6 package com.vesoft.nebula.client.meta;
8 import com.facebook.thrift.TException;
9 import com.facebook.thrift.protocol.THeaderProtocol;
10 import com.facebook.thrift.transport.THeaderTransport;
11 import com.facebook.thrift.transport.TSocket;
12 import com.facebook.thrift.transport.TTransportException;
13 import com.google.common.base.Charsets;
14 import com.vesoft.nebula.ErrorCode;
15 import com.vesoft.nebula.HostAddr;
16 import com.vesoft.nebula.client.graph.data.CASignedSSLParam;
17 import com.vesoft.nebula.client.graph.data.HostAddress;
18 import com.vesoft.nebula.client.graph.data.SSLParam;
19 import com.vesoft.nebula.client.graph.data.SelfSignedSSLParam;
20 import com.vesoft.nebula.client.graph.exception.ClientServerIncompatibleException;
21 import com.vesoft.nebula.client.graph.exception.IOErrorException;
22 import com.vesoft.nebula.client.meta.exception.ExecuteFailedException;
23 import com.vesoft.nebula.meta.EdgeItem;
24 import com.vesoft.nebula.meta.GetEdgeReq;
25 import com.vesoft.nebula.meta.GetEdgeResp;
26 import com.vesoft.nebula.meta.GetPartsAllocReq;
27 import com.vesoft.nebula.meta.GetPartsAllocResp;
28 import com.vesoft.nebula.meta.GetSpaceReq;
29 import com.vesoft.nebula.meta.GetSpaceResp;
30 import com.vesoft.nebula.meta.GetTagReq;
31 import com.vesoft.nebula.meta.GetTagResp;
32 import com.vesoft.nebula.meta.HostItem;
33 import com.vesoft.nebula.meta.HostStatus;
34 import com.vesoft.nebula.meta.IdName;
35 import com.vesoft.nebula.meta.ListEdgesReq;
36 import com.vesoft.nebula.meta.ListEdgesResp;
37 import com.vesoft.nebula.meta.ListHostType;
38 import com.vesoft.nebula.meta.ListHostsReq;
39 import com.vesoft.nebula.meta.ListHostsResp;
40 import com.vesoft.nebula.meta.ListSpacesReq;
41 import com.vesoft.nebula.meta.ListSpacesResp;
42 import com.vesoft.nebula.meta.ListTagsReq;
43 import com.vesoft.nebula.meta.ListTagsResp;
44 import com.vesoft.nebula.meta.MetaService;
45 import com.vesoft.nebula.meta.Schema;
46 import com.vesoft.nebula.meta.SpaceItem;
47 import com.vesoft.nebula.meta.TagItem;
48 import com.vesoft.nebula.meta.VerifyClientVersionReq;
49 import com.vesoft.nebula.meta.VerifyClientVersionResp;
50 import com.vesoft.nebula.util.SslUtil;
51 import java.io.IOException;
52 import java.net.UnknownHostException;
53 import java.util.Arrays;
54 import java.util.HashSet;
55 import java.util.List;
57 import java.util.Random;
59 import javax.net.ssl.SSLSocketFactory;
60 import org.slf4j.Logger;
61 import org.slf4j.LoggerFactory;
65 private static final Logger LOGGER = LoggerFactory.getLogger(
MetaClient.class);
67 public static final int LATEST_SCHEMA_VERSION = -1;
69 private static final int DEFAULT_TIMEOUT_MS = 1000;
70 private static final int DEFAULT_CONNECTION_RETRY_SIZE = 3;
71 private static final int DEFAULT_EXECUTION_RETRY_SIZE = 3;
72 private static final int RETRY_TIMES = 1;
74 private boolean enableSSL =
false;
77 private MetaService.Client client;
78 private final List<HostAddress> addresses;
80 public MetaClient(String host,
int port)
throws UnknownHostException {
85 this(Arrays.asList(address), DEFAULT_CONNECTION_RETRY_SIZE, DEFAULT_EXECUTION_RETRY_SIZE);
88 public MetaClient(List<HostAddress> addresses)
throws UnknownHostException {
89 this(addresses, DEFAULT_CONNECTION_RETRY_SIZE, DEFAULT_EXECUTION_RETRY_SIZE);
92 public MetaClient(List<HostAddress> addresses,
int connectionRetry,
int executionRetry)
93 throws UnknownHostException {
94 this(addresses, DEFAULT_TIMEOUT_MS, connectionRetry, executionRetry);
97 public MetaClient(List<HostAddress> addresses,
int timeout,
int connectionRetry,
98 int executionRetry)
throws UnknownHostException {
99 super(addresses, timeout, connectionRetry, executionRetry);
100 this.addresses = addresses;
103 public MetaClient(List<HostAddress> addresses,
int timeout,
int connectionRetry,
104 int executionRetry,
boolean enableSSL,
SSLParam sslParam)
105 throws UnknownHostException {
106 super(addresses, timeout, connectionRetry, executionRetry);
107 this.addresses = addresses;
108 this.enableSSL = enableSSL;
109 this.sslParam = sslParam;
110 if (enableSSL && sslParam ==
null) {
111 throw new IllegalArgumentException(
"SSL is enabled, but SSLParam is null.");
115 public void connect()
123 private void doConnect()
125 Random random =
new Random(System.currentTimeMillis());
126 int position = random.nextInt(addresses.size());
128 getClient(address.getHost(), address.getPort());
131 private void getClient(String host,
int port)
134 SSLSocketFactory sslSocketFactory;
136 sslSocketFactory = SslUtil.getSSLSocketFactoryWithCA((
CASignedSSLParam) sslParam);
142 transport =
new THeaderTransport(
143 new TSocket(sslSocketFactory.createSocket(host, port), timeout, timeout));
144 }
catch (IOException e) {
148 transport =
new THeaderTransport(
new TSocket(host, port, timeout, timeout));
152 protocol =
new THeaderProtocol(transport);
153 client =
new MetaService.Client(protocol);
155 VerifyClientVersionResp resp = client.verifyClientVersion(
new VerifyClientVersionReq());
156 if (resp.getCode() != ErrorCode.SUCCEEDED) {
157 client.getInputProtocol().getTransport().close();
158 if (resp.getError_msg() ==
null) {
160 new String(
"Error code: ")
161 + String.valueOf(resp.getCode().getValue()));
167 private void freshClient(HostAddr leader)
throws TTransportException {
170 if (leader.getHost() ==
null ||
"".equals(leader.getHost())) {
173 getClient(leader.getHost(), leader.getPort());
176 LOGGER.error(e.getMessage());
184 if (transport !=
null && transport.isOpen()) {
195 int retry = RETRY_TIMES;
196 ListSpacesReq request =
new ListSpacesReq();
197 ListSpacesResp response =
null;
199 while (retry-- >= 0) {
200 response = client.listSpaces(request);
201 if (response.getCode() == ErrorCode.E_LEADER_CHANGED) {
202 freshClient(response.getLeader());
207 }
catch (TException e) {
208 LOGGER.error(String.format(
"List Spaces Error: %s", e.getMessage()));
211 if (response.getCode() == ErrorCode.SUCCEEDED) {
212 return response.getSpaces();
214 LOGGER.error(
"Get Spaces execute failed, errorCode: " + response.getCode());
216 "Get Spaces execute failed, errorCode: " + response.getCode());
226 public synchronized SpaceItem
getSpace(String spaceName)
throws TException,
228 int retry = RETRY_TIMES;
229 GetSpaceReq request =
new GetSpaceReq();
230 request.setSpace_name(spaceName.getBytes());
231 GetSpaceResp response =
null;
233 while (retry-- >= 0) {
234 response = client.getSpace(request);
235 if (response.getCode() == ErrorCode.E_LEADER_CHANGED) {
236 freshClient(response.getLeader());
241 }
catch (TException e) {
242 LOGGER.error(String.format(
"Get Space Error: %s", e.getMessage()));
245 if (response.getCode() == ErrorCode.SUCCEEDED) {
246 return response.getItem();
248 LOGGER.error(
"Get Space execute failed, errorCode: " + response.getCode());
250 "Get Space execute failed, errorCode: " + response.getCode());
260 public synchronized List<TagItem>
getTags(String spaceName)
262 int retry = RETRY_TIMES;
264 int spaceID =
getSpace(spaceName).space_id;
265 ListTagsReq request =
new ListTagsReq(spaceID);
266 ListTagsResp response =
null;
268 while (retry-- >= 0) {
269 response = client.listTags(request);
270 if (response.getCode() == ErrorCode.E_LEADER_CHANGED) {
271 freshClient(response.getLeader());
276 }
catch (TException e) {
277 LOGGER.error(String.format(
"Get Tag Error: %s", e.getMessage()));
280 if (response.getCode() == ErrorCode.SUCCEEDED) {
281 return response.getTags();
283 LOGGER.error(
"Get tags execute failed, errorCode: " + response.getCode());
285 "Get Tags execute failed, errorCode: " + response.getCode());
297 public synchronized Schema
getTag(String spaceName, String tagName)
299 int retry = RETRY_TIMES;
300 GetTagReq request =
new GetTagReq();
301 int spaceID =
getSpace(spaceName).getSpace_id();
302 request.setSpace_id(spaceID);
303 request.setTag_name(tagName.getBytes());
304 request.setVersion(LATEST_SCHEMA_VERSION);
305 GetTagResp response =
null;
308 while (retry-- >= 0) {
309 response = client.getTag(request);
310 if (response.getCode() == ErrorCode.E_LEADER_CHANGED) {
311 freshClient(response.getLeader());
316 }
catch (TException e) {
317 LOGGER.error(String.format(
"Get Tag Error: %s", e.getMessage()));
320 if (response.getCode() == ErrorCode.SUCCEEDED) {
321 return response.getSchema();
323 LOGGER.error(
"Get tag execute failed, errorCode: " + response.getCode());
325 "Get tag execute failed, errorCode: " + response.getCode());
336 public synchronized List<EdgeItem>
getEdges(String spaceName)
338 int retry = RETRY_TIMES;
339 int spaceID =
getSpace(spaceName).getSpace_id();
340 ListEdgesReq request =
new ListEdgesReq(spaceID);
341 ListEdgesResp response =
null;
343 while (retry-- >= 0) {
344 response = client.listEdges(request);
345 if (response.getCode() == ErrorCode.E_LEADER_CHANGED) {
346 freshClient(response.getLeader());
351 }
catch (TException e) {
352 LOGGER.error(String.format(
"Get Edge Error: %s", e.getMessage()));
355 if (response.getCode() == ErrorCode.SUCCEEDED) {
356 return response.getEdges();
358 LOGGER.error(
"Get edges execute failed: errorCode: " + response.getCode());
360 "Get execute edges failed, errorCode: " + response.getCode());
371 public synchronized Schema
getEdge(String spaceName, String edgeName)
373 int retry = RETRY_TIMES;
374 GetEdgeReq request =
new GetEdgeReq();
375 int spaceID =
getSpace(spaceName).getSpace_id();
376 request.setSpace_id(spaceID);
377 request.setEdge_name(edgeName.getBytes());
378 request.setVersion(LATEST_SCHEMA_VERSION);
379 GetEdgeResp response =
null;
382 while (retry-- >= 0) {
383 response = client.getEdge(request);
384 if (response.getCode() == ErrorCode.E_LEADER_CHANGED) {
385 freshClient(response.getLeader());
390 }
catch (TException e) {
391 LOGGER.error(String.format(
"Get Edge Error: %s", e.getMessage()));
394 if (response.getCode() == ErrorCode.SUCCEEDED) {
395 return response.getSchema();
397 LOGGER.error(
"Get Edge execute failed, errorCode: " + response.getCode());
399 "Get Edge execute failed, errorCode: " + response.getCode());
411 public synchronized Map<Integer, List<HostAddr>>
getPartsAlloc(String spaceName)
413 int retry = RETRY_TIMES;
414 GetPartsAllocReq request =
new GetPartsAllocReq();
415 int spaceID =
getSpace(spaceName).getSpace_id();
416 request.setSpace_id(spaceID);
418 GetPartsAllocResp response =
null;
420 while (retry-- >= 0) {
421 response = client.getPartsAlloc(request);
422 if (response.getCode() == ErrorCode.E_LEADER_CHANGED) {
423 freshClient(response.getLeader());
428 }
catch (TException e) {
429 LOGGER.error(String.format(
"Get Parts Error: %s", e.getMessage()));
432 if (response.getCode() == ErrorCode.SUCCEEDED) {
433 return response.getParts();
435 LOGGER.error(
"Get Parts execute failed, errorCode" + response.getCode());
437 "Get Parts execute failed, errorCode" + response.getCode());
445 int retry = RETRY_TIMES;
446 ListHostsReq request =
new ListHostsReq();
447 request.setType(ListHostType.STORAGE);
448 ListHostsResp resp =
null;
450 while (retry-- >= 0) {
451 resp = client.listHosts(request);
452 if (resp.getCode() == ErrorCode.E_LEADER_CHANGED) {
453 freshClient(resp.getLeader());
458 }
catch (TException e) {
459 LOGGER.error(
"listHosts error", e);
460 throw new RuntimeException(
"listHosts error", e);
462 if (resp.getCode() != ErrorCode.SUCCEEDED) {
463 LOGGER.error(
"listHosts execute failed, errorCode: " + resp.getCode());
464 throw new RuntimeException(
"listHosts execute failed, errorCode:" + resp.getCode());
466 Set<HostAddr> hostAddrs =
new HashSet<>();
467 for (HostItem hostItem : resp.hosts) {
468 if (hostItem.getStatus().getValue() == HostStatus.ONLINE.getValue()) {
469 hostAddrs.add(hostItem.getHostAddr());
479 int retry = RETRY_TIMES;
480 ListHostsReq request =
new ListHostsReq();
481 request.setType(ListHostType.ALLOC);
482 ListHostsResp resp =
null;
484 while (retry-- >= 0) {
485 resp = client.listHosts(request);
486 if (resp.getCode() == ErrorCode.E_LEADER_CHANGED) {
487 freshClient(resp.getLeader());
492 }
catch (TException e) {
493 LOGGER.error(
"listHosts error", e);
496 if (resp.getCode() != ErrorCode.SUCCEEDED) {
497 LOGGER.error(
"listHosts execute failed, errorCode: " + resp.getCode());
500 Set<HostItem> hostItems =
new HashSet<>();
501 for (HostItem hostItem : resp.hosts) {
502 if (hostItem.getStatus().getValue() == HostStatus.ONLINE.getValue()) {
503 hostItems.add(hostItem);