NebulaGraph Python Client  release-3.8
Session.py
1 # --coding:utf-8--
2 #
3 # Copyright (c) 2021 vesoft inc. All rights reserved.
4 #
5 # This source code is licensed under Apache 2.0 License.
6 
7 
8 import json
9 import time
10 
11 from typing import TYPE_CHECKING
12 
13 from nebula3.Exception import (
14  IOErrorException,
15  NotValidConnectionException,
16 )
17 from nebula3.common.ttypes import ErrorCode
18 from nebula3.data.ResultSet import ResultSet
19 from nebula3.gclient.net.AuthResult import AuthResult
20 from nebula3.gclient.net.base import BaseExecutor
21 from nebula3.logger import logger
22 
23 if TYPE_CHECKING:
24  from nebula3.gclient.net.ConnectionPool import ConnectionPool
25  from nebula3.gclient.net.Connection import Connection
26 
27 
28 class Session(BaseExecutor, object):
29  def __init__(
30  self,
31  connection: "Connection",
32  auth_result: AuthResult,
33  pool: "ConnectionPool",
34  retry_connect=True,
35  execution_retry_count=0,
36  retry_interval_seconds=1,
37  ):
38  """
39  Initialize the Session object.
40 
41  :param connection: The connection object associated with the session.
42  :param auth_result: The result of the authentication process.
43  :param pool: The pool object where the session was created.
44  :param retry_connect: A boolean indicating whether to retry the connection if it fails.
45  :param execution_retry_count: The number of attempts to retry the execution upon encountering an execution error(-1005), with the default being 0 (no retries).
46  :param retry_interval_seconds: The interval between connection retries in seconds.
47  """
48  self._session_id_session_id = auth_result.get_session_id()
49  self._timezone_offset_timezone_offset = auth_result.get_timezone_offset()
50  self._connection_connection = connection
51  self._timezone_timezone = 0
52  # connection the where the session was created, if session pool was used
53  self._pool_pool = pool
54  self._retry_connect_retry_connect = retry_connect
55  self._execution_retry_count_execution_retry_count = execution_retry_count
56  self._retry_interval_seconds_retry_interval_seconds = retry_interval_seconds
57  # the time stamp when the session was added to the idle list of the session pool
58  self._idle_time_start_idle_time_start = 0
59 
60  def execute(self, stmt):
61  """execute statement
62 
63  :param stmt: the ngql
64  :return: ResultSet
65  """
66  return super().execute(stmt)
67 
68  def execute_parameter(self, stmt, params):
69  """execute statement
70  :param stmt: the ngql
71  :param params: parameter map
72  :return: ResultSet
73  """
74  if self._connection_connection is None:
75  raise RuntimeError("The session has been released")
76  try:
77  start_time = time.time()
78  resp = self._connection_connection.execute_parameter(self._session_id_session_id, stmt, params)
79  end_time = time.time()
80 
81  if (
82  self._execution_retry_count_execution_retry_count > 0
83  and resp.error_code == ErrorCode.E_EXECUTION_ERROR
84  ):
85  for retry_count in range(1, self._execution_retry_count_execution_retry_count + 1):
86  logger.warning(
87  f"Execution error, retrying {retry_count}/{self._execution_retry_count} after {self._retry_interval_seconds}s"
88  )
89  time.sleep(self._retry_interval_seconds_retry_interval_seconds)
90  resp = self._connection_connection.execute_parameter(
91  self._session_id_session_id, stmt, params
92  )
93  if resp.error_code != ErrorCode.E_EXECUTION_ERROR:
94  break
95 
96  return ResultSet(
97  resp,
98  all_latency=int((end_time - start_time) * 1000000),
99  timezone_offset=self._timezone_offset_timezone_offset,
100  )
101  except IOErrorException as ie:
102  if ie.type == IOErrorException.E_CONNECT_BROKEN:
103  self._pool_pool.update_servers_status()
104  if self._retry_connect_retry_connect:
105  if not self._reconnect_reconnect():
106  logger.warning("Retry connect failed")
107  raise IOErrorException(
108  IOErrorException.E_ALL_BROKEN, ie.message
109  )
110  resp = self._connection_connection.execute_parameter(
111  self._session_id_session_id, stmt, params
112  )
113  end_time = time.time()
114  return ResultSet(
115  resp,
116  all_latency=int((end_time - start_time) * 1000000),
117  timezone_offset=self._timezone_offset_timezone_offset,
118  )
119  raise
120  except Exception:
121  raise
122 
123  def execute_json(self, stmt):
124  """execute statement and return the result as a JSON bytes
125  Date and Datetime will be returned in UTC
126  JSON struct:
127  {
128  "results": [
129  {
130  "columns": [],
131  "data": [
132  {
133  "row": [
134  "row-data"
135  ],
136  "meta": [
137  "metadata"
138  ]
139  }
140  ],
141  "latencyInUs": 0,
142  "spaceName": "",
143  "planDesc ": {
144  "planNodeDescs": [
145  {
146  "name": "",
147  "id": 0,
148  "outputVar": "",
149  "description": {
150  "key": ""
151  },
152  "profiles": [
153  {
154  "rows": 1,
155  "execDurationInUs": 0,
156  "totalDurationInUs": 0,
157  "otherStats": {}
158  }
159  ],
160  "branchInfo": {
161  "isDoBranch": false,
162  "conditionNodeId": -1
163  },
164  "dependencies": []
165  }
166  ],
167  "nodeIndexMap": {},
168  "format": "",
169  "optimize_time_in_us": 0
170  },
171  "comment ": ""
172  }
173  ],
174  "errors": [
175  {
176  "code": 0,
177  "message": ""
178  }
179  ]
180  }
181  :param stmt: the ngql
182  :return: JSON bytes
183  """
184  return super().execute_json(stmt)
185 
186  def execute_json_with_parameter(self, stmt, params):
187  """execute statement and return the result as a JSON bytes
188  Date and Datetime will be returned in UTC
189  JSON struct:
190  {
191  "results": [
192  {
193  "columns": [],
194  "data": [
195  {
196  "row": [
197  "row-data"
198  ],
199  "meta": [
200  "metadata"
201  ]
202  }
203  ],
204  "latencyInUs": 0,
205  "spaceName": "",
206  "planDesc ": {
207  "planNodeDescs": [
208  {
209  "name": "",
210  "id": 0,
211  "outputVar": "",
212  "description": {
213  "key": ""
214  },
215  "profiles": [
216  {
217  "rows": 1,
218  "execDurationInUs": 0,
219  "totalDurationInUs": 0,
220  "otherStats": {}
221  }
222  ],
223  "branchInfo": {
224  "isDoBranch": false,
225  "conditionNodeId": -1
226  },
227  "dependencies": []
228  }
229  ],
230  "nodeIndexMap": {},
231  "format": "",
232  "optimize_time_in_us": 0
233  },
234  "comment ": ""
235  }
236  ],
237  "errors": [
238  {
239  "code": 0,
240  "message": ""
241  }
242  ]
243  }
244  :param stmt: the ngql
245  :param params: parameter map
246  :return: JSON bytes
247  """
248  if self._connection_connection is None:
249  raise RuntimeError("The session has been released")
250  try:
251  resp_json = self._connection_connection.execute_json_with_parameter(
252  self._session_id_session_id, stmt, params
253  )
254  if self._execution_retry_count_execution_retry_count > 0:
255  for retry_count in range(self._execution_retry_count_execution_retry_count):
256  if (
257  json.loads(resp_json).get("errors", [{}])[0].get("code")
258  != ErrorCode.E_EXECUTION_ERROR
259  ):
260  break
261  logger.warning(
262  "Execute failed, retry count:{}/{} in {} seconds".format(
263  retry_count + 1,
264  self._execution_retry_count_execution_retry_count,
265  self._retry_interval_seconds_retry_interval_seconds,
266  )
267  )
268  time.sleep(self._retry_interval_seconds_retry_interval_seconds)
269  resp_json = self._connection_connection.execute_json_with_parameter(
270  self._session_id_session_id, stmt, params
271  )
272  return resp_json
273 
274  except IOErrorException as ie:
275  if ie.type == IOErrorException.E_CONNECT_BROKEN:
276  self._pool_pool.update_servers_status()
277  if self._retry_connect_retry_connect:
278  if not self._reconnect_reconnect():
279  logger.warning("Retry connect failed")
280  raise IOErrorException(
281  IOErrorException.E_ALL_BROKEN, ie.message
282  )
283  resp_json = self._connection_connection.execute_json_with_parameter(
284  self._session_id_session_id, stmt, params
285  )
286  return resp_json
287  raise
288  except Exception:
289  raise
290 
291  def release(self):
292  """release the connection to pool, and the session couldn't been use again
293 
294  :return:
295  """
296  if self._connection_connection is None:
297  return
298  self._connection_connection.signout(self._session_id_session_id)
299  self._connection_connection.is_used = False
300  self._connection_connection = None
301 
302  def ping(self):
303  """ping at connection level check the connection is valid
304 
305  :return: True or False
306  """
307  if self._connection_connection is None:
308  return False
309  return self._connection_connection.ping()
310 
311  def ping_session(self):
312  """ping at session level, check whether the session is usable"""
313  resp = self.executeexecuteexecute(r'RETURN "NEBULA PYTHON SESSION PING"')
314  if resp.is_succeeded():
315  return True
316  else:
317  logger.error(
318  "failed to ping the session: error code:{}, error message:{}".format(
319  resp.error_code, resp.error_msg
320  )
321  )
322  return False
323 
324  def _reconnect(self):
325  try:
326  self._connection_connection.is_used = False
327  conn = self._pool_pool.get_connection()
328  if conn is None:
329  return False
330  self._connection_connection = conn
331  except NotValidConnectionException:
332  return False
333  return True
334 
335  def __del__(self):
336  self.releaserelease()
337 
338  def _idle_time(self):
339  """get idletime of connection
340 
341  :return: idletime
342  """
343  if self.is_used:
344  return 0
345  return (time.time() - self.start_use_time) * 1000
346 
347  def _sign_out(self):
348  """sign out the session"""
349  if self._connection_connection is None:
350  raise RuntimeError("The session has been released")
351  self._connection_connection.signout(self._session_id_session_id)
def __init__(self, "Connection" connection, AuthResult auth_result, "ConnectionPool" pool, retry_connect=True, execution_retry_count=0, retry_interval_seconds=1)
Definition: Session.py:37
def execute_json_with_parameter(self, stmt, params)
Definition: Session.py:186
def execute_parameter(self, stmt, params)
Definition: Session.py:68
ResultSet execute(self, str stmt)
Definition: base.py:36