6 package com.vesoft.nebula.client.storage.scan;
8 import com.facebook.thrift.TException;
9 import com.vesoft.nebula.DataSet;
10 import com.vesoft.nebula.client.graph.data.HostAddress;
11 import com.vesoft.nebula.client.meta.MetaManager;
12 import com.vesoft.nebula.client.storage.GraphStorageConnection;
13 import com.vesoft.nebula.client.storage.StorageConnPool;
14 import com.vesoft.nebula.client.storage.data.ScanStatus;
15 import com.vesoft.nebula.storage.ScanCursor;
16 import com.vesoft.nebula.storage.ScanEdgeRequest;
17 import com.vesoft.nebula.storage.ScanResponse;
18 import java.util.ArrayList;
19 import java.util.Collections;
20 import java.util.HashMap;
21 import java.util.List;
24 import java.util.concurrent.CountDownLatch;
25 import java.util.concurrent.ExecutorService;
26 import java.util.concurrent.Executors;
27 import java.util.concurrent.atomic.AtomicInteger;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
34 private final ScanEdgeRequest request;
35 private ExecutorService threadPool =
null;
39 Set<PartScanInfo> partScanInfoList,
40 List<HostAddress> addresses,
41 ScanEdgeRequest request,
44 boolean partSuccess) {
45 super(metaManager, pool,
new PartScanQueue(partScanInfoList), addresses, spaceName,
46 labelName, partSuccess);
47 this.request = request;
61 throw new IllegalAccessException(
"iterator has no more data");
64 final List<DataSet> results =
65 Collections.synchronizedList(
new ArrayList<>(addresses.size()));
66 List<Exception> exceptions =
67 Collections.synchronizedList(
new ArrayList<>(addresses.size()));
68 CountDownLatch countDownLatch =
new CountDownLatch(addresses.size());
69 AtomicInteger existSuccess =
new AtomicInteger(0);
71 threadPool = Executors.newFixedThreadPool(addresses.size());
73 threadPool.submit(() -> {
74 ScanResponse response;
77 if (partInfo ==
null) {
78 countDownLatch.countDown();
79 existSuccess.addAndGet(1);
85 connection = pool.getStorageConnection(
new HostAddress(addr.getHost(),
87 }
catch (Exception e) {
88 LOGGER.error(
"get storage client error, ", e);
90 countDownLatch.countDown();
94 Map<Integer, ScanCursor> cursorMap =
new HashMap<>();
95 cursorMap.put(partInfo.getPart(), partInfo.getCursor());
96 ScanEdgeRequest partRequest =
new ScanEdgeRequest(request);
97 partRequest.setParts(cursorMap);
99 response = connection.scanEdge(partRequest);
100 }
catch (TException e) {
101 LOGGER.error(String.format(
"Scan edgeRow failed for %s", e.getMessage()), e);
104 countDownLatch.countDown();
108 if (response ==
null) {
109 handleNullResponse(partInfo, exceptions);
110 countDownLatch.countDown();
114 if (isSuccessful(response)) {
115 handleSucceedResult(existSuccess, response, partInfo);
116 results.add(response.getProps());
119 if (response.getResult() !=
null) {
120 handleFailedResult(response, partInfo, exceptions);
122 handleNullResult(partInfo, exceptions);
124 pool.release(
new HostAddress(addr.getHost(), addr.getPort()), connection);
125 countDownLatch.countDown();
131 countDownLatch.await();
132 threadPool.shutdown();
133 }
catch (InterruptedException interruptedE) {
134 LOGGER.error(
"scan interrupted:", interruptedE);
139 hasNext = partScanQueue.size() > 0;
141 if (existSuccess.get() == 0) {
142 throwExceptions(exceptions);
148 hasNext = partScanQueue.size() > 0 && exceptions.isEmpty();
150 if (!exceptions.isEmpty()) {
151 throwExceptions(exceptions);
153 boolean success = (existSuccess.get() == addresses.size());
154 List<DataSet> finalResults = success ? results :
null;
163 public static class ScanEdgeResultBuilder {
167 Set<PartScanInfo> partScanInfoList;
168 List<HostAddress> addresses;
169 ScanEdgeRequest request;
172 boolean partSuccess =
false;
174 public ScanEdgeResultBuilder withMetaClient(
MetaManager metaManager) {
175 this.metaManager = metaManager;
184 public ScanEdgeResultBuilder withPartScanInfo(Set<PartScanInfo> partScanInfoList) {
185 this.partScanInfoList = partScanInfoList;
189 public ScanEdgeResultBuilder withAddresses(List<HostAddress> addresses) {
190 this.addresses = addresses;
194 public ScanEdgeResultBuilder withRequest(ScanEdgeRequest request) {
195 this.request = request;
199 public ScanEdgeResultBuilder withSpaceName(String spaceName) {
200 this.spaceName = spaceName;
204 public ScanEdgeResultBuilder withEdgeName(String edgeName) {
205 this.edgeName = edgeName;
209 public ScanEdgeResultBuilder withPartSuccess(
boolean partSuccess) {
210 this.partSuccess = partSuccess;
215 public ScanEdgeResultIterator build() {
216 return new ScanEdgeResultIterator(
synchronized PartScanInfo getPart(HostAddress leader)
get part according to leader
synchronized void dropPart(PartScanInfo partScanInfo)
delete part from set
ScanEdgeResult next()
get the next edge set
PART_SUCCESS
part of parts succeed
ALL_SUCCESS
all parts succeed