NebulaGraph Java Client  release-3.8
SyncConnection.java
1 /* Copyright (c) 2020 vesoft inc. All rights reserved.
2  *
3  * This source code is licensed under Apache 2.0 License.
4  */
5 
6 package com.vesoft.nebula.client.graph.net;
7 
8 import com.facebook.thrift.TException;
9 import com.facebook.thrift.protocol.TBinaryProtocol;
10 import com.facebook.thrift.protocol.THeaderProtocol;
11 import com.facebook.thrift.protocol.TProtocol;
12 import com.facebook.thrift.transport.THeaderTransport;
13 import com.facebook.thrift.transport.THttp2Client;
14 import com.facebook.thrift.transport.TSocket;
15 import com.facebook.thrift.transport.TTransport;
16 import com.facebook.thrift.transport.TTransportException;
17 import com.facebook.thrift.utils.StandardCharsets;
18 import com.google.common.base.Charsets;
19 import com.vesoft.nebula.ErrorCode;
20 import com.vesoft.nebula.client.graph.data.CASignedSSLParam;
21 import com.vesoft.nebula.client.graph.data.HostAddress;
22 import com.vesoft.nebula.client.graph.data.SSLParam;
23 import com.vesoft.nebula.client.graph.data.SelfSignedSSLParam;
24 import com.vesoft.nebula.client.graph.exception.AuthFailedException;
25 import com.vesoft.nebula.client.graph.exception.ClientServerIncompatibleException;
26 import com.vesoft.nebula.client.graph.exception.IOErrorException;
27 import com.vesoft.nebula.graph.AuthResponse;
28 import com.vesoft.nebula.graph.ExecutionResponse;
29 import com.vesoft.nebula.graph.GraphService;
30 import com.vesoft.nebula.graph.VerifyClientVersionReq;
31 import com.vesoft.nebula.graph.VerifyClientVersionResp;
32 import com.vesoft.nebula.util.SslUtil;
33 import java.io.IOException;
34 import java.util.Collections;
35 import java.util.HashMap;
36 import java.util.Map;
37 import javax.net.ssl.SSLSocketFactory;
38 import javax.net.ssl.TrustManager;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
41 
42 
43 public class SyncConnection extends Connection {
44 
45  private static final Logger LOGGER = LoggerFactory.getLogger(SyncConnection.class);
46 
47  protected TTransport transport = null;
48  protected TProtocol protocol = null;
49  private GraphService.Client client = null;
50  private int timeout = 0;
51  private SSLParam sslParam = null;
52  private boolean enabledSsl = false;
53  private SSLSocketFactory sslSocketFactory = null;
54  private boolean useHttp2 = false;
55 
56  private Map<String, String> headers = new HashMap<>();
57 
58  @Override
59  public void open(HostAddress address, int timeout, SSLParam sslParam)
61  this.open(address, timeout, sslParam, false, headers);
62  }
63 
64  @Override
65  public void open(HostAddress address, int timeout, SSLParam sslParam, boolean isUseHttp2,
66  Map<String, String> headers)
68  try {
69  this.serverAddr = address;
70  this.timeout = timeout <= 0 ? Integer.MAX_VALUE : timeout;
71  this.enabledSsl = true;
72  this.sslParam = sslParam;
73  this.useHttp2 = isUseHttp2;
74  this.headers = headers;
75  if (sslSocketFactory == null) {
76  if (sslParam == null) {
77  sslSocketFactory = SslUtil.getSSLSocketFactoryWithoutVerify();
78  } else if (sslParam.getSignMode() == SSLParam.SignMode.CA_SIGNED) {
79  sslSocketFactory =
80  SslUtil.getSSLSocketFactoryWithCA((CASignedSSLParam) sslParam);
81  } else {
82  sslSocketFactory =
83  SslUtil.getSSLSocketFactoryWithoutCA((SelfSignedSSLParam) sslParam);
84  }
85  }
86  if (useHttp2) {
87  getProtocolWithTlsHttp2();
88  } else {
89  getProtocolForTls();
90  }
91 
92  client = new GraphService.Client(protocol);
93 
94  VerifyClientVersionResp resp =
95  client.verifyClientVersion(new VerifyClientVersionReq());
96  if (resp.error_code != ErrorCode.SUCCEEDED) {
97  client.getInputProtocol().getTransport().close();
98  throw new ClientServerIncompatibleException(new String(resp.getError_msg(),
99  Charsets.UTF_8));
100  }
101  } catch (TException | IOException e) {
102  close();
103  throw new IOErrorException(IOErrorException.E_UNKNOWN, e.getMessage());
104  }
105  }
106 
107  @Override
108  public void open(HostAddress address, int timeout) throws IOErrorException,
110  this.open(address, timeout, false, headers);
111  }
112 
113  @Override
114  public void open(HostAddress address, int timeout,
115  boolean isUseHttp2, Map<String, String> headers)
117  try {
118  this.serverAddr = address;
119  this.timeout = timeout <= 0 ? Integer.MAX_VALUE : timeout;
120  this.useHttp2 = isUseHttp2;
121  this.headers = headers;
122  if (useHttp2) {
123  getProtocolForHttp2();
124  } else {
125  getProtocol();
126  }
127  client = new GraphService.Client(protocol);
128 
129  VerifyClientVersionResp resp =
130  client.verifyClientVersion(new VerifyClientVersionReq());
131  if (resp.error_code != ErrorCode.SUCCEEDED) {
132  client.getInputProtocol().getTransport().close();
133  throw new ClientServerIncompatibleException(new String(resp.getError_msg(),
134  Charsets.UTF_8));
135  }
136  } catch (TException e) {
137  close();
138  throw new IOErrorException(IOErrorException.E_UNKNOWN, e.getMessage());
139  }
140  }
141 
145  private void getProtocolWithTlsHttp2() {
146  String url = "https://" + serverAddr.getHost() + ":" + serverAddr.getPort();
147  TrustManager trustManager;
148  if (SslUtil.getTrustManagers() == null || SslUtil.getTrustManagers().length == 0) {
149  trustManager = null;
150  } else {
151  trustManager = SslUtil.getTrustManagers()[0];
152  }
153  this.transport = new THttp2Client(url, sslSocketFactory, trustManager)
154  .setConnectTimeout(timeout)
155  .setReadTimeout(timeout)
156  .setCustomHeaders(headers);
157 
158  transport.open();
159  this.protocol = new TBinaryProtocol(transport);
160  }
161 
165  private void getProtocolForTls() throws IOException {
166  this.transport = new THeaderTransport(new TSocket(
167  sslSocketFactory.createSocket(serverAddr.getHost(),
168  serverAddr.getPort()), this.timeout, this.timeout));
169  this.protocol = new THeaderProtocol((THeaderTransport) transport);
170  }
171 
175  private void getProtocolForHttp2() {
176  String url = "http://" + serverAddr.getHost() + ":" + serverAddr.getPort();
177  this.transport = new THttp2Client(url)
178  .setConnectTimeout(timeout)
179  .setReadTimeout(timeout)
180  .setCustomHeaders(headers);
181  transport.open();
182  this.protocol = new TBinaryProtocol(transport);
183  }
184 
188  private void getProtocol() {
189  this.transport = new THeaderTransport(new TSocket(
190  serverAddr.getHost(), serverAddr.getPort(), this.timeout, this.timeout));
191  transport.open();
192  this.protocol = new THeaderProtocol((THeaderTransport) transport);
193  }
194 
195 
196  /*
197  * Because the code generated by Fbthrift does not handle the seqID,
198  * the message will be dislocation when the timeout occurs,
199  * resulting in unexpected response,
200  * so when the timeout occurs,
201  * the connection will be reopened to avoid the impact of the message.
202  * So when timeout happend need to use reopen
203  *
204  * @throws IOErrorException if io problem happen
205  */
206  @Override
207  public void reopen() throws IOErrorException, ClientServerIncompatibleException {
208  close();
209  if (enabledSsl) {
210  open(serverAddr, timeout, sslParam, useHttp2, headers);
211  } else {
212  open(serverAddr, timeout, useHttp2, headers);
213  }
214  }
215 
216  public AuthResult authenticate(String user, String password)
218  try {
219  AuthResponse resp = client.authenticate(user.getBytes(Charsets.UTF_8),
220  password.getBytes(Charsets.UTF_8));
221  if (resp.error_code != ErrorCode.SUCCEEDED) {
222  if (resp.error_msg != null) {
223  throw new AuthFailedException(new String(resp.error_msg));
224  } else {
225  throw new AuthFailedException(
226  "The error_msg is null, "
227  + "maybe the service not set or the response is disorder.");
228  }
229  }
230  return new AuthResult(resp.getSession_id(), resp.getTime_zone_offset_seconds());
231  } catch (TException e) {
232  if (e instanceof TTransportException) {
233  TTransportException te = (TTransportException) e;
234  if (te.getType() == TTransportException.END_OF_FILE) {
235  throw new IOErrorException(IOErrorException.E_CONNECT_BROKEN, te.getMessage());
236  } else if (te.getType() == TTransportException.TIMED_OUT
237  || te.getMessage().contains("Read timed out")) {
238  reopen();
239  throw new IOErrorException(IOErrorException.E_TIME_OUT, te.getMessage());
240  } else if (te.getType() == TTransportException.NOT_OPEN) {
241  throw new IOErrorException(IOErrorException.E_NO_OPEN, te.getMessage());
242  }
243  }
244  throw new AuthFailedException(String.format("Authenticate failed: %s", e.getMessage()));
245  }
246  }
247 
248  public ExecutionResponse execute(long sessionID, String stmt)
249  throws IOErrorException {
250  return executeWithParameter(sessionID,
251  stmt, (Map<byte[], com.vesoft.nebula.Value>) Collections.EMPTY_MAP);
252  }
253 
254  public ExecutionResponse executeWithParameter(long sessionID, String stmt,
255  Map<byte[], com.vesoft.nebula.Value> parameterMap)
256  throws IOErrorException {
257  try {
258  return client.executeWithParameter(
259  sessionID,
260  stmt.getBytes(Charsets.UTF_8),
261  parameterMap);
262  } catch (TException e) {
263  if (e instanceof TTransportException) {
264  TTransportException te = (TTransportException) e;
265  if (te.getType() == TTransportException.END_OF_FILE) {
266  throw new IOErrorException(IOErrorException.E_CONNECT_BROKEN, te.getMessage());
267  } else if (te.getType() == TTransportException.NOT_OPEN) {
268  throw new IOErrorException(IOErrorException.E_NO_OPEN, te.getMessage());
269  } else if (te.getType() == TTransportException.TIMED_OUT
270  || te.getMessage().contains("Read timed out")) {
271  try {
272  reopen();
273  } catch (ClientServerIncompatibleException ex) {
274  LOGGER.error(ex.getMessage());
275  }
276  throw new IOErrorException(IOErrorException.E_TIME_OUT, te.getMessage());
277  }
278  }
279  throw new IOErrorException(IOErrorException.E_UNKNOWN, e.getMessage());
280  }
281  }
282 
283  public String executeJson(long sessionID, String stmt)
284  throws IOErrorException {
285  return executeJsonWithParameter(sessionID, stmt,
286  (Map<byte[], com.vesoft.nebula.Value>) Collections.EMPTY_MAP);
287  }
288 
289  public String executeJsonWithParameter(long sessionID, String stmt,
290  Map<byte[], com.vesoft.nebula.Value> parameterMap)
291  throws IOErrorException {
292  try {
293  byte[] result =
294  client.executeJsonWithParameter(
295  sessionID,
296  stmt.getBytes(Charsets.UTF_8),
297  parameterMap);
298  return new String(result, StandardCharsets.UTF_8);
299  } catch (TException e) {
300  if (e instanceof TTransportException) {
301  TTransportException te = (TTransportException) e;
302  if (te.getType() == TTransportException.END_OF_FILE) {
303  throw new IOErrorException(IOErrorException.E_CONNECT_BROKEN, te.getMessage());
304  } else if (te.getType() == TTransportException.NOT_OPEN) {
305  throw new IOErrorException(IOErrorException.E_NO_OPEN, te.getMessage());
306  } else if (te.getType() == TTransportException.TIMED_OUT
307  || te.getMessage().contains("Read timed out")) {
308  try {
309  reopen();
310  } catch (ClientServerIncompatibleException ex) {
311  LOGGER.error(ex.getMessage());
312  }
313  throw new IOErrorException(IOErrorException.E_TIME_OUT, te.getMessage());
314  }
315  }
316  throw new IOErrorException(IOErrorException.E_UNKNOWN, e.getMessage());
317  }
318  }
319 
320  public void signout(long sessionId) {
321  client.signout(sessionId);
322  }
323 
324  @Override
325  public boolean ping() {
326  try {
327  execute(0, "YIELD 1;");
328  return true;
329  } catch (IOErrorException e) {
330  return false;
331  }
332  }
333 
334  @Override
335  public boolean ping(long sessionID) {
336  try {
337  execute(sessionID, "YIELD 1;");
338  return true;
339  } catch (IOErrorException e) {
340  return false;
341  }
342  }
343 
344 
345  public void close() {
346  if (transport != null && transport.isOpen()) {
347  transport.close();
348  transport = null;
349  }
350  }
351 
352 }