6 package com.vesoft.nebula.client.storage.scan;
8 import com.vesoft.nebula.ErrorCode;
9 import com.vesoft.nebula.HostAddr;
10 import com.vesoft.nebula.client.graph.data.HostAddress;
11 import com.vesoft.nebula.client.meta.MetaManager;
12 import com.vesoft.nebula.client.meta.exception.ExecuteFailedException;
13 import com.vesoft.nebula.client.storage.StorageConnPool;
14 import com.vesoft.nebula.storage.PartitionResult;
15 import com.vesoft.nebula.storage.ScanResponse;
16 import com.vesoft.nebula.util.NetUtil;
17 import java.io.Serializable;
18 import java.util.HashMap;
19 import java.util.List;
21 import java.util.concurrent.ConcurrentHashMap;
22 import java.util.concurrent.atomic.AtomicInteger;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
27 private static final Logger LOGGER = LoggerFactory.getLogger(
ScanResultIterator.class);
29 protected boolean hasNext =
true;
31 protected final Map<Integer, byte[]> partCursor;
36 protected final List<HostAddress> addresses;
37 protected final String spaceName;
38 protected final String labelName;
39 protected final boolean partSuccess;
41 protected final String user;
42 protected final String password;
44 protected final Map<HostAddr, HostAddr> storageAddressMapping =
new ConcurrentHashMap<>();
49 List<HostAddress> addresses,
55 Map<String, String> storageAddrMapping) {
56 this.metaManager = metaManager;
58 this.partScanQueue = partScanQueue;
59 this.addresses = addresses;
60 this.spaceName = spaceName;
61 this.labelName = labelName;
62 this.partSuccess = partSuccess;
63 this.partCursor =
new HashMap<>(partScanQueue.size());
65 this.password = password;
66 if (storageAddrMapping !=
null && !storageAddrMapping.isEmpty()) {
67 for (Map.Entry<String, String> et : storageAddrMapping.entrySet()) {
68 storageAddressMapping.put(NetUtil.parseHostAddr(et.getKey()),
69 NetUtil.parseHostAddr(et.getValue()));
92 protected void freshLeader(String spaceName,
int part, HostAddr leader) {
97 return new HostAddress(leader.getHost(), leader.getPort());
100 protected void handleNullResponse(
PartScanInfo partInfo, List<Exception> exceptions) {
101 LOGGER.error(
"part scan failed, response is null");
103 exceptions.add(
new Exception(
"null scan response"));
106 protected void handleNullResult(PartScanInfo partInfo, List<Exception> exceptions) {
107 LOGGER.error(
"part scan failed, response result is null");
109 exceptions.add(
new Exception(
"null scan response result"));
112 protected void throwExceptions(List<Exception> exceptions)
throws ExecuteFailedException {
113 StringBuilder errorMsg =
new StringBuilder();
114 for (
int i = 0; i < exceptions.size(); i++) {
116 errorMsg.append(
",");
118 errorMsg.append(exceptions.get(i).getMessage());
120 throw new ExecuteFailedException(
"no parts succeed, error message: " + errorMsg.toString());
123 protected boolean isSuccessful(ScanResponse response) {
124 return response !=
null && response.result.failed_parts.size() <= 0;
127 protected void handleSucceedResult(AtomicInteger existSuccess, ScanResponse response,
128 PartScanInfo partInfo) {
129 existSuccess.addAndGet(1);
130 if (response.getCursors().get(partInfo.getPart()).next_cursor ==
null) {
133 partInfo.setCursor(response.getCursors().get(partInfo.getPart()));
137 protected void handleFailedResult(ScanResponse response, PartScanInfo partInfo,
138 List<Exception> exceptions) {
139 for (PartitionResult partResult : response.getResult().getFailed_parts()) {
140 if (partResult.code == ErrorCode.E_LEADER_CHANGED) {
141 freshLeader(spaceName, partInfo.getPart(), partResult.getLeader());
142 partInfo.setLeader(getLeader(partResult.getLeader()));
144 ErrorCode code = partResult.getCode();
145 LOGGER.error(String.format(
"part scan failed, error code=%s", code));
147 exceptions.add(
new Exception(String.format(
"part scan, error code=%s", code)));
synchronized void dropPart(PartScanInfo partScanInfo)
delete part from set
boolean hasNext()
if iter has more vertex data
void freshLeader(String spaceName, int part, HostAddr leader)
fresh leader for part