NebulaGraph Java Client  release-3.8
MetaClient.java
1 /* Copyright (c) 2020 vesoft inc. All rights reserved.
2  *
3  * This source code is licensed under Apache 2.0 License.
4  */
5 
6 package com.vesoft.nebula.client.meta;
7 
8 import com.facebook.thrift.TException;
9 import com.facebook.thrift.protocol.THeaderProtocol;
10 import com.facebook.thrift.transport.THeaderTransport;
11 import com.facebook.thrift.transport.TSocket;
12 import com.facebook.thrift.transport.TTransportException;
13 import com.google.common.base.Charsets;
14 import com.vesoft.nebula.ErrorCode;
15 import com.vesoft.nebula.HostAddr;
16 import com.vesoft.nebula.client.graph.data.CASignedSSLParam;
17 import com.vesoft.nebula.client.graph.data.HostAddress;
18 import com.vesoft.nebula.client.graph.data.SSLParam;
19 import com.vesoft.nebula.client.graph.data.SelfSignedSSLParam;
20 import com.vesoft.nebula.client.graph.exception.ClientServerIncompatibleException;
21 import com.vesoft.nebula.client.graph.exception.IOErrorException;
22 import com.vesoft.nebula.client.meta.exception.ExecuteFailedException;
23 import com.vesoft.nebula.meta.EdgeItem;
24 import com.vesoft.nebula.meta.GetEdgeReq;
25 import com.vesoft.nebula.meta.GetEdgeResp;
26 import com.vesoft.nebula.meta.GetPartsAllocReq;
27 import com.vesoft.nebula.meta.GetPartsAllocResp;
28 import com.vesoft.nebula.meta.GetSpaceReq;
29 import com.vesoft.nebula.meta.GetSpaceResp;
30 import com.vesoft.nebula.meta.GetTagReq;
31 import com.vesoft.nebula.meta.GetTagResp;
32 import com.vesoft.nebula.meta.HostItem;
33 import com.vesoft.nebula.meta.HostStatus;
34 import com.vesoft.nebula.meta.IdName;
35 import com.vesoft.nebula.meta.ListEdgesReq;
36 import com.vesoft.nebula.meta.ListEdgesResp;
37 import com.vesoft.nebula.meta.ListHostType;
38 import com.vesoft.nebula.meta.ListHostsReq;
39 import com.vesoft.nebula.meta.ListHostsResp;
40 import com.vesoft.nebula.meta.ListSpacesReq;
41 import com.vesoft.nebula.meta.ListSpacesResp;
42 import com.vesoft.nebula.meta.ListTagsReq;
43 import com.vesoft.nebula.meta.ListTagsResp;
44 import com.vesoft.nebula.meta.MetaService;
45 import com.vesoft.nebula.meta.Schema;
46 import com.vesoft.nebula.meta.SpaceItem;
47 import com.vesoft.nebula.meta.TagItem;
48 import com.vesoft.nebula.meta.VerifyClientVersionReq;
49 import com.vesoft.nebula.meta.VerifyClientVersionResp;
50 import com.vesoft.nebula.util.SslUtil;
51 import java.io.IOException;
52 import java.net.UnknownHostException;
53 import java.util.Arrays;
54 import java.util.HashSet;
55 import java.util.List;
56 import java.util.Map;
57 import java.util.Random;
58 import java.util.Set;
59 import javax.net.ssl.SSLSocketFactory;
60 import org.slf4j.Logger;
61 import org.slf4j.LoggerFactory;
62 
63 public class MetaClient extends AbstractMetaClient {
64 
65  private static final Logger LOGGER = LoggerFactory.getLogger(MetaClient.class);
66 
67  public static final int LATEST_SCHEMA_VERSION = -1;
68 
69  private static final int DEFAULT_TIMEOUT_MS = 1000;
70  private static final int DEFAULT_CONNECTION_RETRY_SIZE = 3;
71  private static final int DEFAULT_EXECUTION_RETRY_SIZE = 3;
72  private static final int RETRY_TIMES = 1;
73 
74  private boolean enableSSL = false;
75  private SSLParam sslParam = null;
76 
77  private MetaService.Client client;
78  private final List<HostAddress> addresses;
79 
80  public MetaClient(String host, int port) throws UnknownHostException {
81  this(new HostAddress(host, port));
82  }
83 
84  public MetaClient(HostAddress address) throws UnknownHostException {
85  this(Arrays.asList(address), DEFAULT_CONNECTION_RETRY_SIZE, DEFAULT_EXECUTION_RETRY_SIZE);
86  }
87 
88  public MetaClient(List<HostAddress> addresses) throws UnknownHostException {
89  this(addresses, DEFAULT_CONNECTION_RETRY_SIZE, DEFAULT_EXECUTION_RETRY_SIZE);
90  }
91 
92  public MetaClient(List<HostAddress> addresses, int connectionRetry, int executionRetry)
93  throws UnknownHostException {
94  this(addresses, DEFAULT_TIMEOUT_MS, connectionRetry, executionRetry);
95  }
96 
97  public MetaClient(List<HostAddress> addresses, int timeout, int connectionRetry,
98  int executionRetry) throws UnknownHostException {
99  super(addresses, timeout, connectionRetry, executionRetry);
100  this.addresses = addresses;
101  }
102 
103  public MetaClient(List<HostAddress> addresses, int timeout, int connectionRetry,
104  int executionRetry, boolean enableSSL, SSLParam sslParam)
105  throws UnknownHostException {
106  super(addresses, timeout, connectionRetry, executionRetry);
107  this.addresses = addresses;
108  this.enableSSL = enableSSL;
109  this.sslParam = sslParam;
110  if (enableSSL && sslParam == null) {
111  throw new IllegalArgumentException("SSL is enabled, but SSLParam is null.");
112  }
113  }
114 
115  public void connect()
116  throws TException, ClientServerIncompatibleException {
117  doConnect();
118  }
119 
123  private void doConnect()
124  throws TTransportException, ClientServerIncompatibleException {
125  Random random = new Random(System.currentTimeMillis());
126  int position = random.nextInt(addresses.size());
127  HostAddress address = addresses.get(position);
128  getClient(address.getHost(), address.getPort());
129  }
130 
131  private void getClient(String host, int port)
132  throws TTransportException, ClientServerIncompatibleException {
133  if (enableSSL) {
134  SSLSocketFactory sslSocketFactory;
135  if (sslParam.getSignMode() == SSLParam.SignMode.CA_SIGNED) {
136  sslSocketFactory = SslUtil.getSSLSocketFactoryWithCA((CASignedSSLParam) sslParam);
137  } else {
138  sslSocketFactory =
139  SslUtil.getSSLSocketFactoryWithoutCA((SelfSignedSSLParam) sslParam);
140  }
141  try {
142  transport = new THeaderTransport(
143  new TSocket(sslSocketFactory.createSocket(host, port), timeout, timeout));
144  } catch (IOException e) {
145  throw new TTransportException(IOErrorException.E_UNKNOWN, e);
146  }
147  } else {
148  transport = new THeaderTransport(new TSocket(host, port, timeout, timeout));
149  transport.open();
150  }
151 
152  protocol = new THeaderProtocol(transport);
153  client = new MetaService.Client(protocol);
154 
155  VerifyClientVersionResp resp = client.verifyClientVersion(new VerifyClientVersionReq());
156  if (resp.getCode() != ErrorCode.SUCCEEDED) {
157  client.getInputProtocol().getTransport().close();
158  if (resp.getError_msg() == null) {
160  new String("Error code: ")
161  + String.valueOf(resp.getCode().getValue()));
162  }
163  throw new ClientServerIncompatibleException(new String(resp.getError_msg()));
164  }
165  }
166 
167  private void freshClient(HostAddr leader) throws TTransportException {
168  close();
169  try {
170  if (leader.getHost() == null || "".equals(leader.getHost())) {
171  doConnect();
172  } else {
173  getClient(leader.getHost(), leader.getPort());
174  }
176  LOGGER.error(e.getMessage());
177  }
178  }
179 
183  public void close() {
184  if (transport != null && transport.isOpen()) {
185  transport.close();
186  }
187  }
188 
194  public synchronized List<IdName> getSpaces() throws TException, ExecuteFailedException {
195  int retry = RETRY_TIMES;
196  ListSpacesReq request = new ListSpacesReq();
197  ListSpacesResp response = null;
198  try {
199  while (retry-- >= 0) {
200  response = client.listSpaces(request);
201  if (response.getCode() == ErrorCode.E_LEADER_CHANGED) {
202  freshClient(response.getLeader());
203  } else {
204  break;
205  }
206  }
207  } catch (TException e) {
208  LOGGER.error(String.format("List Spaces Error: %s", e.getMessage()));
209  throw e;
210  }
211  if (response.getCode() == ErrorCode.SUCCEEDED) {
212  return response.getSpaces();
213  } else {
214  LOGGER.error("Get Spaces execute failed, errorCode: " + response.getCode());
215  throw new ExecuteFailedException(
216  "Get Spaces execute failed, errorCode: " + response.getCode());
217  }
218  }
219 
226  public synchronized SpaceItem getSpace(String spaceName) throws TException,
228  int retry = RETRY_TIMES;
229  GetSpaceReq request = new GetSpaceReq();
230  request.setSpace_name(spaceName.getBytes());
231  GetSpaceResp response = null;
232  try {
233  while (retry-- >= 0) {
234  response = client.getSpace(request);
235  if (response.getCode() == ErrorCode.E_LEADER_CHANGED) {
236  freshClient(response.getLeader());
237  } else {
238  break;
239  }
240  }
241  } catch (TException e) {
242  LOGGER.error(String.format("Get Space Error: %s", e.getMessage()));
243  throw e;
244  }
245  if (response.getCode() == ErrorCode.SUCCEEDED) {
246  return response.getItem();
247  } else {
248  LOGGER.error("Get Space execute failed, errorCode: " + response.getCode());
249  throw new ExecuteFailedException(
250  "Get Space execute failed, errorCode: " + response.getCode());
251  }
252  }
253 
260  public synchronized List<TagItem> getTags(String spaceName)
261  throws TException, ExecuteFailedException {
262  int retry = RETRY_TIMES;
263 
264  int spaceID = getSpace(spaceName).space_id;
265  ListTagsReq request = new ListTagsReq(spaceID);
266  ListTagsResp response = null;
267  try {
268  while (retry-- >= 0) {
269  response = client.listTags(request);
270  if (response.getCode() == ErrorCode.E_LEADER_CHANGED) {
271  freshClient(response.getLeader());
272  } else {
273  break;
274  }
275  }
276  } catch (TException e) {
277  LOGGER.error(String.format("Get Tag Error: %s", e.getMessage()));
278  throw e;
279  }
280  if (response.getCode() == ErrorCode.SUCCEEDED) {
281  return response.getTags();
282  } else {
283  LOGGER.error("Get tags execute failed, errorCode: " + response.getCode());
284  throw new ExecuteFailedException(
285  "Get Tags execute failed, errorCode: " + response.getCode());
286  }
287  }
288 
289 
297  public synchronized Schema getTag(String spaceName, String tagName)
298  throws TException, ExecuteFailedException {
299  int retry = RETRY_TIMES;
300  GetTagReq request = new GetTagReq();
301  int spaceID = getSpace(spaceName).getSpace_id();
302  request.setSpace_id(spaceID);
303  request.setTag_name(tagName.getBytes());
304  request.setVersion(LATEST_SCHEMA_VERSION);
305  GetTagResp response = null;
306 
307  try {
308  while (retry-- >= 0) {
309  response = client.getTag(request);
310  if (response.getCode() == ErrorCode.E_LEADER_CHANGED) {
311  freshClient(response.getLeader());
312  } else {
313  break;
314  }
315  }
316  } catch (TException e) {
317  LOGGER.error(String.format("Get Tag Error: %s", e.getMessage()));
318  throw e;
319  }
320  if (response.getCode() == ErrorCode.SUCCEEDED) {
321  return response.getSchema();
322  } else {
323  LOGGER.error("Get tag execute failed, errorCode: " + response.getCode());
324  throw new ExecuteFailedException(
325  "Get tag execute failed, errorCode: " + response.getCode());
326  }
327  }
328 
329 
336  public synchronized List<EdgeItem> getEdges(String spaceName)
337  throws TException, ExecuteFailedException {
338  int retry = RETRY_TIMES;
339  int spaceID = getSpace(spaceName).getSpace_id();
340  ListEdgesReq request = new ListEdgesReq(spaceID);
341  ListEdgesResp response = null;
342  try {
343  while (retry-- >= 0) {
344  response = client.listEdges(request);
345  if (response.getCode() == ErrorCode.E_LEADER_CHANGED) {
346  freshClient(response.getLeader());
347  } else {
348  break;
349  }
350  }
351  } catch (TException e) {
352  LOGGER.error(String.format("Get Edge Error: %s", e.getMessage()));
353  throw e;
354  }
355  if (response.getCode() == ErrorCode.SUCCEEDED) {
356  return response.getEdges();
357  } else {
358  LOGGER.error("Get edges execute failed: errorCode: " + response.getCode());
359  throw new ExecuteFailedException(
360  "Get execute edges failed, errorCode: " + response.getCode());
361  }
362  }
363 
371  public synchronized Schema getEdge(String spaceName, String edgeName)
372  throws TException, ExecuteFailedException {
373  int retry = RETRY_TIMES;
374  GetEdgeReq request = new GetEdgeReq();
375  int spaceID = getSpace(spaceName).getSpace_id();
376  request.setSpace_id(spaceID);
377  request.setEdge_name(edgeName.getBytes());
378  request.setVersion(LATEST_SCHEMA_VERSION);
379  GetEdgeResp response = null;
380 
381  try {
382  while (retry-- >= 0) {
383  response = client.getEdge(request);
384  if (response.getCode() == ErrorCode.E_LEADER_CHANGED) {
385  freshClient(response.getLeader());
386  } else {
387  break;
388  }
389  }
390  } catch (TException e) {
391  LOGGER.error(String.format("Get Edge Error: %s", e.getMessage()));
392  throw e;
393  }
394  if (response.getCode() == ErrorCode.SUCCEEDED) {
395  return response.getSchema();
396  } else {
397  LOGGER.error("Get Edge execute failed, errorCode: " + response.getCode());
398  throw new ExecuteFailedException(
399  "Get Edge execute failed, errorCode: " + response.getCode());
400  }
401  }
402 
403 
411  public synchronized Map<Integer, List<HostAddr>> getPartsAlloc(String spaceName)
412  throws ExecuteFailedException, TException {
413  int retry = RETRY_TIMES;
414  GetPartsAllocReq request = new GetPartsAllocReq();
415  int spaceID = getSpace(spaceName).getSpace_id();
416  request.setSpace_id(spaceID);
417 
418  GetPartsAllocResp response = null;
419  try {
420  while (retry-- >= 0) {
421  response = client.getPartsAlloc(request);
422  if (response.getCode() == ErrorCode.E_LEADER_CHANGED) {
423  freshClient(response.getLeader());
424  } else {
425  break;
426  }
427  }
428  } catch (TException e) {
429  LOGGER.error(String.format("Get Parts Error: %s", e.getMessage()));
430  throw e;
431  }
432  if (response.getCode() == ErrorCode.SUCCEEDED) {
433  return response.getParts();
434  } else {
435  LOGGER.error("Get Parts execute failed, errorCode" + response.getCode());
436  throw new ExecuteFailedException(
437  "Get Parts execute failed, errorCode" + response.getCode());
438  }
439  }
440 
444  public synchronized Set<HostAddr> listHosts() {
445  int retry = RETRY_TIMES;
446  ListHostsReq request = new ListHostsReq();
447  request.setType(ListHostType.STORAGE);
448  ListHostsResp resp = null;
449  try {
450  while (retry-- >= 0) {
451  resp = client.listHosts(request);
452  if (resp.getCode() == ErrorCode.E_LEADER_CHANGED) {
453  freshClient(resp.getLeader());
454  } else {
455  break;
456  }
457  }
458  } catch (TException e) {
459  LOGGER.error("listHosts error", e);
460  return null;
461  }
462  if (resp.getCode() != ErrorCode.SUCCEEDED) {
463  LOGGER.error("listHosts execute failed, errorCode: " + resp.getCode());
464  return null;
465  }
466  Set<HostAddr> hostAddrs = new HashSet<>();
467  for (HostItem hostItem : resp.hosts) {
468  if (hostItem.getStatus().getValue() == HostStatus.ONLINE.getValue()) {
469  hostAddrs.add(hostItem.getHostAddr());
470  }
471  }
472  return hostAddrs;
473  }
474 }
synchronized Schema getTag(String spaceName, String tagName)
get schema of specific tag
synchronized SpaceItem getSpace(String spaceName)
get one space
synchronized Set< HostAddr > listHosts()
get all Storaged servers
synchronized List< IdName > getSpaces()
get all spaces
synchronized List< TagItem > getTags(String spaceName)
get all tags of spaceName
synchronized Schema getEdge(String spaceName, String edgeName)
get schema of specific edgeRow
synchronized Map< Integer, List< HostAddr > > getPartsAlloc(String spaceName)
Get all parts and the address in a space Store in this.parts.
synchronized List< EdgeItem > getEdges(String spaceName)
get all edges of specific space