NebulaGraph Java Client  release-3.6
All Classes Functions Variables
ScanEdgeResultIterator.java
1 /* Copyright (c) 2020 vesoft inc. All rights reserved.
2  *
3  * This source code is licensed under Apache 2.0 License.
4  */
5 
6 package com.vesoft.nebula.client.storage.scan;
7 
8 import com.facebook.thrift.TException;
9 import com.vesoft.nebula.DataSet;
10 import com.vesoft.nebula.client.graph.data.HostAddress;
11 import com.vesoft.nebula.client.meta.MetaManager;
12 import com.vesoft.nebula.client.storage.GraphStorageConnection;
13 import com.vesoft.nebula.client.storage.StorageConnPool;
14 import com.vesoft.nebula.client.storage.data.ScanStatus;
15 import com.vesoft.nebula.storage.ScanCursor;
16 import com.vesoft.nebula.storage.ScanEdgeRequest;
17 import com.vesoft.nebula.storage.ScanResponse;
18 import java.util.ArrayList;
19 import java.util.Collections;
20 import java.util.HashMap;
21 import java.util.List;
22 import java.util.Map;
23 import java.util.Set;
24 import java.util.concurrent.CountDownLatch;
25 import java.util.concurrent.ExecutorService;
26 import java.util.concurrent.Executors;
27 import java.util.concurrent.atomic.AtomicInteger;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
30 
32  private static final Logger LOGGER = LoggerFactory.getLogger(ScanEdgeResultIterator.class);
33 
34  private final ScanEdgeRequest request;
35  private ExecutorService threadPool = null;
36 
37  private ScanEdgeResultIterator(MetaManager metaManager,
38  StorageConnPool pool,
39  Set<PartScanInfo> partScanInfoList,
40  List<HostAddress> addresses,
41  ScanEdgeRequest request,
42  String spaceName,
43  String labelName,
44  boolean partSuccess) {
45  super(metaManager, pool, new PartScanQueue(partScanInfoList), addresses, spaceName,
46  labelName, partSuccess);
47  this.request = request;
48  }
49 
50 
59  public ScanEdgeResult next() throws Exception {
60  if (!hasNext()) {
61  throw new IllegalAccessException("iterator has no more data");
62  }
63 
64  final List<DataSet> results =
65  Collections.synchronizedList(new ArrayList<>(addresses.size()));
66  List<Exception> exceptions =
67  Collections.synchronizedList(new ArrayList<>(addresses.size()));
68  CountDownLatch countDownLatch = new CountDownLatch(addresses.size());
69  AtomicInteger existSuccess = new AtomicInteger(0);
70 
71  threadPool = Executors.newFixedThreadPool(addresses.size());
72  for (HostAddress addr : addresses) {
73  threadPool.submit(() -> {
74  ScanResponse response;
75  PartScanInfo partInfo = partScanQueue.getPart(addr);
76  // no part need to scan
77  if (partInfo == null) {
78  countDownLatch.countDown();
79  existSuccess.addAndGet(1);
80  return;
81  }
82 
83  GraphStorageConnection connection;
84  try {
85  connection = pool.getStorageConnection(new HostAddress(addr.getHost(),
86  addr.getPort()));
87  } catch (Exception e) {
88  LOGGER.error("get storage client error, ", e);
89  exceptions.add(e);
90  countDownLatch.countDown();
91  return;
92  }
93 
94  Map<Integer, ScanCursor> cursorMap = new HashMap<>();
95  cursorMap.put(partInfo.getPart(), partInfo.getCursor());
96  ScanEdgeRequest partRequest = new ScanEdgeRequest(request);
97  partRequest.setParts(cursorMap);
98  try {
99  response = connection.scanEdge(partRequest);
100  } catch (TException e) {
101  LOGGER.error(String.format("Scan edgeRow failed for %s", e.getMessage()), e);
102  exceptions.add(e);
103  partScanQueue.dropPart(partInfo);
104  countDownLatch.countDown();
105  return;
106  }
107 
108  if (response == null) {
109  handleNullResponse(partInfo, exceptions);
110  countDownLatch.countDown();
111  return;
112  }
113 
114  if (isSuccessful(response)) {
115  handleSucceedResult(existSuccess, response, partInfo);
116  results.add(response.getProps());
117  }
118 
119  if (response.getResult() != null) {
120  handleFailedResult(response, partInfo, exceptions);
121  } else {
122  handleNullResult(partInfo, exceptions);
123  }
124  pool.release(new HostAddress(addr.getHost(), addr.getPort()), connection);
125  countDownLatch.countDown();
126  });
127 
128  }
129 
130  try {
131  countDownLatch.await();
132  threadPool.shutdown();
133  } catch (InterruptedException interruptedE) {
134  LOGGER.error("scan interrupted:", interruptedE);
135  throw interruptedE;
136  }
137 
138  if (partSuccess) {
139  hasNext = partScanQueue.size() > 0;
140  // no part succeed, throw ExecuteFailedException
141  if (existSuccess.get() == 0) {
142  throwExceptions(exceptions);
143  }
144  ScanStatus status = exceptions.size() > 0 ? ScanStatus.PART_SUCCESS :
146  return new ScanEdgeResult(results, status);
147  } else {
148  hasNext = partScanQueue.size() > 0 && exceptions.isEmpty();
149  // any part failed, throw ExecuteFailedException
150  if (!exceptions.isEmpty()) {
151  throwExceptions(exceptions);
152  }
153  boolean success = (existSuccess.get() == addresses.size());
154  List<DataSet> finalResults = success ? results : null;
155  return new ScanEdgeResult(finalResults, ScanStatus.ALL_SUCCESS);
156  }
157  }
158 
159 
163  public static class ScanEdgeResultBuilder {
164 
165  MetaManager metaManager;
166  StorageConnPool pool;
167  Set<PartScanInfo> partScanInfoList;
168  List<HostAddress> addresses;
169  ScanEdgeRequest request;
170  String spaceName;
171  String edgeName;
172  boolean partSuccess = false;
173 
174  public ScanEdgeResultBuilder withMetaClient(MetaManager metaManager) {
175  this.metaManager = metaManager;
176  return this;
177  }
178 
179  public ScanEdgeResultBuilder withPool(StorageConnPool pool) {
180  this.pool = pool;
181  return this;
182  }
183 
184  public ScanEdgeResultBuilder withPartScanInfo(Set<PartScanInfo> partScanInfoList) {
185  this.partScanInfoList = partScanInfoList;
186  return this;
187  }
188 
189  public ScanEdgeResultBuilder withAddresses(List<HostAddress> addresses) {
190  this.addresses = addresses;
191  return this;
192  }
193 
194  public ScanEdgeResultBuilder withRequest(ScanEdgeRequest request) {
195  this.request = request;
196  return this;
197  }
198 
199  public ScanEdgeResultBuilder withSpaceName(String spaceName) {
200  this.spaceName = spaceName;
201  return this;
202  }
203 
204  public ScanEdgeResultBuilder withEdgeName(String edgeName) {
205  this.edgeName = edgeName;
206  return this;
207  }
208 
209  public ScanEdgeResultBuilder withPartSuccess(boolean partSuccess) {
210  this.partSuccess = partSuccess;
211  return this;
212  }
213 
214 
215  public ScanEdgeResultIterator build() {
216  return new ScanEdgeResultIterator(
217  metaManager,
218  pool,
219  partScanInfoList,
220  addresses,
221  request,
222  spaceName,
223  edgeName,
224  partSuccess);
225  }
226  }
227 }
MetaManager is a manager for meta info, such as spaces,tags and edges.
synchronized PartScanInfo getPart(HostAddress leader)
get part according to leader
synchronized void dropPart(PartScanInfo partScanInfo)
delete part from set