NebulaGraph Python Client  release-3.4
Session.py
1 # --coding:utf-8--
2 #
3 # Copyright (c) 2021 vesoft inc. All rights reserved.
4 #
5 # This source code is licensed under Apache 2.0 License.
6 
7 
8 import time
9 
10 from nebula3.Exception import (
11  IOErrorException,
12  NotValidConnectionException,
13 )
14 
15 from nebula3.data.ResultSet import ResultSet
16 from nebula3.gclient.net.AuthResult import AuthResult
17 from nebula3.logger import logger
18 
19 
20 class Session(object):
21  def __init__(self, connection, auth_result: AuthResult, pool, retry_connect=True):
22  self._session_id_session_id = auth_result.get_session_id()
23  self._timezone_offset_timezone_offset = auth_result.get_timezone_offset()
24  self._connection_connection = connection
25  self._timezone_timezone = 0
26  # connection the where the session was created, if session pool was used
27  self._pool_pool = pool
28  self._retry_connect_retry_connect = retry_connect
29  # the time stamp when the session was added to the idle list of the session pool
30  self._idle_time_start_idle_time_start = 0
31 
32  def execute_parameter(self, stmt, params):
33  """execute statement
34  :param stmt: the ngql
35  :param params: parameter map
36  :return: ResultSet
37  """
38  if self._connection_connection is None:
39  raise RuntimeError('The session has been released')
40  try:
41  start_time = time.time()
42  resp = self._connection_connection.execute_parameter(self._session_id_session_id, stmt, params)
43  end_time = time.time()
44  return ResultSet(
45  resp,
46  all_latency=int((end_time - start_time) * 1000000),
47  timezone_offset=self._timezone_offset_timezone_offset,
48  )
49  except IOErrorException as ie:
50  if ie.type == IOErrorException.E_CONNECT_BROKEN:
51  self._pool_pool.update_servers_status()
52  if self._retry_connect_retry_connect:
53  if not self._reconnect_reconnect():
54  logger.warning('Retry connect failed')
55  raise IOErrorException(
56  IOErrorException.E_ALL_BROKEN, ie.message
57  )
58  resp = self._connection_connection.execute_parameter(
59  self._session_id_session_id, stmt, params
60  )
61  end_time = time.time()
62  return ResultSet(
63  resp,
64  all_latency=int((end_time - start_time) * 1000000),
65  timezone_offset=self._timezone_offset_timezone_offset,
66  )
67  raise
68  except Exception:
69  raise
70 
71  def execute(self, stmt):
72  """execute statement
73 
74  :param stmt: the ngql
75  :return: ResultSet
76  """
77  return self.execute_parameterexecute_parameter(stmt, None)
78 
79  def execute_json(self, stmt):
80  """execute statement and return the result as a JSON string
81  Date and Datetime will be returned in UTC
82  JSON struct:
83  {
84  "results": [
85  {
86  "columns": [],
87  "data": [
88  {
89  "row": [
90  "row-data"
91  ],
92  "meta": [
93  "metadata"
94  ]
95  }
96  ],
97  "latencyInUs": 0,
98  "spaceName": "",
99  "planDesc ": {
100  "planNodeDescs": [
101  {
102  "name": "",
103  "id": 0,
104  "outputVar": "",
105  "description": {
106  "key": ""
107  },
108  "profiles": [
109  {
110  "rows": 1,
111  "execDurationInUs": 0,
112  "totalDurationInUs": 0,
113  "otherStats": {}
114  }
115  ],
116  "branchInfo": {
117  "isDoBranch": false,
118  "conditionNodeId": -1
119  },
120  "dependencies": []
121  }
122  ],
123  "nodeIndexMap": {},
124  "format": "",
125  "optimize_time_in_us": 0
126  },
127  "comment ": ""
128  }
129  ],
130  "errors": [
131  {
132  "code": 0,
133  "message": ""
134  }
135  ]
136  }
137  :param stmt: the ngql
138  :return: JSON string
139  """
140  return self.execute_json_with_parameterexecute_json_with_parameter(stmt, None)
141 
142  def execute_json_with_parameter(self, stmt, params):
143  """execute statement and return the result as a JSON string
144  Date and Datetime will be returned in UTC
145  JSON struct:
146  {
147  "results": [
148  {
149  "columns": [],
150  "data": [
151  {
152  "row": [
153  "row-data"
154  ],
155  "meta": [
156  "metadata"
157  ]
158  }
159  ],
160  "latencyInUs": 0,
161  "spaceName": "",
162  "planDesc ": {
163  "planNodeDescs": [
164  {
165  "name": "",
166  "id": 0,
167  "outputVar": "",
168  "description": {
169  "key": ""
170  },
171  "profiles": [
172  {
173  "rows": 1,
174  "execDurationInUs": 0,
175  "totalDurationInUs": 0,
176  "otherStats": {}
177  }
178  ],
179  "branchInfo": {
180  "isDoBranch": false,
181  "conditionNodeId": -1
182  },
183  "dependencies": []
184  }
185  ],
186  "nodeIndexMap": {},
187  "format": "",
188  "optimize_time_in_us": 0
189  },
190  "comment ": ""
191  }
192  ],
193  "errors": [
194  {
195  "code": 0,
196  "message": ""
197  }
198  ]
199  }
200  :param stmt: the ngql
201  :param params: parameter map
202  :return: JSON string
203  """
204  if self._connection_connection is None:
205  raise RuntimeError('The session has been released')
206  try:
207  resp_json = self._connection_connection.execute_json_with_parameter(
208  self._session_id_session_id, stmt, params
209  )
210  return resp_json
211  except IOErrorException as ie:
212  if ie.type == IOErrorException.E_CONNECT_BROKEN:
213  self._pool_pool.update_servers_status()
214  if self._retry_connect_retry_connect:
215  if not self._reconnect_reconnect():
216  logger.warning('Retry connect failed')
217  raise IOErrorException(
218  IOErrorException.E_ALL_BROKEN, ie.message
219  )
220  resp_json = self._connection_connection.execute_json_with_parameter(
221  self._session_id_session_id, stmt, params
222  )
223  return resp_json
224  raise
225  except Exception:
226  raise
227 
228  def release(self):
229  """release the connection to pool, and the session couldn't been use again
230 
231  :return:
232  """
233  if self._connection_connection is None:
234  return
235  self._connection_connection.signout(self._session_id_session_id)
236  self._connection_connection.is_used = False
237  self._connection_connection = None
238 
239  def ping(self):
240  """ping at connection level check the connection is valid
241 
242  :return: True or False
243  """
244  if self._connection_connection is None:
245  return False
246  return self._connection_connection.ping()
247 
248  def ping_session(self):
249  """ping at session level, check whether the session is usable"""
250  resp = self.executeexecute(r'RETURN "NEBULA PYTHON SESSION PING"')
251  if resp.is_succeeded():
252  return True
253  else:
254  logger.error(
255  'failed to ping the session: error code:{}, error message:{}'.format(
256  resp.error_code, resp.error_msg
257  )
258  )
259  return False
260 
261  def _reconnect(self):
262  try:
263  self._connection_connection.is_used = False
264  conn = self._pool_pool.get_connection()
265  if conn is None:
266  return False
267  self._connection_connection = conn
268  except NotValidConnectionException:
269  return False
270  return True
271 
272  def __del__(self):
273  self.releaserelease()
274 
275  def _idle_time(self):
276  """get idletime of connection
277 
278  :return: idletime
279  """
280  if self.is_used:
281  return 0
282  return (time.time() - self.start_use_time) * 1000
283 
284  def _sign_out(self):
285  """sign out the session"""
286  if self._connection_connection is None:
287  raise RuntimeError('The session has been released')
288  self._connection_connection.signout(self._session_id_session_id)
def execute_json_with_parameter(self, stmt, params)
Definition: Session.py:142
def execute_parameter(self, stmt, params)
Definition: Session.py:32