6 package com.vesoft.nebula.client.meta;
8 import com.facebook.thrift.TException;
9 import com.facebook.thrift.protocol.TCompactProtocol;
10 import com.facebook.thrift.protocol.THeaderProtocol;
11 import com.facebook.thrift.transport.THeaderTransport;
12 import com.facebook.thrift.transport.TSocket;
13 import com.facebook.thrift.transport.TTransportException;
14 import com.google.common.base.Charsets;
15 import com.vesoft.nebula.ErrorCode;
16 import com.vesoft.nebula.HostAddr;
17 import com.vesoft.nebula.client.graph.data.CASignedSSLParam;
18 import com.vesoft.nebula.client.graph.data.HostAddress;
19 import com.vesoft.nebula.client.graph.data.SSLParam;
20 import com.vesoft.nebula.client.graph.data.SelfSignedSSLParam;
21 import com.vesoft.nebula.client.graph.exception.ClientServerIncompatibleException;
22 import com.vesoft.nebula.client.graph.exception.IOErrorException;
23 import com.vesoft.nebula.client.meta.exception.ExecuteFailedException;
24 import com.vesoft.nebula.meta.EdgeItem;
25 import com.vesoft.nebula.meta.GetEdgeReq;
26 import com.vesoft.nebula.meta.GetEdgeResp;
27 import com.vesoft.nebula.meta.GetPartsAllocReq;
28 import com.vesoft.nebula.meta.GetPartsAllocResp;
29 import com.vesoft.nebula.meta.GetSpaceReq;
30 import com.vesoft.nebula.meta.GetSpaceResp;
31 import com.vesoft.nebula.meta.GetTagReq;
32 import com.vesoft.nebula.meta.GetTagResp;
33 import com.vesoft.nebula.meta.HostItem;
34 import com.vesoft.nebula.meta.HostStatus;
35 import com.vesoft.nebula.meta.IdName;
36 import com.vesoft.nebula.meta.ListEdgesReq;
37 import com.vesoft.nebula.meta.ListEdgesResp;
38 import com.vesoft.nebula.meta.ListHostType;
39 import com.vesoft.nebula.meta.ListHostsReq;
40 import com.vesoft.nebula.meta.ListHostsResp;
41 import com.vesoft.nebula.meta.ListSpacesReq;
42 import com.vesoft.nebula.meta.ListSpacesResp;
43 import com.vesoft.nebula.meta.ListTagsReq;
44 import com.vesoft.nebula.meta.ListTagsResp;
45 import com.vesoft.nebula.meta.MetaService;
46 import com.vesoft.nebula.meta.Schema;
47 import com.vesoft.nebula.meta.SpaceItem;
48 import com.vesoft.nebula.meta.TagItem;
49 import com.vesoft.nebula.meta.VerifyClientVersionReq;
50 import com.vesoft.nebula.meta.VerifyClientVersionResp;
51 import com.vesoft.nebula.util.SslUtil;
52 import java.io.IOException;
53 import java.io.Serializable;
54 import java.net.UnknownHostException;
55 import java.util.Arrays;
56 import java.util.HashSet;
57 import java.util.List;
59 import java.util.Random;
61 import javax.net.ssl.SSLSocketFactory;
62 import org.slf4j.Logger;
63 import org.slf4j.LoggerFactory;
67 private static final Logger LOGGER = LoggerFactory.getLogger(
MetaClient.class);
69 public static final int LATEST_SCHEMA_VERSION = -1;
71 private static final int DEFAULT_TIMEOUT_MS = 1000;
72 private static final int DEFAULT_CONNECTION_RETRY_SIZE = 3;
73 private static final int DEFAULT_EXECUTION_RETRY_SIZE = 3;
74 private static final int RETRY_TIMES = 1;
76 private boolean enableSSL =
false;
79 private MetaService.Client client;
80 private final List<HostAddress> addresses;
82 public MetaClient(String host,
int port)
throws UnknownHostException {
87 this(Arrays.asList(address), DEFAULT_CONNECTION_RETRY_SIZE, DEFAULT_EXECUTION_RETRY_SIZE);
90 public MetaClient(List<HostAddress> addresses)
throws UnknownHostException {
91 this(addresses, DEFAULT_CONNECTION_RETRY_SIZE, DEFAULT_EXECUTION_RETRY_SIZE);
94 public MetaClient(List<HostAddress> addresses,
int connectionRetry,
int executionRetry)
95 throws UnknownHostException {
96 this(addresses, DEFAULT_TIMEOUT_MS, connectionRetry, executionRetry);
99 public MetaClient(List<HostAddress> addresses,
int timeout,
int connectionRetry,
100 int executionRetry)
throws UnknownHostException {
101 super(addresses, timeout, connectionRetry, executionRetry);
102 this.addresses = addresses;
105 public MetaClient(List<HostAddress> addresses,
int timeout,
int connectionRetry,
106 int executionRetry,
boolean enableSSL,
SSLParam sslParam)
107 throws UnknownHostException {
108 super(addresses, timeout, connectionRetry, executionRetry);
109 this.addresses = addresses;
110 this.enableSSL = enableSSL;
111 this.sslParam = sslParam;
112 if (enableSSL && sslParam ==
null) {
113 throw new IllegalArgumentException(
"SSL is enabled, but SSLParam is null.");
117 public void connect()
125 private void doConnect()
127 Random random =
new Random(System.currentTimeMillis());
128 int position = random.nextInt(addresses.size());
130 getClient(address.getHost(), address.getPort());
133 private void getClient(String host,
int port)
136 SSLSocketFactory sslSocketFactory;
138 sslSocketFactory = SslUtil.getSSLSocketFactoryWithCA((
CASignedSSLParam) sslParam);
144 transport =
new THeaderTransport(
145 new TSocket(sslSocketFactory.createSocket(host, port), timeout, timeout));
146 }
catch (IOException e) {
150 transport =
new THeaderTransport(
new TSocket(host, port, timeout, timeout));
154 protocol =
new THeaderProtocol(transport);
155 client =
new MetaService.Client(protocol);
158 VerifyClientVersionResp resp = client
159 .verifyClientVersion(
new VerifyClientVersionReq());
160 if (resp.getCode() != ErrorCode.SUCCEEDED) {
161 client.getInputProtocol().getTransport().close();
162 if (resp.getError_msg() ==
null) {
164 new String(
"Error code: ")
165 + String.valueOf(resp.getCode().getValue()));
168 new String(resp.getError_msg())
169 + String.valueOf(resp.getCode().getValue()));
173 private void freshClient(HostAddr leader)
throws TTransportException {
176 if (leader.getHost() ==
null ||
"".equals(leader.getHost())) {
179 getClient(leader.getHost(), leader.getPort());
182 LOGGER.error(e.getMessage());
190 if (transport !=
null && transport.isOpen()) {
201 int retry = RETRY_TIMES;
202 ListSpacesReq request =
new ListSpacesReq();
203 ListSpacesResp response =
null;
205 while (retry-- >= 0) {
206 response = client.listSpaces(request);
207 if (response.getCode() == ErrorCode.E_LEADER_CHANGED) {
208 freshClient(response.getLeader());
213 }
catch (TException e) {
214 LOGGER.error(String.format(
"List Spaces Error: %s", e.getMessage()));
217 if (response.getCode() == ErrorCode.SUCCEEDED) {
218 return response.getSpaces();
220 LOGGER.error(
"Get Spaces execute failed, errorCode: " + response.getCode());
222 "Get Spaces execute failed, errorCode: " + response.getCode());
232 public synchronized SpaceItem
getSpace(String spaceName)
throws TException,
234 int retry = RETRY_TIMES;
235 GetSpaceReq request =
new GetSpaceReq();
236 request.setSpace_name(spaceName.getBytes());
237 GetSpaceResp response =
null;
239 while (retry-- >= 0) {
240 response = client.getSpace(request);
241 if (response.getCode() == ErrorCode.E_LEADER_CHANGED) {
242 freshClient(response.getLeader());
247 }
catch (TException e) {
248 LOGGER.error(String.format(
"Get Space Error: %s", e.getMessage()));
251 if (response.getCode() == ErrorCode.SUCCEEDED) {
252 return response.getItem();
254 LOGGER.error(
"Get Space execute failed, errorCode: " + response.getCode());
256 "Get Space execute failed, errorCode: " + response.getCode());
266 public synchronized List<TagItem>
getTags(String spaceName)
268 int retry = RETRY_TIMES;
270 int spaceID =
getSpace(spaceName).space_id;
271 ListTagsReq request =
new ListTagsReq(spaceID);
272 ListTagsResp response =
null;
274 while (retry-- >= 0) {
275 response = client.listTags(request);
276 if (response.getCode() == ErrorCode.E_LEADER_CHANGED) {
277 freshClient(response.getLeader());
282 }
catch (TException e) {
283 LOGGER.error(String.format(
"Get Tag Error: %s", e.getMessage()));
286 if (response.getCode() == ErrorCode.SUCCEEDED) {
287 return response.getTags();
289 LOGGER.error(
"Get tags execute failed, errorCode: " + response.getCode());
291 "Get Tags execute failed, errorCode: " + response.getCode());
303 public synchronized Schema
getTag(String spaceName, String tagName)
305 int retry = RETRY_TIMES;
306 GetTagReq request =
new GetTagReq();
307 int spaceID =
getSpace(spaceName).getSpace_id();
308 request.setSpace_id(spaceID);
309 request.setTag_name(tagName.getBytes());
310 request.setVersion(LATEST_SCHEMA_VERSION);
311 GetTagResp response =
null;
314 while (retry-- >= 0) {
315 response = client.getTag(request);
316 if (response.getCode() == ErrorCode.E_LEADER_CHANGED) {
317 freshClient(response.getLeader());
322 }
catch (TException e) {
323 LOGGER.error(String.format(
"Get Tag Error: %s", e.getMessage()));
326 if (response.getCode() == ErrorCode.SUCCEEDED) {
327 return response.getSchema();
329 LOGGER.error(
"Get tag execute failed, errorCode: " + response.getCode());
331 "Get tag execute failed, errorCode: " + response.getCode());
342 public synchronized List<EdgeItem>
getEdges(String spaceName)
344 int retry = RETRY_TIMES;
345 int spaceID =
getSpace(spaceName).getSpace_id();
346 ListEdgesReq request =
new ListEdgesReq(spaceID);
347 ListEdgesResp response =
null;
349 while (retry-- >= 0) {
350 response = client.listEdges(request);
351 if (response.getCode() == ErrorCode.E_LEADER_CHANGED) {
352 freshClient(response.getLeader());
357 }
catch (TException e) {
358 LOGGER.error(String.format(
"Get Edge Error: %s", e.getMessage()));
361 if (response.getCode() == ErrorCode.SUCCEEDED) {
362 return response.getEdges();
364 LOGGER.error(
"Get edges execute failed: errorCode: " + response.getCode());
366 "Get execute edges failed, errorCode: " + response.getCode());
377 public synchronized Schema
getEdge(String spaceName, String edgeName)
379 int retry = RETRY_TIMES;
380 GetEdgeReq request =
new GetEdgeReq();
381 int spaceID =
getSpace(spaceName).getSpace_id();
382 request.setSpace_id(spaceID);
383 request.setEdge_name(edgeName.getBytes());
384 request.setVersion(LATEST_SCHEMA_VERSION);
385 GetEdgeResp response =
null;
388 while (retry-- >= 0) {
389 response = client.getEdge(request);
390 if (response.getCode() == ErrorCode.E_LEADER_CHANGED) {
391 freshClient(response.getLeader());
396 }
catch (TException e) {
397 LOGGER.error(String.format(
"Get Edge Error: %s", e.getMessage()));
400 if (response.getCode() == ErrorCode.SUCCEEDED) {
401 return response.getSchema();
403 LOGGER.error(
"Get Edge execute failed, errorCode: " + response.getCode());
405 "Get Edge execute failed, errorCode: " + response.getCode());
417 public synchronized Map<Integer, List<HostAddr>>
getPartsAlloc(String spaceName)
419 int retry = RETRY_TIMES;
420 GetPartsAllocReq request =
new GetPartsAllocReq();
421 int spaceID =
getSpace(spaceName).getSpace_id();
422 request.setSpace_id(spaceID);
424 GetPartsAllocResp response =
null;
426 while (retry-- >= 0) {
427 response = client.getPartsAlloc(request);
428 if (response.getCode() == ErrorCode.E_LEADER_CHANGED) {
429 freshClient(response.getLeader());
434 }
catch (TException e) {
435 LOGGER.error(String.format(
"Get Parts Error: %s", e.getMessage()));
438 if (response.getCode() == ErrorCode.SUCCEEDED) {
439 return response.getParts();
441 LOGGER.error(
"Get Parts execute failed, errorCode" + response.getCode());
443 "Get Parts execute failed, errorCode" + response.getCode());
451 int retry = RETRY_TIMES;
452 ListHostsReq request =
new ListHostsReq();
453 request.setType(ListHostType.STORAGE);
454 ListHostsResp resp =
null;
456 while (retry-- >= 0) {
457 resp = client.listHosts(request);
458 if (resp.getCode() == ErrorCode.E_LEADER_CHANGED) {
459 freshClient(resp.getLeader());
464 }
catch (TException e) {
465 LOGGER.error(
"listHosts error", e);
468 if (resp.getCode() != ErrorCode.SUCCEEDED) {
469 LOGGER.error(
"listHosts execute failed, errorCode: " + resp.getCode());
472 Set<HostAddr> hostAddrs =
new HashSet<>();
473 for (HostItem hostItem : resp.hosts) {
474 if (hostItem.getStatus().getValue() == HostStatus.ONLINE.getValue()) {
475 hostAddrs.add(hostItem.getHostAddr());