NebulaGraph Python Client  release-3.4
Connection.py
1 # --coding:utf-8--
2 #
3 # Copyright (c) 2021 vesoft inc. All rights reserved.
4 #
5 # This source code is licensed under Apache 2.0 License.
6 
7 
8 import time
9 
10 from nebula3.fbthrift.transport import (
11  TSocket,
12  TSSLSocket,
13  TTransport,
14  THeaderTransport,
15 )
16 from nebula3.fbthrift.transport.TTransport import TTransportException
17 from nebula3.fbthrift.protocol import THeaderProtocol
18 
19 from nebula3.common.ttypes import ErrorCode
20 from nebula3.graph import GraphService
21 from nebula3.graph.ttypes import VerifyClientVersionReq
22 from nebula3.logger import logger
23 
24 from nebula3.Exception import (
25  AuthFailedException,
26  IOErrorException,
27  ClientServerIncompatibleException,
28 )
29 
30 from nebula3.gclient.net.AuthResult import AuthResult
31 
32 
33 class Connection(object):
34  is_used = False
35 
36  def __init__(self):
37  self._connection_connection = None
38  self.start_use_timestart_use_time = time.time()
39  self._ip_ip = None
40  self._port_port = None
41  self._timeout_timeout = 0
42  self._ssl_conf_ssl_conf = None
43 
44  def open(self, ip, port, timeout):
45  """open the connection
46 
47  :param ip: the server ip
48  :param port: the server port
49  :param timeout: the timeout for connect and execute
50  :return: void
51  """
52  self.open_SSLopen_SSL(ip, port, timeout, None)
53 
54  def open_SSL(self, ip, port, timeout, ssl_config=None):
55  """open the SSL connection
56 
57  :param ip: the server ip
58  :param port: the server port
59  :param timeout: the timeout for connect and execute
60  :ssl_config: configs for SSL
61  :return: void
62  """
63  self._ip_ip = ip
64  self._port_port = port
65  self._timeout_timeout = timeout
66  self._ssl_conf_ssl_conf = ssl_config
67  try:
68  if ssl_config is not None:
69  s = TSSLSocket.TSSLSocket(
70  self._ip_ip,
71  self._port_port,
72  ssl_config.unix_socket,
73  ssl_config.ssl_version,
74  ssl_config.cert_reqs,
75  ssl_config.ca_certs,
76  ssl_config.verify_name,
77  ssl_config.keyfile,
78  ssl_config.certfile,
79  ssl_config.allow_weak_ssl_versions,
80  )
81  else:
82  s = TSocket.TSocket(self._ip_ip, self._port_port)
83  if timeout > 0:
84  s.setTimeout(timeout)
85 
86  buffered_transport = TTransport.TBufferedTransport(s)
87  header_transport = THeaderTransport.THeaderTransport(buffered_transport)
88  protocol = THeaderProtocol.THeaderProtocol(header_transport)
89  header_transport.open()
90 
91  self._connection_connection = GraphService.Client(protocol)
92  resp = self._connection_connection.verifyClientVersion(VerifyClientVersionReq())
93  if resp.error_code != ErrorCode.SUCCEEDED:
94  self._connection_connection._iprot.trans.close()
95  raise ClientServerIncompatibleException(resp.error_msg)
96  except Exception:
97  raise
98 
99  def _reopen(self):
100  """reopen the connection
101 
102  :return:
103  """
104  self.closeclose()
105  if self._ssl_conf_ssl_conf is not None:
106  self.open_SSLopen_SSL(self._ip_ip, self._port_port, self._timeout_timeout, self._ssl_conf_ssl_conf)
107  else:
108  self.openopen(self._ip_ip, self._port_port, self._timeout_timeout)
109 
110  def authenticate(self, user_name, password):
111  """authenticate to graphd
112 
113  :param user_name: the user name
114  :param password: the password
115  :return:
116  """
117  try:
118  resp = self._connection_connection.authenticate(user_name, password)
119  if resp.error_code != ErrorCode.SUCCEEDED:
120  raise AuthFailedException(resp.error_msg)
121  return AuthResult(
122  resp.session_id, resp.time_zone_offset_seconds, resp.time_zone_name
123  )
124  except TTransportException as te:
125  if te.message.find("timed out"):
126  self._reopen_reopen()
127  if te.type == TTransportException.END_OF_FILE:
128  self.closeclose()
129  raise IOErrorException(IOErrorException.E_CONNECT_BROKEN, te.message)
130 
131  def execute(self, session_id, stmt):
132  """execute interface with session_id and ngql
133 
134  :param session_id: the session id get from result of authenticate interface
135  :param stmt: the ngql
136  :return: ExecutionResponse
137  """
138  return self.execute_parameterexecute_parameter(session_id, stmt, None)
139 
140  def execute_parameter(self, session_id, stmt, params):
141  """execute interface with session_id and ngql
142  :param session_id: the session id get from result of authenticate interface
143  :param stmt: the ngql
144  :param params: parameter map
145  :return: ExecutionResponse
146  """
147  try:
148  resp = self._connection_connection.executeWithParameter(session_id, stmt, params)
149  return resp
150  except Exception as te:
151  if isinstance(te, TTransportException):
152  if te.message.find("timed out") > 0:
153  self._reopen_reopen()
154  raise IOErrorException(IOErrorException.E_TIMEOUT, te.message)
155  elif te.type == TTransportException.END_OF_FILE:
156  raise IOErrorException(
157  IOErrorException.E_CONNECT_BROKEN, te.message
158  )
159  elif te.type == TTransportException.NOT_OPEN:
160  raise IOErrorException(IOErrorException.E_NOT_OPEN, te.message)
161  else:
162  raise IOErrorException(IOErrorException.E_UNKNOWN, te.message)
163  raise
164 
165  def execute_json(self, session_id, stmt):
166  """execute_json interface with session_id and ngql
167  :param session_id: the session id get from result of authenticate interface
168  :param stmt: the ngql
169  :return: string json representing the execution result
170  """
171  return self.execute_json_with_parameterexecute_json_with_parameter(session_id, stmt, None)
172 
173  def execute_json_with_parameter(self, session_id, stmt, params):
174  """execute_json interface with session_id and ngql with parameter
175  :param session_id: the session id get from result of authenticate interface
176  :param stmt: the ngql
177  :param params: parameter map
178  :return: string json representing the execution result
179  """
180  try:
181  resp = self._connection_connection.executeJsonWithParameter(session_id, stmt, params)
182  return resp
183  except Exception as te:
184  if isinstance(te, TTransportException):
185  if te.message.find("timed out") > 0:
186  self._reopen_reopen()
187  raise IOErrorException(IOErrorException.E_TIMEOUT, te.message)
188  elif te.type == TTransportException.END_OF_FILE:
189  raise IOErrorException(
190  IOErrorException.E_CONNECT_BROKEN, te.message
191  )
192  elif te.type == TTransportException.NOT_OPEN:
193  raise IOErrorException(IOErrorException.E_NOT_OPEN, te.message)
194  else:
195  raise IOErrorException(IOErrorException.E_UNKNOWN, te.message)
196  raise
197 
198  def signout(self, session_id):
199  """tells the graphd can release the session info
200 
201  :param session_id:the session id
202  :return: void
203  """
204  try:
205  self._connection_connection.signout(session_id)
206  except TTransportException as te:
207  if te.type == TTransportException.END_OF_FILE:
208  self.closeclose()
209 
210  def close(self):
211  """close the connection
212 
213  :return: void
214  """
215  try:
216  self._connection_connection._iprot.trans.close()
217  except Exception as e:
218  logger.error(
219  'Close connection to {}:{} failed:{}'.format(self._ip_ip, self._port_port, e)
220  )
221 
222  def ping(self):
223  """check the connection if ok
224  :return: True or False
225  """
226  try:
227  resp = self._connection_connection.execute(0, 'YIELD 1;')
228  return True
229  except Exception:
230  return False
231 
232  def reset(self):
233  """reset the idletime
234 
235  :return: void
236  """
237  self.start_use_timestart_use_time = time.time()
238 
239  def idle_time(self):
240  """get idletime of connection
241 
242  :return: idletime
243  """
244  if self.is_usedis_used:
245  return 0
246  return (time.time() - self.start_use_timestart_use_time) * 1000
247 
248  def get_address(self):
249  """get the address of the connected service
250 
251  :return: (ip, port)
252  """
253  return (self._ip_ip, self._port_port)
def open(self, ip, port, timeout)
Definition: Connection.py:44
def open_SSL(self, ip, port, timeout, ssl_config=None)
Definition: Connection.py:54
def execute(self, session_id, stmt)
Definition: Connection.py:131
def execute_parameter(self, session_id, stmt, params)
Definition: Connection.py:140
def execute_json(self, session_id, stmt)
Definition: Connection.py:165
def execute_json_with_parameter(self, session_id, stmt, params)
Definition: Connection.py:173
def authenticate(self, user_name, password)
Definition: Connection.py:110