NebulaGraph Java Client  release-3.6
All Classes Functions Variables
MetaClient.java
1 /* Copyright (c) 2020 vesoft inc. All rights reserved.
2  *
3  * This source code is licensed under Apache 2.0 License.
4  */
5 
6 package com.vesoft.nebula.client.meta;
7 
8 import com.facebook.thrift.TException;
9 import com.facebook.thrift.protocol.TCompactProtocol;
10 import com.facebook.thrift.protocol.THeaderProtocol;
11 import com.facebook.thrift.transport.THeaderTransport;
12 import com.facebook.thrift.transport.TSocket;
13 import com.facebook.thrift.transport.TTransportException;
14 import com.google.common.base.Charsets;
15 import com.vesoft.nebula.ErrorCode;
16 import com.vesoft.nebula.HostAddr;
17 import com.vesoft.nebula.client.graph.data.CASignedSSLParam;
18 import com.vesoft.nebula.client.graph.data.HostAddress;
19 import com.vesoft.nebula.client.graph.data.SSLParam;
20 import com.vesoft.nebula.client.graph.data.SelfSignedSSLParam;
21 import com.vesoft.nebula.client.graph.exception.ClientServerIncompatibleException;
22 import com.vesoft.nebula.client.graph.exception.IOErrorException;
23 import com.vesoft.nebula.client.meta.exception.ExecuteFailedException;
24 import com.vesoft.nebula.meta.EdgeItem;
25 import com.vesoft.nebula.meta.GetEdgeReq;
26 import com.vesoft.nebula.meta.GetEdgeResp;
27 import com.vesoft.nebula.meta.GetPartsAllocReq;
28 import com.vesoft.nebula.meta.GetPartsAllocResp;
29 import com.vesoft.nebula.meta.GetSpaceReq;
30 import com.vesoft.nebula.meta.GetSpaceResp;
31 import com.vesoft.nebula.meta.GetTagReq;
32 import com.vesoft.nebula.meta.GetTagResp;
33 import com.vesoft.nebula.meta.HostItem;
34 import com.vesoft.nebula.meta.HostStatus;
35 import com.vesoft.nebula.meta.IdName;
36 import com.vesoft.nebula.meta.ListEdgesReq;
37 import com.vesoft.nebula.meta.ListEdgesResp;
38 import com.vesoft.nebula.meta.ListHostType;
39 import com.vesoft.nebula.meta.ListHostsReq;
40 import com.vesoft.nebula.meta.ListHostsResp;
41 import com.vesoft.nebula.meta.ListSpacesReq;
42 import com.vesoft.nebula.meta.ListSpacesResp;
43 import com.vesoft.nebula.meta.ListTagsReq;
44 import com.vesoft.nebula.meta.ListTagsResp;
45 import com.vesoft.nebula.meta.MetaService;
46 import com.vesoft.nebula.meta.Schema;
47 import com.vesoft.nebula.meta.SpaceItem;
48 import com.vesoft.nebula.meta.TagItem;
49 import com.vesoft.nebula.meta.VerifyClientVersionReq;
50 import com.vesoft.nebula.meta.VerifyClientVersionResp;
51 import com.vesoft.nebula.util.SslUtil;
52 import java.io.IOException;
53 import java.io.Serializable;
54 import java.net.UnknownHostException;
55 import java.util.Arrays;
56 import java.util.HashSet;
57 import java.util.List;
58 import java.util.Map;
59 import java.util.Random;
60 import java.util.Set;
61 import javax.net.ssl.SSLSocketFactory;
62 import org.slf4j.Logger;
63 import org.slf4j.LoggerFactory;
64 
65 public class MetaClient extends AbstractMetaClient {
66 
67  private static final Logger LOGGER = LoggerFactory.getLogger(MetaClient.class);
68 
69  public static final int LATEST_SCHEMA_VERSION = -1;
70 
71  private static final int DEFAULT_TIMEOUT_MS = 1000;
72  private static final int DEFAULT_CONNECTION_RETRY_SIZE = 3;
73  private static final int DEFAULT_EXECUTION_RETRY_SIZE = 3;
74  private static final int RETRY_TIMES = 1;
75 
76  private boolean enableSSL = false;
77  private SSLParam sslParam = null;
78 
79  private MetaService.Client client;
80  private final List<HostAddress> addresses;
81 
82  public MetaClient(String host, int port) throws UnknownHostException {
83  this(new HostAddress(host, port));
84  }
85 
86  public MetaClient(HostAddress address) throws UnknownHostException {
87  this(Arrays.asList(address), DEFAULT_CONNECTION_RETRY_SIZE, DEFAULT_EXECUTION_RETRY_SIZE);
88  }
89 
90  public MetaClient(List<HostAddress> addresses) throws UnknownHostException {
91  this(addresses, DEFAULT_CONNECTION_RETRY_SIZE, DEFAULT_EXECUTION_RETRY_SIZE);
92  }
93 
94  public MetaClient(List<HostAddress> addresses, int connectionRetry, int executionRetry)
95  throws UnknownHostException {
96  this(addresses, DEFAULT_TIMEOUT_MS, connectionRetry, executionRetry);
97  }
98 
99  public MetaClient(List<HostAddress> addresses, int timeout, int connectionRetry,
100  int executionRetry) throws UnknownHostException {
101  super(addresses, timeout, connectionRetry, executionRetry);
102  this.addresses = addresses;
103  }
104 
105  public MetaClient(List<HostAddress> addresses, int timeout, int connectionRetry,
106  int executionRetry, boolean enableSSL, SSLParam sslParam)
107  throws UnknownHostException {
108  super(addresses, timeout, connectionRetry, executionRetry);
109  this.addresses = addresses;
110  this.enableSSL = enableSSL;
111  this.sslParam = sslParam;
112  if (enableSSL && sslParam == null) {
113  throw new IllegalArgumentException("SSL is enabled, but SSLParam is null.");
114  }
115  }
116 
117  public void connect()
118  throws TException, ClientServerIncompatibleException {
119  doConnect();
120  }
121 
125  private void doConnect()
126  throws TTransportException, ClientServerIncompatibleException {
127  Random random = new Random(System.currentTimeMillis());
128  int position = random.nextInt(addresses.size());
129  HostAddress address = addresses.get(position);
130  getClient(address.getHost(), address.getPort());
131  }
132 
133  private void getClient(String host, int port)
134  throws TTransportException, ClientServerIncompatibleException {
135  if (enableSSL) {
136  SSLSocketFactory sslSocketFactory;
137  if (sslParam.getSignMode() == SSLParam.SignMode.CA_SIGNED) {
138  sslSocketFactory = SslUtil.getSSLSocketFactoryWithCA((CASignedSSLParam) sslParam);
139  } else {
140  sslSocketFactory =
141  SslUtil.getSSLSocketFactoryWithoutCA((SelfSignedSSLParam) sslParam);
142  }
143  try {
144  transport = new THeaderTransport(
145  new TSocket(sslSocketFactory.createSocket(host, port), timeout, timeout));
146  } catch (IOException e) {
147  throw new TTransportException(IOErrorException.E_UNKNOWN, e);
148  }
149  } else {
150  transport = new THeaderTransport(new TSocket(host, port, timeout, timeout));
151  transport.open();
152  }
153 
154  protocol = new THeaderProtocol(transport);
155  client = new MetaService.Client(protocol);
156 
157  // check if client version matches server version
158  VerifyClientVersionResp resp = client
159  .verifyClientVersion(new VerifyClientVersionReq());
160  if (resp.getCode() != ErrorCode.SUCCEEDED) {
161  client.getInputProtocol().getTransport().close();
162  if (resp.getError_msg() == null) {
164  new String("Error code: ")
165  + String.valueOf(resp.getCode().getValue()));
166  }
168  new String(resp.getError_msg())
169  + String.valueOf(resp.getCode().getValue()));
170  }
171  }
172 
173  private void freshClient(HostAddr leader) throws TTransportException {
174  close();
175  try {
176  if (leader.getHost() == null || "".equals(leader.getHost())) {
177  doConnect();
178  } else {
179  getClient(leader.getHost(), leader.getPort());
180  }
182  LOGGER.error(e.getMessage());
183  }
184  }
185 
189  public void close() {
190  if (transport != null && transport.isOpen()) {
191  transport.close();
192  }
193  }
194 
200  public synchronized List<IdName> getSpaces() throws TException, ExecuteFailedException {
201  int retry = RETRY_TIMES;
202  ListSpacesReq request = new ListSpacesReq();
203  ListSpacesResp response = null;
204  try {
205  while (retry-- >= 0) {
206  response = client.listSpaces(request);
207  if (response.getCode() == ErrorCode.E_LEADER_CHANGED) {
208  freshClient(response.getLeader());
209  } else {
210  break;
211  }
212  }
213  } catch (TException e) {
214  LOGGER.error(String.format("List Spaces Error: %s", e.getMessage()));
215  throw e;
216  }
217  if (response.getCode() == ErrorCode.SUCCEEDED) {
218  return response.getSpaces();
219  } else {
220  LOGGER.error("Get Spaces execute failed, errorCode: " + response.getCode());
221  throw new ExecuteFailedException(
222  "Get Spaces execute failed, errorCode: " + response.getCode());
223  }
224  }
225 
232  public synchronized SpaceItem getSpace(String spaceName) throws TException,
234  int retry = RETRY_TIMES;
235  GetSpaceReq request = new GetSpaceReq();
236  request.setSpace_name(spaceName.getBytes());
237  GetSpaceResp response = null;
238  try {
239  while (retry-- >= 0) {
240  response = client.getSpace(request);
241  if (response.getCode() == ErrorCode.E_LEADER_CHANGED) {
242  freshClient(response.getLeader());
243  } else {
244  break;
245  }
246  }
247  } catch (TException e) {
248  LOGGER.error(String.format("Get Space Error: %s", e.getMessage()));
249  throw e;
250  }
251  if (response.getCode() == ErrorCode.SUCCEEDED) {
252  return response.getItem();
253  } else {
254  LOGGER.error("Get Space execute failed, errorCode: " + response.getCode());
255  throw new ExecuteFailedException(
256  "Get Space execute failed, errorCode: " + response.getCode());
257  }
258  }
259 
266  public synchronized List<TagItem> getTags(String spaceName)
267  throws TException, ExecuteFailedException {
268  int retry = RETRY_TIMES;
269 
270  int spaceID = getSpace(spaceName).space_id;
271  ListTagsReq request = new ListTagsReq(spaceID);
272  ListTagsResp response = null;
273  try {
274  while (retry-- >= 0) {
275  response = client.listTags(request);
276  if (response.getCode() == ErrorCode.E_LEADER_CHANGED) {
277  freshClient(response.getLeader());
278  } else {
279  break;
280  }
281  }
282  } catch (TException e) {
283  LOGGER.error(String.format("Get Tag Error: %s", e.getMessage()));
284  throw e;
285  }
286  if (response.getCode() == ErrorCode.SUCCEEDED) {
287  return response.getTags();
288  } else {
289  LOGGER.error("Get tags execute failed, errorCode: " + response.getCode());
290  throw new ExecuteFailedException(
291  "Get Tags execute failed, errorCode: " + response.getCode());
292  }
293  }
294 
295 
303  public synchronized Schema getTag(String spaceName, String tagName)
304  throws TException, ExecuteFailedException {
305  int retry = RETRY_TIMES;
306  GetTagReq request = new GetTagReq();
307  int spaceID = getSpace(spaceName).getSpace_id();
308  request.setSpace_id(spaceID);
309  request.setTag_name(tagName.getBytes());
310  request.setVersion(LATEST_SCHEMA_VERSION);
311  GetTagResp response = null;
312 
313  try {
314  while (retry-- >= 0) {
315  response = client.getTag(request);
316  if (response.getCode() == ErrorCode.E_LEADER_CHANGED) {
317  freshClient(response.getLeader());
318  } else {
319  break;
320  }
321  }
322  } catch (TException e) {
323  LOGGER.error(String.format("Get Tag Error: %s", e.getMessage()));
324  throw e;
325  }
326  if (response.getCode() == ErrorCode.SUCCEEDED) {
327  return response.getSchema();
328  } else {
329  LOGGER.error("Get tag execute failed, errorCode: " + response.getCode());
330  throw new ExecuteFailedException(
331  "Get tag execute failed, errorCode: " + response.getCode());
332  }
333  }
334 
335 
342  public synchronized List<EdgeItem> getEdges(String spaceName)
343  throws TException, ExecuteFailedException {
344  int retry = RETRY_TIMES;
345  int spaceID = getSpace(spaceName).getSpace_id();
346  ListEdgesReq request = new ListEdgesReq(spaceID);
347  ListEdgesResp response = null;
348  try {
349  while (retry-- >= 0) {
350  response = client.listEdges(request);
351  if (response.getCode() == ErrorCode.E_LEADER_CHANGED) {
352  freshClient(response.getLeader());
353  } else {
354  break;
355  }
356  }
357  } catch (TException e) {
358  LOGGER.error(String.format("Get Edge Error: %s", e.getMessage()));
359  throw e;
360  }
361  if (response.getCode() == ErrorCode.SUCCEEDED) {
362  return response.getEdges();
363  } else {
364  LOGGER.error("Get edges execute failed: errorCode: " + response.getCode());
365  throw new ExecuteFailedException(
366  "Get execute edges failed, errorCode: " + response.getCode());
367  }
368  }
369 
377  public synchronized Schema getEdge(String spaceName, String edgeName)
378  throws TException, ExecuteFailedException {
379  int retry = RETRY_TIMES;
380  GetEdgeReq request = new GetEdgeReq();
381  int spaceID = getSpace(spaceName).getSpace_id();
382  request.setSpace_id(spaceID);
383  request.setEdge_name(edgeName.getBytes());
384  request.setVersion(LATEST_SCHEMA_VERSION);
385  GetEdgeResp response = null;
386 
387  try {
388  while (retry-- >= 0) {
389  response = client.getEdge(request);
390  if (response.getCode() == ErrorCode.E_LEADER_CHANGED) {
391  freshClient(response.getLeader());
392  } else {
393  break;
394  }
395  }
396  } catch (TException e) {
397  LOGGER.error(String.format("Get Edge Error: %s", e.getMessage()));
398  throw e;
399  }
400  if (response.getCode() == ErrorCode.SUCCEEDED) {
401  return response.getSchema();
402  } else {
403  LOGGER.error("Get Edge execute failed, errorCode: " + response.getCode());
404  throw new ExecuteFailedException(
405  "Get Edge execute failed, errorCode: " + response.getCode());
406  }
407  }
408 
409 
417  public synchronized Map<Integer, List<HostAddr>> getPartsAlloc(String spaceName)
418  throws ExecuteFailedException, TException {
419  int retry = RETRY_TIMES;
420  GetPartsAllocReq request = new GetPartsAllocReq();
421  int spaceID = getSpace(spaceName).getSpace_id();
422  request.setSpace_id(spaceID);
423 
424  GetPartsAllocResp response = null;
425  try {
426  while (retry-- >= 0) {
427  response = client.getPartsAlloc(request);
428  if (response.getCode() == ErrorCode.E_LEADER_CHANGED) {
429  freshClient(response.getLeader());
430  } else {
431  break;
432  }
433  }
434  } catch (TException e) {
435  LOGGER.error(String.format("Get Parts Error: %s", e.getMessage()));
436  throw e;
437  }
438  if (response.getCode() == ErrorCode.SUCCEEDED) {
439  return response.getParts();
440  } else {
441  LOGGER.error("Get Parts execute failed, errorCode" + response.getCode());
442  throw new ExecuteFailedException(
443  "Get Parts execute failed, errorCode" + response.getCode());
444  }
445  }
446 
450  public synchronized Set<HostAddr> listHosts() {
451  int retry = RETRY_TIMES;
452  ListHostsReq request = new ListHostsReq();
453  request.setType(ListHostType.STORAGE);
454  ListHostsResp resp = null;
455  try {
456  while (retry-- >= 0) {
457  resp = client.listHosts(request);
458  if (resp.getCode() == ErrorCode.E_LEADER_CHANGED) {
459  freshClient(resp.getLeader());
460  } else {
461  break;
462  }
463  }
464  } catch (TException e) {
465  LOGGER.error("listHosts error", e);
466  return null;
467  }
468  if (resp.getCode() != ErrorCode.SUCCEEDED) {
469  LOGGER.error("listHosts execute failed, errorCode: " + resp.getCode());
470  return null;
471  }
472  Set<HostAddr> hostAddrs = new HashSet<>();
473  for (HostItem hostItem : resp.hosts) {
474  if (hostItem.getStatus().getValue() == HostStatus.ONLINE.getValue()) {
475  hostAddrs.add(hostItem.getHostAddr());
476  }
477  }
478  return hostAddrs;
479  }
480 }
synchronized Schema getTag(String spaceName, String tagName)
get schema of specific tag
synchronized SpaceItem getSpace(String spaceName)
get one space
synchronized Set< HostAddr > listHosts()
get all Storaged servers
synchronized List< IdName > getSpaces()
get all spaces
synchronized List< TagItem > getTags(String spaceName)
get all tags of spaceName
synchronized Schema getEdge(String spaceName, String edgeName)
get schema of specific edgeRow
synchronized Map< Integer, List< HostAddr > > getPartsAlloc(String spaceName)
Get all parts and the address in a space Store in this.parts.
synchronized List< EdgeItem > getEdges(String spaceName)
get all edges of specific space