6 package com.vesoft.nebula.client.storage.scan;
8 import com.facebook.thrift.TException;
9 import com.vesoft.nebula.DataSet;
10 import com.vesoft.nebula.client.graph.data.HostAddress;
11 import com.vesoft.nebula.client.meta.MetaManager;
12 import com.vesoft.nebula.client.storage.GraphStorageConnection;
13 import com.vesoft.nebula.client.storage.StorageConnPool;
14 import com.vesoft.nebula.client.storage.data.ScanStatus;
15 import com.vesoft.nebula.storage.ScanCursor;
16 import com.vesoft.nebula.storage.ScanResponse;
17 import com.vesoft.nebula.storage.ScanVertexRequest;
18 import java.util.ArrayList;
19 import java.util.Collections;
20 import java.util.HashMap;
21 import java.util.List;
24 import java.util.concurrent.CountDownLatch;
25 import java.util.concurrent.ExecutorService;
26 import java.util.concurrent.Executors;
27 import java.util.concurrent.atomic.AtomicInteger;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
37 private final ScanVertexRequest request;
38 private ExecutorService threadPool =
null;
42 Set<PartScanInfo> partScanInfoList,
43 List<HostAddress> addresses,
44 ScanVertexRequest request,
47 boolean partSuccess) {
48 super(metaManager, pool,
new PartScanQueue(partScanInfoList), addresses, spaceName,
49 labelName, partSuccess);
50 this.request = request;
64 throw new IllegalAccessException(
"iterator has no more data");
66 final List<DataSet> results =
67 Collections.synchronizedList(
new ArrayList<>(addresses.size()));
68 List<Exception> exceptions =
69 Collections.synchronizedList(
new ArrayList<>(addresses.size()));
70 CountDownLatch countDownLatch =
new CountDownLatch(addresses.size());
71 AtomicInteger existSuccess =
new AtomicInteger(0);
73 threadPool = Executors.newFixedThreadPool(addresses.size());
76 threadPool.submit(() -> {
77 ScanResponse response;
80 if (partInfo ==
null) {
81 countDownLatch.countDown();
82 existSuccess.addAndGet(1);
88 connection = pool.getStorageConnection(addr);
89 }
catch (Exception e) {
90 LOGGER.error(
"get storage client error, ", e);
92 countDownLatch.countDown();
96 Map<Integer, ScanCursor> cursorMap =
new HashMap<>();
97 cursorMap.put(partInfo.getPart(), partInfo.getCursor());
98 ScanVertexRequest partRequest =
new ScanVertexRequest(request);
99 partRequest.setParts(cursorMap);
101 response = connection.scanVertex(partRequest);
102 }
catch (TException e) {
103 LOGGER.error(String.format(
"Scan vertex failed for %s", e.getMessage()), e);
106 countDownLatch.countDown();
110 if (response ==
null) {
111 handleNullResponse(partInfo, exceptions);
112 countDownLatch.countDown();
116 if (isSuccessful(response)) {
117 handleSucceedResult(existSuccess, response, partInfo);
118 results.add(response.getProps());
121 if (response.getResult() !=
null) {
122 handleFailedResult(response, partInfo, exceptions);
124 handleNullResult(partInfo, exceptions);
126 pool.release(addr, connection);
127 countDownLatch.countDown();
132 countDownLatch.await();
133 threadPool.shutdown();
134 }
catch (InterruptedException interruptedE) {
135 LOGGER.error(
"scan interrupted:", interruptedE);
140 hasNext = partScanQueue.size() > 0;
142 if (existSuccess.get() == 0) {
143 throwExceptions(exceptions);
149 hasNext = partScanQueue.size() > 0 && exceptions.isEmpty();
151 if (!exceptions.isEmpty()) {
152 throwExceptions(exceptions);
154 boolean success = (existSuccess.get() == addresses.size());
155 List<DataSet> finalResults = success ? results :
null;
167 public static class ScanVertexResultBuilder {
171 Set<PartScanInfo> partScanInfoList;
172 List<HostAddress> addresses;
173 ScanVertexRequest request;
176 boolean partSuccess =
false;
178 public ScanVertexResultBuilder withMetaClient(
MetaManager metaManager) {
179 this.metaManager = metaManager;
188 public ScanVertexResultBuilder withPartScanInfo(Set<PartScanInfo> partScanInfoList) {
189 this.partScanInfoList = partScanInfoList;
193 public ScanVertexResultBuilder withAddresses(List<HostAddress> addresses) {
194 this.addresses = addresses;
198 public ScanVertexResultBuilder withRequest(ScanVertexRequest request) {
199 this.request = request;
203 public ScanVertexResultBuilder withSpaceName(String spaceName) {
204 this.spaceName = spaceName;
208 public ScanVertexResultBuilder withTagName(String tagName) {
209 this.tagName = tagName;
213 public ScanVertexResultBuilder withPartSuccess(
boolean partSuccess) {
214 this.partSuccess = partSuccess;
218 public ScanVertexResultIterator build() {
219 return new ScanVertexResultIterator(
synchronized PartScanInfo getPart(HostAddress leader)
get part according to leader
synchronized void dropPart(PartScanInfo partScanInfo)
delete part from set
ScanVertexResult's iterator.
ScanVertexResult next()
get the next vertex set
PART_SUCCESS
part of parts succeed
ALL_SUCCESS
all parts succeed