NebulaGraph Python Client  release-3.8
ConnectionPool.py
1 # --coding:utf-8--
2 #
3 # Copyright (c) 2021 vesoft inc. All rights reserved.
4 #
5 # This source code is licensed under Apache 2.0 License.
6 
7 
8 import contextlib
9 import socket
10 
11 from collections import deque
12 from threading import RLock, Timer
13 
14 from nebula3.Exception import NotValidConnectionException, InValidHostname
15 
16 from nebula3.gclient.net.Session import Session
17 from nebula3.gclient.net.Connection import Connection
18 from nebula3.Config import Config
19 from nebula3.logger import logger
20 from typing import Dict, List, Tuple
21 
22 
23 class ConnectionPool(object):
24  S_OK = 0
25  S_BAD = 1
26 
27  def __init__(self):
28  # all addresses of servers
29  self._addresses: List[Tuple[str, int]] = list()
30 
31  # server's status
32  self._addresses_status_addresses_status = dict()
33 
34  # all connections
35  self._connections: Dict[Tuple[str, int], List[Connection]] = dict()
36  self._configs_configs = None
37  self._ssl_configs_ssl_configs = None
38  self._lock_lock = RLock()
39  self._pos_pos = -1
40  self._close_close = False
41 
42  def __del__(self):
43  self.closeclose()
44 
45  def init(self, addresses, configs=None, ssl_conf=None):
46  """init the connection pool
47 
48  :param addresses: the graphd servers' addresses
49  :param configs: the config of the pool
50  :param ssl_conf: the config of SSL socket
51  :return: if all addresses are ok, return True else return False.
52  """
53  if self._close_close:
54  logger.error("The pool has init or closed.")
55  raise RuntimeError("The pool has init or closed.")
56  if configs is None:
57  self._configs_configs = Config()
58  else:
59  assert isinstance(
60  configs, Config
61  ), "wrong type of Config, try this: `from nebula3.Config import Config`"
62  self._configs_configs = configs
63  self._ssl_configs_ssl_configs = ssl_conf
64  for address in addresses:
65  if address not in self._addresses:
66  try:
67  ip = socket.gethostbyname(address[0])
68  except Exception:
69  raise InValidHostname(str(address[0]))
70  ip_port = (ip, address[1])
71  self._addresses.append(ip_port)
72  self._addresses_status_addresses_status[ip_port] = self.S_BADS_BAD
73  self._connections[ip_port] = deque()
74  self._ssl_configs_ssl_configs = ssl_conf
75  self.update_servers_statusupdate_servers_status()
76 
77  # detect the services
78  self._period_detect_period_detect()
79 
80  # init min connections
81  ok_num = self.get_ok_servers_numget_ok_servers_num()
82  if ok_num < len(self._addresses):
83  raise RuntimeError(
84  "The services status exception: {}".format(self._get_services_status_get_services_status())
85  )
86 
87  conns_per_address = int(self._configs_configs.min_connection_pool_size / ok_num)
88 
89  for addr in self._addresses:
90  for i in range(0, conns_per_address):
91  connection = Connection()
92  connection.open_SSL(
93  addr[0],
94  addr[1],
95  self._configs_configs.timeout,
96  self._ssl_configs_ssl_configs,
97  self._configs_configs.use_http2,
98  self._configs_configs.http_headers,
99  )
100  self._connections[addr].append(connection)
101  return True
102 
103  def get_session(self, user_name, password, retry_connect=True):
104  """get session
105 
106  :param user_name: the user name to authenticate graphd
107  :param password: the password to authenticate graphd
108  :param retry_connect:
109  :return: Session
110  """
111  connection = self.get_connectionget_connection()
112  if connection is None:
113  raise NotValidConnectionException()
114  try:
115  auth_result = connection.authenticate(user_name, password)
116  return Session(connection, auth_result, self, retry_connect)
117  except Exception:
118  raise
119 
120  @contextlib.contextmanager
121  def session_context(self, *args, **kwargs):
122  """
123  session_context is to be used with a contextlib.contextmanager.
124  It returns a connection session from the pool, with same params
125  as the method get_session().
126 
127  When session_context is exited, the connection will be released.
128 
129  :param user_name: the user name to authenticate graphd
130  :param password: the password to authenticate graphd
131  :param retry_connect: if auto retry connect
132  :return: contextlib._GeneratorContextManager
133  """
134  session = None
135  try:
136  session = self.get_sessionget_session(*args, **kwargs)
137  yield session
138  except Exception:
139  raise
140  finally:
141  if session:
142  session.release()
143 
144  def get_connection(self):
145  """get available connection
146 
147  :return: Connection
148  """
149  with self._lock_lock:
150  if self._close_close:
151  logger.error("The pool is closed")
152  raise NotValidConnectionException()
153 
154  try:
155  ok_num = self.get_ok_servers_numget_ok_servers_num()
156  if ok_num == 0:
157  logger.error("No available server")
158  return None
159  max_con_per_address = int(
160  self._configs_configs.max_connection_pool_size / ok_num
161  )
162  try_count = 0
163  while try_count <= len(self._addresses):
164  self._pos_pos = (self._pos_pos + 1) % len(self._addresses)
165  addr = self._addresses[self._pos_pos]
166  if self._addresses_status_addresses_status[addr] == self.S_OKS_OK:
167  invalid_connections = list()
168 
169  # iterate all connections to find an available connection
170  for connection in self._connections[addr]:
171  if not connection.is_used:
172  # ping to check the connection is valid
173  if connection.ping():
174  connection.is_used = True
175  logger.info("Get connection to {}".format(addr))
176  return connection
177  else:
178  invalid_connections.append(connection)
179 
180  # remove invalid connections
181  for connection in invalid_connections:
182  self._connections[addr].remove(connection)
183 
184  # check if the server is still alive
185  if not self.pingping(addr):
186  self._addresses_status_addresses_status[addr] = self.S_BADS_BAD
187  continue
188 
189  # create new connection if the number of connections is less than max_con_per_address
190  if len(self._connections[addr]) < max_con_per_address:
191  connection = Connection()
192  connection.open_SSL(
193  addr[0],
194  addr[1],
195  self._configs_configs.timeout,
196  self._ssl_configs_ssl_configs,
197  self._configs_configs.use_http2,
198  self._configs_configs.http_headers,
199  )
200  connection.is_used = True
201  self._connections[addr].append(connection)
202  logger.info("Get connection to {}".format(addr))
203  return connection
204  else:
205  for connection in list(self._connections[addr]):
206  if not connection.is_used:
207  self._connections[addr].remove(connection)
208  try_count = try_count + 1
209 
210  logger.error("No available connection")
211  return None
212  except Exception as ex:
213  logger.error("Get connection failed: {}".format(ex))
214  return None
215 
216  def ping(self, address):
217  """check the server is ok
218 
219  :param address: the server address want to connect
220  :return: True or False
221  """
222  try:
223  conn = Connection()
224  # support ping before self.init()
225  if self._configs_configs is None:
226  self._configs_configs = Config()
227  conn.open_SSL(
228  address[0],
229  address[1],
230  1000,
231  self._ssl_configs_ssl_configs,
232  self._configs_configs.use_http2,
233  self._configs_configs.http_headers,
234  )
235  conn.close()
236  return True
237  except Exception as ex:
238  logger.warning(
239  "Connect {}:{} failed: {}".format(address[0], address[1], ex)
240  )
241  return False
242 
243  def close(self):
244  """close all connections in pool
245 
246  :return: void
247  """
248  with self._lock_lock:
249  for addr in self._connections.keys():
250  for connection in self._connections[addr]:
251  if connection.is_used:
252  logger.warning("Closing a connection that is in use")
253  connection.close()
254  self._close_close = True
255 
256  def connects(self):
257  """get the number of existing connections
258 
259  :return: the number of connections
260  """
261  with self._lock_lock:
262  count = 0
263  for addr in self._connections.keys():
264  count = count + len(self._connections[addr])
265  return count
266 
267  def in_used_connects(self):
268  """get the number of the used connections
269 
270  :return: int
271  """
272  with self._lock_lock:
273  count = 0
274  for addr in self._connections.keys():
275  for connection in self._connections[addr]:
276  if connection.is_used:
277  count = count + 1
278  return count
279 
281  """get the number of the ok servers
282 
283  :return: int
284  """
285  count = 0
286  for addr in self._addresses_status_addresses_status.keys():
287  if self._addresses_status_addresses_status[addr] == self.S_OKS_OK:
288  count = count + 1
289  return count
290 
291  def _get_services_status(self):
292  msg_list = []
293  for addr in self._addresses_status_addresses_status.keys():
294  status = "OK"
295  if self._addresses_status_addresses_status[addr] != self.S_OKS_OK:
296  status = "BAD"
297  msg_list.append("[services: {}, status: {}]".format(addr, status))
298  return ", ".join(msg_list)
299 
301  """update the servers' status"""
302  for address in self._addresses:
303  if self.pingping(address):
304  self._addresses_status_addresses_status[address] = self.S_OKS_OK
305  else:
306  self._addresses_status_addresses_status[address] = self.S_BADS_BAD
307 
308  def _remove_idle_unusable_connection(self):
309  if self._configs_configs.idle_time == 0:
310  return
311  with self._lock_lock:
312  for addr in self._connections.keys():
313  conns = self._connections[addr]
314  for connection in list(conns):
315  if not connection.is_used:
316  if not connection.ping():
317  logger.debug(
318  "Remove the unusable connection to {}".format(
319  connection.get_address()
320  )
321  )
322  conns.remove(connection)
323  continue
324  if (
325  self._configs_configs.idle_time != 0
326  and connection.idle_time() > self._configs_configs.idle_time
327  ):
328  logger.debug(
329  "Remove the idle connection to {}".format(
330  connection.get_address()
331  )
332  )
333  conns.remove(connection)
334 
335  def _period_detect(self):
336  if self._close_close or self._configs_configs.interval_check < 0:
337  return
338  self.update_servers_statusupdate_servers_status()
339  self._remove_idle_unusable_connection_remove_idle_unusable_connection()
340  timer = Timer(self._configs_configs.interval_check, self._period_detect_period_detect)
341  timer.setDaemon(True)
342  timer.start()
def init(self, addresses, configs=None, ssl_conf=None)
def get_session(self, user_name, password, retry_connect=True)