11 from threading
import RLock, Timer
14 from nebula3.Exception
import (
16 NoValidSessionException,
22 from nebula3.logger
import logger
23 from nebula3.Config
import SessionPoolConfig
30 def __init__(self, username, password, space_name, addresses):
45 for address
in addresses:
47 ip = socket.gethostbyname(address[0])
49 raise InValidHostname(str(address[0]))
50 ip_port = (ip, address[1])
59 self.
_configs_configs = SessionPoolConfig()
61 self.
_lock_lock = RLock()
73 """init the session pool
75 :param username: the username of the session
76 :param password: the password of the session
77 :param space_name: the space name of the session
78 :param addresses: the addresses of the servers
79 :param configs: the config of the pool
81 :return: if all addresses are valid, return True else return False.
86 except Exception
as e:
87 logger.error(
'Invalid configs: {}'.format(e))
91 logger.error(
'The pool has init or closed.')
92 raise RuntimeError(
'The pool has init or closed.')
108 for i
in range(self.
_configs_configs.min_size):
111 raise RuntimeError(
'Get session failed')
117 """check the server is ok
119 :param address: the server address want to connect
120 :return: True or False
125 conn.open(address[0], address[1], 1000)
127 conn.open_SSL(address[0], address[1], 1000, self.
_ssl_configs_ssl_configs)
130 except Exception
as ex:
132 'Connect {}:{} failed: {}'.format(address[0], address[1], ex)
137 """execute the given query
138 Notice there are some limitations:
139 1. The query should not be a plain space switch statement, e.g. "USE test_space",
140 but queries like "use space xxx; match (v) return v" are accepted.
141 2. If the query contains statements like "USE <space name>", the space will be set to the
142 one in the pool config after the execution of the query.
143 3. The query should not change the user password nor drop a user.
145 :param stmt: the query string
153 :param stmt: the query string
154 :param params: parameter map
159 raise RuntimeError(
'Get session failed')
163 resp = session.execute_parameter(stmt, params)
166 if resp.space_name() != self.
_space_name_space_name:
173 except Exception
as e:
174 logger.error(
'Execute failed: {}'.format(e))
180 """execute statement and return the result as a JSON string
181 Date and Datetime will be returned in UTC
211 "execDurationInUs": 0,
212 "totalDurationInUs": 0,
218 "conditionNodeId": -1
225 "optimize_time_in_us": 0
237 :param stmt: the ngql
242 def execute_json_with_parameter(self, stmt, params):
245 raise RuntimeError(
'Get session failed')
249 resp = session.execute_json_with_parameter(stmt, params)
252 json_obj = json.loads(resp)
253 if json_obj[
"results"][0][
"spaceName"] != self.
_space_name_space_name:
260 except Exception
as e:
261 logger.error(
'Execute failed: {}'.format(e))
267 """log out all sessions and close all connections
271 with self.
_lock_lock:
274 session._connection.close()
277 session._connection.close()
282 """get the number of the ok servers
292 def _get_services_status(self):
298 msg_list.append(
'[services: {}, status: {}]'.format(addr, status))
299 return ', '.join(msg_list)
302 """update the servers' status"""
304 if self.
pingping(address):
310 """ping all sessions in the pool"""
311 with self.
_lock_lock:
313 session.execute(
r'RETURN "SESSION PING"')
315 def _get_idle_session(self):
316 """get a valid session from the pool idle list.
320 with self.
_lock_lock:
326 raise NoValidSessionException(
327 'The total number of sessions reaches the pool max size {}'.format(
332 def _new_session(self):
333 """construct a new session with the username and password in the pool.
334 also, the session is bound to the space specified in the configs.
339 raise RuntimeError(
'SSL is not supported yet')
342 next_addr_index = self.
_pos_pos
348 addr = self.
_addresses_addresses[next_addr_index]
352 logger.warning(
'The graph service {} is not available'.format(addr))
353 retries = retries - 1
354 next_addr_index = (next_addr_index + 1) % len(self.
_addresses_addresses)
360 connection.open(addr[0], addr[1], self.
_configs_configs.timeout)
361 auth_result = connection.authenticate(self.
_username_username, self.
_password_password)
362 session =
Session(connection, auth_result, self,
False)
365 resp = session.execute(
'USE {}'.format(self.
_space_name_space_name))
366 if not resp.is_succeeded():
368 'Failed to get session, cannot set the session space to {} error: {} {}'.format(
369 self.
_space_name_space_name, resp.error_code(), resp.error_msg()
373 except AuthFailedException
as e:
375 if e.message.find(
"Invalid password")
or e.message.find(
379 'Authentication failed, because of bad credentials, close the pool {}'.format(
389 'Failed to get a valid session, no graph service is available'
392 def _return_session(self, session):
393 """return the session to the pool idle list when query finished.
395 :param session: the session to return
398 with self.
_lock_lock:
401 session.idle_time_start = time.time()
403 def _add_session_to_idle(self, session):
404 """add the session to the pool idle list
406 :param session: the session to add
409 with self.
_lock_lock:
411 session.idle_time_start = time.time()
413 def _add_session_to_active(self, session):
414 """add the session to the pool active list
416 :param session: the session to add
419 with self.
_lock_lock:
422 session.idle_time_start = 0
424 def _set_space_to_default(self, session):
425 """set the space to the default space in the pool
427 :param session: the session to set
431 resp = session.execute(
'USE {}'.format(self.
_space_name_space_name))
432 if not resp.is_succeeded():
434 'Failed to set the session space to {}'.format(self.
_space_name_space_name)
438 'Failed to set the session space to {}, the current session has been dropped'.format(
442 session._connection.close()
443 with self.
_lock_lock:
446 def _remove_idle_unusable_session(self):
447 if self.
_configs_configs.idle_time == 0:
449 with self.
_lock_lock:
451 if total_sessions <= self.
_configs_configs.min_size:
455 idle_time = time.time() - session._idle_time_start
458 if idle_time > self.
_configs_configs.idle_time:
459 conn = session._connection
464 def _period_detect(self):
465 """periodically detect the services status and remove the sessions from the idle list if they expire"""
471 timer.setDaemon(
True)
474 def _check_configs(self):
475 """validate the configs"""
476 if self.
_configs_configs.min_size < 0:
477 raise RuntimeError(
'The min_size must be greater than 0')
478 if self.
_configs_configs.max_size < 0:
479 raise RuntimeError(
'The max_size must be greater than 0')
482 'The min_size must be less than or equal to the max_size'
484 if self.
_configs_configs.idle_time < 0:
485 raise RuntimeError(
'The idle_time must be greater or equal to 0')
486 if self.
_configs_configs.timeout < 0:
487 raise RuntimeError(
'The timeout must be greater or equal to 0')
490 raise RuntimeError(
'The space_name must be set')
492 raise RuntimeError(
'The username must be set')
494 raise RuntimeError(
'The password must be set')
496 raise RuntimeError(
'The addresses must be set')
def get_ok_servers_num(self)
def _return_session(self, session)
def _set_space_to_default(self, session)
def _add_session_to_idle(self, session)
def _get_idle_session(self)
def _remove_idle_unusable_session(self)
def _get_services_status(self)
def execute_json_with_parameter(self, stmt, params)
def execute_json(self, stmt)
def update_servers_status(self)
def execute_parameter(self, stmt, params)
def _add_session_to_active(self, session)