NebulaGraph Python Client  release-3.8
1 # --coding:utf-8--
2 #
3 # Copyright (c) 2021 vesoft inc. All rights reserved.
4 #
5 # This source code is licensed under Apache 2.0 License.
8 import json
9 import time
11 from typing import TYPE_CHECKING
13 from nebula3.Exception import (
14  IOErrorException,
15  NotValidConnectionException,
16 )
17 from nebula3.common.ttypes import ErrorCode
18 from import ResultSet
19 from import AuthResult
20 from import BaseExecutor
21 from nebula3.logger import logger
24  from import ConnectionPool
25  from import Connection
28 class Session(BaseExecutor, object):
29  def __init__(
30  self,
31  connection: "Connection",
32  auth_result: AuthResult,
33  pool: "ConnectionPool",
34  retry_connect=True,
35  execution_retry_count=0,
36  retry_interval_seconds=1,
37  ):
38  """
39  Initialize the Session object.
41  :param connection: The connection object associated with the session.
42  :param auth_result: The result of the authentication process.
43  :param pool: The pool object where the session was created.
44  :param retry_connect: A boolean indicating whether to retry the connection if it fails.
45  :param execution_retry_count: The number of attempts to retry the execution upon encountering an execution error(-1005), with the default being 0 (no retries).
46  :param retry_interval_seconds: The interval between connection retries in seconds.
47  """
48  self._session_id_session_id = auth_result.get_session_id()
49  self._timezone_offset_timezone_offset = auth_result.get_timezone_offset()
50  self._connection_connection = connection
51  self._timezone_timezone = 0
52  # connection the where the session was created, if session pool was used
53  self._pool_pool = pool
54  self._retry_connect_retry_connect = retry_connect
55  self._execution_retry_count_execution_retry_count = execution_retry_count
56  self._retry_interval_seconds_retry_interval_seconds = retry_interval_seconds
57  # the time stamp when the session was added to the idle list of the session pool
58  self._idle_time_start_idle_time_start = 0
60  def execute(self, stmt):
61  """execute statement
63  :param stmt: the ngql
64  :return: ResultSet
65  """
66  return super().execute(stmt)
68  def execute_parameter(self, stmt, params):
69  """execute statement
70  :param stmt: the ngql
71  :param params: parameter map
72  :return: ResultSet
73  """
74  if self._connection_connection is None:
75  raise RuntimeError("The session has been released")
76  try:
77  start_time = time.time()
78  resp = self._connection_connection.execute_parameter(self._session_id_session_id, stmt, params)
79  end_time = time.time()
81  if (
82  self._execution_retry_count_execution_retry_count > 0
83  and resp.error_code == ErrorCode.E_EXECUTION_ERROR
84  ):
85  for retry_count in range(1, self._execution_retry_count_execution_retry_count + 1):
86  logger.warning(
87  f"Execution error, retrying {retry_count}/{self._execution_retry_count} after {self._retry_interval_seconds}s"
88  )
89  time.sleep(self._retry_interval_seconds_retry_interval_seconds)
90  resp = self._connection_connection.execute_parameter(
91  self._session_id_session_id, stmt, params
92  )
93  if resp.error_code != ErrorCode.E_EXECUTION_ERROR:
94  break
96  return ResultSet(
97  resp,
98  all_latency=int((end_time - start_time) * 1000000),
99  timezone_offset=self._timezone_offset_timezone_offset,
100  )
101  except IOErrorException as ie:
102  if ie.type == IOErrorException.E_CONNECT_BROKEN:
103  self._pool_pool.update_servers_status()
104  if self._retry_connect_retry_connect:
105  if not self._reconnect_reconnect():
106  logger.warning("Retry connect failed")
107  raise IOErrorException(
108  IOErrorException.E_ALL_BROKEN, ie.message
109  )
110  resp = self._connection_connection.execute_parameter(
111  self._session_id_session_id, stmt, params
112  )
113  end_time = time.time()
114  return ResultSet(
115  resp,
116  all_latency=int((end_time - start_time) * 1000000),
117  timezone_offset=self._timezone_offset_timezone_offset,
118  )
119  raise
120  except Exception:
121  raise
123  def execute_json(self, stmt):
124  """execute statement and return the result as a JSON bytes
125  Date and Datetime will be returned in UTC
126  JSON struct:
127  {
128  "results": [
129  {
130  "columns": [],
131  "data": [
132  {
133  "row": [
134  "row-data"
135  ],
136  "meta": [
137  "metadata"
138  ]
139  }
140  ],
141  "latencyInUs": 0,
142  "spaceName": "",
143  "planDesc ": {
144  "planNodeDescs": [
145  {
146  "name": "",
147  "id": 0,
148  "outputVar": "",
149  "description": {
150  "key": ""
151  },
152  "profiles": [
153  {
154  "rows": 1,
155  "execDurationInUs": 0,
156  "totalDurationInUs": 0,
157  "otherStats": {}
158  }
159  ],
160  "branchInfo": {
161  "isDoBranch": false,
162  "conditionNodeId": -1
163  },
164  "dependencies": []
165  }
166  ],
167  "nodeIndexMap": {},
168  "format": "",
169  "optimize_time_in_us": 0
170  },
171  "comment ": ""
172  }
173  ],
174  "errors": [
175  {
176  "code": 0,
177  "message": ""
178  }
179  ]
180  }
181  :param stmt: the ngql
182  :return: JSON bytes
183  """
184  return super().execute_json(stmt)
186  def execute_json_with_parameter(self, stmt, params):
187  """execute statement and return the result as a JSON bytes
188  Date and Datetime will be returned in UTC
189  JSON struct:
190  {
191  "results": [
192  {
193  "columns": [],
194  "data": [
195  {
196  "row": [
197  "row-data"
198  ],
199  "meta": [
200  "metadata"
201  ]
202  }
203  ],
204  "latencyInUs": 0,
205  "spaceName": "",
206  "planDesc ": {
207  "planNodeDescs": [
208  {
209  "name": "",
210  "id": 0,
211  "outputVar": "",
212  "description": {
213  "key": ""
214  },
215  "profiles": [
216  {
217  "rows": 1,
218  "execDurationInUs": 0,
219  "totalDurationInUs": 0,
220  "otherStats": {}
221  }
222  ],
223  "branchInfo": {
224  "isDoBranch": false,
225  "conditionNodeId": -1
226  },
227  "dependencies": []
228  }
229  ],
230  "nodeIndexMap": {},
231  "format": "",
232  "optimize_time_in_us": 0
233  },
234  "comment ": ""
235  }
236  ],
237  "errors": [
238  {
239  "code": 0,
240  "message": ""
241  }
242  ]
243  }
244  :param stmt: the ngql
245  :param params: parameter map
246  :return: JSON bytes
247  """
248  if self._connection_connection is None:
249  raise RuntimeError("The session has been released")
250  try:
251  resp_json = self._connection_connection.execute_json_with_parameter(
252  self._session_id_session_id, stmt, params
253  )
254  if self._execution_retry_count_execution_retry_count > 0:
255  for retry_count in range(self._execution_retry_count_execution_retry_count):
256  if (
257  json.loads(resp_json).get("errors", [{}])[0].get("code")
258  != ErrorCode.E_EXECUTION_ERROR
259  ):
260  break
261  logger.warning(
262  "Execute failed, retry count:{}/{} in {} seconds".format(
263  retry_count + 1,
264  self._execution_retry_count_execution_retry_count,
265  self._retry_interval_seconds_retry_interval_seconds,
266  )
267  )
268  time.sleep(self._retry_interval_seconds_retry_interval_seconds)
269  resp_json = self._connection_connection.execute_json_with_parameter(
270  self._session_id_session_id, stmt, params
271  )
272  return resp_json
274  except IOErrorException as ie:
275  if ie.type == IOErrorException.E_CONNECT_BROKEN:
276  self._pool_pool.update_servers_status()
277  if self._retry_connect_retry_connect:
278  if not self._reconnect_reconnect():
279  logger.warning("Retry connect failed")
280  raise IOErrorException(
281  IOErrorException.E_ALL_BROKEN, ie.message
282  )
283  resp_json = self._connection_connection.execute_json_with_parameter(
284  self._session_id_session_id, stmt, params
285  )
286  return resp_json
287  raise
288  except Exception:
289  raise
291  def release(self):
292  """release the connection to pool, and the session couldn't been use again
294  :return:
295  """
296  if self._connection_connection is None:
297  return
298  self._connection_connection.signout(self._session_id_session_id)
299  self._connection_connection.is_used = False
300  self._connection_connection = None
302  def ping(self):
303  """ping at connection level check the connection is valid
305  :return: True or False
306  """
307  if self._connection_connection is None:
308  return False
309  return
311  def ping_session(self):
312  """ping at session level, check whether the session is usable"""
313  resp = self.executeexecuteexecute(r'RETURN "NEBULA PYTHON SESSION PING"')
314  if resp.is_succeeded():
315  return True
316  else:
317  logger.error(
318  "failed to ping the session: error code:{}, error message:{}".format(
319  resp.error_code, resp.error_msg
320  )
321  )
322  return False
324  def _reconnect(self):
325  try:
326  self._connection_connection.is_used = False
327  conn = self._pool_pool.get_connection()
328  if conn is None:
329  return False
330  self._connection_connection = conn
331  except NotValidConnectionException:
332  return False
333  return True
335  def __del__(self):
336  self.releaserelease()
338  def _idle_time(self):
339  """get idletime of connection
341  :return: idletime
342  """
343  if self.is_used:
344  return 0
345  return (time.time() - self.start_use_time) * 1000
347  def _sign_out(self):
348  """sign out the session"""
349  if self._connection_connection is None:
350  raise RuntimeError("The session has been released")
351  self._connection_connection.signout(self._session_id_session_id)
def __init__(self, "Connection" connection, AuthResult auth_result, "ConnectionPool" pool, retry_connect=True, execution_retry_count=0, retry_interval_seconds=1)
def execute_json_with_parameter(self, stmt, params)
def execute_parameter(self, stmt, params)
ResultSet execute(self, str stmt)