NebulaGraph Java Client  release-3.8
All Classes Functions Variables
ScanEdgeResultIterator.java
1 /* Copyright (c) 2020 vesoft inc. All rights reserved.
2  *
3  * This source code is licensed under Apache 2.0 License.
4  */
5 
6 package com.vesoft.nebula.client.storage.scan;
7 
8 import com.google.common.base.Charsets;
9 import com.vesoft.nebula.DataSet;
10 import com.vesoft.nebula.ErrorCode;
11 import com.vesoft.nebula.HostAddr;
12 import com.vesoft.nebula.client.graph.data.HostAddress;
13 import com.vesoft.nebula.client.meta.MetaManager;
14 import com.vesoft.nebula.client.storage.GraphStorageConnection;
15 import com.vesoft.nebula.client.storage.StorageConnPool;
16 import com.vesoft.nebula.client.storage.data.ScanStatus;
17 import com.vesoft.nebula.storage.ScanCursor;
18 import com.vesoft.nebula.storage.ScanEdgeRequest;
19 import com.vesoft.nebula.storage.ScanResponse;
20 import java.util.ArrayList;
21 import java.util.Collections;
22 import java.util.HashMap;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.Set;
26 import java.util.concurrent.CountDownLatch;
27 import java.util.concurrent.ExecutorService;
28 import java.util.concurrent.Executors;
29 import java.util.concurrent.atomic.AtomicInteger;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
32 
34  private static final Logger LOGGER = LoggerFactory.getLogger(ScanEdgeResultIterator.class);
35 
36  private final ScanEdgeRequest request;
37  private ExecutorService threadPool = null;
38 
39  private ScanEdgeResultIterator(MetaManager metaManager,
40  StorageConnPool pool,
41  Set<PartScanInfo> partScanInfoList,
42  List<HostAddress> addresses,
43  ScanEdgeRequest request,
44  String spaceName,
45  String labelName,
46  boolean partSuccess,
47  String user,
48  String password,
49  Map<String, String> storageAddressMapping) {
50  super(metaManager, pool, new PartScanQueue(partScanInfoList), addresses, spaceName,
51  labelName, partSuccess, user, password, storageAddressMapping);
52  this.request = request;
53  }
54 
55 
64  public ScanEdgeResult next() throws Exception {
65  if (!hasNext()) {
66  throw new IllegalAccessException("iterator has no more data");
67  }
68 
69  final List<DataSet> results =
70  Collections.synchronizedList(new ArrayList<>(addresses.size()));
71  List<Exception> exceptions =
72  Collections.synchronizedList(new ArrayList<>(addresses.size()));
73  CountDownLatch countDownLatch = new CountDownLatch(addresses.size());
74  AtomicInteger existSuccess = new AtomicInteger(0);
75 
76  threadPool = Executors.newFixedThreadPool(addresses.size());
77  for (HostAddress addr : addresses) {
78  threadPool.submit(() -> {
79  HostAddress leader = addr;
80  ScanResponse response;
81  PartScanInfo partInfo = partScanQueue.getPart(leader);
82  // no part need to scan
83  if (partInfo == null) {
84  countDownLatch.countDown();
85  existSuccess.addAndGet(1);
86  return;
87  }
88 
89  GraphStorageConnection connection;
90  try {
91  connection = pool.getStorageConnection(leader);
92  } catch (Exception e) {
93  LOGGER.error("get storage client error, ", e);
94  exceptions.add(e);
95  countDownLatch.countDown();
96  return;
97  }
98 
99  Map<Integer, ScanCursor> cursorMap = new HashMap<>();
100  cursorMap.put(partInfo.getPart(), partInfo.getCursor());
101  ScanEdgeRequest partRequest = new ScanEdgeRequest(request);
102  partRequest.setParts(cursorMap);
103  if (user != null && password != null) {
104  partRequest.setUsername(user.getBytes(Charsets.UTF_8));
105  partRequest.setPassword(password.getBytes(Charsets.UTF_8));
106  }
107  partRequest.setNeed_authenticate(true);
108  try {
109  response = connection.scanEdge(partRequest);
110  if (!response.getResult().failed_parts.isEmpty()
111  && response.getResult().failed_parts.get(0).code
112  == ErrorCode.E_LEADER_CHANGED) {
113  pool.release(leader, connection);
114  HostAddr newLeader = response.getResult().failed_parts.get(0).leader;
115  HostAddr availableLeader = storageAddressMapping
116  .getOrDefault(newLeader, newLeader);
117  leader = new HostAddress(availableLeader.host, availableLeader.getPort());
118  connection = pool.getStorageConnection(leader);
119  response = connection.scanEdge(partRequest);
120  }
121  } catch (Exception e) {
122  LOGGER.error(String.format("Scan edgeRow failed for %s", e.getMessage()), e);
123  exceptions.add(e);
124  partScanQueue.dropPart(partInfo);
125  countDownLatch.countDown();
126  return;
127  } finally {
128  pool.release(leader, connection);
129  }
130 
131  if (response == null) {
132  handleNullResponse(partInfo, exceptions);
133  countDownLatch.countDown();
134  return;
135  }
136 
137  if (isSuccessful(response)) {
138  handleSucceedResult(existSuccess, response, partInfo);
139  results.add(response.getProps());
140  }
141 
142  if (response.getResult() != null) {
143  handleFailedResult(response, partInfo, exceptions);
144  } else {
145  handleNullResult(partInfo, exceptions);
146  }
147  pool.release(new HostAddress(addr.getHost(), addr.getPort()), connection);
148  countDownLatch.countDown();
149  });
150 
151  }
152 
153  try {
154  countDownLatch.await();
155  threadPool.shutdown();
156  } catch (InterruptedException interruptedE) {
157  LOGGER.error("scan interrupted:", interruptedE);
158  throw interruptedE;
159  }
160 
161  if (partSuccess) {
162  hasNext = partScanQueue.size() > 0;
163  // no part succeed, throw ExecuteFailedException
164  if (existSuccess.get() == 0) {
165  throwExceptions(exceptions);
166  }
167  ScanStatus status = exceptions.size() > 0 ? ScanStatus.PART_SUCCESS :
169  return new ScanEdgeResult(results, status);
170  } else {
171  hasNext = partScanQueue.size() > 0 && exceptions.isEmpty();
172  // any part failed, throw ExecuteFailedException
173  if (!exceptions.isEmpty()) {
174  throwExceptions(exceptions);
175  }
176  boolean success = (existSuccess.get() == addresses.size());
177  List<DataSet> finalResults = success ? results : null;
178  return new ScanEdgeResult(finalResults, ScanStatus.ALL_SUCCESS);
179  }
180  }
181 
182 
186  public static class ScanEdgeResultBuilder {
187 
188  MetaManager metaManager;
189  StorageConnPool pool;
190  Set<PartScanInfo> partScanInfoList;
191  List<HostAddress> addresses;
192  ScanEdgeRequest request;
193  String spaceName;
194  String edgeName;
195  boolean partSuccess = false;
196  String user = null;
197  String password = null;
198  Map<String, String> storageAddressMapping = null;
199 
200  public ScanEdgeResultBuilder withMetaClient(MetaManager metaManager) {
201  this.metaManager = metaManager;
202  return this;
203  }
204 
205  public ScanEdgeResultBuilder withPool(StorageConnPool pool) {
206  this.pool = pool;
207  return this;
208  }
209 
210  public ScanEdgeResultBuilder withPartScanInfo(Set<PartScanInfo> partScanInfoList) {
211  this.partScanInfoList = partScanInfoList;
212  return this;
213  }
214 
215  public ScanEdgeResultBuilder withAddresses(List<HostAddress> addresses) {
216  this.addresses = addresses;
217  return this;
218  }
219 
220  public ScanEdgeResultBuilder withRequest(ScanEdgeRequest request) {
221  this.request = request;
222  return this;
223  }
224 
225  public ScanEdgeResultBuilder withSpaceName(String spaceName) {
226  this.spaceName = spaceName;
227  return this;
228  }
229 
230  public ScanEdgeResultBuilder withEdgeName(String edgeName) {
231  this.edgeName = edgeName;
232  return this;
233  }
234 
235  public ScanEdgeResultBuilder withPartSuccess(boolean partSuccess) {
236  this.partSuccess = partSuccess;
237  return this;
238  }
239 
240  public ScanEdgeResultBuilder withUser(String user) {
241  this.user = user;
242  return this;
243  }
244 
245  public ScanEdgeResultBuilder withPassword(String password) {
246  this.password = password;
247  return this;
248  }
249 
250  public ScanEdgeResultBuilder withStorageAddressMapping(
251  Map<String, String> storageAddressMapping) {
252  this.storageAddressMapping = storageAddressMapping;
253  return this;
254  }
255 
256  public ScanEdgeResultIterator build() {
257  return new ScanEdgeResultIterator(
258  metaManager,
259  pool,
260  partScanInfoList,
261  addresses,
262  request,
263  spaceName,
264  edgeName,
265  partSuccess,
266  user,
267  password,
268  storageAddressMapping);
269  }
270  }
271 }
MetaManager is a manager for meta info, such as spaces,tags and edges.
synchronized PartScanInfo getPart(HostAddress leader)
get part according to leader
synchronized void dropPart(PartScanInfo partScanInfo)
delete part from set