NebulaGraph Java Client  release-3.8
All Classes Functions Variables
ScanVertexResultIterator.java
1 /* Copyright (c) 2020 vesoft inc. All rights reserved.
2  *
3  * This source code is licensed under Apache 2.0 License.
4  */
5 
6 package com.vesoft.nebula.client.storage.scan;
7 
8 import com.facebook.thrift.TException;
9 import com.google.common.base.Charsets;
10 import com.vesoft.nebula.DataSet;
11 import com.vesoft.nebula.ErrorCode;
12 import com.vesoft.nebula.HostAddr;
13 import com.vesoft.nebula.client.graph.data.HostAddress;
14 import com.vesoft.nebula.client.meta.MetaManager;
15 import com.vesoft.nebula.client.storage.GraphStorageConnection;
16 import com.vesoft.nebula.client.storage.StorageConnPool;
17 import com.vesoft.nebula.client.storage.data.ScanStatus;
18 import com.vesoft.nebula.storage.ScanCursor;
19 import com.vesoft.nebula.storage.ScanResponse;
20 import com.vesoft.nebula.storage.ScanVertexRequest;
21 import java.util.ArrayList;
22 import java.util.Collections;
23 import java.util.HashMap;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.Set;
27 import java.util.concurrent.CountDownLatch;
28 import java.util.concurrent.ExecutorService;
29 import java.util.concurrent.Executors;
30 import java.util.concurrent.atomic.AtomicInteger;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
33 
38  private static final Logger LOGGER = LoggerFactory.getLogger(ScanVertexResultIterator.class);
39 
40  private final ScanVertexRequest request;
41  private ExecutorService threadPool = null;
42 
43  private ScanVertexResultIterator(MetaManager metaManager,
44  StorageConnPool pool,
45  Set<PartScanInfo> partScanInfoList,
46  List<HostAddress> addresses,
47  ScanVertexRequest request,
48  String spaceName,
49  String labelName,
50  boolean partSuccess,
51  String user,
52  String password,
53  Map<String, String> storageAddressMapping) {
54  super(metaManager, pool, new PartScanQueue(partScanInfoList), addresses, spaceName,
55  labelName, partSuccess, user, password, storageAddressMapping);
56  this.request = request;
57  }
58 
59 
68  public ScanVertexResult next() throws Exception {
69  if (!hasNext()) {
70  throw new IllegalAccessException("iterator has no more data");
71  }
72  final List<DataSet> results =
73  Collections.synchronizedList(new ArrayList<>(addresses.size()));
74  List<Exception> exceptions =
75  Collections.synchronizedList(new ArrayList<>(addresses.size()));
76  CountDownLatch countDownLatch = new CountDownLatch(addresses.size());
77  AtomicInteger existSuccess = new AtomicInteger(0);
78 
79  threadPool = Executors.newFixedThreadPool(addresses.size());
80 
81  for (HostAddress addr : addresses) {
82  threadPool.submit(() -> {
83  HostAddress leader = addr;
84  ScanResponse response;
85  PartScanInfo partInfo = partScanQueue.getPart(leader);
86  // no part need to scan
87  if (partInfo == null) {
88  countDownLatch.countDown();
89  existSuccess.addAndGet(1);
90  return;
91  }
92 
93  GraphStorageConnection connection;
94  try {
95  connection = pool.getStorageConnection(leader);
96  } catch (Exception e) {
97  LOGGER.error("get storage client error, ", e);
98  exceptions.add(e);
99  countDownLatch.countDown();
100  return;
101  }
102 
103  Map<Integer, ScanCursor> cursorMap = new HashMap<>();
104  cursorMap.put(partInfo.getPart(), partInfo.getCursor());
105  ScanVertexRequest partRequest = new ScanVertexRequest(request);
106  partRequest.setParts(cursorMap);
107  if (user != null && password != null) {
108  partRequest.setUsername(user.getBytes(Charsets.UTF_8));
109  partRequest.setPassword(password.getBytes(Charsets.UTF_8));
110  }
111  partRequest.setNeed_authenticate(true);
112  try {
113  response = connection.scanVertex(partRequest);
114  if (!response.getResult().failed_parts.isEmpty()
115  && response.getResult().failed_parts.get(0).code
116  == ErrorCode.E_LEADER_CHANGED) {
117  pool.release(leader, connection);
118  HostAddr newLeader = response.getResult().failed_parts.get(0).leader;
119  HostAddr availableLeader = storageAddressMapping
120  .getOrDefault(newLeader, newLeader);
121  leader = new HostAddress(availableLeader.host, availableLeader.getPort());
122  connection = pool.getStorageConnection(leader);
123  response = connection.scanVertex(partRequest);
124  }
125  } catch (Exception e) {
126  LOGGER.error(String.format("Scan vertex failed for %s", e.getMessage()), e);
127  exceptions.add(e);
128  partScanQueue.dropPart(partInfo);
129  countDownLatch.countDown();
130  return;
131  } finally {
132  pool.release(leader, connection);
133  }
134 
135  if (response == null) {
136  handleNullResponse(partInfo, exceptions);
137  countDownLatch.countDown();
138  return;
139  }
140 
141  if (isSuccessful(response)) {
142  handleSucceedResult(existSuccess, response, partInfo);
143  results.add(response.getProps());
144  }
145 
146  if (response.getResult() != null) {
147  handleFailedResult(response, partInfo, exceptions);
148  } else {
149  handleNullResult(partInfo, exceptions);
150  }
151 
152  countDownLatch.countDown();
153  });
154  }
155 
156  try {
157  countDownLatch.await();
158  threadPool.shutdown();
159  } catch (InterruptedException interruptedE) {
160  LOGGER.error("scan interrupted:", interruptedE);
161  throw interruptedE;
162  }
163 
164  if (partSuccess) {
165  hasNext = partScanQueue.size() > 0;
166  // no part succeed, throw ExecuteFailedException
167  if (existSuccess.get() == 0) {
168  throwExceptions(exceptions);
169  }
170  ScanStatus status = exceptions.size() > 0 ? ScanStatus.PART_SUCCESS :
172  return new ScanVertexResult(results, status);
173  } else {
174  hasNext = partScanQueue.size() > 0 && exceptions.isEmpty();
175  // any part failed, throw ExecuteFailedException
176  if (!exceptions.isEmpty()) {
177  throwExceptions(exceptions);
178  }
179  boolean success = (existSuccess.get() == addresses.size());
180  List<DataSet> finalResults = success ? results : null;
181  return new ScanVertexResult(finalResults, ScanStatus.ALL_SUCCESS);
182  }
183  }
184 
185 
189  public static class ScanVertexResultBuilder {
190 
191  MetaManager metaManager;
192  StorageConnPool pool;
193  Set<PartScanInfo> partScanInfoList;
194  List<HostAddress> addresses;
195  ScanVertexRequest request;
196  String spaceName;
197  String tagName;
198  boolean partSuccess = false;
199 
200  String user = null;
201  String password = null;
202 
203  Map<String, String> storageAddressMapping = null;
204 
205  public ScanVertexResultBuilder withMetaClient(MetaManager metaManager) {
206  this.metaManager = metaManager;
207  return this;
208  }
209 
210  public ScanVertexResultBuilder withPool(StorageConnPool pool) {
211  this.pool = pool;
212  return this;
213  }
214 
215  public ScanVertexResultBuilder withPartScanInfo(Set<PartScanInfo> partScanInfoList) {
216  this.partScanInfoList = partScanInfoList;
217  return this;
218  }
219 
220  public ScanVertexResultBuilder withAddresses(List<HostAddress> addresses) {
221  this.addresses = addresses;
222  return this;
223  }
224 
225  public ScanVertexResultBuilder withRequest(ScanVertexRequest request) {
226  this.request = request;
227  return this;
228  }
229 
230  public ScanVertexResultBuilder withSpaceName(String spaceName) {
231  this.spaceName = spaceName;
232  return this;
233  }
234 
235  public ScanVertexResultBuilder withTagName(String tagName) {
236  this.tagName = tagName;
237  return this;
238  }
239 
240  public ScanVertexResultBuilder withPartSuccess(boolean partSuccess) {
241  this.partSuccess = partSuccess;
242  return this;
243  }
244 
245  public ScanVertexResultBuilder withUser(String user) {
246  this.user = user;
247  return this;
248  }
249 
250  public ScanVertexResultBuilder withPassword(String password) {
251  this.password = password;
252  return this;
253  }
254 
255  public ScanVertexResultBuilder withStorageAddressMapping(
256  Map<String, String> storageAddressMapping) {
257  this.storageAddressMapping = storageAddressMapping;
258  return this;
259  }
260 
261  public ScanVertexResultIterator build() {
262  return new ScanVertexResultIterator(
263  metaManager,
264  pool,
265  partScanInfoList,
266  addresses,
267  request,
268  spaceName,
269  tagName,
270  partSuccess,
271  user,
272  password,
273  storageAddressMapping);
274  }
275  }
276 }
MetaManager is a manager for meta info, such as spaces,tags and edges.
synchronized PartScanInfo getPart(HostAddress leader)
get part according to leader
synchronized void dropPart(PartScanInfo partScanInfo)
delete part from set