NebulaGraph Java Client  release-3.6
All Classes Functions Variables
SyncConnection.java
1 /* Copyright (c) 2020 vesoft inc. All rights reserved.
2  *
3  * This source code is licensed under Apache 2.0 License.
4  */
5 
6 package com.vesoft.nebula.client.graph.net;
7 
8 import com.facebook.thrift.TException;
9 import com.facebook.thrift.protocol.TBinaryProtocol;
10 import com.facebook.thrift.protocol.TCompactProtocol;
11 import com.facebook.thrift.protocol.THeaderProtocol;
12 import com.facebook.thrift.protocol.TProtocol;
13 import com.facebook.thrift.transport.THeaderTransport;
14 import com.facebook.thrift.transport.THttp2Client;
15 import com.facebook.thrift.transport.TSocket;
16 import com.facebook.thrift.transport.TTransport;
17 import com.facebook.thrift.transport.TTransportException;
18 import com.facebook.thrift.utils.StandardCharsets;
19 import com.google.common.base.Charsets;
20 import com.vesoft.nebula.ErrorCode;
21 import com.vesoft.nebula.client.graph.data.CASignedSSLParam;
22 import com.vesoft.nebula.client.graph.data.HostAddress;
23 import com.vesoft.nebula.client.graph.data.SSLParam;
24 import com.vesoft.nebula.client.graph.data.SelfSignedSSLParam;
25 import com.vesoft.nebula.client.graph.exception.AuthFailedException;
26 import com.vesoft.nebula.client.graph.exception.ClientServerIncompatibleException;
27 import com.vesoft.nebula.client.graph.exception.IOErrorException;
28 import com.vesoft.nebula.graph.AuthResponse;
29 import com.vesoft.nebula.graph.ExecutionResponse;
30 import com.vesoft.nebula.graph.GraphService;
31 import com.vesoft.nebula.graph.VerifyClientVersionReq;
32 import com.vesoft.nebula.graph.VerifyClientVersionResp;
33 import com.vesoft.nebula.util.SslUtil;
34 import java.io.IOException;
35 import java.util.Collections;
36 import java.util.HashMap;
37 import java.util.Map;
38 import javax.net.ssl.SSLSocketFactory;
39 import javax.net.ssl.TrustManager;
40 import okhttp3.internal.http2.Http2Connection;
41 import org.slf4j.Logger;
42 import org.slf4j.LoggerFactory;
43 
44 
45 public class SyncConnection extends Connection {
46 
47  private static final Logger LOGGER = LoggerFactory.getLogger(SyncConnection.class);
48 
49  protected TTransport transport = null;
50  protected TProtocol protocol = null;
51  private GraphService.Client client = null;
52  private int timeout = 0;
53  private SSLParam sslParam = null;
54  private boolean enabledSsl = false;
55  private SSLSocketFactory sslSocketFactory = null;
56  private boolean useHttp2 = false;
57 
58  private Map<String, String> headers = new HashMap<>();
59 
60  @Override
61  public void open(HostAddress address, int timeout, SSLParam sslParam)
63  this.open(address, timeout, sslParam, false, headers);
64  }
65 
66  @Override
67  public void open(HostAddress address, int timeout, SSLParam sslParam, boolean isUseHttp2,
68  Map<String, String> headers)
70  try {
71  this.serverAddr = address;
72  this.timeout = timeout <= 0 ? Integer.MAX_VALUE : timeout;
73  this.enabledSsl = true;
74  this.sslParam = sslParam;
75  this.useHttp2 = isUseHttp2;
76  this.headers = headers;
77  if (sslSocketFactory == null) {
78  if (sslParam.getSignMode() == SSLParam.SignMode.CA_SIGNED) {
79  sslSocketFactory =
80  SslUtil.getSSLSocketFactoryWithCA((CASignedSSLParam) sslParam);
81  } else {
82  sslSocketFactory =
83  SslUtil.getSSLSocketFactoryWithoutCA((SelfSignedSSLParam) sslParam);
84  }
85  }
86  if (useHttp2) {
87  getProtocolWithTlsHttp2();
88  } else {
89  getProtocolForTls();
90  }
91 
92  client = new GraphService.Client(protocol);
93 
94  // check if client version matches server version
95  VerifyClientVersionResp resp =
96  client.verifyClientVersion(new VerifyClientVersionReq());
97  if (resp.error_code != ErrorCode.SUCCEEDED) {
98  client.getInputProtocol().getTransport().close();
99  throw new ClientServerIncompatibleException(new String(resp.getError_msg(),
100  Charsets.UTF_8));
101  }
102  } catch (TException | IOException e) {
103  close();
104  throw new IOErrorException(IOErrorException.E_UNKNOWN, e.getMessage());
105  }
106  }
107 
108  @Override
109  public void open(HostAddress address, int timeout) throws IOErrorException,
111  this.open(address, timeout, false, headers);
112  }
113 
114  @Override
115  public void open(HostAddress address, int timeout,
116  boolean isUseHttp2, Map<String,String> headers)
118  try {
119  this.serverAddr = address;
120  this.timeout = timeout <= 0 ? Integer.MAX_VALUE : timeout;
121  this.useHttp2 = isUseHttp2;
122  this.headers = headers;
123  if (useHttp2) {
124  getProtocolForHttp2();
125  } else {
126  getProtocol();
127  }
128  client = new GraphService.Client(protocol);
129 
130  // check if client version matches server version
131  VerifyClientVersionResp resp =
132  client.verifyClientVersion(new VerifyClientVersionReq());
133  if (resp.error_code != ErrorCode.SUCCEEDED) {
134  client.getInputProtocol().getTransport().close();
135  throw new ClientServerIncompatibleException(new String(resp.getError_msg(),
136  Charsets.UTF_8));
137  }
138  } catch (TException e) {
139  throw new IOErrorException(IOErrorException.E_UNKNOWN, e.getMessage());
140  }
141  }
142 
146  private void getProtocolWithTlsHttp2() {
147  String url = "https://" + serverAddr.getHost() + ":" + serverAddr.getPort();
148  TrustManager trustManager;
149  if (SslUtil.getTrustManagers() == null || SslUtil.getTrustManagers().length == 0) {
150  trustManager = null;
151  } else {
152  trustManager = SslUtil.getTrustManagers()[0];
153  }
154  this.transport = new THttp2Client(url, sslSocketFactory, trustManager)
155  .setConnectTimeout(timeout)
156  .setReadTimeout(timeout)
157  .setCustomHeaders(headers);
158 
159  transport.open();
160  this.protocol = new TBinaryProtocol(transport);
161  }
162 
166  private void getProtocolForTls() throws IOException {
167  this.transport = new THeaderTransport(new TSocket(
168  sslSocketFactory.createSocket(serverAddr.getHost(),
169  serverAddr.getPort()), this.timeout, this.timeout));
170  this.protocol = new THeaderProtocol((THeaderTransport) transport);
171  }
172 
176  private void getProtocolForHttp2() {
177  String url = "http://" + serverAddr.getHost() + ":" + serverAddr.getPort();
178  this.transport = new THttp2Client(url)
179  .setConnectTimeout(timeout)
180  .setReadTimeout(timeout)
181  .setCustomHeaders(headers);
182  transport.open();
183  this.protocol = new TBinaryProtocol(transport);
184  }
185 
189  private void getProtocol() {
190  this.transport = new THeaderTransport(new TSocket(
191  serverAddr.getHost(), serverAddr.getPort(), this.timeout, this.timeout));
192  transport.open();
193  this.protocol = new THeaderProtocol((THeaderTransport) transport);
194  }
195 
196 
197  /*
198  * Because the code generated by Fbthrift does not handle the seqID,
199  * the message will be dislocation when the timeout occurs,
200  * resulting in unexpected response,
201  * so when the timeout occurs,
202  * the connection will be reopened to avoid the impact of the message.
203  * So when timeout happend need to use reopen
204  *
205  * @throws IOErrorException if io problem happen
206  */
207  @Override
208  public void reopen() throws IOErrorException, ClientServerIncompatibleException {
209  close();
210  if (enabledSsl) {
211  open(serverAddr, timeout, sslParam, useHttp2, headers);
212  } else {
213  open(serverAddr, timeout, useHttp2, headers);
214  }
215  }
216 
217  public AuthResult authenticate(String user, String password)
219  try {
220  AuthResponse resp = client.authenticate(user.getBytes(), password.getBytes());
221  if (resp.error_code != ErrorCode.SUCCEEDED) {
222  if (resp.error_msg != null) {
223  throw new AuthFailedException(new String(resp.error_msg));
224  } else {
225  throw new AuthFailedException(
226  "The error_msg is null, "
227  + "maybe the service not set or the response is disorder.");
228  }
229  }
230  return new AuthResult(resp.getSession_id(), resp.getTime_zone_offset_seconds());
231  } catch (TException e) {
232  if (e instanceof TTransportException) {
233  TTransportException te = (TTransportException) e;
234  if (te.getType() == TTransportException.END_OF_FILE) {
235  throw new IOErrorException(IOErrorException.E_CONNECT_BROKEN, te.getMessage());
236  } else if (te.getType() == TTransportException.TIMED_OUT
237  || te.getMessage().contains("Read timed out")) {
238  reopen();
239  throw new IOErrorException(IOErrorException.E_TIME_OUT, te.getMessage());
240  } else if (te.getType() == TTransportException.NOT_OPEN) {
241  throw new IOErrorException(IOErrorException.E_NO_OPEN, te.getMessage());
242  }
243  }
244  throw new AuthFailedException(String.format("Authenticate failed: %s", e.getMessage()));
245  }
246  }
247 
248  public ExecutionResponse execute(long sessionID, String stmt)
249  throws IOErrorException {
250  return executeWithParameter(sessionID,
251  stmt, (Map<byte[], com.vesoft.nebula.Value>) Collections.EMPTY_MAP);
252  }
253 
254  public ExecutionResponse executeWithParameter(long sessionID, String stmt,
255  Map<byte[], com.vesoft.nebula.Value> parameterMap)
256  throws IOErrorException {
257  try {
258  return client.executeWithParameter(sessionID, stmt.getBytes(), parameterMap);
259  } catch (TException e) {
260  if (e instanceof TTransportException) {
261  TTransportException te = (TTransportException) e;
262  if (te.getType() == TTransportException.END_OF_FILE) {
263  throw new IOErrorException(IOErrorException.E_CONNECT_BROKEN, te.getMessage());
264  } else if (te.getType() == TTransportException.NOT_OPEN) {
265  throw new IOErrorException(IOErrorException.E_NO_OPEN, te.getMessage());
266  } else if (te.getType() == TTransportException.TIMED_OUT
267  || te.getMessage().contains("Read timed out")) {
268  try {
269  reopen();
270  } catch (ClientServerIncompatibleException ex) {
271  LOGGER.error(ex.getMessage());
272  }
273  throw new IOErrorException(IOErrorException.E_TIME_OUT, te.getMessage());
274  }
275  }
276  throw new IOErrorException(IOErrorException.E_UNKNOWN, e.getMessage());
277  }
278  }
279 
280  public String executeJson(long sessionID, String stmt)
281  throws IOErrorException {
282  return executeJsonWithParameter(sessionID, stmt,
283  (Map<byte[], com.vesoft.nebula.Value>) Collections.EMPTY_MAP);
284  }
285 
286  public String executeJsonWithParameter(long sessionID, String stmt,
287  Map<byte[], com.vesoft.nebula.Value> parameterMap)
288  throws IOErrorException {
289  try {
290  byte[] result =
291  client.executeJsonWithParameter(sessionID, stmt.getBytes(), parameterMap);
292  return new String(result, StandardCharsets.UTF_8);
293  } catch (TException e) {
294  if (e instanceof TTransportException) {
295  TTransportException te = (TTransportException) e;
296  if (te.getType() == TTransportException.END_OF_FILE) {
297  throw new IOErrorException(IOErrorException.E_CONNECT_BROKEN, te.getMessage());
298  } else if (te.getType() == TTransportException.NOT_OPEN) {
299  throw new IOErrorException(IOErrorException.E_NO_OPEN, te.getMessage());
300  } else if (te.getType() == TTransportException.TIMED_OUT
301  || te.getMessage().contains("Read timed out")) {
302  try {
303  reopen();
304  } catch (ClientServerIncompatibleException ex) {
305  LOGGER.error(ex.getMessage());
306  }
307  throw new IOErrorException(IOErrorException.E_TIME_OUT, te.getMessage());
308  }
309  }
310  throw new IOErrorException(IOErrorException.E_UNKNOWN, e.getMessage());
311  }
312  }
313 
314  public void signout(long sessionId) {
315  client.signout(sessionId);
316  }
317 
318  @Override
319  public boolean ping() {
320  try {
321  execute(0, "YIELD 1;");
322  return true;
323  } catch (IOErrorException e) {
324  return false;
325  }
326  }
327 
328  @Override
329  public boolean ping(long sessionID) {
330  try {
331  execute(sessionID, "YIELD 1;");
332  return true;
333  } catch (IOErrorException e) {
334  return false;
335  }
336  }
337 
338 
339  public void close() {
340  if (transport != null && transport.isOpen()) {
341  transport.close();
342  transport = null;
343  }
344  }
345 
346 }