6 package com.vesoft.nebula.client.storage.scan;
8 import com.facebook.thrift.TException;
9 import com.google.common.base.Charsets;
10 import com.vesoft.nebula.DataSet;
11 import com.vesoft.nebula.client.graph.data.HostAddress;
12 import com.vesoft.nebula.client.meta.MetaManager;
13 import com.vesoft.nebula.client.storage.GraphStorageConnection;
14 import com.vesoft.nebula.client.storage.StorageConnPool;
15 import com.vesoft.nebula.client.storage.data.ScanStatus;
16 import com.vesoft.nebula.storage.ScanCursor;
17 import com.vesoft.nebula.storage.ScanEdgeRequest;
18 import com.vesoft.nebula.storage.ScanResponse;
19 import java.util.ArrayList;
20 import java.util.Collections;
21 import java.util.HashMap;
22 import java.util.List;
25 import java.util.concurrent.CountDownLatch;
26 import java.util.concurrent.ExecutorService;
27 import java.util.concurrent.Executors;
28 import java.util.concurrent.atomic.AtomicInteger;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
35 private final ScanEdgeRequest request;
36 private ExecutorService threadPool =
null;
40 Set<PartScanInfo> partScanInfoList,
41 List<HostAddress> addresses,
42 ScanEdgeRequest request,
48 super(metaManager, pool,
new PartScanQueue(partScanInfoList), addresses, spaceName,
49 labelName, partSuccess, user, password);
50 this.request = request;
64 throw new IllegalAccessException(
"iterator has no more data");
67 final List<DataSet> results =
68 Collections.synchronizedList(
new ArrayList<>(addresses.size()));
69 List<Exception> exceptions =
70 Collections.synchronizedList(
new ArrayList<>(addresses.size()));
71 CountDownLatch countDownLatch =
new CountDownLatch(addresses.size());
72 AtomicInteger existSuccess =
new AtomicInteger(0);
74 threadPool = Executors.newFixedThreadPool(addresses.size());
76 threadPool.submit(() -> {
77 ScanResponse response;
80 if (partInfo ==
null) {
81 countDownLatch.countDown();
82 existSuccess.addAndGet(1);
88 connection = pool.getStorageConnection(
new HostAddress(addr.getHost(),
90 }
catch (Exception e) {
91 LOGGER.error(
"get storage client error, ", e);
93 countDownLatch.countDown();
97 Map<Integer, ScanCursor> cursorMap =
new HashMap<>();
98 cursorMap.put(partInfo.getPart(), partInfo.getCursor());
99 ScanEdgeRequest partRequest =
new ScanEdgeRequest(request);
100 partRequest.setParts(cursorMap);
101 if (user !=
null && password !=
null) {
102 partRequest.setUsername(user.getBytes(Charsets.UTF_8));
103 partRequest.setPassword(password.getBytes(Charsets.UTF_8));
105 partRequest.setNeed_authenticate(
true);
107 response = connection.scanEdge(partRequest);
108 }
catch (TException e) {
109 LOGGER.error(String.format(
"Scan edgeRow failed for %s", e.getMessage()), e);
112 countDownLatch.countDown();
116 if (response ==
null) {
117 handleNullResponse(partInfo, exceptions);
118 countDownLatch.countDown();
122 if (isSuccessful(response)) {
123 handleSucceedResult(existSuccess, response, partInfo);
124 results.add(response.getProps());
127 if (response.getResult() !=
null) {
128 handleFailedResult(response, partInfo, exceptions);
130 handleNullResult(partInfo, exceptions);
132 pool.release(
new HostAddress(addr.getHost(), addr.getPort()), connection);
133 countDownLatch.countDown();
139 countDownLatch.await();
140 threadPool.shutdown();
141 }
catch (InterruptedException interruptedE) {
142 LOGGER.error(
"scan interrupted:", interruptedE);
147 hasNext = partScanQueue.size() > 0;
149 if (existSuccess.get() == 0) {
150 throwExceptions(exceptions);
156 hasNext = partScanQueue.size() > 0 && exceptions.isEmpty();
158 if (!exceptions.isEmpty()) {
159 throwExceptions(exceptions);
161 boolean success = (existSuccess.get() == addresses.size());
162 List<DataSet> finalResults = success ? results :
null;
171 public static class ScanEdgeResultBuilder {
175 Set<PartScanInfo> partScanInfoList;
176 List<HostAddress> addresses;
177 ScanEdgeRequest request;
180 boolean partSuccess =
false;
182 String password =
null;
184 public ScanEdgeResultBuilder withMetaClient(
MetaManager metaManager) {
185 this.metaManager = metaManager;
194 public ScanEdgeResultBuilder withPartScanInfo(Set<PartScanInfo> partScanInfoList) {
195 this.partScanInfoList = partScanInfoList;
199 public ScanEdgeResultBuilder withAddresses(List<HostAddress> addresses) {
200 this.addresses = addresses;
204 public ScanEdgeResultBuilder withRequest(ScanEdgeRequest request) {
205 this.request = request;
209 public ScanEdgeResultBuilder withSpaceName(String spaceName) {
210 this.spaceName = spaceName;
214 public ScanEdgeResultBuilder withEdgeName(String edgeName) {
215 this.edgeName = edgeName;
219 public ScanEdgeResultBuilder withPartSuccess(
boolean partSuccess) {
220 this.partSuccess = partSuccess;
224 public ScanEdgeResultBuilder withUser(String user) {
229 public ScanEdgeResultBuilder withPassword(String password) {
230 this.password = password;
234 public ScanEdgeResultIterator build() {
235 return new ScanEdgeResultIterator(
synchronized PartScanInfo getPart(HostAddress leader)
get part according to leader
synchronized void dropPart(PartScanInfo partScanInfo)
delete part from set
ScanEdgeResult next()
get the next edge set
PART_SUCCESS
part of parts succeed
ALL_SUCCESS
all parts succeed