NebulaGraph Java Client  release-3.8
All Classes Functions Variables
ScanResultIterator.java
1 /* Copyright (c) 2020 vesoft inc. All rights reserved.
2  *
3  * This source code is licensed under Apache 2.0 License.
4  */
5 
6 package com.vesoft.nebula.client.storage.scan;
7 
8 import com.vesoft.nebula.ErrorCode;
9 import com.vesoft.nebula.HostAddr;
10 import com.vesoft.nebula.client.graph.data.HostAddress;
11 import com.vesoft.nebula.client.meta.MetaManager;
12 import com.vesoft.nebula.client.meta.exception.ExecuteFailedException;
13 import com.vesoft.nebula.client.storage.StorageConnPool;
14 import com.vesoft.nebula.storage.PartitionResult;
15 import com.vesoft.nebula.storage.ScanResponse;
16 import com.vesoft.nebula.util.NetUtil;
17 import java.io.Serializable;
18 import java.util.HashMap;
19 import java.util.List;
20 import java.util.Map;
21 import java.util.concurrent.ConcurrentHashMap;
22 import java.util.concurrent.atomic.AtomicInteger;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
25 
26 public class ScanResultIterator implements Serializable {
27  private static final Logger LOGGER = LoggerFactory.getLogger(ScanResultIterator.class);
28 
29  protected boolean hasNext = true;
30 
31  protected final Map<Integer, byte[]> partCursor;
32 
33  protected final MetaManager metaManager;
34  protected final StorageConnPool pool;
35  protected final PartScanQueue partScanQueue;
36  protected final List<HostAddress> addresses;
37  protected final String spaceName;
38  protected final String labelName;
39  protected final boolean partSuccess;
40 
41  protected final String user;
42  protected final String password;
43 
44  protected final Map<HostAddr, HostAddr> storageAddressMapping = new ConcurrentHashMap<>();
45 
46  protected ScanResultIterator(MetaManager metaManager,
47  StorageConnPool pool,
48  PartScanQueue partScanQueue,
49  List<HostAddress> addresses,
50  String spaceName,
51  String labelName,
52  boolean partSuccess,
53  String user,
54  String password,
55  Map<String, String> storageAddrMapping) {
56  this.metaManager = metaManager;
57  this.pool = pool;
58  this.partScanQueue = partScanQueue;
59  this.addresses = addresses;
60  this.spaceName = spaceName;
61  this.labelName = labelName;
62  this.partSuccess = partSuccess;
63  this.partCursor = new HashMap<>(partScanQueue.size());
64  this.user = user;
65  this.password = password;
66  if (storageAddrMapping != null && !storageAddrMapping.isEmpty()) {
67  for (Map.Entry<String, String> et : storageAddrMapping.entrySet()) {
68  storageAddressMapping.put(NetUtil.parseHostAddr(et.getKey()),
69  NetUtil.parseHostAddr(et.getValue()));
70  }
71  }
72  }
73 
74 
80  public boolean hasNext() {
81  return hasNext;
82  }
83 
84 
92  protected void freshLeader(String spaceName, int part, HostAddr leader) {
93  metaManager.updateLeader(spaceName, part, leader);
94  }
95 
96  protected HostAddress getLeader(HostAddr leader) {
97  return new HostAddress(leader.getHost(), leader.getPort());
98  }
99 
100  protected void handleNullResponse(PartScanInfo partInfo, List<Exception> exceptions) {
101  LOGGER.error("part scan failed, response is null");
102  partScanQueue.dropPart(partInfo);
103  exceptions.add(new Exception("null scan response"));
104  }
105 
106  protected void handleNullResult(PartScanInfo partInfo, List<Exception> exceptions) {
107  LOGGER.error("part scan failed, response result is null");
108  partScanQueue.dropPart(partInfo);
109  exceptions.add(new Exception("null scan response result"));
110  }
111 
112  protected void throwExceptions(List<Exception> exceptions) throws ExecuteFailedException {
113  StringBuilder errorMsg = new StringBuilder();
114  for (int i = 0; i < exceptions.size(); i++) {
115  if (i != 0) {
116  errorMsg.append(",");
117  }
118  errorMsg.append(exceptions.get(i).getMessage());
119  }
120  throw new ExecuteFailedException("no parts succeed, error message: " + errorMsg.toString());
121  }
122 
123  protected boolean isSuccessful(ScanResponse response) {
124  return response != null && response.result.failed_parts.size() <= 0;
125  }
126 
127  protected void handleSucceedResult(AtomicInteger existSuccess, ScanResponse response,
128  PartScanInfo partInfo) {
129  existSuccess.addAndGet(1);
130  if (response.getCursors().get(partInfo.getPart()).next_cursor == null) {
131  partScanQueue.dropPart(partInfo);
132  } else {
133  partInfo.setCursor(response.getCursors().get(partInfo.getPart()));
134  }
135  }
136 
137  protected void handleFailedResult(ScanResponse response, PartScanInfo partInfo,
138  List<Exception> exceptions) {
139  for (PartitionResult partResult : response.getResult().getFailed_parts()) {
140  if (partResult.code == ErrorCode.E_LEADER_CHANGED) {
141  freshLeader(spaceName, partInfo.getPart(), partResult.getLeader());
142  partInfo.setLeader(getLeader(partResult.getLeader()));
143  } else {
144  ErrorCode code = partResult.getCode();
145  LOGGER.error(String.format("part scan failed, error code=%s", code));
146  partScanQueue.dropPart(partInfo);
147  exceptions.add(new Exception(String.format("part scan, error code=%s", code)));
148  }
149  }
150  }
151 }
MetaManager is a manager for meta info, such as spaces,tags and edges.
void updateLeader(String spaceName, int part, HostAddr newLeader)
cache new leader for part
synchronized void dropPart(PartScanInfo partScanInfo)
delete part from set
void freshLeader(String spaceName, int part, HostAddr leader)
fresh leader for part