NebulaGraph Java Client  release-3.6
All Classes Functions Variables
ScanResultIterator.java
1 /* Copyright (c) 2020 vesoft inc. All rights reserved.
2  *
3  * This source code is licensed under Apache 2.0 License.
4  */
5 
6 package com.vesoft.nebula.client.storage.scan;
7 
8 import com.vesoft.nebula.ErrorCode;
9 import com.vesoft.nebula.HostAddr;
10 import com.vesoft.nebula.client.graph.data.HostAddress;
11 import com.vesoft.nebula.client.meta.MetaManager;
12 import com.vesoft.nebula.client.meta.exception.ExecuteFailedException;
13 import com.vesoft.nebula.client.storage.StorageConnPool;
14 import com.vesoft.nebula.storage.PartitionResult;
15 import com.vesoft.nebula.storage.ScanResponse;
16 import java.io.Serializable;
17 import java.util.HashMap;
18 import java.util.List;
19 import java.util.Map;
20 import java.util.concurrent.ExecutorService;
21 import java.util.concurrent.Executors;
22 import java.util.concurrent.atomic.AtomicInteger;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
25 
26 public class ScanResultIterator implements Serializable {
27  private static final Logger LOGGER = LoggerFactory.getLogger(ScanResultIterator.class);
28 
29  protected boolean hasNext = true;
30 
31  protected final Map<Integer, byte[]> partCursor;
32 
33  protected final MetaManager metaManager;
34  protected final StorageConnPool pool;
35  protected final PartScanQueue partScanQueue;
36  protected final List<HostAddress> addresses;
37  protected final String spaceName;
38  protected final String labelName;
39  protected final boolean partSuccess;
40 
41  protected ScanResultIterator(MetaManager metaManager, StorageConnPool pool,
42  PartScanQueue partScanQueue, List<HostAddress> addresses,
43  String spaceName, String labelName, boolean partSuccess) {
44  this.metaManager = metaManager;
45  this.pool = pool;
46  this.partScanQueue = partScanQueue;
47  this.addresses = addresses;
48  this.spaceName = spaceName;
49  this.labelName = labelName;
50  this.partSuccess = partSuccess;
51  this.partCursor = new HashMap<>(partScanQueue.size());
52  }
53 
54 
60  public boolean hasNext() {
61  return hasNext;
62  }
63 
64 
72  protected void freshLeader(String spaceName, int part, HostAddr leader) {
73  metaManager.updateLeader(spaceName, part, leader);
74  }
75 
76  protected HostAddress getLeader(HostAddr leader) {
77  return new HostAddress(leader.getHost(), leader.getPort());
78  }
79 
80  protected void handleNullResponse(PartScanInfo partInfo, List<Exception> exceptions) {
81  LOGGER.error("part scan failed, response is null");
82  partScanQueue.dropPart(partInfo);
83  exceptions.add(new Exception("null scan response"));
84  }
85 
86  protected void handleNullResult(PartScanInfo partInfo, List<Exception> exceptions) {
87  LOGGER.error("part scan failed, response result is null");
88  partScanQueue.dropPart(partInfo);
89  exceptions.add(new Exception("null scan response result"));
90  }
91 
92  protected void throwExceptions(List<Exception> exceptions) throws ExecuteFailedException {
93  StringBuilder errorMsg = new StringBuilder();
94  for (int i = 0; i < exceptions.size(); i++) {
95  if (i != 0) {
96  errorMsg.append(",");
97  }
98  errorMsg.append(exceptions.get(i).getMessage());
99  }
100  throw new ExecuteFailedException("no parts succeed, error message: " + errorMsg.toString());
101  }
102 
103  protected boolean isSuccessful(ScanResponse response) {
104  return response != null && response.result.failed_parts.size() <= 0;
105  }
106 
107  protected void handleSucceedResult(AtomicInteger existSuccess, ScanResponse response,
108  PartScanInfo partInfo) {
109  existSuccess.addAndGet(1);
110  if (response.getCursors().get(partInfo.getPart()).next_cursor == null) {
111  partScanQueue.dropPart(partInfo);
112  } else {
113  partInfo.setCursor(response.getCursors().get(partInfo.getPart()));
114  }
115  }
116 
117  protected void handleFailedResult(ScanResponse response, PartScanInfo partInfo,
118  List<Exception> exceptions) {
119  for (PartitionResult partResult : response.getResult().getFailed_parts()) {
120  if (partResult.code == ErrorCode.E_LEADER_CHANGED) {
121  freshLeader(spaceName, partInfo.getPart(), partResult.getLeader());
122  partInfo.setLeader(getLeader(partResult.getLeader()));
123  } else {
124  int code = partResult.getCode().getValue();
125  LOGGER.error(String.format("part scan failed, error code=%d", code));
126  partScanQueue.dropPart(partInfo);
127  exceptions.add(new Exception(String.format("part scan, error code=%d", code)));
128  }
129  }
130  }
131 }
MetaManager is a manager for meta info, such as spaces,tags and edges.
void updateLeader(String spaceName, int part, HostAddr newLeader)
cache new leader for part
synchronized void dropPart(PartScanInfo partScanInfo)
delete part from set
void freshLeader(String spaceName, int part, HostAddr leader)
fresh leader for part