11 from threading
import RLock, Timer
12 from typing
import List, Optional
15 from nebula3.common.ttypes
import ErrorCode
16 from nebula3.Exception
import (
18 NoValidSessionException,
25 from nebula3.logger
import logger
26 from nebula3.Config
import SessionPoolConfig, SSL_config
33 def __init__(self, username, password, space_name, addresses):
48 for address
in addresses:
50 ip = socket.gethostbyname(address[0])
52 raise InValidHostname(str(address[0]))
53 ip_port = (ip, address[1])
58 self._active_sessions: List[Session] = list()
60 self._idle_sessions: List[Session] = list()
62 self.
_configs_configs = SessionPoolConfig()
64 self.
_lock_lock = RLock()
77 configs: Optional[SessionPoolConfig] =
None,
78 ssl_configs: Optional[SSL_config] =
None,
80 """init the session pool
82 :param username: the username of the session
83 :param password: the password of the session
84 :param space_name: the space name of the session
85 :param addresses: the addresses of the servers
86 :param configs: the config of the pool
88 :return: if all addresses are valid, return True else return False.
90 if configs
is not None:
92 configs, SessionPoolConfig
93 ),
"wrong type of SessionPoolConfig, try this: `from nebula3.Config import SessionPoolConfig`"
96 self.
_configs_configs = SessionPoolConfig()
101 except Exception
as e:
102 logger.error(
"Invalid configs: {}".format(e))
106 logger.error(
"The pool has init or closed.")
107 raise RuntimeError(
"The pool has init or closed.")
122 for i
in range(self.
_configs_configs.min_size):
125 raise RuntimeError(
"Get session failed")
131 """check the server is ok
133 :param address: the server address want to connect
134 :return: True or False
157 except Exception
as ex:
159 "Connect {}:{} failed: {}".format(address[0], address[1], ex)
164 """execute the given query
165 Notice there are some limitations:
166 1. The query should not be a plain space switch statement, e.g. "USE test_space",
167 but queries like "use space xxx; match (v) return v" are accepted.
168 2. If the query contains statements like "USE <space name>", the space will be set to the
169 one in the pool config after the execution of the query.
170 3. The query should not change the user password nor drop a user.
172 :param stmt: the query string
180 :param stmt: the query string
181 :param params: parameter map
186 raise RuntimeError(
"Get session failed")
190 resp = session.execute_parameter(stmt, params)
193 if resp.error_code()
in [
194 ErrorCode.E_SESSION_INVALID,
195 ErrorCode.E_SESSION_TIMEOUT,
197 self._active_sessions.remove(session)
201 "Session invalid or timeout, removed from the pool, but failed to get a new session."
204 logger.warning(
"Session invalid or timeout, session has been recycled")
209 if resp.space_name() != self.
_space_name_space_name:
216 except Exception
as e:
217 logger.error(
"Execute failed: {}".format(e))
219 self._active_sessions.remove(session)
223 """execute statement and return the result as a JSON bytes
224 Date and Datetime will be returned in UTC
254 "execDurationInUs": 0,
255 "totalDurationInUs": 0,
261 "conditionNodeId": -1
268 "optimize_time_in_us": 0
280 :param stmt: the ngql
285 def execute_json_with_parameter(self, stmt, params):
288 raise RuntimeError(
"Get session failed")
292 resp = session.execute_json_with_parameter(stmt, params)
293 json_obj = json.loads(resp)
295 if json_obj.get(
"errors", [{}])[0].get(
"code")
in [
296 ErrorCode.E_SESSION_INVALID,
297 ErrorCode.E_SESSION_TIMEOUT,
299 self._active_sessions.remove(session)
303 "Session invalid or timeout, removed from the pool, but failed to get a new session."
307 logger.warning(
"Session invalid or timeout, session has been recycled")
311 if json_obj[
"results"][0][
"spaceName"] != self.
_space_name_space_name:
318 except Exception
as e:
319 logger.error(
"Execute failed: {}".format(e))
321 self._active_sessions.remove(session)
325 """log out all sessions and close all connections
329 with self.
_lock_lock:
330 for session
in self._idle_sessions:
332 session._connection.close()
333 for session
in self._active_sessions:
335 session._connection.close()
336 self._idle_sessions.clear()
340 """get the number of the ok servers
350 def _get_services_status(self):
356 msg_list.append(
"[services: {}, status: {}]".format(addr, status))
357 return ", ".join(msg_list)
360 """update the servers' status"""
362 if self.
pingping(address):
368 """ping all sessions in the pool"""
369 with self.
_lock_lock:
370 for session
in self._idle_sessions:
371 session.execute(
r'RETURN "SESSION PING"')
373 def _get_idle_session(self):
374 """get a valid session from the pool idle list.
378 with self.
_lock_lock:
379 if len(self._idle_sessions) > 0:
380 return self._idle_sessions.pop(0)
381 elif len(self._active_sessions) < self.
_configs_configs.max_size:
384 raise NoValidSessionException(
385 "The total number of sessions reaches the pool max size {}".format(
390 def _new_session(self):
391 """construct a new session with the username and password in the pool.
392 also, the session is bound to the space specified in the configs.
397 next_addr_index = self.
_pos_pos
403 addr = self.
_addresses_addresses[next_addr_index]
407 logger.warning(
"The graph service {} is not available".format(addr))
408 retries = retries - 1
409 next_addr_index = (next_addr_index + 1) % len(self.
_addresses_addresses)
432 auth_result = connection.authenticate(self.
_username_username, self.
_password_password)
433 session =
Session(connection, auth_result, self,
False)
437 resp = session.execute(
"USE {}".format(self.
_space_name_space_name))
442 "Failed to get session, execute `use {}` failed.".format(
446 if not resp.is_succeeded():
450 "Failed to get session, cannot set the session space to {} error: {} {}".format(
451 self.
_space_name_space_name, resp.error_code(), resp.error_msg()
455 except AuthFailedException
as e:
457 if e.message.find(
"Invalid password")
or e.message.find(
461 "Authentication failed, because of bad credentials, close the pool {}".format(
474 "Failed to get a valid session, no graph service is available"
477 def _return_session(self, session):
478 """return the session to the pool idle list when query finished.
480 :param session: the session to return
483 with self.
_lock_lock:
484 self._active_sessions.remove(session)
485 self._idle_sessions.append(session)
486 session.idle_time_start = time.time()
488 def _add_session_to_idle(self, session):
489 """add the session to the pool idle list
491 :param session: the session to add
494 with self.
_lock_lock:
495 self._idle_sessions.append(session)
496 session.idle_time_start = time.time()
498 def _add_session_to_active(self, session):
499 """add the session to the pool active list
501 :param session: the session to add
504 with self.
_lock_lock:
505 self._active_sessions.append(session)
507 session.idle_time_start = 0
509 def _set_space_to_default(self, session):
510 """set the space to the default space in the pool
512 :param session: the session to set
516 resp = session.execute(
"USE {}".format(self.
_space_name_space_name))
517 if not resp.is_succeeded():
519 "Failed to set the session space to {}".format(self.
_space_name_space_name)
523 "Failed to set the session space to {}, the current session has been dropped".format(
527 session._connection.close()
528 with self.
_lock_lock:
529 self._active_sessions.remove(session)
531 def _remove_idle_unusable_session(self):
532 if self.
_configs_configs.idle_time == 0:
534 with self.
_lock_lock:
535 total_sessions = len(self._idle_sessions) + len(self._active_sessions)
536 if total_sessions <= self.
_configs_configs.min_size:
538 for session
in self._idle_sessions:
540 idle_time = time.time() - session._idle_time_start
543 if idle_time > self.
_configs_configs.idle_time:
544 conn = session._connection
547 self._idle_sessions.remove(session)
549 def _period_detect(self):
550 """periodically detect the services status and remove the sessions from the idle list if they expire"""
556 timer.setDaemon(
True)
559 def _check_configs(self):
560 """validate the configs"""
561 if self.
_configs_configs.min_size < 0:
562 raise RuntimeError(
"The min_size must be greater than 0")
563 if self.
_configs_configs.max_size < 0:
564 raise RuntimeError(
"The max_size must be greater than 0")
567 "The min_size must be less than or equal to the max_size"
569 if self.
_configs_configs.idle_time < 0:
570 raise RuntimeError(
"The idle_time must be greater or equal to 0")
571 if self.
_configs_configs.timeout < 0:
572 raise RuntimeError(
"The timeout must be greater or equal to 0")
575 raise RuntimeError(
"The space_name must be set")
577 raise RuntimeError(
"The username must be set")
579 raise RuntimeError(
"The password must be set")
581 raise RuntimeError(
"The addresses must be set")
def get_ok_servers_num(self)
def _return_session(self, session)
def _set_space_to_default(self, session)
def _add_session_to_idle(self, session)
def init(self, Optional[SessionPoolConfig] configs=None, Optional[SSL_config] ssl_configs=None)
def _get_idle_session(self)
def _remove_idle_unusable_session(self)
def _get_services_status(self)
def execute_json(self, stmt)
def update_servers_status(self)
def execute_parameter(self, stmt, params)
def _add_session_to_active(self, session)