6 package com.vesoft.nebula.client.storage.scan;
8 import com.facebook.thrift.TException;
9 import com.google.common.base.Charsets;
10 import com.vesoft.nebula.DataSet;
11 import com.vesoft.nebula.ErrorCode;
12 import com.vesoft.nebula.HostAddr;
13 import com.vesoft.nebula.client.graph.data.HostAddress;
14 import com.vesoft.nebula.client.meta.MetaManager;
15 import com.vesoft.nebula.client.storage.GraphStorageConnection;
16 import com.vesoft.nebula.client.storage.StorageConnPool;
17 import com.vesoft.nebula.client.storage.data.ScanStatus;
18 import com.vesoft.nebula.storage.ScanCursor;
19 import com.vesoft.nebula.storage.ScanResponse;
20 import com.vesoft.nebula.storage.ScanVertexRequest;
21 import java.util.ArrayList;
22 import java.util.Collections;
23 import java.util.HashMap;
24 import java.util.List;
27 import java.util.concurrent.CountDownLatch;
28 import java.util.concurrent.ExecutorService;
29 import java.util.concurrent.Executors;
30 import java.util.concurrent.atomic.AtomicInteger;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
40 private final ScanVertexRequest request;
41 private ExecutorService threadPool =
null;
45 Set<PartScanInfo> partScanInfoList,
46 List<HostAddress> addresses,
47 ScanVertexRequest request,
53 Map<String, String> storageAddressMapping) {
54 super(metaManager, pool,
new PartScanQueue(partScanInfoList), addresses, spaceName,
55 labelName, partSuccess, user, password, storageAddressMapping);
56 this.request = request;
70 throw new IllegalAccessException(
"iterator has no more data");
72 final List<DataSet> results =
73 Collections.synchronizedList(
new ArrayList<>(addresses.size()));
74 List<Exception> exceptions =
75 Collections.synchronizedList(
new ArrayList<>(addresses.size()));
76 CountDownLatch countDownLatch =
new CountDownLatch(addresses.size());
77 AtomicInteger existSuccess =
new AtomicInteger(0);
79 threadPool = Executors.newFixedThreadPool(addresses.size());
82 threadPool.submit(() -> {
84 ScanResponse response;
87 if (partInfo ==
null) {
88 countDownLatch.countDown();
89 existSuccess.addAndGet(1);
95 connection = pool.getStorageConnection(leader);
96 }
catch (Exception e) {
97 LOGGER.error(
"get storage client error, ", e);
99 countDownLatch.countDown();
103 Map<Integer, ScanCursor> cursorMap =
new HashMap<>();
104 cursorMap.put(partInfo.getPart(), partInfo.getCursor());
105 ScanVertexRequest partRequest =
new ScanVertexRequest(request);
106 partRequest.setParts(cursorMap);
107 if (user !=
null && password !=
null) {
108 partRequest.setUsername(user.getBytes(Charsets.UTF_8));
109 partRequest.setPassword(password.getBytes(Charsets.UTF_8));
111 partRequest.setNeed_authenticate(
true);
113 response = connection.scanVertex(partRequest);
114 if (!response.getResult().failed_parts.isEmpty()
115 && response.getResult().failed_parts.get(0).code
116 == ErrorCode.E_LEADER_CHANGED) {
117 pool.release(leader, connection);
118 HostAddr newLeader = response.getResult().failed_parts.get(0).leader;
119 HostAddr availableLeader = storageAddressMapping
120 .getOrDefault(newLeader, newLeader);
121 leader =
new HostAddress(availableLeader.host, availableLeader.getPort());
122 connection = pool.getStorageConnection(leader);
123 response = connection.scanVertex(partRequest);
125 }
catch (Exception e) {
126 LOGGER.error(String.format(
"Scan vertex failed for %s", e.getMessage()), e);
129 countDownLatch.countDown();
132 pool.release(leader, connection);
135 if (response ==
null) {
136 handleNullResponse(partInfo, exceptions);
137 countDownLatch.countDown();
141 if (isSuccessful(response)) {
142 handleSucceedResult(existSuccess, response, partInfo);
143 results.add(response.getProps());
146 if (response.getResult() !=
null) {
147 handleFailedResult(response, partInfo, exceptions);
149 handleNullResult(partInfo, exceptions);
152 countDownLatch.countDown();
157 countDownLatch.await();
158 threadPool.shutdown();
159 }
catch (InterruptedException interruptedE) {
160 LOGGER.error(
"scan interrupted:", interruptedE);
165 hasNext = partScanQueue.size() > 0;
167 if (existSuccess.get() == 0) {
168 throwExceptions(exceptions);
174 hasNext = partScanQueue.size() > 0 && exceptions.isEmpty();
176 if (!exceptions.isEmpty()) {
177 throwExceptions(exceptions);
179 boolean success = (existSuccess.get() == addresses.size());
180 List<DataSet> finalResults = success ? results :
null;
189 public static class ScanVertexResultBuilder {
193 Set<PartScanInfo> partScanInfoList;
194 List<HostAddress> addresses;
195 ScanVertexRequest request;
198 boolean partSuccess =
false;
201 String password =
null;
203 Map<String, String> storageAddressMapping =
null;
205 public ScanVertexResultBuilder withMetaClient(
MetaManager metaManager) {
206 this.metaManager = metaManager;
215 public ScanVertexResultBuilder withPartScanInfo(Set<PartScanInfo> partScanInfoList) {
216 this.partScanInfoList = partScanInfoList;
220 public ScanVertexResultBuilder withAddresses(List<HostAddress> addresses) {
221 this.addresses = addresses;
225 public ScanVertexResultBuilder withRequest(ScanVertexRequest request) {
226 this.request = request;
230 public ScanVertexResultBuilder withSpaceName(String spaceName) {
231 this.spaceName = spaceName;
235 public ScanVertexResultBuilder withTagName(String tagName) {
236 this.tagName = tagName;
240 public ScanVertexResultBuilder withPartSuccess(
boolean partSuccess) {
241 this.partSuccess = partSuccess;
245 public ScanVertexResultBuilder withUser(String user) {
250 public ScanVertexResultBuilder withPassword(String password) {
251 this.password = password;
255 public ScanVertexResultBuilder withStorageAddressMapping(
256 Map<String, String> storageAddressMapping) {
257 this.storageAddressMapping = storageAddressMapping;
261 public ScanVertexResultIterator build() {
262 return new ScanVertexResultIterator(
273 storageAddressMapping);
synchronized PartScanInfo getPart(HostAddress leader)
get part according to leader
synchronized void dropPart(PartScanInfo partScanInfo)
delete part from set
ScanVertexResult's iterator.
ScanVertexResult next()
get the next vertex set
PART_SUCCESS
part of parts succeed
ALL_SUCCESS
all parts succeed