NebulaGraph Python Client  release-3.8
__init__.py
1 #!/usr/bin/env python
2 # --coding:utf-8--
3 
4 # Copyright (c) 2020 vesoft inc. All rights reserved.
5 #
6 # This source code is licensed under Apache 2.0 License.
7 
8 
9 import socket
10 
11 from _thread import RLock
12 from nebula3.Exception import (
13  InValidHostname,
14  PartNotFoundException,
15  SpaceNotFoundException,
16  TagNotFoundException,
17  EdgeNotFoundException,
18 )
19 from nebula3.common.ttypes import HostAddr, ErrorCode
20 from nebula3.meta.ttypes import (
21  HostStatus,
22  ListTagsReq,
23  ListEdgesReq,
24  ListSpacesReq,
25  GetPartsAllocReq,
26  ListHostsReq,
27  HostRole,
28 )
29 from nebula3.meta import ttypes, MetaService
30 
31 from nebula3.fbthrift.transport import TSocket, TTransport
32 from nebula3.fbthrift.protocol import TBinaryProtocol
33 from nebula3.logger import logger
34 
35 
36 class MetaClient(object):
37  def __init__(self, addresses, timeout):
38  if len(addresses) == 0:
39  raise RuntimeError('Input empty addresses')
40  self._timeout_timeout = timeout
41  self._connection_connection = None
42  self._retry_count_retry_count = 3
43  self._addresses_addresses = addresses
44  for address in addresses:
45  try:
46  socket.gethostbyname(address[0])
47  except Exception:
48  raise InValidHostname(str(address[0]))
49  self._leader_leader = self._addresses_addresses[0]
50  self._lock_lock = RLock()
51 
52  def open(self):
53  """open the connection to connect meta service
54 
55  :eturn: void
56  """
57  try:
58  self.closeclose()
59  s = TSocket.TSocket(self._leader_leader[0], self._leader_leader[1])
60  if self._timeout_timeout > 0:
61  s.setTimeout(self._timeout_timeout)
62  transport = TTransport.TBufferedTransport(s)
63  protocol = TBinaryProtocol.TBinaryProtocol(transport)
64  transport.open()
65  self._connection_connection = MetaService.Client(protocol)
66  except Exception:
67  raise
68 
69  def list_tags(self, space_id):
70  """get all version tags
71 
72  :param space_id: the specified space id
73  :eturn: list<TagItem>
74  """
75  with self._lock_lock:
76  if self._connection_connection is None:
77  raise RuntimeError('The connection is no open')
78  req = ListTagsReq()
79  req.space_id = space_id
80  count = 0
81  while count < self._retry_count_retry_count:
82  resp = self._connection_connection.listTags(req)
83  if resp.code != ErrorCode.SUCCEEDED:
84  if resp.code == ErrorCode.E_LEADER_CHANGED:
85  self.update_leaderupdate_leader(resp.leader)
86  count = count + 1
87  continue
88  raise RuntimeError(
89  "List tags from space id:{} failed, error code: {}".format(
90  space_id, ErrorCode._VALUES_TO_NAMES.get(resp.code)
91  )
92  )
93  return resp.tags
94  raise RuntimeError(
95  "List tags from space id:{} failed, error code: {}".format(
96  space_id, ErrorCode._VALUES_TO_NAMES.get(resp.code)
97  )
98  )
99 
100  def list_edges(self, space_id):
101  """get all version edge
102 
103  :param space_id: the specified space id
104  :return: list<EdgeItem>
105  """
106  with self._lock_lock:
107  if self._connection_connection is None:
108  raise RuntimeError('The connection is no open')
109  req = ListEdgesReq()
110  req.space_id = space_id
111  count = 0
112  while count < self._retry_count_retry_count:
113  resp = self._connection_connection.listEdges(req)
114  if resp.code != ErrorCode.SUCCEEDED:
115  if resp.code == ErrorCode.E_LEADER_CHANGED:
116  self.update_leaderupdate_leader(resp.leader)
117  count = count + 1
118  continue
119  raise RuntimeError(
120  "List edges from space id:{} failed, error code: {}".format(
121  space_id, ErrorCode._VALUES_TO_NAMES.get(resp.code)
122  )
123  )
124  return resp.edges
125  raise RuntimeError(
126  "List edges from space id:{} failed, error code: {}".format(
127  space_id, ErrorCode._VALUES_TO_NAMES.get(resp.code)
128  )
129  )
130 
131  def list_spaces(self):
132  """get all spaces info
133 
134  :eturn: list<IdName>
135  """
136  with self._lock_lock:
137  if self._connection_connection is None:
138  raise RuntimeError('The connection is no open')
139  req = ListSpacesReq()
140  count = 0
141  while count < self._retry_count_retry_count:
142  resp = self._connection_connection.listSpaces(req)
143  if resp.code != ErrorCode.SUCCEEDED:
144  if resp.code == ErrorCode.E_LEADER_CHANGED:
145  self.update_leaderupdate_leader(resp.leader)
146  count = count + 1
147  continue
148  raise RuntimeError(
149  "List spaces failed, error code: {}".format(
150  ErrorCode._VALUES_TO_NAMES.get(resp.code)
151  )
152  )
153  return resp.spaces
154  raise RuntimeError(
155  "List spaces failed, error code: {}".format(
156  ErrorCode._VALUES_TO_NAMES.get(resp.code)
157  )
158  )
159 
160  def list_hosts(self):
161  """get all online hosts info
162 
163  :eturn: list<HostItem>
164  """
165  with self._lock_lock:
166  if self._connection_connection is None:
167  raise RuntimeError('The connection is no open')
168  req = ListHostsReq()
169  req.role = HostRole.STORAGE
170  count = 0
171  while count < self._retry_count_retry_count:
172  resp = self._connection_connection.listHosts(req)
173  if resp.code != ErrorCode.SUCCEEDED:
174  if resp.code == ErrorCode.E_LEADER_CHANGED:
175  self.update_leaderupdate_leader(resp.leader)
176  count = count + 1
177  continue
178  raise RuntimeError(
179  "List spaces failed, error code: {}".format(
180  ErrorCode._VALUES_TO_NAMES.get(resp.code)
181  )
182  )
183  valid_hosts = []
184  for host in resp.hosts:
185  if host.status == HostStatus.ONLINE:
186  valid_hosts.append(host)
187  return valid_hosts
188  raise RuntimeError(
189  "List spaces failed, error code: {}".format(
190  ErrorCode._VALUES_TO_NAMES.get(resp.code)
191  )
192  )
193 
194  def get_parts_alloc(self, space_id):
195  """get all parts info of the specified space id
196 
197  :param space_id:
198  :eturn: map<PartitionID, list<HostAddr>>
199  """
200  with self._lock_lock:
201  if self._connection_connection is None:
202  raise RuntimeError('The connection is no open')
203  req = GetPartsAllocReq()
204  req.space_id = space_id
205  count = 0
206  while count < self._retry_count_retry_count:
207  resp = self._connection_connection.getPartsAlloc(req)
208  if resp.code != ErrorCode.SUCCEEDED:
209  if resp.code == ErrorCode.E_LEADER_CHANGED:
210  self.update_leaderupdate_leader(resp.leader)
211  count = count + 1
212  continue
213  raise RuntimeError(
214  "List parts from space id:{} failed, error code: {}".format(
215  space_id, ErrorCode._VALUES_TO_NAMES.get(resp.code)
216  )
217  )
218  return resp.parts
219  raise RuntimeError(
220  "List parts from space id:{} failed, error code: {}".format(
221  space_id, ErrorCode._VALUES_TO_NAMES.get(resp.code)
222  )
223  )
224 
225  def close(self):
226  """close the connection
227 
228  :eturn: void
229  """
230  try:
231  if self._connection_connection is not None:
232  self._connection_connection._iprot.trans.close()
233  except Exception:
234  raise
235 
236  def update_leader(self, leader):
237  """update the leader meta info when happen leader change
238 
239  :param leader: the address of meta leader
240  :eturn:
241  """
242  try:
243  self._leader_leader = (leader.host, leader.port)
244  self.openopen()
245  except Exception as e:
246  logger.error(e)
247 
248  def __del__(self):
249  self.closeclose()
250 
251 
252 class MetaCache(object):
253  class SpaceCache:
254  def __init__(self):
255  self.space_idspace_id = 0
256  self.space_namespace_name = ''
257  self.tag_itemstag_items = {}
258  self.edge_itemsedge_items = {}
259  self.parts_allocparts_alloc = {}
260 
261  def __repr__(self):
262  return 'space_id: {}, space_name: {}, tag_items: {}, edge_items: {}, parts_alloc: {}'.format(
263  self.space_idspace_id,
264  self.space_namespace_name,
265  self.tag_itemstag_items,
266  self.edge_itemsedge_items,
267  self.parts_allocparts_alloc,
268  )
269 
270  def __init__(self, meta_addrs, timeout=2000, load_period=10, decode_type='utf-8'):
271  self._decode_type = decode_type
272  self._load_period = load_period
273  self._lock = RLock()
274  self._space_caches = {}
275  self._space_id_names = {}
276  self._storage_addrs = []
277  self._storage_leader = {}
278  self._close = False
279  self._meta_client = MetaClient(meta_addrs, timeout)
280  self._meta_client.open()
281 
282  # load meta data
283  self._load_all()
284 
285  def close(self):
286  """close the metaClient
287 
288  :eturn: void
289  """
290  if self._close_close:
291  return
292  self._close_close = True
293  if self._meta_client_meta_client is not None:
294  self._meta_client_meta_client.close()
295 
296  def __del__(self):
297  self.closeclose()
298 
299  def _load_all(self):
300  """load all space info and schema info from meta services
301 
302  :eturn: void
303  """
304  try:
305  spaces = self._meta_client_meta_client.list_spaces()
306  space_caches = {}
307  space_id_names = {}
308  for space in spaces:
309  space_id = space.id.get_space_id()
310  space_cache = MetaCache.SpaceCache()
311  space_cache.space_id = space_id
312  space_cache.space_name = space.name.decode('utf-8')
313  space_id_names[space_id] = space_cache.space_name
314  tags = self._meta_client_meta_client.list_tags(space_id)
315  edges = self._meta_client_meta_client.list_edges(space_id)
316  parts_alloc = self._meta_client_meta_client.get_parts_alloc(space_id)
317  for tag in tags:
318  tag_name = tag.tag_name.decode(self._decode_type_decode_type)
319  if tag_name not in space_cache.tag_items.keys():
320  space_cache.tag_items[tag_name] = tag
321  else:
322  if space_cache.tag_items[tag_name].version < tag.version:
323  space_cache.tag_items[tag_name] = tag
324  for edge in edges:
325  edge_name = edge.edge_name.decode(self._decode_type_decode_type)
326  if edge_name not in space_cache.edge_items.keys():
327  space_cache.edge_items[edge_name] = edge
328  else:
329  if space_cache.edge_items[edge_name].version < edge.version:
330  space_cache.edge_items[edge_name] = edge
331  space_cache.edge_items[
332  edge.edge_name.decode(self._decode_type_decode_type)
333  ] = edge
334  space_cache.parts_alloc = parts_alloc
335  space_caches[space.name.decode(self._decode_type_decode_type)] = space_cache
336 
337  hosts = self._meta_client_meta_client.list_hosts()
338  storage_addrs = []
339  for host_item in hosts:
340  storage_addrs.append(host_item.hostAddr)
341 
342  with self._lock_lock:
343  self._storage_addrs_storage_addrs = storage_addrs
344  self._space_caches_space_caches = space_caches
345  self._space_id_names_space_id_names = space_id_names
346  for space_name in self._space_caches_space_caches.keys():
347  if space_name in self._storage_leader_storage_leader.keys():
348  continue
349  parts_alloc = self._space_caches_space_caches[space_name].parts_alloc
350  self._storage_leader_storage_leader[space_name] = {}
351  for part_id in parts_alloc:
352  self._storage_leader_storage_leader[space_name][part_id] = parts_alloc[
353  part_id
354  ][0]
355  except Exception as x:
356  logger.error('Update meta data failed: {}'.format(x))
357  import traceback
358 
359  logger.error(traceback.format_exc())
360 
362  """get all storage address
363 
364  :return: list[HostAddr]
365  """
366  return self._storage_addrs_storage_addrs
367 
368  def get_tag_id(self, space_name, tag_name):
369  """get tag id
370 
371  :param space_name:
372  :param tag_name:
373  :return: tag_id
374  """
375  with self._lock_lock:
376  tag_item = self._get_tag_item_get_tag_item(space_name, tag_name)
377  return tag_item.tag_id
378 
379  def get_edge_type(self, space_name, edge_name):
380  """get edge type
381 
382  :param space_name:
383  :param edge_name:
384  :return: edge_type
385  """
386  with self._lock_lock:
387  edge_item = self._get_edge_item_get_edge_item(space_name, edge_name)
388  return edge_item.edge_type
389 
390  def get_space_id(self, space_name):
391  """get space id
392 
393  :param space_name:
394  :return: space_id
395  """
396  with self._lock_lock:
397  if space_name not in self._space_caches_space_caches.keys():
398  self._load_all_load_all()
399  if space_name not in self._space_caches_space_caches.keys():
400  raise SpaceNotFoundException(space_name)
401  return self._space_caches_space_caches[space_name].space_id
402 
403  def get_tag_schema(self, space_name, tag_name):
404  """get tag schema
405 
406  :param space_name:
407  :param tag_name:
408  :return: schema
409  """
410  tag_item = self._get_tag_item_get_tag_item(space_name, tag_name)
411  return tag_item.schema
412 
413  def get_edge_schema(self, space_name, edge_name):
414  """get edge schema
415 
416  :param space_name:
417  :param edge_name:
418  :return: schema
419  """
420  edge_item = self._get_edge_item_get_edge_item(space_name, edge_name)
421  return edge_item.schema
422 
423  def get_part_leader(self, space_name, part_id):
424  """
425 
426  :param space_name:
427  :param part_id:
428  :return: storage ip port: HostAddr
429  """
430  part_leaders = self.get_part_leadersget_part_leaders(space_name)
431  if part_id not in part_leaders.keys():
432  raise PartNotFoundException(part_id)
433  return part_leaders[part_id]
434 
435  def get_part_leaders(self, space_name):
436  """get all part leader info of the space
437 
438  :param space_name: space name
439  :eturn: map<PartitionID, HostAddr>
440  """
441  with self._lock_lock:
442  if space_name not in self._storage_leader_storage_leader.keys():
443  self._load_all_load_all()
444  if space_name not in self._storage_leader_storage_leader.keys():
445  raise SpaceNotFoundException(space_name)
446  return self._storage_leader_storage_leader[space_name]
447 
448  def get_part_alloc(self, space_name):
449  """get all part info of the space
450 
451  :param space_name: space name
452  :eturn: map<PartitionID, list<HostAddr>>
453  """
454  with self._lock_lock:
455  if space_name not in self._space_caches_space_caches.keys():
456  self._load_all_load_all()
457  if space_name not in self._space_caches_space_caches.keys():
458  raise SpaceNotFoundException(space_name)
459  return self._space_caches_space_caches[space_name].parts_alloc
460 
461  def _get_tag_item(self, space_name, tag_name):
462  with self._lock_lock:
463  if space_name not in self._space_caches_space_caches.keys():
464  self._load_all_load_all()
465  if space_name not in self._space_caches_space_caches.keys():
466  raise SpaceNotFoundException(space_name)
467  space_info = self._space_caches_space_caches[space_name]
468  if tag_name not in space_info.tag_items.keys():
469  self._load_all_load_all()
470  if tag_name not in space_info.tag_items.keys():
471  raise TagNotFoundException(tag_name)
472  return space_info.tag_items[tag_name]
473 
474  def _get_edge_item(self, space_name, edge_name):
475  with self._lock_lock:
476  if space_name not in self._space_caches_space_caches.keys():
477  self._load_all_load_all()
478  if space_name not in self._space_caches_space_caches.keys():
479  raise SpaceNotFoundException(space_name)
480  space_info = self._space_caches_space_caches[space_name]
481  if edge_name not in space_info.edge_items.keys():
482  self._load_all_load_all()
483  if edge_name not in space_info.edge_items.keys():
484  raise EdgeNotFoundException(edge_name)
485  return space_info.edge_items[edge_name]
486 
487  def update_storage_leader(self, space_id, part_id, address: HostAddr):
488  """if the storage leader change, storage client need to call this function
489 
490  :param space_id:
491  :param part_id:
492  :param address: HostAddr, if the address is None, it means the leader can't connect,
493  choose the peer as leader
494  :return: coid
495  """
496  with self._lock_lock:
497  if space_id not in self._space_id_names_space_id_names.keys():
498  logger.error("Space name:{} is not found".format(space_id))
499  return
500  space_name = self._space_id_names_space_id_names.get(space_id)
501  if part_id not in self._storage_leader_storage_leader[space_name].keys():
502  logger.error("part_id:{} is not found".format(space_name))
503  return
504  if address is not None:
505  self._storage_leader_storage_leader[space_name][part_id] = address
506  return
507  part_addresses = self._space_caches_space_caches[space_name].parts_alloc.get(part_id)
508  for part_addr in part_addresses:
509  if part_addr == address:
510  continue
511  self._storage_leader_storage_leader[space_name][part_id] = part_addr
def get_edge_type(self, space_name, edge_name)
Definition: __init__.py:379
def update_storage_leader(self, space_id, part_id, HostAddr address)
Definition: __init__.py:487
def get_all_storage_addrs(self)
Definition: __init__.py:361
def get_tag_id(self, space_name, tag_name)
Definition: __init__.py:368
def get_part_alloc(self, space_name)
Definition: __init__.py:448
def _get_edge_item(self, space_name, edge_name)
Definition: __init__.py:474
def get_edge_schema(self, space_name, edge_name)
Definition: __init__.py:413
def get_space_id(self, space_name)
Definition: __init__.py:390
def get_part_leaders(self, space_name)
Definition: __init__.py:435
def _get_tag_item(self, space_name, tag_name)
Definition: __init__.py:461
def get_tag_schema(self, space_name, tag_name)
Definition: __init__.py:403
def get_part_leader(self, space_name, part_id)
Definition: __init__.py:423
def update_leader(self, leader)
Definition: __init__.py:236
def list_edges(self, space_id)
Definition: __init__.py:100
def get_parts_alloc(self, space_id)
Definition: __init__.py:194
def list_tags(self, space_id)
Definition: __init__.py:69