6 package com.vesoft.nebula.client.storage.scan;
8 import com.facebook.thrift.TException;
9 import com.google.common.base.Charsets;
10 import com.vesoft.nebula.DataSet;
11 import com.vesoft.nebula.client.graph.data.HostAddress;
12 import com.vesoft.nebula.client.meta.MetaManager;
13 import com.vesoft.nebula.client.storage.GraphStorageConnection;
14 import com.vesoft.nebula.client.storage.StorageConnPool;
15 import com.vesoft.nebula.client.storage.data.ScanStatus;
16 import com.vesoft.nebula.storage.ScanCursor;
17 import com.vesoft.nebula.storage.ScanResponse;
18 import com.vesoft.nebula.storage.ScanVertexRequest;
19 import java.util.ArrayList;
20 import java.util.Collections;
21 import java.util.HashMap;
22 import java.util.List;
25 import java.util.concurrent.CountDownLatch;
26 import java.util.concurrent.ExecutorService;
27 import java.util.concurrent.Executors;
28 import java.util.concurrent.atomic.AtomicInteger;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
38 private final ScanVertexRequest request;
39 private ExecutorService threadPool =
null;
43 Set<PartScanInfo> partScanInfoList,
44 List<HostAddress> addresses,
45 ScanVertexRequest request,
51 super(metaManager, pool,
new PartScanQueue(partScanInfoList), addresses, spaceName,
52 labelName, partSuccess, user, password);
53 this.request = request;
67 throw new IllegalAccessException(
"iterator has no more data");
69 final List<DataSet> results =
70 Collections.synchronizedList(
new ArrayList<>(addresses.size()));
71 List<Exception> exceptions =
72 Collections.synchronizedList(
new ArrayList<>(addresses.size()));
73 CountDownLatch countDownLatch =
new CountDownLatch(addresses.size());
74 AtomicInteger existSuccess =
new AtomicInteger(0);
76 threadPool = Executors.newFixedThreadPool(addresses.size());
79 threadPool.submit(() -> {
80 ScanResponse response;
83 if (partInfo ==
null) {
84 countDownLatch.countDown();
85 existSuccess.addAndGet(1);
91 connection = pool.getStorageConnection(addr);
92 }
catch (Exception e) {
93 LOGGER.error(
"get storage client error, ", e);
95 countDownLatch.countDown();
99 Map<Integer, ScanCursor> cursorMap =
new HashMap<>();
100 cursorMap.put(partInfo.getPart(), partInfo.getCursor());
101 ScanVertexRequest partRequest =
new ScanVertexRequest(request);
102 partRequest.setParts(cursorMap);
103 if (user !=
null && password !=
null) {
104 partRequest.setUsername(user.getBytes(Charsets.UTF_8));
105 partRequest.setPassword(password.getBytes(Charsets.UTF_8));
107 partRequest.setNeed_authenticate(
true);
109 response = connection.scanVertex(partRequest);
110 }
catch (TException e) {
111 LOGGER.error(String.format(
"Scan vertex failed for %s", e.getMessage()), e);
114 countDownLatch.countDown();
118 if (response ==
null) {
119 handleNullResponse(partInfo, exceptions);
120 countDownLatch.countDown();
124 if (isSuccessful(response)) {
125 handleSucceedResult(existSuccess, response, partInfo);
126 results.add(response.getProps());
129 if (response.getResult() !=
null) {
130 handleFailedResult(response, partInfo, exceptions);
132 handleNullResult(partInfo, exceptions);
134 pool.release(addr, connection);
135 countDownLatch.countDown();
140 countDownLatch.await();
141 threadPool.shutdown();
142 }
catch (InterruptedException interruptedE) {
143 LOGGER.error(
"scan interrupted:", interruptedE);
148 hasNext = partScanQueue.size() > 0;
150 if (existSuccess.get() == 0) {
151 throwExceptions(exceptions);
157 hasNext = partScanQueue.size() > 0 && exceptions.isEmpty();
159 if (!exceptions.isEmpty()) {
160 throwExceptions(exceptions);
162 boolean success = (existSuccess.get() == addresses.size());
163 List<DataSet> finalResults = success ? results :
null;
172 public static class ScanVertexResultBuilder {
176 Set<PartScanInfo> partScanInfoList;
177 List<HostAddress> addresses;
178 ScanVertexRequest request;
181 boolean partSuccess =
false;
184 String password =
null;
186 public ScanVertexResultBuilder withMetaClient(
MetaManager metaManager) {
187 this.metaManager = metaManager;
196 public ScanVertexResultBuilder withPartScanInfo(Set<PartScanInfo> partScanInfoList) {
197 this.partScanInfoList = partScanInfoList;
201 public ScanVertexResultBuilder withAddresses(List<HostAddress> addresses) {
202 this.addresses = addresses;
206 public ScanVertexResultBuilder withRequest(ScanVertexRequest request) {
207 this.request = request;
211 public ScanVertexResultBuilder withSpaceName(String spaceName) {
212 this.spaceName = spaceName;
216 public ScanVertexResultBuilder withTagName(String tagName) {
217 this.tagName = tagName;
221 public ScanVertexResultBuilder withPartSuccess(
boolean partSuccess) {
222 this.partSuccess = partSuccess;
226 public ScanVertexResultBuilder withUser(String user) {
231 public ScanVertexResultBuilder withPassword(String password) {
232 this.password = password;
236 public ScanVertexResultIterator build() {
237 return new ScanVertexResultIterator(
synchronized PartScanInfo getPart(HostAddress leader)
get part according to leader
synchronized void dropPart(PartScanInfo partScanInfo)
delete part from set
ScanVertexResult's iterator.
ScanVertexResult next()
get the next vertex set
PART_SUCCESS
part of parts succeed
ALL_SUCCESS
all parts succeed