NebulaGraph Java Client  release-3.8
ScanResultIterator.java
1 /* Copyright (c) 2020 vesoft inc. All rights reserved.
2  *
3  * This source code is licensed under Apache 2.0 License.
4  */
5 
6 package com.vesoft.nebula.client.storage.scan;
7 
8 import com.vesoft.nebula.ErrorCode;
9 import com.vesoft.nebula.HostAddr;
10 import com.vesoft.nebula.client.graph.data.HostAddress;
11 import com.vesoft.nebula.client.meta.MetaManager;
12 import com.vesoft.nebula.client.meta.exception.ExecuteFailedException;
13 import com.vesoft.nebula.client.storage.StorageConnPool;
14 import com.vesoft.nebula.storage.PartitionResult;
15 import com.vesoft.nebula.storage.ScanResponse;
16 import java.io.Serializable;
17 import java.util.HashMap;
18 import java.util.List;
19 import java.util.Map;
20 import java.util.concurrent.ExecutorService;
21 import java.util.concurrent.Executors;
22 import java.util.concurrent.atomic.AtomicInteger;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
25 
26 public class ScanResultIterator implements Serializable {
27  private static final Logger LOGGER = LoggerFactory.getLogger(ScanResultIterator.class);
28 
29  protected boolean hasNext = true;
30 
31  protected final Map<Integer, byte[]> partCursor;
32 
33  protected final MetaManager metaManager;
34  protected final StorageConnPool pool;
35  protected final PartScanQueue partScanQueue;
36  protected final List<HostAddress> addresses;
37  protected final String spaceName;
38  protected final String labelName;
39  protected final boolean partSuccess;
40 
41  protected final String user;
42  protected final String password;
43 
44  protected ScanResultIterator(MetaManager metaManager, StorageConnPool pool,
45  PartScanQueue partScanQueue, List<HostAddress> addresses,
46  String spaceName, String labelName, boolean partSuccess,
47  String user, String password) {
48  this.metaManager = metaManager;
49  this.pool = pool;
50  this.partScanQueue = partScanQueue;
51  this.addresses = addresses;
52  this.spaceName = spaceName;
53  this.labelName = labelName;
54  this.partSuccess = partSuccess;
55  this.partCursor = new HashMap<>(partScanQueue.size());
56  this.user = user;
57  this.password = password;
58  }
59 
60 
66  public boolean hasNext() {
67  return hasNext;
68  }
69 
70 
78  protected void freshLeader(String spaceName, int part, HostAddr leader) {
79  metaManager.updateLeader(spaceName, part, leader);
80  }
81 
82  protected HostAddress getLeader(HostAddr leader) {
83  return new HostAddress(leader.getHost(), leader.getPort());
84  }
85 
86  protected void handleNullResponse(PartScanInfo partInfo, List<Exception> exceptions) {
87  LOGGER.error("part scan failed, response is null");
88  partScanQueue.dropPart(partInfo);
89  exceptions.add(new Exception("null scan response"));
90  }
91 
92  protected void handleNullResult(PartScanInfo partInfo, List<Exception> exceptions) {
93  LOGGER.error("part scan failed, response result is null");
94  partScanQueue.dropPart(partInfo);
95  exceptions.add(new Exception("null scan response result"));
96  }
97 
98  protected void throwExceptions(List<Exception> exceptions) throws ExecuteFailedException {
99  StringBuilder errorMsg = new StringBuilder();
100  for (int i = 0; i < exceptions.size(); i++) {
101  if (i != 0) {
102  errorMsg.append(",");
103  }
104  errorMsg.append(exceptions.get(i).getMessage());
105  }
106  throw new ExecuteFailedException("no parts succeed, error message: " + errorMsg.toString());
107  }
108 
109  protected boolean isSuccessful(ScanResponse response) {
110  return response != null && response.result.failed_parts.size() <= 0;
111  }
112 
113  protected void handleSucceedResult(AtomicInteger existSuccess, ScanResponse response,
114  PartScanInfo partInfo) {
115  existSuccess.addAndGet(1);
116  if (response.getCursors().get(partInfo.getPart()).next_cursor == null) {
117  partScanQueue.dropPart(partInfo);
118  } else {
119  partInfo.setCursor(response.getCursors().get(partInfo.getPart()));
120  }
121  }
122 
123  protected void handleFailedResult(ScanResponse response, PartScanInfo partInfo,
124  List<Exception> exceptions) {
125  for (PartitionResult partResult : response.getResult().getFailed_parts()) {
126  if (partResult.code == ErrorCode.E_LEADER_CHANGED) {
127  freshLeader(spaceName, partInfo.getPart(), partResult.getLeader());
128  partInfo.setLeader(getLeader(partResult.getLeader()));
129  } else {
130  ErrorCode code = partResult.getCode();
131  LOGGER.error(String.format("part scan failed, error code=%s", code));
132  partScanQueue.dropPart(partInfo);
133  exceptions.add(new Exception(String.format("part scan, error code=%s", code)));
134  }
135  }
136  }
137 }
MetaManager is a manager for meta info, such as spaces,tags and edges.
void updateLeader(String spaceName, int part, HostAddr newLeader)
cache new leader for part
synchronized void dropPart(PartScanInfo partScanInfo)
delete part from set
void freshLeader(String spaceName, int part, HostAddr leader)
fresh leader for part