11 from typing
import TYPE_CHECKING
13 from nebula3.Exception
import (
15 NotValidConnectionException,
17 from nebula3.common.ttypes
import ErrorCode
18 from nebula3.data.ResultSet
import ResultSet
21 from nebula3.logger
import logger
31 connection:
"Connection",
32 auth_result: AuthResult,
33 pool:
"ConnectionPool",
35 execution_retry_count=0,
36 retry_interval_seconds=1,
39 Initialize the Session object.
41 :param connection: The connection object associated with the session.
42 :param auth_result: The result of the authentication process.
43 :param pool: The pool object where the session was created.
44 :param retry_connect: A boolean indicating whether to retry the connection if it fails.
45 :param execution_retry_count: The number of attempts to retry the execution upon encountering an execution error(-1005), with the default being 0 (no retries).
46 :param retry_interval_seconds: The interval between connection retries in seconds.
48 self.
_session_id_session_id = auth_result.get_session_id()
53 self.
_pool_pool = pool
71 :param params: parameter map
75 raise RuntimeError(
"The session has been released")
77 start_time = time.time()
79 end_time = time.time()
83 and resp.error_code == ErrorCode.E_EXECUTION_ERROR
87 f
"Execution error, retrying {retry_count}/{self._execution_retry_count} after {self._retry_interval_seconds}s"
93 if resp.error_code != ErrorCode.E_EXECUTION_ERROR:
98 all_latency=int((end_time - start_time) * 1000000),
101 except IOErrorException
as ie:
102 if ie.type == IOErrorException.E_CONNECT_BROKEN:
103 self.
_pool_pool.update_servers_status()
106 logger.warning(
"Retry connect failed")
107 raise IOErrorException(
108 IOErrorException.E_ALL_BROKEN, ie.message
113 end_time = time.time()
116 all_latency=int((end_time - start_time) * 1000000),
124 """execute statement and return the result as a JSON bytes
125 Date and Datetime will be returned in UTC
155 "execDurationInUs": 0,
156 "totalDurationInUs": 0,
162 "conditionNodeId": -1
169 "optimize_time_in_us": 0
181 :param stmt: the ngql
187 """execute statement and return the result as a JSON bytes
188 Date and Datetime will be returned in UTC
218 "execDurationInUs": 0,
219 "totalDurationInUs": 0,
225 "conditionNodeId": -1
232 "optimize_time_in_us": 0
244 :param stmt: the ngql
245 :param params: parameter map
249 raise RuntimeError(
"The session has been released")
257 json.loads(resp_json).get(
"errors", [{}])[0].get(
"code")
258 != ErrorCode.E_EXECUTION_ERROR
262 "Execute failed, retry count:{}/{} in {} seconds".format(
274 except IOErrorException
as ie:
275 if ie.type == IOErrorException.E_CONNECT_BROKEN:
276 self.
_pool_pool.update_servers_status()
279 logger.warning(
"Retry connect failed")
280 raise IOErrorException(
281 IOErrorException.E_ALL_BROKEN, ie.message
292 """release the connection to pool, and the session couldn't been use again
303 """ping at connection level check the connection is valid
305 :return: True or False
312 """ping at session level, check whether the session is usable"""
313 resp = self.
executeexecuteexecute(
r'RETURN "NEBULA PYTHON SESSION PING"')
314 if resp.is_succeeded():
318 "failed to ping the session: error code:{}, error message:{}".format(
319 resp.error_code, resp.error_msg
324 def _reconnect(self):
327 conn = self.
_pool_pool.get_connection()
331 except NotValidConnectionException:
338 def _idle_time(self):
339 """get idletime of connection
345 return (time.time() - self.start_use_time) * 1000
348 """sign out the session"""
350 raise RuntimeError(
"The session has been released")
def __init__(self, "Connection" connection, AuthResult auth_result, "ConnectionPool" pool, retry_connect=True, execution_retry_count=0, retry_interval_seconds=1)
def execute_json_with_parameter(self, stmt, params)
def execute_json(self, stmt)
def execute_parameter(self, stmt, params)
ResultSet execute(self, str stmt)