6 package com.vesoft.nebula.client.storage.scan;
8 import com.google.common.base.Charsets;
9 import com.vesoft.nebula.DataSet;
10 import com.vesoft.nebula.ErrorCode;
11 import com.vesoft.nebula.HostAddr;
12 import com.vesoft.nebula.client.graph.data.HostAddress;
13 import com.vesoft.nebula.client.meta.MetaManager;
14 import com.vesoft.nebula.client.storage.GraphStorageConnection;
15 import com.vesoft.nebula.client.storage.StorageConnPool;
16 import com.vesoft.nebula.client.storage.data.ScanStatus;
17 import com.vesoft.nebula.storage.ScanCursor;
18 import com.vesoft.nebula.storage.ScanEdgeRequest;
19 import com.vesoft.nebula.storage.ScanResponse;
20 import java.util.ArrayList;
21 import java.util.Collections;
22 import java.util.HashMap;
23 import java.util.List;
26 import java.util.concurrent.CountDownLatch;
27 import java.util.concurrent.ExecutorService;
28 import java.util.concurrent.Executors;
29 import java.util.concurrent.atomic.AtomicInteger;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
36 private final ScanEdgeRequest request;
37 private ExecutorService threadPool =
null;
41 Set<PartScanInfo> partScanInfoList,
42 List<HostAddress> addresses,
43 ScanEdgeRequest request,
49 Map<String, String> storageAddressMapping) {
50 super(metaManager, pool,
new PartScanQueue(partScanInfoList), addresses, spaceName,
51 labelName, partSuccess, user, password, storageAddressMapping);
52 this.request = request;
66 throw new IllegalAccessException(
"iterator has no more data");
69 final List<DataSet> results =
70 Collections.synchronizedList(
new ArrayList<>(addresses.size()));
71 List<Exception> exceptions =
72 Collections.synchronizedList(
new ArrayList<>(addresses.size()));
73 CountDownLatch countDownLatch =
new CountDownLatch(addresses.size());
74 AtomicInteger existSuccess =
new AtomicInteger(0);
76 threadPool = Executors.newFixedThreadPool(addresses.size());
78 threadPool.submit(() -> {
80 ScanResponse response;
83 if (partInfo ==
null) {
84 countDownLatch.countDown();
85 existSuccess.addAndGet(1);
91 connection = pool.getStorageConnection(leader);
92 }
catch (Exception e) {
93 LOGGER.error(
"get storage client error, ", e);
95 countDownLatch.countDown();
99 Map<Integer, ScanCursor> cursorMap =
new HashMap<>();
100 cursorMap.put(partInfo.getPart(), partInfo.getCursor());
101 ScanEdgeRequest partRequest =
new ScanEdgeRequest(request);
102 partRequest.setParts(cursorMap);
103 if (user !=
null && password !=
null) {
104 partRequest.setUsername(user.getBytes(Charsets.UTF_8));
105 partRequest.setPassword(password.getBytes(Charsets.UTF_8));
107 partRequest.setNeed_authenticate(
true);
109 response = connection.scanEdge(partRequest);
110 if (!response.getResult().failed_parts.isEmpty()
111 && response.getResult().failed_parts.get(0).code
112 == ErrorCode.E_LEADER_CHANGED) {
113 pool.release(leader, connection);
114 HostAddr newLeader = response.getResult().failed_parts.get(0).leader;
115 HostAddr availableLeader = storageAddressMapping
116 .getOrDefault(newLeader, newLeader);
117 leader =
new HostAddress(availableLeader.host, availableLeader.getPort());
118 connection = pool.getStorageConnection(leader);
119 response = connection.scanEdge(partRequest);
121 }
catch (Exception e) {
122 LOGGER.error(String.format(
"Scan edgeRow failed for %s", e.getMessage()), e);
125 countDownLatch.countDown();
128 pool.release(leader, connection);
131 if (response ==
null) {
132 handleNullResponse(partInfo, exceptions);
133 countDownLatch.countDown();
137 if (isSuccessful(response)) {
138 handleSucceedResult(existSuccess, response, partInfo);
139 results.add(response.getProps());
142 if (response.getResult() !=
null) {
143 handleFailedResult(response, partInfo, exceptions);
145 handleNullResult(partInfo, exceptions);
147 pool.release(
new HostAddress(addr.getHost(), addr.getPort()), connection);
148 countDownLatch.countDown();
154 countDownLatch.await();
155 threadPool.shutdown();
156 }
catch (InterruptedException interruptedE) {
157 LOGGER.error(
"scan interrupted:", interruptedE);
162 hasNext = partScanQueue.size() > 0;
164 if (existSuccess.get() == 0) {
165 throwExceptions(exceptions);
171 hasNext = partScanQueue.size() > 0 && exceptions.isEmpty();
173 if (!exceptions.isEmpty()) {
174 throwExceptions(exceptions);
176 boolean success = (existSuccess.get() == addresses.size());
177 List<DataSet> finalResults = success ? results :
null;
186 public static class ScanEdgeResultBuilder {
190 Set<PartScanInfo> partScanInfoList;
191 List<HostAddress> addresses;
192 ScanEdgeRequest request;
195 boolean partSuccess =
false;
197 String password =
null;
198 Map<String, String> storageAddressMapping =
null;
200 public ScanEdgeResultBuilder withMetaClient(
MetaManager metaManager) {
201 this.metaManager = metaManager;
210 public ScanEdgeResultBuilder withPartScanInfo(Set<PartScanInfo> partScanInfoList) {
211 this.partScanInfoList = partScanInfoList;
215 public ScanEdgeResultBuilder withAddresses(List<HostAddress> addresses) {
216 this.addresses = addresses;
220 public ScanEdgeResultBuilder withRequest(ScanEdgeRequest request) {
221 this.request = request;
225 public ScanEdgeResultBuilder withSpaceName(String spaceName) {
226 this.spaceName = spaceName;
230 public ScanEdgeResultBuilder withEdgeName(String edgeName) {
231 this.edgeName = edgeName;
235 public ScanEdgeResultBuilder withPartSuccess(
boolean partSuccess) {
236 this.partSuccess = partSuccess;
240 public ScanEdgeResultBuilder withUser(String user) {
245 public ScanEdgeResultBuilder withPassword(String password) {
246 this.password = password;
250 public ScanEdgeResultBuilder withStorageAddressMapping(
251 Map<String, String> storageAddressMapping) {
252 this.storageAddressMapping = storageAddressMapping;
256 public ScanEdgeResultIterator build() {
257 return new ScanEdgeResultIterator(
268 storageAddressMapping);
synchronized PartScanInfo getPart(HostAddress leader)
get part according to leader
synchronized void dropPart(PartScanInfo partScanInfo)
delete part from set
ScanEdgeResult next()
get the next edge set
PART_SUCCESS
part of parts succeed
ALL_SUCCESS
all parts succeed