NebulaGraph Python Client  release-3.4
ConnectionPool.py
1 # --coding:utf-8--
2 #
3 # Copyright (c) 2021 vesoft inc. All rights reserved.
4 #
5 # This source code is licensed under Apache 2.0 License.
6 
7 
8 import contextlib
9 import socket
10 
11 from collections import deque
12 from threading import RLock, Timer
13 
14 from nebula3.Exception import NotValidConnectionException, InValidHostname
15 
16 from nebula3.gclient.net.Session import Session
17 from nebula3.gclient.net.Connection import Connection
18 from nebula3.logger import logger
19 
20 
21 class ConnectionPool(object):
22  S_OK = 0
23  S_BAD = 1
24 
25  def __init__(self):
26  # all addresses of servers
27  self._addresses_addresses = list()
28 
29  # server's status
30  self._addresses_status_addresses_status = dict()
31 
32  # all connections
33  self._connections_connections = dict()
34  self._configs_configs = None
35  self._ssl_configs_ssl_configs = None
36  self._lock_lock = RLock()
37  self._pos_pos = -1
38  self._close_close = False
39 
40  def __del__(self):
41  self.closeclose()
42 
43  def init(self, addresses, configs, ssl_conf=None):
44  """init the connection pool
45 
46  :param addresses: the graphd servers' addresses
47  :param configs: the config of the pool
48  :param ssl_conf: the config of SSL socket
49  :return: if all addresses are ok, return True else return False.
50  """
51  if self._close_close:
52  logger.error('The pool has init or closed.')
53  raise RuntimeError('The pool has init or closed.')
54  self._configs_configs = configs
55  self._ssl_configs_ssl_configs = ssl_conf
56  for address in addresses:
57  if address not in self._addresses_addresses:
58  try:
59  ip = socket.gethostbyname(address[0])
60  except Exception:
61  raise InValidHostname(str(address[0]))
62  ip_port = (ip, address[1])
63  self._addresses_addresses.append(ip_port)
64  self._addresses_status_addresses_status[ip_port] = self.S_BADS_BAD
65  self._connections_connections[ip_port] = deque()
66  self._ssl_configs_ssl_configs = ssl_conf
67  self.update_servers_statusupdate_servers_status()
68 
69  # detect the services
70  self._period_detect_period_detect()
71 
72  # init min connections
73  ok_num = self.get_ok_servers_numget_ok_servers_num()
74  if ok_num < len(self._addresses_addresses):
75  raise RuntimeError(
76  'The services status exception: {}'.format(self._get_services_status_get_services_status())
77  )
78 
79  conns_per_address = int(self._configs_configs.min_connection_pool_size / ok_num)
80 
81  for addr in self._addresses_addresses:
82  for i in range(0, conns_per_address):
83  connection = Connection()
84  connection.open_SSL(
85  addr[0], addr[1], self._configs_configs.timeout, self._ssl_configs_ssl_configs
86  )
87  self._connections_connections[addr].append(connection)
88  return True
89 
90  def get_session(self, user_name, password, retry_connect=True):
91  """get session
92 
93  :param user_name: the user name to authenticate graphd
94  :param password: the password to authenticate graphd
95  :param retry_connect:
96  :return: Session
97  """
98  connection = self.get_connectionget_connection()
99  if connection is None:
100  raise NotValidConnectionException()
101  try:
102  auth_result = connection.authenticate(user_name, password)
103  return Session(connection, auth_result, self, retry_connect)
104  except Exception:
105  raise
106 
107  @contextlib.contextmanager
108  def session_context(self, *args, **kwargs):
109  """
110  session_context is to be used with a contextlib.contextmanager.
111  It returns a connection session from the pool, with same params
112  as the method get_session().
113 
114  When session_context is exited, the connection will be released.
115 
116  :param user_name: the user name to authenticate graphd
117  :param password: the password to authenticate graphd
118  :param retry_connect: if auto retry connect
119  :return: contextlib._GeneratorContextManager
120  """
121  session = None
122  try:
123  session = self.get_sessionget_session(*args, **kwargs)
124  yield session
125  except Exception:
126  raise
127  finally:
128  if session:
129  session.release()
130 
131  def get_connection(self):
132  """get available connection
133 
134  :return: Connection
135  """
136  with self._lock_lock:
137  if self._close_close:
138  logger.error('The pool is closed')
139  raise NotValidConnectionException()
140 
141  try:
142  ok_num = self.get_ok_servers_numget_ok_servers_num()
143  if ok_num == 0:
144  logging.error('No available server')
145  return None
146  max_con_per_address = int(
147  self._configs_configs.max_connection_pool_size / ok_num
148  )
149  try_count = 0
150  while try_count <= len(self._addresses_addresses):
151  self._pos_pos = (self._pos_pos + 1) % len(self._addresses_addresses)
152  addr = self._addresses_addresses[self._pos_pos]
153  if self._addresses_status_addresses_status[addr] == self.S_OKS_OK:
154  invalid_connections = list()
155 
156  # iterate all connections to find an available connection
157  for connection in self._connections_connections[addr]:
158  if not connection.is_used:
159  # ping to check the connection is valid
160  if connection.ping():
161  connection.is_used = True
162  logger.info('Get connection to {}'.format(addr))
163  return connection
164  else:
165  invalid_connections.append(connection)
166 
167  # remove invalid connections
168  for connection in invalid_connections:
169  self._connections_connections[addr].remove(connection)
170 
171  # check if the server is still alive
172  if not self.pingping(addr):
173  self._addresses_status_addresses_status[addr] = self.S_BADS_BAD
174  continue
175 
176  # create new connection if the number of connections is less than max_con_per_address
177  if len(self._connections_connections[addr]) < max_con_per_address:
178  connection = Connection()
179  connection.open_SSL(
180  addr[0],
181  addr[1],
182  self._configs_configs.timeout,
183  self._ssl_configs_ssl_configs,
184  )
185  connection.is_used = True
186  self._connections_connections[addr].append(connection)
187  logger.info('Get connection to {}'.format(addr))
188  return connection
189  else:
190  for connection in list(self._connections_connections[addr]):
191  if not connection.is_used:
192  self._connections_connections[addr].remove(connection)
193  try_count = try_count + 1
194 
195  logging.error('No available connection')
196  return None
197  except Exception as ex:
198  logger.error('Get connection failed: {}'.format(ex))
199  return None
200 
201  def ping(self, address):
202  """check the server is ok
203 
204  :param address: the server address want to connect
205  :return: True or False
206  """
207  try:
208  conn = Connection()
209  conn.open_SSL(address[0], address[1], 1000, self._ssl_configs_ssl_configs)
210  conn.close()
211  return True
212  except Exception as ex:
213  logger.warning(
214  'Connect {}:{} failed: {}'.format(address[0], address[1], ex)
215  )
216  return False
217 
218  def close(self):
219  """close all connections in pool
220 
221  :return: void
222  """
223  with self._lock_lock:
224  for addr in self._connections_connections.keys():
225  for connection in self._connections_connections[addr]:
226  if connection.is_used:
227  logger.warning('Closing a connection that is in use')
228  connection.close()
229  self._close_close = True
230 
231  def connects(self):
232  """get the number of existing connections
233 
234  :return: the number of connections
235  """
236  with self._lock_lock:
237  count = 0
238  for addr in self._connections_connections.keys():
239  count = count + len(self._connections_connections[addr])
240  return count
241 
242  def in_used_connects(self):
243  """get the number of the used connections
244 
245  :return: int
246  """
247  with self._lock_lock:
248  count = 0
249  for addr in self._connections_connections.keys():
250  for connection in self._connections_connections[addr]:
251  if connection.is_used:
252  count = count + 1
253  return count
254 
256  """get the number of the ok servers
257 
258  :return: int
259  """
260  count = 0
261  for addr in self._addresses_status_addresses_status.keys():
262  if self._addresses_status_addresses_status[addr] == self.S_OKS_OK:
263  count = count + 1
264  return count
265 
266  def _get_services_status(self):
267  msg_list = []
268  for addr in self._addresses_status_addresses_status.keys():
269  status = 'OK'
270  if self._addresses_status_addresses_status[addr] != self.S_OKS_OK:
271  status = 'BAD'
272  msg_list.append('[services: {}, status: {}]'.format(addr, status))
273  return ', '.join(msg_list)
274 
276  """update the servers' status"""
277  for address in self._addresses_addresses:
278  if self.pingping(address):
279  self._addresses_status_addresses_status[address] = self.S_OKS_OK
280  else:
281  self._addresses_status_addresses_status[address] = self.S_BADS_BAD
282 
283  def _remove_idle_unusable_connection(self):
284  if self._configs_configs.idle_time == 0:
285  return
286  with self._lock_lock:
287  for addr in self._connections_connections.keys():
288  conns = self._connections_connections[addr]
289  for connection in list(conns):
290  if not connection.is_used:
291  if not connection.ping():
292  logger.debug(
293  'Remove the unusable connection to {}'.format(
294  connection.get_address()
295  )
296  )
297  conns.remove(connection)
298  continue
299  if (
300  self._configs_configs.idle_time != 0
301  and connection.idle_time() > self._configs_configs.idle_time
302  ):
303  logger.debug(
304  'Remove the idle connection to {}'.format(
305  connection.get_address()
306  )
307  )
308  conns.remove(connection)
309 
310  def _period_detect(self):
311  if self._close_close or self._configs_configs.interval_check < 0:
312  return
313  self.update_servers_statusupdate_servers_status()
314  self._remove_idle_unusable_connection_remove_idle_unusable_connection()
315  timer = Timer(self._configs_configs.interval_check, self._period_detect_period_detect)
316  timer.setDaemon(True)
317  timer.start()
def init(self, addresses, configs, ssl_conf=None)
def get_session(self, user_name, password, retry_connect=True)