NebulaGraph Java Client  release-3.8
ScanEdgeResultIterator.java
1 /* Copyright (c) 2020 vesoft inc. All rights reserved.
2  *
3  * This source code is licensed under Apache 2.0 License.
4  */
5 
6 package com.vesoft.nebula.client.storage.scan;
7 
8 import com.facebook.thrift.TException;
9 import com.google.common.base.Charsets;
10 import com.vesoft.nebula.DataSet;
11 import com.vesoft.nebula.client.graph.data.HostAddress;
12 import com.vesoft.nebula.client.meta.MetaManager;
13 import com.vesoft.nebula.client.storage.GraphStorageConnection;
14 import com.vesoft.nebula.client.storage.StorageConnPool;
15 import com.vesoft.nebula.client.storage.data.ScanStatus;
16 import com.vesoft.nebula.storage.ScanCursor;
17 import com.vesoft.nebula.storage.ScanEdgeRequest;
18 import com.vesoft.nebula.storage.ScanResponse;
19 import java.util.ArrayList;
20 import java.util.Collections;
21 import java.util.HashMap;
22 import java.util.List;
23 import java.util.Map;
24 import java.util.Set;
25 import java.util.concurrent.CountDownLatch;
26 import java.util.concurrent.ExecutorService;
27 import java.util.concurrent.Executors;
28 import java.util.concurrent.atomic.AtomicInteger;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31 
33  private static final Logger LOGGER = LoggerFactory.getLogger(ScanEdgeResultIterator.class);
34 
35  private final ScanEdgeRequest request;
36  private ExecutorService threadPool = null;
37 
38  private ScanEdgeResultIterator(MetaManager metaManager,
39  StorageConnPool pool,
40  Set<PartScanInfo> partScanInfoList,
41  List<HostAddress> addresses,
42  ScanEdgeRequest request,
43  String spaceName,
44  String labelName,
45  boolean partSuccess,
46  String user,
47  String password) {
48  super(metaManager, pool, new PartScanQueue(partScanInfoList), addresses, spaceName,
49  labelName, partSuccess, user, password);
50  this.request = request;
51  }
52 
53 
62  public ScanEdgeResult next() throws Exception {
63  if (!hasNext()) {
64  throw new IllegalAccessException("iterator has no more data");
65  }
66 
67  final List<DataSet> results =
68  Collections.synchronizedList(new ArrayList<>(addresses.size()));
69  List<Exception> exceptions =
70  Collections.synchronizedList(new ArrayList<>(addresses.size()));
71  CountDownLatch countDownLatch = new CountDownLatch(addresses.size());
72  AtomicInteger existSuccess = new AtomicInteger(0);
73 
74  threadPool = Executors.newFixedThreadPool(addresses.size());
75  for (HostAddress addr : addresses) {
76  threadPool.submit(() -> {
77  ScanResponse response;
78  PartScanInfo partInfo = partScanQueue.getPart(addr);
79  // no part need to scan
80  if (partInfo == null) {
81  countDownLatch.countDown();
82  existSuccess.addAndGet(1);
83  return;
84  }
85 
86  GraphStorageConnection connection;
87  try {
88  connection = pool.getStorageConnection(new HostAddress(addr.getHost(),
89  addr.getPort()));
90  } catch (Exception e) {
91  LOGGER.error("get storage client error, ", e);
92  exceptions.add(e);
93  countDownLatch.countDown();
94  return;
95  }
96 
97  Map<Integer, ScanCursor> cursorMap = new HashMap<>();
98  cursorMap.put(partInfo.getPart(), partInfo.getCursor());
99  ScanEdgeRequest partRequest = new ScanEdgeRequest(request);
100  partRequest.setParts(cursorMap);
101  if (user != null && password != null) {
102  partRequest.setUsername(user.getBytes(Charsets.UTF_8));
103  partRequest.setPassword(password.getBytes(Charsets.UTF_8));
104  }
105  partRequest.setNeed_authenticate(true);
106  try {
107  response = connection.scanEdge(partRequest);
108  } catch (TException e) {
109  LOGGER.error(String.format("Scan edgeRow failed for %s", e.getMessage()), e);
110  exceptions.add(e);
111  partScanQueue.dropPart(partInfo);
112  countDownLatch.countDown();
113  return;
114  }
115 
116  if (response == null) {
117  handleNullResponse(partInfo, exceptions);
118  countDownLatch.countDown();
119  return;
120  }
121 
122  if (isSuccessful(response)) {
123  handleSucceedResult(existSuccess, response, partInfo);
124  results.add(response.getProps());
125  }
126 
127  if (response.getResult() != null) {
128  handleFailedResult(response, partInfo, exceptions);
129  } else {
130  handleNullResult(partInfo, exceptions);
131  }
132  pool.release(new HostAddress(addr.getHost(), addr.getPort()), connection);
133  countDownLatch.countDown();
134  });
135 
136  }
137 
138  try {
139  countDownLatch.await();
140  threadPool.shutdown();
141  } catch (InterruptedException interruptedE) {
142  LOGGER.error("scan interrupted:", interruptedE);
143  throw interruptedE;
144  }
145 
146  if (partSuccess) {
147  hasNext = partScanQueue.size() > 0;
148  // no part succeed, throw ExecuteFailedException
149  if (existSuccess.get() == 0) {
150  throwExceptions(exceptions);
151  }
152  ScanStatus status = exceptions.size() > 0 ? ScanStatus.PART_SUCCESS :
154  return new ScanEdgeResult(results, status);
155  } else {
156  hasNext = partScanQueue.size() > 0 && exceptions.isEmpty();
157  // any part failed, throw ExecuteFailedException
158  if (!exceptions.isEmpty()) {
159  throwExceptions(exceptions);
160  }
161  boolean success = (existSuccess.get() == addresses.size());
162  List<DataSet> finalResults = success ? results : null;
163  return new ScanEdgeResult(finalResults, ScanStatus.ALL_SUCCESS);
164  }
165  }
166 
167 
171  public static class ScanEdgeResultBuilder {
172 
173  MetaManager metaManager;
174  StorageConnPool pool;
175  Set<PartScanInfo> partScanInfoList;
176  List<HostAddress> addresses;
177  ScanEdgeRequest request;
178  String spaceName;
179  String edgeName;
180  boolean partSuccess = false;
181  String user = null;
182  String password = null;
183 
184  public ScanEdgeResultBuilder withMetaClient(MetaManager metaManager) {
185  this.metaManager = metaManager;
186  return this;
187  }
188 
189  public ScanEdgeResultBuilder withPool(StorageConnPool pool) {
190  this.pool = pool;
191  return this;
192  }
193 
194  public ScanEdgeResultBuilder withPartScanInfo(Set<PartScanInfo> partScanInfoList) {
195  this.partScanInfoList = partScanInfoList;
196  return this;
197  }
198 
199  public ScanEdgeResultBuilder withAddresses(List<HostAddress> addresses) {
200  this.addresses = addresses;
201  return this;
202  }
203 
204  public ScanEdgeResultBuilder withRequest(ScanEdgeRequest request) {
205  this.request = request;
206  return this;
207  }
208 
209  public ScanEdgeResultBuilder withSpaceName(String spaceName) {
210  this.spaceName = spaceName;
211  return this;
212  }
213 
214  public ScanEdgeResultBuilder withEdgeName(String edgeName) {
215  this.edgeName = edgeName;
216  return this;
217  }
218 
219  public ScanEdgeResultBuilder withPartSuccess(boolean partSuccess) {
220  this.partSuccess = partSuccess;
221  return this;
222  }
223 
224  public ScanEdgeResultBuilder withUser(String user) {
225  this.user = user;
226  return this;
227  }
228 
229  public ScanEdgeResultBuilder withPassword(String password) {
230  this.password = password;
231  return this;
232  }
233 
234  public ScanEdgeResultIterator build() {
235  return new ScanEdgeResultIterator(
236  metaManager,
237  pool,
238  partScanInfoList,
239  addresses,
240  request,
241  spaceName,
242  edgeName,
243  partSuccess,
244  user,
245  password);
246  }
247  }
248 }
MetaManager is a manager for meta info, such as spaces,tags and edges.
synchronized PartScanInfo getPart(HostAddress leader)
get part according to leader
synchronized void dropPart(PartScanInfo partScanInfo)
delete part from set