11 from collections
import deque
12 from threading
import RLock, Timer
14 from nebula3.Exception
import NotValidConnectionException, InValidHostname
18 from nebula3.logger
import logger
36 self.
_lock_lock = RLock()
43 def init(self, addresses, configs, ssl_conf=None):
44 """init the connection pool
46 :param addresses: the graphd servers' addresses
47 :param configs: the config of the pool
48 :param ssl_conf: the config of SSL socket
49 :return: if all addresses are ok, return True else return False.
52 logger.error(
'The pool has init or closed.')
53 raise RuntimeError(
'The pool has init or closed.')
56 for address
in addresses:
59 ip = socket.gethostbyname(address[0])
61 raise InValidHostname(str(address[0]))
62 ip_port = (ip, address[1])
79 conns_per_address = int(self.
_configs_configs.min_connection_pool_size / ok_num)
82 for i
in range(0, conns_per_address):
90 def get_session(self, user_name, password, retry_connect=True):
93 :param user_name: the user name to authenticate graphd
94 :param password: the password to authenticate graphd
99 if connection
is None:
100 raise NotValidConnectionException()
102 auth_result = connection.authenticate(user_name, password)
103 return Session(connection, auth_result, self, retry_connect)
107 @contextlib.contextmanager
110 session_context is to be used with a contextlib.contextmanager.
111 It returns a connection session from the pool, with same params
112 as the method get_session().
114 When session_context is exited, the connection will be released.
116 :param user_name: the user name to authenticate graphd
117 :param password: the password to authenticate graphd
118 :param retry_connect: if auto retry connect
119 :return: contextlib._GeneratorContextManager
123 session = self.
get_sessionget_session(*args, **kwargs)
132 """get available connection
136 with self.
_lock_lock:
138 logger.error(
'The pool is closed')
139 raise NotValidConnectionException()
144 logging.error(
'No available server')
146 max_con_per_address = int(
147 self.
_configs_configs.max_connection_pool_size / ok_num
150 while try_count <= len(self.
_addresses_addresses):
154 invalid_connections = list()
158 if not connection.is_used:
160 if connection.ping():
161 connection.is_used =
True
162 logger.info(
'Get connection to {}'.format(addr))
165 invalid_connections.append(connection)
168 for connection
in invalid_connections:
172 if not self.
pingping(addr):
177 if len(self.
_connections_connections[addr]) < max_con_per_address:
185 connection.is_used =
True
187 logger.info(
'Get connection to {}'.format(addr))
190 for connection
in list(self.
_connections_connections[addr]):
191 if not connection.is_used:
193 try_count = try_count + 1
195 logging.error(
'No available connection')
197 except Exception
as ex:
198 logger.error(
'Get connection failed: {}'.format(ex))
202 """check the server is ok
204 :param address: the server address want to connect
205 :return: True or False
209 conn.open_SSL(address[0], address[1], 1000, self.
_ssl_configs_ssl_configs)
212 except Exception
as ex:
214 'Connect {}:{} failed: {}'.format(address[0], address[1], ex)
219 """close all connections in pool
223 with self.
_lock_lock:
226 if connection.is_used:
227 logger.warning(
'Closing a connection that is in use')
232 """get the number of existing connections
234 :return: the number of connections
236 with self.
_lock_lock:
239 count = count + len(self.
_connections_connections[addr])
243 """get the number of the used connections
247 with self.
_lock_lock:
251 if connection.is_used:
256 """get the number of the ok servers
266 def _get_services_status(self):
272 msg_list.append(
'[services: {}, status: {}]'.format(addr, status))
273 return ', '.join(msg_list)
276 """update the servers' status"""
278 if self.
pingping(address):
283 def _remove_idle_unusable_connection(self):
284 if self.
_configs_configs.idle_time == 0:
286 with self.
_lock_lock:
289 for connection
in list(conns):
290 if not connection.is_used:
291 if not connection.ping():
293 'Remove the unusable connection to {}'.format(
294 connection.get_address()
297 conns.remove(connection)
300 self.
_configs_configs.idle_time != 0
301 and connection.idle_time() > self.
_configs_configs.idle_time
304 'Remove the idle connection to {}'.format(
305 connection.get_address()
308 conns.remove(connection)
310 def _period_detect(self):
316 timer.setDaemon(
True)
def get_ok_servers_num(self)
def _get_services_status(self)
def init(self, addresses, configs, ssl_conf=None)
def update_servers_status(self)
def session_context(self, *args, **kwargs)
def get_session(self, user_name, password, retry_connect=True)
def in_used_connects(self)
def _remove_idle_unusable_connection(self)