NebulaGraph Python Client  release-3.4
SessionPool.py
1 # --coding:utf-8--
2 #
3 # Copyright (c) 2022 vesoft inc. All rights reserved.
4 #
5 # This source code is licensed under Apache 2.0 License.
6 
7 
8 import json
9 import socket
10 
11 from threading import RLock, Timer
12 import time
13 
14 from nebula3.Exception import (
15  AuthFailedException,
16  NoValidSessionException,
17  InValidHostname,
18 )
19 
20 from nebula3.gclient.net.Session import Session
21 from nebula3.gclient.net.Connection import Connection
22 from nebula3.logger import logger
23 from nebula3.Config import SessionPoolConfig
24 
25 
26 class SessionPool(object):
27  S_OK = 0
28  S_BAD = 1
29 
30  def __init__(self, username, password, space_name, addresses):
31  # user name and password of the session
32  self._username_username = username
33  self._password_password = password
34 
35  # space name bonded to the session
36  self._space_name_space_name = space_name
37 
38  # all addresses of servers
39  self._addresses_addresses = list()
40 
41  # server's status
42  self._addresses_status_addresses_status = dict()
43 
44  # validate the addresses
45  for address in addresses:
46  try:
47  ip = socket.gethostbyname(address[0])
48  except Exception:
49  raise InValidHostname(str(address[0]))
50  ip_port = (ip, address[1])
51  self._addresses_addresses.append(ip_port)
52  self._addresses_status_addresses_status[ip_port] = self.S_BADS_BAD
53 
54  # sessions that are currently in use
55  self._active_sessions_active_sessions = list()
56  # sessions that are currently available
57  self._idle_sessions_idle_sessions = list()
58 
59  self._configs_configs = SessionPoolConfig()
60  self._ssl_configs_ssl_configs = None
61  self._lock_lock = RLock()
62 
63  # the index of the next address to connect
64  self._pos_pos = -1
65 
66  # the flag of whether the pool is closed
67  self._close_close = False
68 
69  def __del__(self):
70  self.closeclose()
71 
72  def init(self, configs):
73  """init the session pool
74 
75  :param username: the username of the session
76  :param password: the password of the session
77  :param space_name: the space name of the session
78  :param addresses: the addresses of the servers
79  :param configs: the config of the pool
80 
81  :return: if all addresses are valid, return True else return False.
82  """
83  # check configs
84  try:
85  self._check_configs_check_configs()
86  except Exception as e:
87  logger.error('Invalid configs: {}'.format(e))
88  return False
89 
90  if self._close_close:
91  logger.error('The pool has init or closed.')
92  raise RuntimeError('The pool has init or closed.')
93  self._configs_configs = configs
94 
95  # ping all servers
96  self.update_servers_statusupdate_servers_status()
97 
98  # check services status in the background
99  self._period_detect_period_detect()
100 
101  ok_num = self.get_ok_servers_numget_ok_servers_num()
102  if ok_num < len(self._addresses_addresses):
103  raise RuntimeError(
104  'The services status exception: {}'.format(self._get_services_status_get_services_status())
105  )
106 
107  # iterate all addresses and create sessions to fullfil the min_size
108  for i in range(self._configs_configs.min_size):
109  session = self._new_session_new_session()
110  if session is None:
111  raise RuntimeError('Get session failed')
112  self._add_session_to_idle_add_session_to_idle(session)
113 
114  return True
115 
116  def ping(self, address):
117  """check the server is ok
118 
119  :param address: the server address want to connect
120  :return: True or False
121  """
122  try:
123  conn = Connection()
124  if self._ssl_configs_ssl_configs is None:
125  conn.open(address[0], address[1], 1000)
126  else:
127  conn.open_SSL(address[0], address[1], 1000, self._ssl_configs_ssl_configs)
128  conn.close()
129  return True
130  except Exception as ex:
131  logger.warning(
132  'Connect {}:{} failed: {}'.format(address[0], address[1], ex)
133  )
134  return False
135 
136  def execute(self, stmt):
137  """execute the given query
138  Notice there are some limitations:
139  1. The query should not be a plain space switch statement, e.g. "USE test_space",
140  but queries like "use space xxx; match (v) return v" are accepted.
141  2. If the query contains statements like "USE <space name>", the space will be set to the
142  one in the pool config after the execution of the query.
143  3. The query should not change the user password nor drop a user.
144 
145  :param stmt: the query string
146  :return: ResultSet
147  """
148  return self.execute_parameterexecute_parameter(stmt, None)
149 
150  def execute_parameter(self, stmt, params):
151  """execute statement
152 
153  :param stmt: the query string
154  :param params: parameter map
155  :return: ResultSet
156  """
157  session = self._get_idle_session_get_idle_session()
158  if session is None:
159  raise RuntimeError('Get session failed')
160  self._add_session_to_active_add_session_to_active(session)
161 
162  try:
163  resp = session.execute_parameter(stmt, params)
164 
165  # reset the space name to the pool config
166  if resp.space_name() != self._space_name_space_name:
167  self._set_space_to_default_set_space_to_default(session)
168 
169  # move the session back to the idle list
170  self._return_session_return_session(session)
171 
172  return resp
173  except Exception as e:
174  logger.error('Execute failed: {}'.format(e))
175  # remove the session from the pool if it is invalid
176  self._active_sessions_active_sessions.remove(session)
177  raise e
178 
179  def execute_json(self, stmt):
180  """execute statement and return the result as a JSON string
181  Date and Datetime will be returned in UTC
182  JSON struct:
183  {
184  "results": [
185  {
186  "columns": [],
187  "data": [
188  {
189  "row": [
190  "row-data"
191  ],
192  "meta": [
193  "metadata"
194  ]
195  }
196  ],
197  "latencyInUs": 0,
198  "spaceName": "",
199  "planDesc ": {
200  "planNodeDescs": [
201  {
202  "name": "",
203  "id": 0,
204  "outputVar": "",
205  "description": {
206  "key": ""
207  },
208  "profiles": [
209  {
210  "rows": 1,
211  "execDurationInUs": 0,
212  "totalDurationInUs": 0,
213  "otherStats": {}
214  }
215  ],
216  "branchInfo": {
217  "isDoBranch": false,
218  "conditionNodeId": -1
219  },
220  "dependencies": []
221  }
222  ],
223  "nodeIndexMap": {},
224  "format": "",
225  "optimize_time_in_us": 0
226  },
227  "comment ": ""
228  }
229  ],
230  "errors": [
231  {
232  "code": 0,
233  "message": ""
234  }
235  ]
236  }
237  :param stmt: the ngql
238  :return: JSON string
239  """
240  return self.execute_json_with_parameterexecute_json_with_parameter(stmt, None)
241 
242  def execute_json_with_parameter(self, stmt, params):
243  session = self._get_idle_session_get_idle_session()
244  if session is None:
245  raise RuntimeError('Get session failed')
246  self._add_session_to_active_add_session_to_active(session)
247 
248  try:
249  resp = session.execute_json_with_parameter(stmt, params)
250 
251  # reset the space name to the pool config
252  json_obj = json.loads(resp)
253  if json_obj["results"][0]["spaceName"] != self._space_name_space_name:
254  self._set_space_to_default_set_space_to_default(session)
255 
256  # move the session back to the idle list
257  self._return_session_return_session(session)
258 
259  return resp
260  except Exception as e:
261  logger.error('Execute failed: {}'.format(e))
262  # remove the session from the pool if it is invalid
263  self._active_sessions_active_sessions.remove(session)
264  raise e
265 
266  def close(self):
267  """log out all sessions and close all connections
268 
269  :return: void
270  """
271  with self._lock_lock:
272  for session in self._idle_sessions_idle_sessions:
273  session._sign_out()
274  session._connection.close()
275  for session in self._active_sessions_active_sessions:
276  session._sign_out()
277  session._connection.close()
278  self._idle_sessions_idle_sessions.clear()
279  self._close_close = True
280 
282  """get the number of the ok servers
283 
284  :return: int
285  """
286  count = 0
287  for addr in self._addresses_status_addresses_status.keys():
288  if self._addresses_status_addresses_status[addr] == self.S_OKS_OK:
289  count = count + 1
290  return count
291 
292  def _get_services_status(self):
293  msg_list = []
294  for addr in self._addresses_status_addresses_status.keys():
295  status = 'OK'
296  if self._addresses_status_addresses_status[addr] != self.S_OKS_OK:
297  status = 'BAD'
298  msg_list.append('[services: {}, status: {}]'.format(addr, status))
299  return ', '.join(msg_list)
300 
302  """update the servers' status"""
303  for address in self._addresses_addresses:
304  if self.pingping(address):
305  self._addresses_status_addresses_status[address] = self.S_OKS_OK
306  else:
307  self._addresses_status_addresses_status[address] = self.S_BADS_BAD
308 
309  def ping_sessions(self):
310  """ping all sessions in the pool"""
311  with self._lock_lock:
312  for session in self._idle_sessions_idle_sessions:
313  session.execute(r'RETURN "SESSION PING"')
314 
315  def _get_idle_session(self):
316  """get a valid session from the pool idle list.
317 
318  :return: Session
319  """
320  with self._lock_lock:
321  if len(self._idle_sessions_idle_sessions) > 0:
322  return self._idle_sessions_idle_sessions.pop(0)
323  elif len(self._active_sessions_active_sessions) < self._configs_configs.max_size:
324  return self._new_session_new_session()
325  else:
326  raise NoValidSessionException(
327  'The total number of sessions reaches the pool max size {}'.format(
328  self._configs_configs.max_size
329  )
330  )
331 
332  def _new_session(self):
333  """construct a new session with the username and password in the pool.
334  also, the session is bound to the space specified in the configs.
335 
336  :return: Session
337  """
338  if self._ssl_configs_ssl_configs is not None:
339  raise RuntimeError('SSL is not supported yet')
340 
341  self._pos_pos = (self._pos_pos + 1) % len(self._addresses_addresses)
342  next_addr_index = self._pos_pos
343 
344  # try to connect with a valid service address, the worst case it to iterate all addresses
345  retries = len(self._addresses_addresses)
346 
347  while retries > 0:
348  addr = self._addresses_addresses[next_addr_index]
349 
350  # if the address is bad, skip it
351  if self._addresses_status_addresses_status[addr] == self.S_BADS_BAD:
352  logger.warning('The graph service {} is not available'.format(addr))
353  retries = retries - 1
354  next_addr_index = (next_addr_index + 1) % len(self._addresses_addresses)
355  continue
356 
357  # connect to the valid service
358  connection = Connection()
359  try:
360  connection.open(addr[0], addr[1], self._configs_configs.timeout)
361  auth_result = connection.authenticate(self._username_username, self._password_password)
362  session = Session(connection, auth_result, self, False)
363 
364  # switch to the space specified in the configs
365  resp = session.execute('USE {}'.format(self._space_name_space_name))
366  if not resp.is_succeeded():
367  raise RuntimeError(
368  'Failed to get session, cannot set the session space to {} error: {} {}'.format(
369  self._space_name_space_name, resp.error_code(), resp.error_msg()
370  )
371  )
372  return session
373  except AuthFailedException as e:
374  # if auth failed because of credentials, close the pool
375  if e.message.find("Invalid password") or e.message.find(
376  "User not exist"
377  ):
378  logger.error(
379  'Authentication failed, because of bad credentials, close the pool {}'.format(
380  e
381  )
382  )
383  self.closeclose()
384  raise e
385  except Exception:
386  raise
387 
388  raise RuntimeError(
389  'Failed to get a valid session, no graph service is available'
390  )
391 
392  def _return_session(self, session):
393  """return the session to the pool idle list when query finished.
394 
395  :param session: the session to return
396  :return: void
397  """
398  with self._lock_lock:
399  self._active_sessions_active_sessions.remove(session)
400  self._idle_sessions_idle_sessions.append(session)
401  session.idle_time_start = time.time()
402 
403  def _add_session_to_idle(self, session):
404  """add the session to the pool idle list
405 
406  :param session: the session to add
407  :return: void
408  """
409  with self._lock_lock:
410  self._idle_sessions_idle_sessions.append(session)
411  session.idle_time_start = time.time()
412 
413  def _add_session_to_active(self, session):
414  """add the session to the pool active list
415 
416  :param session: the session to add
417  :return: void
418  """
419  with self._lock_lock:
420  self._active_sessions_active_sessions.append(session)
421  # reset the idle time start
422  session.idle_time_start = 0
423 
424  def _set_space_to_default(self, session):
425  """set the space to the default space in the pool
426 
427  :param session: the session to set
428  :return: void
429  """
430  try:
431  resp = session.execute('USE {}'.format(self._space_name_space_name))
432  if not resp.is_succeeded():
433  raise RuntimeError(
434  'Failed to set the session space to {}'.format(self._space_name_space_name)
435  )
436  except Exception:
437  logger.warning(
438  'Failed to set the session space to {}, the current session has been dropped'.format(
439  self._space_name_space_name
440  )
441  )
442  session._connection.close()
443  with self._lock_lock:
444  self._active_sessions_active_sessions.remove(session)
445 
446  def _remove_idle_unusable_session(self):
447  if self._configs_configs.idle_time == 0:
448  return
449  with self._lock_lock:
450  total_sessions = len(self._idle_sessions_idle_sessions) + len(self._active_sessions_active_sessions)
451  if total_sessions <= self._configs_configs.min_size:
452  return
453  for session in self._idle_sessions_idle_sessions:
454  # calc session idle time
455  idle_time = time.time() - session._idle_time_start
456 
457  # release idle session and remove from the pool
458  if idle_time > self._configs_configs.idle_time:
459  conn = session._connection
460  session.release()
461  conn.close()
462  self._idle_sessions_idle_sessions.remove(session)
463 
464  def _period_detect(self):
465  """periodically detect the services status and remove the sessions from the idle list if they expire"""
466  if self._close_close or self._configs_configs.interval_check < 0:
467  return
468  self.update_servers_statusupdate_servers_status()
469  self._remove_idle_unusable_session_remove_idle_unusable_session()
470  timer = Timer(self._configs_configs.interval_check, self._period_detect_period_detect)
471  timer.setDaemon(True)
472  timer.start()
473 
474  def _check_configs(self):
475  """validate the configs"""
476  if self._configs_configs.min_size < 0:
477  raise RuntimeError('The min_size must be greater than 0')
478  if self._configs_configs.max_size < 0:
479  raise RuntimeError('The max_size must be greater than 0')
480  if self._configs_configs.min_size > self._configs_configs.max_size:
481  raise RuntimeError(
482  'The min_size must be less than or equal to the max_size'
483  )
484  if self._configs_configs.idle_time < 0:
485  raise RuntimeError('The idle_time must be greater or equal to 0')
486  if self._configs_configs.timeout < 0:
487  raise RuntimeError('The timeout must be greater or equal to 0')
488 
489  if self._space_name_space_name == "":
490  raise RuntimeError('The space_name must be set')
491  if self._username_username == "":
492  raise RuntimeError('The username must be set')
493  if self._password_password == "":
494  raise RuntimeError('The password must be set')
495  if self._addresses_addresses is None or len(self._addresses_addresses) == 0:
496  raise RuntimeError('The addresses must be set')
def execute_json_with_parameter(self, stmt, params)
Definition: SessionPool.py:242
def execute_parameter(self, stmt, params)
Definition: SessionPool.py:150