NebulaGraph Java Client  release-3.8
ScanVertexResultIterator.java
1 /* Copyright (c) 2020 vesoft inc. All rights reserved.
2  *
3  * This source code is licensed under Apache 2.0 License.
4  */
5 
6 package com.vesoft.nebula.client.storage.scan;
7 
8 import com.facebook.thrift.TException;
9 import com.google.common.base.Charsets;
10 import com.vesoft.nebula.DataSet;
11 import com.vesoft.nebula.client.graph.data.HostAddress;
12 import com.vesoft.nebula.client.meta.MetaManager;
13 import com.vesoft.nebula.client.storage.GraphStorageConnection;
14 import com.vesoft.nebula.client.storage.StorageConnPool;
15 import com.vesoft.nebula.client.storage.data.ScanStatus;
16 import com.vesoft.nebula.storage.ScanCursor;
17 import com.vesoft.nebula.storage.ScanResponse;
18 import com.vesoft.nebula.storage.ScanVertexRequest;
19 import java.util.ArrayList;
20 import java.util.Collections;
21 import java.util.HashMap;
22 import java.util.List;
23 import java.util.Map;
24 import java.util.Set;
25 import java.util.concurrent.CountDownLatch;
26 import java.util.concurrent.ExecutorService;
27 import java.util.concurrent.Executors;
28 import java.util.concurrent.atomic.AtomicInteger;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31 
36  private static final Logger LOGGER = LoggerFactory.getLogger(ScanVertexResultIterator.class);
37 
38  private final ScanVertexRequest request;
39  private ExecutorService threadPool = null;
40 
41  private ScanVertexResultIterator(MetaManager metaManager,
42  StorageConnPool pool,
43  Set<PartScanInfo> partScanInfoList,
44  List<HostAddress> addresses,
45  ScanVertexRequest request,
46  String spaceName,
47  String labelName,
48  boolean partSuccess,
49  String user,
50  String password) {
51  super(metaManager, pool, new PartScanQueue(partScanInfoList), addresses, spaceName,
52  labelName, partSuccess, user, password);
53  this.request = request;
54  }
55 
56 
65  public ScanVertexResult next() throws Exception {
66  if (!hasNext()) {
67  throw new IllegalAccessException("iterator has no more data");
68  }
69  final List<DataSet> results =
70  Collections.synchronizedList(new ArrayList<>(addresses.size()));
71  List<Exception> exceptions =
72  Collections.synchronizedList(new ArrayList<>(addresses.size()));
73  CountDownLatch countDownLatch = new CountDownLatch(addresses.size());
74  AtomicInteger existSuccess = new AtomicInteger(0);
75 
76  threadPool = Executors.newFixedThreadPool(addresses.size());
77 
78  for (HostAddress addr : addresses) {
79  threadPool.submit(() -> {
80  ScanResponse response;
81  PartScanInfo partInfo = partScanQueue.getPart(addr);
82  // no part need to scan
83  if (partInfo == null) {
84  countDownLatch.countDown();
85  existSuccess.addAndGet(1);
86  return;
87  }
88 
89  GraphStorageConnection connection;
90  try {
91  connection = pool.getStorageConnection(addr);
92  } catch (Exception e) {
93  LOGGER.error("get storage client error, ", e);
94  exceptions.add(e);
95  countDownLatch.countDown();
96  return;
97  }
98 
99  Map<Integer, ScanCursor> cursorMap = new HashMap<>();
100  cursorMap.put(partInfo.getPart(), partInfo.getCursor());
101  ScanVertexRequest partRequest = new ScanVertexRequest(request);
102  partRequest.setParts(cursorMap);
103  if (user != null && password != null) {
104  partRequest.setUsername(user.getBytes(Charsets.UTF_8));
105  partRequest.setPassword(password.getBytes(Charsets.UTF_8));
106  }
107  partRequest.setNeed_authenticate(true);
108  try {
109  response = connection.scanVertex(partRequest);
110  } catch (TException e) {
111  LOGGER.error(String.format("Scan vertex failed for %s", e.getMessage()), e);
112  exceptions.add(e);
113  partScanQueue.dropPart(partInfo);
114  countDownLatch.countDown();
115  return;
116  }
117 
118  if (response == null) {
119  handleNullResponse(partInfo, exceptions);
120  countDownLatch.countDown();
121  return;
122  }
123 
124  if (isSuccessful(response)) {
125  handleSucceedResult(existSuccess, response, partInfo);
126  results.add(response.getProps());
127  }
128 
129  if (response.getResult() != null) {
130  handleFailedResult(response, partInfo, exceptions);
131  } else {
132  handleNullResult(partInfo, exceptions);
133  }
134  pool.release(addr, connection);
135  countDownLatch.countDown();
136  });
137  }
138 
139  try {
140  countDownLatch.await();
141  threadPool.shutdown();
142  } catch (InterruptedException interruptedE) {
143  LOGGER.error("scan interrupted:", interruptedE);
144  throw interruptedE;
145  }
146 
147  if (partSuccess) {
148  hasNext = partScanQueue.size() > 0;
149  // no part succeed, throw ExecuteFailedException
150  if (existSuccess.get() == 0) {
151  throwExceptions(exceptions);
152  }
153  ScanStatus status = exceptions.size() > 0 ? ScanStatus.PART_SUCCESS :
155  return new ScanVertexResult(results, status);
156  } else {
157  hasNext = partScanQueue.size() > 0 && exceptions.isEmpty();
158  // any part failed, throw ExecuteFailedException
159  if (!exceptions.isEmpty()) {
160  throwExceptions(exceptions);
161  }
162  boolean success = (existSuccess.get() == addresses.size());
163  List<DataSet> finalResults = success ? results : null;
164  return new ScanVertexResult(finalResults, ScanStatus.ALL_SUCCESS);
165  }
166  }
167 
168 
172  public static class ScanVertexResultBuilder {
173 
174  MetaManager metaManager;
175  StorageConnPool pool;
176  Set<PartScanInfo> partScanInfoList;
177  List<HostAddress> addresses;
178  ScanVertexRequest request;
179  String spaceName;
180  String tagName;
181  boolean partSuccess = false;
182 
183  String user = null;
184  String password = null;
185 
186  public ScanVertexResultBuilder withMetaClient(MetaManager metaManager) {
187  this.metaManager = metaManager;
188  return this;
189  }
190 
191  public ScanVertexResultBuilder withPool(StorageConnPool pool) {
192  this.pool = pool;
193  return this;
194  }
195 
196  public ScanVertexResultBuilder withPartScanInfo(Set<PartScanInfo> partScanInfoList) {
197  this.partScanInfoList = partScanInfoList;
198  return this;
199  }
200 
201  public ScanVertexResultBuilder withAddresses(List<HostAddress> addresses) {
202  this.addresses = addresses;
203  return this;
204  }
205 
206  public ScanVertexResultBuilder withRequest(ScanVertexRequest request) {
207  this.request = request;
208  return this;
209  }
210 
211  public ScanVertexResultBuilder withSpaceName(String spaceName) {
212  this.spaceName = spaceName;
213  return this;
214  }
215 
216  public ScanVertexResultBuilder withTagName(String tagName) {
217  this.tagName = tagName;
218  return this;
219  }
220 
221  public ScanVertexResultBuilder withPartSuccess(boolean partSuccess) {
222  this.partSuccess = partSuccess;
223  return this;
224  }
225 
226  public ScanVertexResultBuilder withUser(String user) {
227  this.user = user;
228  return this;
229  }
230 
231  public ScanVertexResultBuilder withPassword(String password) {
232  this.password = password;
233  return this;
234  }
235 
236  public ScanVertexResultIterator build() {
237  return new ScanVertexResultIterator(
238  metaManager,
239  pool,
240  partScanInfoList,
241  addresses,
242  request,
243  spaceName,
244  tagName,
245  partSuccess,
246  user,
247  password);
248  }
249  }
250 }
MetaManager is a manager for meta info, such as spaces,tags and edges.
synchronized PartScanInfo getPart(HostAddress leader)
get part according to leader
synchronized void dropPart(PartScanInfo partScanInfo)
delete part from set