NebulaGraph Python Client  release-3.8
Connection.py
1 # --coding:utf-8--
2 #
3 # Copyright (c) 2021 vesoft inc. All rights reserved.
4 #
5 # This source code is licensed under Apache 2.0 License.
6 
7 
8 import time
9 import ssl
10 
11 from nebula3.fbthrift.transport import (
12  TSocket,
13  TSSLSocket,
14  TTransport,
15  THeaderTransport,
16  THttp2Client,
17 )
18 from nebula3.fbthrift.transport.TTransport import TTransportException
19 from nebula3.fbthrift.protocol import THeaderProtocol, TBinaryProtocol
20 
21 from nebula3.common.ttypes import ErrorCode
22 from nebula3.graph import GraphService
23 from nebula3.graph.ttypes import VerifyClientVersionReq
24 from nebula3.logger import logger
25 
26 from nebula3.Exception import (
27  AuthFailedException,
28  IOErrorException,
29  ClientServerIncompatibleException,
30 )
31 
32 from nebula3.gclient.net.AuthResult import AuthResult
33 
34 
35 class Connection(object):
36  is_used = False
37 
38  def __init__(self):
39  self._connection_connection = None
40  self.start_use_timestart_use_time = time.time()
41  self._ip_ip = None
42  self._port_port = None
43  self._timeout_timeout = 0
44  self._ssl_conf_ssl_conf = None
45  self.use_http2use_http2 = False
46  self.http_headershttp_headers = None
47  self._closed_closed = True
48 
49  def open(self, ip, port, timeout, use_http2=False, http_headers=None):
50  """open the connection
51 
52  :param ip: the server ip
53  :param port: the server port
54  :param timeout: the timeout for connect and execute
55  :param use_http2: use http2 or not
56  :param http_headers: http headers
57  :return: void
58  """
59  self.open_SSLopen_SSL(ip, port, timeout, None, use_http2, http_headers)
60 
61  def open_SSL(
62  self, ip, port, timeout, ssl_config=None, use_http2=False, http_headers=None
63  ):
64  """open the SSL connection
65 
66  :param ip: the server ip
67  :param port: the server port
68  :param timeout: the timeout for connect and execute
69  :ssl_config: configs for SSL
70  :param use_http2: use http2 or not
71  :param http_headers: http headers
72  :return: void
73  """
74  self._ip_ip = ip
75  self._port_port = port
76  self._timeout_timeout = timeout
77  self._ssl_conf_ssl_conf = ssl_config
78  self.use_http2use_http2 = use_http2
79  self.http_headershttp_headers = http_headers
80  try:
81  if use_http2 is False:
82  protocol = self.__get_protocol__get_protocol(timeout, ssl_config)
83  else:
84  protocol = self.__get_protocal_http2__get_protocal_http2(timeout, ssl_config, http_headers)
85  self._connection_connection = GraphService.Client(protocol)
86  resp = self._connection_connection.verifyClientVersion(VerifyClientVersionReq())
87  if resp.error_code != ErrorCode.SUCCEEDED:
88  self._connection_connection._iprot.trans.close()
89  raise ClientServerIncompatibleException(resp.error_msg)
90  except Exception as e:
91  self.closeclose()
92  raise
93  self._closed_closed = False
94 
95  def __get_protocol(self, timeout, ssl_config):
96  try:
97  if ssl_config is not None:
98  s = TSSLSocket.TSSLSocket(
99  self._ip_ip,
100  self._port_port,
101  ssl_config.unix_socket,
102  ssl_config.ssl_version,
103  ssl_config.cert_reqs,
104  ssl_config.ca_certs,
105  ssl_config.verify_name,
106  ssl_config.keyfile,
107  ssl_config.certfile,
108  ssl_config.allow_weak_ssl_versions,
109  )
110  else:
111  s = TSocket.TSocket(self._ip_ip, self._port_port)
112  if timeout > 0:
113  s.setTimeout(timeout)
114 
115  buffered_transport = TTransport.TBufferedTransport(s)
116  header_transport = THeaderTransport.THeaderTransport(buffered_transport)
117  protocol = THeaderProtocol.THeaderProtocol(header_transport)
118  header_transport.open()
119  except Exception as e:
120  raise
121  return protocol
122 
123  def __get_protocal_http2(self, timeout, ssl_config, http_headers):
124  verify, certfile, keyfile, password = None, None, None, None
125  if ssl_config is not None:
126  # verify could be a boolean or ssl.SSLContext in httpx.
127  verify = ssl.create_default_context(cafile=ssl_config.ca_certs)
128  certfile = ssl_config.certfile
129  keyfile = ssl_config.keyfile
130  url = "https://" + self._ip_ip + ":" + str(self._port_port)
131  else:
132  url = "http://" + self._ip_ip + ":" + str(self._port_port)
133  try:
134  transport = THttp2Client.THttp2Client(
135  url, timeout, verify, certfile, keyfile, password, http_headers
136  )
137  transport.open()
138  protocol = TBinaryProtocol.TBinaryProtocol(transport)
139  except Exception as e:
140  raise
141  return protocol
142 
143  def _reopen(self):
144  """reopen the connection
145 
146  :return:
147  """
148  self.closeclose()
149  if self._ssl_conf_ssl_conf is not None:
150  self.open_SSLopen_SSL(
151  self._ip_ip,
152  self._port_port,
153  self._timeout_timeout,
154  self._ssl_conf_ssl_conf,
155  self.use_http2use_http2,
156  self.http_headershttp_headers,
157  )
158  self._closed_closed = False
159  else:
160  self.openopen(
161  self._ip_ip, self._port_port, self._timeout_timeout, self.use_http2use_http2, self.http_headershttp_headers
162  )
163 
164  def authenticate(self, user_name, password):
165  """authenticate to graphd
166 
167  :param user_name: the user name
168  :param password: the password
169  :return:
170  """
171  try:
172  resp = self._connection_connection.authenticate(user_name, password)
173  if resp.error_code != ErrorCode.SUCCEEDED:
174  self._connection_connection.is_used = False
175  raise AuthFailedException(resp.error_msg)
176  return AuthResult(
177  resp.session_id, resp.time_zone_offset_seconds, resp.time_zone_name
178  )
179  except TTransportException as te:
180  if te.message.find("timed out"):
181  self._reopen_reopen()
182  if te.type == TTransportException.END_OF_FILE:
183  self.closeclose()
184  raise IOErrorException(IOErrorException.E_CONNECT_BROKEN, te.message)
185 
186  def execute(self, session_id, stmt):
187  """execute interface with session_id and ngql
188 
189  :param session_id: the session id get from result of authenticate interface
190  :param stmt: the ngql
191  :return: ExecutionResponse
192  """
193  return self.execute_parameterexecute_parameter(session_id, stmt, None)
194 
195  def execute_parameter(self, session_id, stmt, params):
196  """execute interface with session_id and ngql
197  :param session_id: the session id get from result of authenticate interface
198  :param stmt: the ngql
199  :param params: parameter map
200  :return: ExecutionResponse
201  """
202  try:
203  resp = self._connection_connection.executeWithParameter(session_id, stmt, params)
204  return resp
205  except Exception as te:
206  if isinstance(te, TTransportException):
207  if te.message.find("timed out") > 0:
208  self._reopen_reopen()
209  raise IOErrorException(IOErrorException.E_TIMEOUT, te.message)
210  elif te.type == TTransportException.END_OF_FILE:
211  raise IOErrorException(
212  IOErrorException.E_CONNECT_BROKEN, te.message
213  )
214  elif te.type == TTransportException.NOT_OPEN:
215  raise IOErrorException(IOErrorException.E_NOT_OPEN, te.message)
216  else:
217  raise IOErrorException(IOErrorException.E_UNKNOWN, te.message)
218  raise
219 
220  def execute_json(self, session_id, stmt):
221  """execute_json interface with session_id and ngql
222  :param session_id: the session id get from result of authenticate interface
223  :param stmt: the ngql
224  :return: string json representing the execution result
225  """
226  return self.execute_json_with_parameterexecute_json_with_parameter(session_id, stmt, None)
227 
228  def execute_json_with_parameter(self, session_id, stmt, params):
229  """execute_json interface with session_id and ngql with parameter
230  :param session_id: the session id get from result of authenticate interface
231  :param stmt: the ngql
232  :param params: parameter map
233  :return: json bytes representing the execution result
234  """
235  try:
236  resp = self._connection_connection.executeJsonWithParameter(session_id, stmt, params)
237  if not isinstance(resp, bytes):
238  raise TypeError("response is not bytes")
239  else:
240  return resp
241  except Exception as te:
242  if isinstance(te, TTransportException):
243  if te.message.find("timed out") > 0:
244  self._reopen_reopen()
245  raise IOErrorException(IOErrorException.E_TIMEOUT, te.message)
246  elif te.type == TTransportException.END_OF_FILE:
247  raise IOErrorException(
248  IOErrorException.E_CONNECT_BROKEN, te.message
249  )
250  elif te.type == TTransportException.NOT_OPEN:
251  raise IOErrorException(IOErrorException.E_NOT_OPEN, te.message)
252  else:
253  raise IOErrorException(IOErrorException.E_UNKNOWN, te.message)
254  raise
255 
256  def signout(self, session_id):
257  """tells the graphd can release the session info
258 
259  :param session_id:the session id
260  :return: void
261  """
262  try:
263  self._connection_connection.signout(session_id)
264  except TTransportException as te:
265  if te.type == TTransportException.END_OF_FILE:
266  self.closeclose()
267 
268  def close(self):
269  """close the connection
270 
271  :return: void
272  """
273  try:
274  if not self._closed_closed:
275  self._connection_connection._iprot.trans.close()
276  self._closed_closed = True
277  except Exception as e:
278  logger.error(
279  "Close connection to {}:{} failed:{}".format(self._ip_ip, self._port_port, e)
280  )
281 
282  def ping(self):
283  """check the connection if ok
284  :return: True or False
285  """
286  try:
287  resp = self._connection_connection.execute(0, "YIELD 1;")
288  return True
289  except Exception:
290  return False
291 
292  def reset(self):
293  """reset the idletime
294 
295  :return: void
296  """
297  self.start_use_timestart_use_time = time.time()
298 
299  def idle_time(self):
300  """get idletime of connection
301 
302  :return: idletime
303  """
304  if self.is_usedis_used:
305  return 0
306  return (time.time() - self.start_use_timestart_use_time) * 1000
307 
308  def get_address(self):
309  """get the address of the connected service
310 
311  :return: (ip, port)
312  """
313  return (self._ip_ip, self._port_port)
def open_SSL(self, ip, port, timeout, ssl_config=None, use_http2=False, http_headers=None)
Definition: Connection.py:63
def __get_protocol(self, timeout, ssl_config)
Definition: Connection.py:95
def execute(self, session_id, stmt)
Definition: Connection.py:186
def execute_parameter(self, session_id, stmt, params)
Definition: Connection.py:195
def open(self, ip, port, timeout, use_http2=False, http_headers=None)
Definition: Connection.py:49
def execute_json(self, session_id, stmt)
Definition: Connection.py:220
def execute_json_with_parameter(self, session_id, stmt, params)
Definition: Connection.py:228
def authenticate(self, user_name, password)
Definition: Connection.py:164
def __get_protocal_http2(self, timeout, ssl_config, http_headers)
Definition: Connection.py:123