NebulaGraph Java Client  release-3.6
All Classes Functions Variables
ScanVertexResultIterator.java
1 /* Copyright (c) 2020 vesoft inc. All rights reserved.
2  *
3  * This source code is licensed under Apache 2.0 License.
4  */
5 
6 package com.vesoft.nebula.client.storage.scan;
7 
8 import com.facebook.thrift.TException;
9 import com.vesoft.nebula.DataSet;
10 import com.vesoft.nebula.client.graph.data.HostAddress;
11 import com.vesoft.nebula.client.meta.MetaManager;
12 import com.vesoft.nebula.client.storage.GraphStorageConnection;
13 import com.vesoft.nebula.client.storage.StorageConnPool;
14 import com.vesoft.nebula.client.storage.data.ScanStatus;
15 import com.vesoft.nebula.storage.ScanCursor;
16 import com.vesoft.nebula.storage.ScanResponse;
17 import com.vesoft.nebula.storage.ScanVertexRequest;
18 import java.util.ArrayList;
19 import java.util.Collections;
20 import java.util.HashMap;
21 import java.util.List;
22 import java.util.Map;
23 import java.util.Set;
24 import java.util.concurrent.CountDownLatch;
25 import java.util.concurrent.ExecutorService;
26 import java.util.concurrent.Executors;
27 import java.util.concurrent.atomic.AtomicInteger;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
30 
35  private static final Logger LOGGER = LoggerFactory.getLogger(ScanVertexResultIterator.class);
36 
37  private final ScanVertexRequest request;
38  private ExecutorService threadPool = null;
39 
40  private ScanVertexResultIterator(MetaManager metaManager,
41  StorageConnPool pool,
42  Set<PartScanInfo> partScanInfoList,
43  List<HostAddress> addresses,
44  ScanVertexRequest request,
45  String spaceName,
46  String labelName,
47  boolean partSuccess) {
48  super(metaManager, pool, new PartScanQueue(partScanInfoList), addresses, spaceName,
49  labelName, partSuccess);
50  this.request = request;
51  }
52 
53 
62  public ScanVertexResult next() throws Exception {
63  if (!hasNext()) {
64  throw new IllegalAccessException("iterator has no more data");
65  }
66  final List<DataSet> results =
67  Collections.synchronizedList(new ArrayList<>(addresses.size()));
68  List<Exception> exceptions =
69  Collections.synchronizedList(new ArrayList<>(addresses.size()));
70  CountDownLatch countDownLatch = new CountDownLatch(addresses.size());
71  AtomicInteger existSuccess = new AtomicInteger(0);
72 
73  threadPool = Executors.newFixedThreadPool(addresses.size());
74 
75  for (HostAddress addr : addresses) {
76  threadPool.submit(() -> {
77  ScanResponse response;
78  PartScanInfo partInfo = partScanQueue.getPart(addr);
79  // no part need to scan
80  if (partInfo == null) {
81  countDownLatch.countDown();
82  existSuccess.addAndGet(1);
83  return;
84  }
85 
86  GraphStorageConnection connection;
87  try {
88  connection = pool.getStorageConnection(addr);
89  } catch (Exception e) {
90  LOGGER.error("get storage client error, ", e);
91  exceptions.add(e);
92  countDownLatch.countDown();
93  return;
94  }
95 
96  Map<Integer, ScanCursor> cursorMap = new HashMap<>();
97  cursorMap.put(partInfo.getPart(), partInfo.getCursor());
98  ScanVertexRequest partRequest = new ScanVertexRequest(request);
99  partRequest.setParts(cursorMap);
100  try {
101  response = connection.scanVertex(partRequest);
102  } catch (TException e) {
103  LOGGER.error(String.format("Scan vertex failed for %s", e.getMessage()), e);
104  exceptions.add(e);
105  partScanQueue.dropPart(partInfo);
106  countDownLatch.countDown();
107  return;
108  }
109 
110  if (response == null) {
111  handleNullResponse(partInfo, exceptions);
112  countDownLatch.countDown();
113  return;
114  }
115 
116  if (isSuccessful(response)) {
117  handleSucceedResult(existSuccess, response, partInfo);
118  results.add(response.getProps());
119  }
120 
121  if (response.getResult() != null) {
122  handleFailedResult(response, partInfo, exceptions);
123  } else {
124  handleNullResult(partInfo, exceptions);
125  }
126  pool.release(addr, connection);
127  countDownLatch.countDown();
128  });
129  }
130 
131  try {
132  countDownLatch.await();
133  threadPool.shutdown();
134  } catch (InterruptedException interruptedE) {
135  LOGGER.error("scan interrupted:", interruptedE);
136  throw interruptedE;
137  }
138 
139  if (partSuccess) {
140  hasNext = partScanQueue.size() > 0;
141  // no part succeed, throw ExecuteFailedException
142  if (existSuccess.get() == 0) {
143  throwExceptions(exceptions);
144  }
145  ScanStatus status = exceptions.size() > 0 ? ScanStatus.PART_SUCCESS :
147  return new ScanVertexResult(results, status);
148  } else {
149  hasNext = partScanQueue.size() > 0 && exceptions.isEmpty();
150  // any part failed, throw ExecuteFailedException
151  if (!exceptions.isEmpty()) {
152  throwExceptions(exceptions);
153  }
154  boolean success = (existSuccess.get() == addresses.size());
155  List<DataSet> finalResults = success ? results : null;
156  return new ScanVertexResult(finalResults, ScanStatus.ALL_SUCCESS);
157  }
158  }
159 
160 
161 
162 
163 
167  public static class ScanVertexResultBuilder {
168 
169  MetaManager metaManager;
170  StorageConnPool pool;
171  Set<PartScanInfo> partScanInfoList;
172  List<HostAddress> addresses;
173  ScanVertexRequest request;
174  String spaceName;
175  String tagName;
176  boolean partSuccess = false;
177 
178  public ScanVertexResultBuilder withMetaClient(MetaManager metaManager) {
179  this.metaManager = metaManager;
180  return this;
181  }
182 
183  public ScanVertexResultBuilder withPool(StorageConnPool pool) {
184  this.pool = pool;
185  return this;
186  }
187 
188  public ScanVertexResultBuilder withPartScanInfo(Set<PartScanInfo> partScanInfoList) {
189  this.partScanInfoList = partScanInfoList;
190  return this;
191  }
192 
193  public ScanVertexResultBuilder withAddresses(List<HostAddress> addresses) {
194  this.addresses = addresses;
195  return this;
196  }
197 
198  public ScanVertexResultBuilder withRequest(ScanVertexRequest request) {
199  this.request = request;
200  return this;
201  }
202 
203  public ScanVertexResultBuilder withSpaceName(String spaceName) {
204  this.spaceName = spaceName;
205  return this;
206  }
207 
208  public ScanVertexResultBuilder withTagName(String tagName) {
209  this.tagName = tagName;
210  return this;
211  }
212 
213  public ScanVertexResultBuilder withPartSuccess(boolean partSuccess) {
214  this.partSuccess = partSuccess;
215  return this;
216  }
217 
218  public ScanVertexResultIterator build() {
219  return new ScanVertexResultIterator(
220  metaManager,
221  pool,
222  partScanInfoList,
223  addresses,
224  request,
225  spaceName,
226  tagName,
227  partSuccess);
228  }
229  }
230 }
MetaManager is a manager for meta info, such as spaces,tags and edges.
synchronized PartScanInfo getPart(HostAddress leader)
get part according to leader
synchronized void dropPart(PartScanInfo partScanInfo)
delete part from set