NebulaGraph Java Client  release-3.8
All Classes Functions Variables
SyncConnection.java
1 /* Copyright (c) 2020 vesoft inc. All rights reserved.
2  *
3  * This source code is licensed under Apache 2.0 License.
4  */
5 
6 package com.vesoft.nebula.client.graph.net;
7 
8 import com.facebook.thrift.TException;
9 import com.facebook.thrift.protocol.TBinaryProtocol;
10 import com.facebook.thrift.protocol.THeaderProtocol;
11 import com.facebook.thrift.protocol.TProtocol;
12 import com.facebook.thrift.transport.THeaderTransport;
13 import com.facebook.thrift.transport.THttp2Client;
14 import com.facebook.thrift.transport.TSocket;
15 import com.facebook.thrift.transport.TTransport;
16 import com.facebook.thrift.transport.TTransportException;
17 import com.facebook.thrift.utils.StandardCharsets;
18 import com.google.common.base.Charsets;
19 import com.vesoft.nebula.ErrorCode;
20 import com.vesoft.nebula.Value;
21 import com.vesoft.nebula.client.graph.data.CASignedSSLParam;
22 import com.vesoft.nebula.client.graph.data.HostAddress;
23 import com.vesoft.nebula.client.graph.data.SSLParam;
24 import com.vesoft.nebula.client.graph.data.SelfSignedSSLParam;
25 import com.vesoft.nebula.client.graph.exception.AuthFailedException;
26 import com.vesoft.nebula.client.graph.exception.ClientServerIncompatibleException;
27 import com.vesoft.nebula.client.graph.exception.IOErrorException;
28 import com.vesoft.nebula.graph.AuthResponse;
29 import com.vesoft.nebula.graph.ExecutionResponse;
30 import com.vesoft.nebula.graph.GraphService;
31 import com.vesoft.nebula.graph.VerifyClientVersionReq;
32 import com.vesoft.nebula.graph.VerifyClientVersionResp;
33 import com.vesoft.nebula.util.SslUtil;
34 import java.io.IOException;
35 import java.util.Collections;
36 import java.util.HashMap;
37 import java.util.Map;
38 import javax.net.ssl.SSLSocketFactory;
39 import javax.net.ssl.TrustManager;
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
42 
43 
44 public class SyncConnection extends Connection {
45 
46  private static final Logger LOGGER = LoggerFactory.getLogger(SyncConnection.class);
47 
48  protected TTransport transport = null;
49  protected TProtocol protocol = null;
50  private GraphService.Client client = null;
51  private int timeout = 0;
52  private SSLParam sslParam = null;
53  private boolean enabledSsl = false;
54  private SSLSocketFactory sslSocketFactory = null;
55  private boolean useHttp2 = false;
56 
57  private Map<String, String> headers = new HashMap<>();
58 
59  @Override
60  public void open(HostAddress address, int timeout, SSLParam sslParam)
62  this.open(address, timeout, sslParam, false, headers);
63  }
64 
65  @Override
66  public void open(HostAddress address, int timeout, SSLParam sslParam, boolean isUseHttp2,
67  Map<String, String> headers)
69  try {
70  this.serverAddr = address;
71  this.timeout = timeout <= 0 ? Integer.MAX_VALUE : timeout;
72  this.enabledSsl = true;
73  this.sslParam = sslParam;
74  this.useHttp2 = isUseHttp2;
75  this.headers = headers;
76  if (sslSocketFactory == null) {
77  if (sslParam == null) {
78  sslSocketFactory = SslUtil.getSSLSocketFactoryWithoutVerify();
79  } else if (sslParam.getSignMode() == SSLParam.SignMode.CA_SIGNED) {
80  sslSocketFactory =
81  SslUtil.getSSLSocketFactoryWithCA((CASignedSSLParam) sslParam);
82  } else {
83  sslSocketFactory =
84  SslUtil.getSSLSocketFactoryWithoutCA((SelfSignedSSLParam) sslParam);
85  }
86  }
87  if (useHttp2) {
88  getProtocolWithTlsHttp2();
89  } else {
90  getProtocolForTls();
91  }
92 
93  client = new GraphService.Client(protocol);
94 
95  VerifyClientVersionResp resp =
96  client.verifyClientVersion(new VerifyClientVersionReq());
97  if (resp.error_code != ErrorCode.SUCCEEDED) {
98  client.getInputProtocol().getTransport().close();
99  throw new ClientServerIncompatibleException(new String(resp.getError_msg(),
100  Charsets.UTF_8));
101  }
102  } catch (TException | IOException e) {
103  close();
104  throw new IOErrorException(IOErrorException.E_UNKNOWN, e.getMessage());
105  }
106  }
107 
108  @Override
109  public void open(HostAddress address, int timeout) throws IOErrorException,
111  this.open(address, timeout, false, headers);
112  }
113 
114  @Override
115  public void open(HostAddress address, int timeout,
116  boolean isUseHttp2, Map<String, String> headers)
118  try {
119  this.serverAddr = address;
120  this.timeout = timeout <= 0 ? Integer.MAX_VALUE : timeout;
121  this.useHttp2 = isUseHttp2;
122  this.headers = headers;
123  if (useHttp2) {
124  getProtocolForHttp2();
125  } else {
126  getProtocol();
127  }
128  client = new GraphService.Client(protocol);
129 
130  VerifyClientVersionResp resp =
131  client.verifyClientVersion(new VerifyClientVersionReq());
132  if (resp.error_code != ErrorCode.SUCCEEDED) {
133  client.getInputProtocol().getTransport().close();
134  throw new ClientServerIncompatibleException(new String(resp.getError_msg(),
135  Charsets.UTF_8));
136  }
137  } catch (TException e) {
138  close();
139  throw new IOErrorException(IOErrorException.E_UNKNOWN, e.getMessage());
140  }
141  }
142 
146  private void getProtocolWithTlsHttp2() {
147  String url = "https://" + serverAddr.getHost() + ":" + serverAddr.getPort();
148  TrustManager trustManager;
149  if (SslUtil.getTrustManagers() == null || SslUtil.getTrustManagers().length == 0) {
150  trustManager = null;
151  } else {
152  trustManager = SslUtil.getTrustManagers()[0];
153  }
154  this.transport = new THttp2Client(url, sslSocketFactory, trustManager)
155  .setConnectTimeout(timeout)
156  .setReadTimeout(timeout)
157  .setCustomHeaders(headers);
158 
159  transport.open();
160  this.protocol = new TBinaryProtocol(transport);
161  }
162 
166  private void getProtocolForTls() throws IOException {
167  this.transport = new THeaderTransport(new TSocket(
168  sslSocketFactory.createSocket(serverAddr.getHost(),
169  serverAddr.getPort()), this.timeout, this.timeout));
170  this.protocol = new THeaderProtocol((THeaderTransport) transport);
171  }
172 
176  private void getProtocolForHttp2() {
177  String url = "http://" + serverAddr.getHost() + ":" + serverAddr.getPort();
178  this.transport = new THttp2Client(url)
179  .setConnectTimeout(timeout)
180  .setReadTimeout(timeout)
181  .setCustomHeaders(headers);
182  transport.open();
183  this.protocol = new TBinaryProtocol(transport);
184  }
185 
189  private void getProtocol() {
190  this.transport = new THeaderTransport(new TSocket(
191  serverAddr.getHost(), serverAddr.getPort(), this.timeout, this.timeout));
192  transport.open();
193  this.protocol = new THeaderProtocol((THeaderTransport) transport);
194  }
195 
196 
197  /*
198  * Because the code generated by Fbthrift does not handle the seqID,
199  * the message will be dislocation when the timeout occurs,
200  * resulting in unexpected response,
201  * so when the timeout occurs,
202  * the connection will be reopened to avoid the impact of the message.
203  * So when timeout happend need to use reopen
204  *
205  * @throws IOErrorException if io problem happen
206  */
207  @Override
208  public void reopen() throws IOErrorException, ClientServerIncompatibleException {
209  close();
210  if (enabledSsl) {
211  open(serverAddr, timeout, sslParam, useHttp2, headers);
212  } else {
213  open(serverAddr, timeout, useHttp2, headers);
214  }
215  }
216 
217  public AuthResult authenticate(String user, String password)
219  try {
220  AuthResponse resp = client.authenticate(user.getBytes(Charsets.UTF_8),
221  password.getBytes(Charsets.UTF_8));
222  if (resp.error_code != ErrorCode.SUCCEEDED) {
223  if (resp.error_msg != null) {
224  throw new AuthFailedException(new String(resp.error_msg));
225  } else {
226  throw new AuthFailedException(
227  "The error_msg is null, "
228  + "maybe the service not set or the response is disorder.");
229  }
230  }
231  return new AuthResult(resp.getSession_id(), resp.getTime_zone_offset_seconds());
232  } catch (TException e) {
233  if (e instanceof TTransportException) {
234  TTransportException te = (TTransportException) e;
235  if (te.getType() == TTransportException.END_OF_FILE) {
236  throw new IOErrorException(IOErrorException.E_CONNECT_BROKEN, te.getMessage());
237  } else if (te.getType() == TTransportException.TIMED_OUT
238  || te.getMessage().contains("Read timed out")) {
239  reopen();
240  throw new IOErrorException(IOErrorException.E_TIME_OUT, te.getMessage());
241  } else if (te.getType() == TTransportException.NOT_OPEN) {
242  throw new IOErrorException(IOErrorException.E_NO_OPEN, te.getMessage());
243  }
244  }
245  throw new AuthFailedException(String.format("Authenticate failed: %s", e.getMessage()));
246  }
247  }
248 
249  public ExecutionResponse execute(long sessionID, String stmt)
250  throws IOErrorException {
251  return executeWithParameter(sessionID,
252  stmt,
253  (Map<byte[], com.vesoft.nebula.Value>) Collections.EMPTY_MAP);
254  }
255 
256  public ExecutionResponse executeWithParameter(long sessionID, String stmt,
257  Map<byte[], com.vesoft.nebula.Value> parameterMap)
258  throws IOErrorException {
259  try {
260  return client.executeWithParameter(
261  sessionID,
262  stmt.getBytes(Charsets.UTF_8),
263  parameterMap);
264  } catch (TException e) {
265  if (e instanceof TTransportException) {
266  TTransportException te = (TTransportException) e;
267  if (te.getType() == TTransportException.END_OF_FILE) {
268  throw new IOErrorException(IOErrorException.E_CONNECT_BROKEN, te.getMessage());
269  } else if (te.getType() == TTransportException.NOT_OPEN) {
270  throw new IOErrorException(IOErrorException.E_NO_OPEN, te.getMessage());
271  } else if (te.getType() == TTransportException.TIMED_OUT
272  || te.getMessage().contains("Read timed out")) {
273  try {
274  reopen();
275  } catch (ClientServerIncompatibleException ex) {
276  LOGGER.error(ex.getMessage());
277  }
278  throw new IOErrorException(IOErrorException.E_TIME_OUT, te.getMessage());
279  }
280  }
281  throw new IOErrorException(IOErrorException.E_UNKNOWN, e.getMessage());
282  }
283  }
284 
285  public ExecutionResponse executeWithParameterTimeout(long sessionID,
286  String stmt,
287  Map<byte[], Value> parameterMap,
288  long timeoutMs) throws IOErrorException {
289  try {
290  return client.executeWithTimeout(sessionID,
291  stmt.getBytes(Charsets.UTF_8),
292  parameterMap,
293  timeoutMs);
294  } catch (TException e) {
295  if (e instanceof TTransportException) {
296  TTransportException te = (TTransportException) e;
297  if (te.getType() == TTransportException.END_OF_FILE) {
298  throw new IOErrorException(IOErrorException.E_CONNECT_BROKEN, te.getMessage());
299  } else if (te.getType() == TTransportException.NOT_OPEN) {
300  throw new IOErrorException(IOErrorException.E_NO_OPEN, te.getMessage());
301  } else if (te.getType() == TTransportException.TIMED_OUT
302  || te.getMessage().contains("Read timed out")) {
303  try {
304  reopen();
305  } catch (ClientServerIncompatibleException ex) {
306  LOGGER.error(ex.getMessage());
307  }
308  throw new IOErrorException(IOErrorException.E_TIME_OUT, te.getMessage());
309  }
310  }
311  throw new IOErrorException(IOErrorException.E_UNKNOWN, e.getMessage());
312  }
313  }
314 
315  public String executeJson(long sessionID, String stmt)
316  throws IOErrorException {
317  return executeJsonWithParameter(sessionID, stmt,
318  (Map<byte[], Value>) Collections.EMPTY_MAP);
319  }
320 
321  public String executeJsonWithParameter(long sessionID, String stmt,
322  Map<byte[], Value> parameterMap)
323  throws IOErrorException {
324  try {
325  byte[] result =
326  client.executeJsonWithParameter(
327  sessionID,
328  stmt.getBytes(Charsets.UTF_8),
329  parameterMap);
330  return new String(result, StandardCharsets.UTF_8);
331  } catch (TException e) {
332  if (e instanceof TTransportException) {
333  TTransportException te = (TTransportException) e;
334  if (te.getType() == TTransportException.END_OF_FILE) {
335  throw new IOErrorException(IOErrorException.E_CONNECT_BROKEN, te.getMessage());
336  } else if (te.getType() == TTransportException.NOT_OPEN) {
337  throw new IOErrorException(IOErrorException.E_NO_OPEN, te.getMessage());
338  } else if (te.getType() == TTransportException.TIMED_OUT
339  || te.getMessage().contains("Read timed out")) {
340  try {
341  reopen();
342  } catch (ClientServerIncompatibleException ex) {
343  LOGGER.error(ex.getMessage());
344  }
345  throw new IOErrorException(IOErrorException.E_TIME_OUT, te.getMessage());
346  }
347  }
348  throw new IOErrorException(IOErrorException.E_UNKNOWN, e.getMessage());
349  }
350  }
351 
352  public void signout(long sessionId) {
353  client.signout(sessionId);
354  }
355 
356  @Override
357  public boolean ping() {
358  try {
359  execute(0, "YIELD 1;");
360  return true;
361  } catch (IOErrorException e) {
362  return false;
363  }
364  }
365 
366  @Override
367  public boolean ping(long sessionID) {
368  try {
369  execute(sessionID, "YIELD 1;");
370  return true;
371  } catch (IOErrorException e) {
372  return false;
373  }
374  }
375 
376 
377  public void close() {
378  if (transport != null && transport.isOpen()) {
379  transport.close();
380  transport = null;
381  }
382  }
383 
384 }