11 from collections
import deque
12 from threading
import RLock, Timer
14 from nebula3.Exception
import NotValidConnectionException, InValidHostname
18 from nebula3.Config
import Config
19 from nebula3.logger
import logger
20 from typing
import Dict, List, Tuple
29 self._addresses: List[Tuple[str, int]] = list()
35 self._connections: Dict[Tuple[str, int], List[Connection]] = dict()
38 self.
_lock_lock = RLock()
45 def init(self, addresses, configs=None, ssl_conf=None):
46 """init the connection pool
48 :param addresses: the graphd servers' addresses
49 :param configs: the config of the pool
50 :param ssl_conf: the config of SSL socket
51 :return: if all addresses are ok, return True else return False.
54 logger.error(
"The pool has init or closed.")
55 raise RuntimeError(
"The pool has init or closed.")
61 ),
"wrong type of Config, try this: `from nebula3.Config import Config`"
64 for address
in addresses:
65 if address
not in self._addresses:
67 ip = socket.gethostbyname(address[0])
69 raise InValidHostname(str(address[0]))
70 ip_port = (ip, address[1])
71 self._addresses.append(ip_port)
73 self._connections[ip_port] = deque()
82 if ok_num < len(self._addresses):
87 conns_per_address = int(self.
_configs_configs.min_connection_pool_size / ok_num)
89 for addr
in self._addresses:
90 for i
in range(0, conns_per_address):
100 self._connections[addr].append(connection)
106 :param user_name: the user name to authenticate graphd
107 :param password: the password to authenticate graphd
108 :param retry_connect:
112 if connection
is None:
113 raise NotValidConnectionException()
115 auth_result = connection.authenticate(user_name, password)
116 return Session(connection, auth_result, self, retry_connect)
120 @contextlib.contextmanager
123 session_context is to be used with a contextlib.contextmanager.
124 It returns a connection session from the pool, with same params
125 as the method get_session().
127 When session_context is exited, the connection will be released.
129 :param user_name: the user name to authenticate graphd
130 :param password: the password to authenticate graphd
131 :param retry_connect: if auto retry connect
132 :return: contextlib._GeneratorContextManager
136 session = self.
get_sessionget_session(*args, **kwargs)
145 """get available connection
149 with self.
_lock_lock:
151 logger.error(
"The pool is closed")
152 raise NotValidConnectionException()
157 logger.error(
"No available server")
159 max_con_per_address = int(
160 self.
_configs_configs.max_connection_pool_size / ok_num
163 while try_count <= len(self._addresses):
164 self.
_pos_pos = (self.
_pos_pos + 1) % len(self._addresses)
165 addr = self._addresses[self.
_pos_pos]
167 invalid_connections = list()
170 for connection
in self._connections[addr]:
171 if not connection.is_used:
173 if connection.ping():
174 connection.is_used =
True
175 logger.info(
"Get connection to {}".format(addr))
178 invalid_connections.append(connection)
181 for connection
in invalid_connections:
182 self._connections[addr].remove(connection)
185 if not self.
pingping(addr):
190 if len(self._connections[addr]) < max_con_per_address:
200 connection.is_used =
True
201 self._connections[addr].append(connection)
202 logger.info(
"Get connection to {}".format(addr))
205 for connection
in list(self._connections[addr]):
206 if not connection.is_used:
207 self._connections[addr].remove(connection)
208 try_count = try_count + 1
210 logger.error(
"No available connection")
212 except Exception
as ex:
213 logger.error(
"Get connection failed: {}".format(ex))
217 """check the server is ok
219 :param address: the server address want to connect
220 :return: True or False
237 except Exception
as ex:
239 "Connect {}:{} failed: {}".format(address[0], address[1], ex)
244 """close all connections in pool
248 with self.
_lock_lock:
249 for addr
in self._connections.keys():
250 for connection
in self._connections[addr]:
251 if connection.is_used:
252 logger.warning(
"Closing a connection that is in use")
257 """get the number of existing connections
259 :return: the number of connections
261 with self.
_lock_lock:
263 for addr
in self._connections.keys():
264 count = count + len(self._connections[addr])
268 """get the number of the used connections
272 with self.
_lock_lock:
274 for addr
in self._connections.keys():
275 for connection
in self._connections[addr]:
276 if connection.is_used:
281 """get the number of the ok servers
291 def _get_services_status(self):
297 msg_list.append(
"[services: {}, status: {}]".format(addr, status))
298 return ", ".join(msg_list)
301 """update the servers' status"""
302 for address
in self._addresses:
303 if self.
pingping(address):
308 def _remove_idle_unusable_connection(self):
309 if self.
_configs_configs.idle_time == 0:
311 with self.
_lock_lock:
312 for addr
in self._connections.keys():
313 conns = self._connections[addr]
314 for connection
in list(conns):
315 if not connection.is_used:
316 if not connection.ping():
318 "Remove the unusable connection to {}".format(
319 connection.get_address()
322 conns.remove(connection)
325 self.
_configs_configs.idle_time != 0
326 and connection.idle_time() > self.
_configs_configs.idle_time
329 "Remove the idle connection to {}".format(
330 connection.get_address()
333 conns.remove(connection)
335 def _period_detect(self):
def get_ok_servers_num(self)
def init(self, addresses, configs=None, ssl_conf=None)
def _get_services_status(self)
def update_servers_status(self)
def session_context(self, *args, **kwargs)
def get_session(self, user_name, password, retry_connect=True)
def in_used_connects(self)
def _remove_idle_unusable_connection(self)