6 package com.vesoft.nebula.client.storage.scan;
8 import com.vesoft.nebula.ErrorCode;
9 import com.vesoft.nebula.HostAddr;
10 import com.vesoft.nebula.client.graph.data.HostAddress;
11 import com.vesoft.nebula.client.meta.MetaManager;
12 import com.vesoft.nebula.client.meta.exception.ExecuteFailedException;
13 import com.vesoft.nebula.client.storage.StorageConnPool;
14 import com.vesoft.nebula.storage.PartitionResult;
15 import com.vesoft.nebula.storage.ScanResponse;
16 import java.io.Serializable;
17 import java.util.HashMap;
18 import java.util.List;
20 import java.util.concurrent.ExecutorService;
21 import java.util.concurrent.Executors;
22 import java.util.concurrent.atomic.AtomicInteger;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
27 private static final Logger LOGGER = LoggerFactory.getLogger(
ScanResultIterator.class);
29 protected boolean hasNext =
true;
31 protected final Map<Integer, byte[]> partCursor;
36 protected final List<HostAddress> addresses;
37 protected final String spaceName;
38 protected final String labelName;
39 protected final boolean partSuccess;
43 String spaceName, String labelName,
boolean partSuccess) {
44 this.metaManager = metaManager;
46 this.partScanQueue = partScanQueue;
47 this.addresses = addresses;
48 this.spaceName = spaceName;
49 this.labelName = labelName;
50 this.partSuccess = partSuccess;
51 this.partCursor =
new HashMap<>(partScanQueue.size());
72 protected void freshLeader(String spaceName,
int part, HostAddr leader) {
77 return new HostAddress(leader.getHost(), leader.getPort());
80 protected void handleNullResponse(
PartScanInfo partInfo, List<Exception> exceptions) {
81 LOGGER.error(
"part scan failed, response is null");
83 exceptions.add(
new Exception(
"null scan response"));
86 protected void handleNullResult(PartScanInfo partInfo, List<Exception> exceptions) {
87 LOGGER.error(
"part scan failed, response result is null");
89 exceptions.add(
new Exception(
"null scan response result"));
92 protected void throwExceptions(List<Exception> exceptions)
throws ExecuteFailedException {
93 StringBuilder errorMsg =
new StringBuilder();
94 for (
int i = 0; i < exceptions.size(); i++) {
98 errorMsg.append(exceptions.get(i).getMessage());
100 throw new ExecuteFailedException(
"no parts succeed, error message: " + errorMsg.toString());
103 protected boolean isSuccessful(ScanResponse response) {
104 return response !=
null && response.result.failed_parts.size() <= 0;
107 protected void handleSucceedResult(AtomicInteger existSuccess, ScanResponse response,
108 PartScanInfo partInfo) {
109 existSuccess.addAndGet(1);
110 if (response.getCursors().get(partInfo.getPart()).next_cursor ==
null) {
113 partInfo.setCursor(response.getCursors().get(partInfo.getPart()));
117 protected void handleFailedResult(ScanResponse response, PartScanInfo partInfo,
118 List<Exception> exceptions) {
119 for (PartitionResult partResult : response.getResult().getFailed_parts()) {
120 if (partResult.code == ErrorCode.E_LEADER_CHANGED) {
121 freshLeader(spaceName, partInfo.getPart(), partResult.getLeader());
122 partInfo.setLeader(getLeader(partResult.getLeader()));
124 int code = partResult.getCode().getValue();
125 LOGGER.error(String.format(
"part scan failed, error code=%d", code));
127 exceptions.add(
new Exception(String.format(
"part scan, error code=%d", code)));
synchronized void dropPart(PartScanInfo partScanInfo)
delete part from set
boolean hasNext()
if iter has more vertex data
void freshLeader(String spaceName, int part, HostAddr leader)
fresh leader for part