NebulaGraph Python Client  release-3.8
SessionPool.py
1 # --coding:utf-8--
2 #
3 # Copyright (c) 2022 vesoft inc. All rights reserved.
4 #
5 # This source code is licensed under Apache 2.0 License.
6 
7 
8 import json
9 import socket
10 
11 from threading import RLock, Timer
12 from typing import List, Optional
13 import time
14 
15 from nebula3.common.ttypes import ErrorCode
16 from nebula3.Exception import (
17  AuthFailedException,
18  NoValidSessionException,
19  InValidHostname,
20 )
21 
22 from nebula3.gclient.net.Session import Session
23 from nebula3.gclient.net.Connection import Connection
24 from nebula3.gclient.net.base import BaseExecutor
25 from nebula3.logger import logger
26 from nebula3.Config import SessionPoolConfig, SSL_config
27 
28 
29 class SessionPool(BaseExecutor, object):
30  S_OK = 0
31  S_BAD = 1
32 
33  def __init__(self, username, password, space_name, addresses):
34  # user name and password of the session
35  self._username_username = username
36  self._password_password = password
37 
38  # space name bonded to the session
39  self._space_name_space_name = space_name
40 
41  # all addresses of servers
42  self._addresses_addresses = list()
43 
44  # server's status
45  self._addresses_status_addresses_status = dict()
46 
47  # validate the addresses
48  for address in addresses:
49  try:
50  ip = socket.gethostbyname(address[0])
51  except Exception:
52  raise InValidHostname(str(address[0]))
53  ip_port = (ip, address[1])
54  self._addresses_addresses.append(ip_port)
55  self._addresses_status_addresses_status[ip_port] = self.S_BADS_BAD
56 
57  # sessions that are currently in use
58  self._active_sessions: List[Session] = list()
59  # sessions that are currently available
60  self._idle_sessions: List[Session] = list()
61 
62  self._configs_configs = SessionPoolConfig()
63  self._ssl_configs_ssl_configs = None
64  self._lock_lock = RLock()
65 
66  # the index of the next address to connect
67  self._pos_pos = -1
68 
69  # the flag of whether the pool is closed
70  self._close_close = False
71 
72  def __del__(self):
73  self.closeclose()
74 
75  def init(
76  self,
77  configs: Optional[SessionPoolConfig] = None,
78  ssl_configs: Optional[SSL_config] = None,
79  ):
80  """init the session pool
81 
82  :param username: the username of the session
83  :param password: the password of the session
84  :param space_name: the space name of the session
85  :param addresses: the addresses of the servers
86  :param configs: the config of the pool
87 
88  :return: if all addresses are valid, return True else return False.
89  """
90  if configs is not None:
91  assert isinstance(
92  configs, SessionPoolConfig
93  ), "wrong type of SessionPoolConfig, try this: `from nebula3.Config import SessionPoolConfig`"
94  self._configs_configs = configs
95  else:
96  self._configs_configs = SessionPoolConfig()
97  self._ssl_configs_ssl_configs = ssl_configs
98  # check configs
99  try:
100  self._check_configs_check_configs()
101  except Exception as e:
102  logger.error("Invalid configs: {}".format(e))
103  return False
104 
105  if self._close_close:
106  logger.error("The pool has init or closed.")
107  raise RuntimeError("The pool has init or closed.")
108 
109  # ping all servers
110  self.update_servers_statusupdate_servers_status()
111 
112  # check services status in the background
113  self._period_detect_period_detect()
114 
115  ok_num = self.get_ok_servers_numget_ok_servers_num()
116  if ok_num < len(self._addresses_addresses):
117  raise RuntimeError(
118  "The services status exception: {}".format(self._get_services_status_get_services_status())
119  )
120 
121  # iterate all addresses and create sessions to fullfil the min_size
122  for i in range(self._configs_configs.min_size):
123  session = self._new_session_new_session()
124  if session is None:
125  raise RuntimeError("Get session failed")
126  self._add_session_to_idle_add_session_to_idle(session)
127 
128  return True
129 
130  def ping(self, address):
131  """check the server is ok
132 
133  :param address: the server address want to connect
134  :return: True or False
135  """
136  try:
137  conn = Connection()
138  if self._ssl_configs_ssl_configs is None:
139  conn.open(
140  address[0],
141  address[1],
142  1000,
143  self._configs_configs.use_http2,
144  self._configs_configs.http_headers,
145  )
146  else:
147  conn.open_SSL(
148  address[0],
149  address[1],
150  1000,
151  self._ssl_configs_ssl_configs,
152  self._configs_configs.use_http2,
153  self._configs_configs.http_headers,
154  )
155  conn.close()
156  return True
157  except Exception as ex:
158  logger.warning(
159  "Connect {}:{} failed: {}".format(address[0], address[1], ex)
160  )
161  return False
162 
163  def execute(self, stmt):
164  """execute the given query
165  Notice there are some limitations:
166  1. The query should not be a plain space switch statement, e.g. "USE test_space",
167  but queries like "use space xxx; match (v) return v" are accepted.
168  2. If the query contains statements like "USE <space name>", the space will be set to the
169  one in the pool config after the execution of the query.
170  3. The query should not change the user password nor drop a user.
171 
172  :param stmt: the query string
173  :return: ResultSet
174  """
175  return super().execute(stmt)
176 
177  def execute_parameter(self, stmt, params):
178  """execute statement
179 
180  :param stmt: the query string
181  :param params: parameter map
182  :return: ResultSet
183  """
184  session = self._get_idle_session_get_idle_session()
185  if session is None:
186  raise RuntimeError("Get session failed")
187  self._add_session_to_active_add_session_to_active(session)
188 
189  try:
190  resp = session.execute_parameter(stmt, params)
191 
192  # Check for session validity based on error code
193  if resp.error_code() in [
194  ErrorCode.E_SESSION_INVALID,
195  ErrorCode.E_SESSION_TIMEOUT,
196  ]:
197  self._active_sessions.remove(session)
198  session = self._get_idle_session_get_idle_session()
199  if session is None:
200  logger.warning(
201  "Session invalid or timeout, removed from the pool, but failed to get a new session."
202  )
203  return resp
204  logger.warning("Session invalid or timeout, session has been recycled")
205  self._add_session_to_idle_add_session_to_idle(session)
206 
207  else:
208  # reset the space name to the pool config
209  if resp.space_name() != self._space_name_space_name:
210  self._set_space_to_default_set_space_to_default(session)
211 
212  # move the session back to the idle list
213  self._return_session_return_session(session)
214 
215  return resp
216  except Exception as e:
217  logger.error("Execute failed: {}".format(e))
218  # remove the session from the pool if it is invalid
219  self._active_sessions.remove(session)
220  raise e
221 
222  def execute_json(self, stmt):
223  """execute statement and return the result as a JSON bytes
224  Date and Datetime will be returned in UTC
225  JSON struct:
226  {
227  "results": [
228  {
229  "columns": [],
230  "data": [
231  {
232  "row": [
233  "row-data"
234  ],
235  "meta": [
236  "metadata"
237  ]
238  }
239  ],
240  "latencyInUs": 0,
241  "spaceName": "",
242  "planDesc ": {
243  "planNodeDescs": [
244  {
245  "name": "",
246  "id": 0,
247  "outputVar": "",
248  "description": {
249  "key": ""
250  },
251  "profiles": [
252  {
253  "rows": 1,
254  "execDurationInUs": 0,
255  "totalDurationInUs": 0,
256  "otherStats": {}
257  }
258  ],
259  "branchInfo": {
260  "isDoBranch": false,
261  "conditionNodeId": -1
262  },
263  "dependencies": []
264  }
265  ],
266  "nodeIndexMap": {},
267  "format": "",
268  "optimize_time_in_us": 0
269  },
270  "comment ": ""
271  }
272  ],
273  "errors": [
274  {
275  "code": 0,
276  "message": ""
277  }
278  ]
279  }
280  :param stmt: the ngql
281  :return: JSON bytes
282  """
283  return super().execute_json(stmt)
284 
285  def execute_json_with_parameter(self, stmt, params):
286  session = self._get_idle_session_get_idle_session()
287  if session is None:
288  raise RuntimeError("Get session failed")
289  self._add_session_to_active_add_session_to_active(session)
290 
291  try:
292  resp = session.execute_json_with_parameter(stmt, params)
293  json_obj = json.loads(resp)
294  # Check for session validity based on error code
295  if json_obj.get("errors", [{}])[0].get("code") in [
296  ErrorCode.E_SESSION_INVALID,
297  ErrorCode.E_SESSION_TIMEOUT,
298  ]:
299  self._active_sessions.remove(session)
300  session = self._get_idle_session_get_idle_session()
301  if session is None:
302  logger.warning(
303  "Session invalid or timeout, removed from the pool, but failed to get a new session."
304  )
305  return resp
306  self._add_session_to_idle_add_session_to_idle(session)
307  logger.warning("Session invalid or timeout, session has been recycled")
308 
309  else:
310  # reset the space name to the pool config
311  if json_obj["results"][0]["spaceName"] != self._space_name_space_name:
312  self._set_space_to_default_set_space_to_default(session)
313 
314  # move the session back to the idle list
315  self._return_session_return_session(session)
316 
317  return resp
318  except Exception as e:
319  logger.error("Execute failed: {}".format(e))
320  # remove the session from the pool if it is invalid
321  self._active_sessions.remove(session)
322  raise e
323 
324  def close(self):
325  """log out all sessions and close all connections
326 
327  :return: void
328  """
329  with self._lock_lock:
330  for session in self._idle_sessions:
331  session._sign_out()
332  session._connection.close()
333  for session in self._active_sessions:
334  session._sign_out()
335  session._connection.close()
336  self._idle_sessions.clear()
337  self._close_close = True
338 
340  """get the number of the ok servers
341 
342  :return: int
343  """
344  count = 0
345  for addr in self._addresses_status_addresses_status.keys():
346  if self._addresses_status_addresses_status[addr] == self.S_OKS_OK:
347  count = count + 1
348  return count
349 
350  def _get_services_status(self):
351  msg_list = []
352  for addr in self._addresses_status_addresses_status.keys():
353  status = "OK"
354  if self._addresses_status_addresses_status[addr] != self.S_OKS_OK:
355  status = "BAD"
356  msg_list.append("[services: {}, status: {}]".format(addr, status))
357  return ", ".join(msg_list)
358 
360  """update the servers' status"""
361  for address in self._addresses_addresses:
362  if self.pingping(address):
363  self._addresses_status_addresses_status[address] = self.S_OKS_OK
364  else:
365  self._addresses_status_addresses_status[address] = self.S_BADS_BAD
366 
367  def ping_sessions(self):
368  """ping all sessions in the pool"""
369  with self._lock_lock:
370  for session in self._idle_sessions:
371  session.execute(r'RETURN "SESSION PING"')
372 
373  def _get_idle_session(self):
374  """get a valid session from the pool idle list.
375 
376  :return: Session
377  """
378  with self._lock_lock:
379  if len(self._idle_sessions) > 0:
380  return self._idle_sessions.pop(0)
381  elif len(self._active_sessions) < self._configs_configs.max_size:
382  return self._new_session_new_session()
383  else:
384  raise NoValidSessionException(
385  "The total number of sessions reaches the pool max size {}".format(
386  self._configs_configs.max_size
387  )
388  )
389 
390  def _new_session(self):
391  """construct a new session with the username and password in the pool.
392  also, the session is bound to the space specified in the configs.
393 
394  :return: Session
395  """
396  self._pos_pos = (self._pos_pos + 1) % len(self._addresses_addresses)
397  next_addr_index = self._pos_pos
398 
399  # try to connect with a valid service address, the worst case it to iterate all addresses
400  retries = len(self._addresses_addresses)
401 
402  while retries > 0:
403  addr = self._addresses_addresses[next_addr_index]
404 
405  # if the address is bad, skip it
406  if self._addresses_status_addresses_status[addr] == self.S_BADS_BAD:
407  logger.warning("The graph service {} is not available".format(addr))
408  retries = retries - 1
409  next_addr_index = (next_addr_index + 1) % len(self._addresses_addresses)
410  continue
411 
412  # connect to the valid service
413  connection = Connection()
414  try:
415  if self._ssl_configs_ssl_configs is None:
416  connection.open(
417  addr[0],
418  addr[1],
419  self._configs_configs.timeout,
420  self._configs_configs.use_http2,
421  self._configs_configs.http_headers,
422  )
423  else:
424  connection.open_SSL(
425  addr[0],
426  addr[1],
427  self._configs_configs.timeout,
428  self._ssl_configs_ssl_configs,
429  self._configs_configs.use_http2,
430  self._configs_configs.http_headers,
431  )
432  auth_result = connection.authenticate(self._username_username, self._password_password)
433  session = Session(connection, auth_result, self, False)
434 
435  # switch to the space specified in the configs
436  try:
437  resp = session.execute("USE {}".format(self._space_name_space_name))
438  except Exception:
439  session.release()
440  connection.close()
441  raise RuntimeError(
442  "Failed to get session, execute `use {}` failed.".format(
443  self._space_name_space_name
444  )
445  )
446  if not resp.is_succeeded():
447  session.release()
448  connection.close()
449  raise RuntimeError(
450  "Failed to get session, cannot set the session space to {} error: {} {}".format(
451  self._space_name_space_name, resp.error_code(), resp.error_msg()
452  )
453  )
454  return session
455  except AuthFailedException as e:
456  # if auth failed because of credentials, close the pool
457  if e.message.find("Invalid password") or e.message.find(
458  "User not exist"
459  ):
460  logger.error(
461  "Authentication failed, because of bad credentials, close the pool {}".format(
462  e
463  )
464  )
465  self.closeclose()
466  else:
467  connection.close()
468  raise e
469  except Exception:
470  connection.close()
471  raise
472 
473  raise RuntimeError(
474  "Failed to get a valid session, no graph service is available"
475  )
476 
477  def _return_session(self, session):
478  """return the session to the pool idle list when query finished.
479 
480  :param session: the session to return
481  :return: void
482  """
483  with self._lock_lock:
484  self._active_sessions.remove(session)
485  self._idle_sessions.append(session)
486  session.idle_time_start = time.time()
487 
488  def _add_session_to_idle(self, session):
489  """add the session to the pool idle list
490 
491  :param session: the session to add
492  :return: void
493  """
494  with self._lock_lock:
495  self._idle_sessions.append(session)
496  session.idle_time_start = time.time()
497 
498  def _add_session_to_active(self, session):
499  """add the session to the pool active list
500 
501  :param session: the session to add
502  :return: void
503  """
504  with self._lock_lock:
505  self._active_sessions.append(session)
506  # reset the idle time start
507  session.idle_time_start = 0
508 
509  def _set_space_to_default(self, session):
510  """set the space to the default space in the pool
511 
512  :param session: the session to set
513  :return: void
514  """
515  try:
516  resp = session.execute("USE {}".format(self._space_name_space_name))
517  if not resp.is_succeeded():
518  raise RuntimeError(
519  "Failed to set the session space to {}".format(self._space_name_space_name)
520  )
521  except Exception:
522  logger.warning(
523  "Failed to set the session space to {}, the current session has been dropped".format(
524  self._space_name_space_name
525  )
526  )
527  session._connection.close()
528  with self._lock_lock:
529  self._active_sessions.remove(session)
530 
531  def _remove_idle_unusable_session(self):
532  if self._configs_configs.idle_time == 0:
533  return
534  with self._lock_lock:
535  total_sessions = len(self._idle_sessions) + len(self._active_sessions)
536  if total_sessions <= self._configs_configs.min_size:
537  return
538  for session in self._idle_sessions:
539  # calc session idle time
540  idle_time = time.time() - session._idle_time_start
541 
542  # release idle session and remove from the pool
543  if idle_time > self._configs_configs.idle_time:
544  conn = session._connection
545  session.release()
546  conn.close()
547  self._idle_sessions.remove(session)
548 
549  def _period_detect(self):
550  """periodically detect the services status and remove the sessions from the idle list if they expire"""
551  if self._close_close or self._configs_configs.interval_check < 0:
552  return
553  self.update_servers_statusupdate_servers_status()
554  self._remove_idle_unusable_session_remove_idle_unusable_session()
555  timer = Timer(self._configs_configs.interval_check, self._period_detect_period_detect)
556  timer.setDaemon(True)
557  timer.start()
558 
559  def _check_configs(self):
560  """validate the configs"""
561  if self._configs_configs.min_size < 0:
562  raise RuntimeError("The min_size must be greater than 0")
563  if self._configs_configs.max_size < 0:
564  raise RuntimeError("The max_size must be greater than 0")
565  if self._configs_configs.min_size > self._configs_configs.max_size:
566  raise RuntimeError(
567  "The min_size must be less than or equal to the max_size"
568  )
569  if self._configs_configs.idle_time < 0:
570  raise RuntimeError("The idle_time must be greater or equal to 0")
571  if self._configs_configs.timeout < 0:
572  raise RuntimeError("The timeout must be greater or equal to 0")
573 
574  if self._space_name_space_name == "":
575  raise RuntimeError("The space_name must be set")
576  if self._username_username == "":
577  raise RuntimeError("The username must be set")
578  if self._password_password == "":
579  raise RuntimeError("The password must be set")
580  if self._addresses_addresses is None or len(self._addresses_addresses) == 0:
581  raise RuntimeError("The addresses must be set")
def init(self, Optional[SessionPoolConfig] configs=None, Optional[SSL_config] ssl_configs=None)
Definition: SessionPool.py:79
def execute_parameter(self, stmt, params)
Definition: SessionPool.py:177