NebulaGraph Python Client  release-3.8
GraphStorageClient.py
1 #!/usr/bin/env python
2 # --coding:utf-8--
3 
4 # Copyright (c) 2020 vesoft inc. All rights reserved.
5 #
6 # This source code is licensed under Apache 2.0 License.
7 
8 
9 """
10 The client to scan vertex and edge from storage,
11 the return data is from thr graph database
12 """
13 import sys
14 
15 from nebula3.sclient.ScanResult import ScanResult
16 from nebula3.sclient.net import GraphStorageConnection
17 from nebula3.storage.ttypes import (
18  ScanCursor,
19  ScanEdgeRequest,
20  ScanVertexRequest,
21  VertexProp,
22  EdgeProp,
23 )
24 from nebula3.logger import logger
25 
26 kVid = b'_vid'
27 kSrc = b'_src'
28 kType = b'_type'
29 kRank = b'_rank'
30 kDst = b'_dst'
31 
32 
33 class GraphStorageClient(object):
34  DEFAULT_START_TIME = 0
35  DEFAULT_END_TIME = sys.maxsize
36  DEFAULT_LIMIT = 1000
37  user = ""
38  passwd = ""
39 
40  def __init__(self, meta_cache, storage_addrs=None, time_out=60000):
41  self._meta_cache_meta_cache = meta_cache
42  self._storage_addrs_storage_addrs = storage_addrs
43  self._time_out_time_out = time_out
44  self._connections_connections = []
45  self._create_connection_create_connection()
46 
47  def set_user_passwd(self, user, passwd):
48  """set user and password for scan. only useful for enterprise
49  :return:
50  """
51  self.useruseruser = user
52  self.passwdpasswdpasswd = passwd
53 
54  def get_conns(self):
55  """get all connections which connect to storaged, the ScanResult use it
56 
57  :return: list<GraphStorageConnection>
58  """
59  return self._connections_connections
60 
61  def __del__(self):
62  self.closeclose()
63 
64  def close(self):
65  """close the GraphStorageClient
66 
67  :return:
68  """
69  try:
70  for conn in self._connections_connections:
71  conn.close()
72  except Exception as e:
73  logger.error('Close connection failed: {}'.format(e))
74  raise
75 
76  def _create_connection(self):
77  """create GraphStorageConnection
78 
79  :return: GraphStorageConnection
80  """
81  if self._storage_addrs_storage_addrs is None:
82  self._storage_addrs_storage_addrs = self._meta_cache_meta_cache.get_all_storage_addrs()
83  if len(self._storage_addrs_storage_addrs) == 0:
84  raise RuntimeError('Get storage address from meta cache is empty')
85  try:
86  for addr in self._storage_addrs_storage_addrs:
87  conn = GraphStorageConnection(addr, self._time_out_time_out, self._meta_cache_meta_cache)
88  conn.open()
89  self._connections_connections.append(conn)
90  except Exception as e:
91  logger.error('Create storage connection failed: {}'.format(e))
92  raise
93 
94  def get_space_addrs(self, space_name):
95  """get all storage addresses that manage space
96 
97  :param space_name: the specified space name
98  :return: list<(ip, port)>
99  """
100  return self._meta_cache_meta_cache.get_space_addrs(space_name)
101 
103  self,
104  space_name,
105  tag_name,
106  prop_names=[],
107  limit=DEFAULT_LIMIT,
108  start_time=DEFAULT_START_TIME,
109  end_time=DEFAULT_END_TIME,
110  where=None,
111  only_latest_version=False,
112  enable_read_from_follower=True,
113  partial_success=False,
114  ):
115  """scan vertex with the specified space_name, tag_name,
116  if the prop_names is empty, will return all properties of the tag
117 
118  :param prop_names: if given empty, return all property
119  :param tag_name: the tag name
120  :param space_name: the space name
121  :param limit: the max vertex number from one storaged
122  :param start_time: the min version of vertex
123  :param end_time: the max version of vertex
124  :param where: now is unsupported
125  :param only_latest_version: when storage enable multi versions and only_latest_version is true,
126  only return latest version.
127  when storage disable multi versions, just use the default value.
128  :param enable_read_from_follower: if set to false, forbid follower read
129  :param partial_success: if set true, when partial success, it will continue until finish
130  :return: ScanResult
131  """
132  part_leaders = self._meta_cache_meta_cache.get_part_leaders(space_name)
133  return self._scan_vertex_scan_vertex(
134  space_name,
135  part_leaders,
136  tag_name,
137  prop_names,
138  limit,
139  start_time,
140  end_time,
141  where,
142  only_latest_version,
143  enable_read_from_follower,
144  partial_success,
145  )
146 
148  self,
149  space_name,
150  part,
151  tag_name,
152  prop_names=[],
153  limit=DEFAULT_LIMIT,
154  start_time=DEFAULT_START_TIME,
155  end_time=DEFAULT_END_TIME,
156  where=None,
157  only_latest_version=False,
158  enable_read_from_follower=True,
159  partial_success=False,
160  ):
161  """scan vertex with the specified space_name, partId, tag_name,
162  if the prop_names is empty, will return all properties of the tag
163 
164  :param prop_names: if given empty, return all property
165  :param tag_name: the tag name
166  :type part: part id
167  :param space_name: the space name
168  :param limit: the max vertex number from one storaged
169  :param start_time: the min version of vertex
170  :param end_time: the max version of vertex
171  :param where: now is unsupported
172  :param only_latest_version: when storage enable multi versions and only_latest_version is true,
173  only return latest version.
174  when storage disable multi versions, just use the default value.
175  :param enable_read_from_follower: if set to false, forbid follower read
176  :param partial_success: if set true, when partial success, it will continue until finish
177  :return: ScanResult
178  """
179 
180  part_leaders = {part: self._meta_cache_meta_cache.get_part_leader(space_name, part)}
181  return self._scan_vertex_scan_vertex(
182  space_name,
183  part_leaders,
184  tag_name,
185  prop_names,
186  limit,
187  start_time,
188  end_time,
189  where,
190  only_latest_version,
191  enable_read_from_follower,
192  partial_success,
193  )
194 
195  def _scan_vertex(
196  self,
197  space_name,
198  part_leaders,
199  tag_name,
200  prop_names,
201  limit,
202  start_time,
203  end_time,
204  where,
205  only_latest_version,
206  enable_read_from_follower,
207  partial_success=False,
208  ):
209  space_id = self._meta_cache_meta_cache.get_space_id(space_name)
210  tag_id = self._meta_cache_meta_cache.get_tag_id(space_name, tag_name)
211  vertex_prop: VertexProp = VertexProp()
212  vertex_prop.tag = tag_id
213  vertex_prop.props = [kVid]
214  for prop_name in prop_names:
215  vertex_prop.props.append(prop_name.encode('utf-8'))
216 
217  if len(prop_names) == 0:
218  schema = self._meta_cache_meta_cache.get_tag_schema(space_name, tag_name)
219  for col in schema.columns:
220  vertex_prop.props.append(col.name)
221 
222  parts = {}
223  for id in part_leaders.keys():
224  parts[id] = ScanCursor()
225 
226  # construct request
227  req = ScanVertexRequest()
228  req.space_id = space_id
229  req.parts = parts
230  req.return_columns = [vertex_prop]
231  req.limit = limit
232  req.start_time = start_time
233  req.end_time = end_time
234  req.filter = where
235  req.only_latest_version = only_latest_version
236  req.enable_read_from_follower = enable_read_from_follower
237  req.username = self.useruseruser.encode('utf-8')
238  req.password = self.passwdpasswdpasswd.encode('utf-8')
239  req.need_authenticate = True
240  return ScanResult(
241  self,
242  req=req,
243  part_addrs=part_leaders,
244  is_vertex=True,
245  partial_success=partial_success,
246  )
247 
249  self,
250  space_name,
251  edge_name,
252  prop_names=[],
253  limit=DEFAULT_LIMIT,
254  start_time=DEFAULT_START_TIME,
255  end_time=DEFAULT_END_TIME,
256  where=None,
257  only_latest_version=False,
258  enable_read_from_follower=True,
259  partial_success=False,
260  ):
261  """scan edge with the specified space_name, edge_name,
262  if the prop_names is empty, will return all properties of the edge
263 
264  :param prop_names: if given empty, return all property
265  :param edge_name: the edge name
266  :param space_name: the space name
267  :param limit: the max vertex number from one storaged
268  :param start_time: the min version of vertex
269  :param end_time: the max version of vertex
270  :param where: now is unsupported
271  :param only_latest_version: when storage enable multi versions and only_latest_version is true,
272  only return latest version.
273  when storage disable multi versions, just use the default value.
274  :param enable_read_from_follower: if set to false, forbid follower read
275  :param partial_success: if set true, when partial success, it will continue until finish
276  :return: ScanResult
277  """
278  part_leaders = self._meta_cache_meta_cache.get_part_leaders(space_name)
279  return self._scan_edge_scan_edge(
280  space_name,
281  part_leaders,
282  edge_name,
283  prop_names,
284  limit,
285  start_time,
286  end_time,
287  where,
288  only_latest_version,
289  enable_read_from_follower,
290  partial_success,
291  )
292 
294  self,
295  space_name,
296  part,
297  edge_name,
298  prop_names=[],
299  limit=DEFAULT_LIMIT,
300  start_time=DEFAULT_START_TIME,
301  end_time=DEFAULT_END_TIME,
302  where=None,
303  only_latest_version=False,
304  enable_read_from_follower=True,
305  partial_success=False,
306  ):
307  """scan edge with the specified space_name, partId, edge_name,
308  if the prop_names is empty, will return all properties of the edge
309 
310  :param space_name: the space name
311  :param part: the partition num of the given space
312  :type prop_names: if given empty, return all property
313  :param edge_name: the edge name
314  :param limit: the max vertex number from one storaged
315  :param start_time: the min version of edge
316  :param end_time: the max version of edge
317  :param where: now is unsupported
318  :param only_latest_version: when storage enable multi versions and only_latest_version is true,
319  only return latest version.
320  when storage disable multi versions, just use the default value.
321  :param enable_read_from_follower: if set to false, forbid follower read
322  :param partial_success: if set true, when partial success, it will continue until finish
323  :return: ScanResult
324  """
325  part_leaders = {part: self._meta_cache_meta_cache.get_part_leader(space_name, part)}
326  return self._scan_edge_scan_edge(
327  space_name,
328  part_leaders,
329  edge_name,
330  prop_names,
331  limit,
332  start_time,
333  end_time,
334  where,
335  only_latest_version,
336  enable_read_from_follower,
337  partial_success,
338  )
339 
340  def _scan_edge(
341  self,
342  space_name,
343  part_leaders,
344  edge_name,
345  prop_names,
346  limit,
347  start_time,
348  end_time,
349  where,
350  only_latest_version,
351  enable_read_from_follower,
352  partial_success,
353  ):
354  space_id = self._meta_cache_meta_cache.get_space_id(space_name)
355  edge_type = self._meta_cache_meta_cache.get_edge_type(space_name, edge_name)
356  edge_prop = EdgeProp()
357  edge_prop.type = edge_type
358  edge_prop.props = [kSrc, kType, kRank, kDst]
359 
360  for prop_name in prop_names:
361  edge_prop.props.append(prop_name.encode('utf-8'))
362 
363  # When storage return column names, here need delete
364  if len(prop_names) == 0:
365  schema = self._meta_cache_meta_cache.get_edge_schema(space_name, edge_name)
366  for col in schema.columns:
367  edge_prop.props.append(col.name)
368 
369  parts = {}
370  for id in part_leaders.keys():
371  parts[id] = ScanCursor()
372 
373  req = ScanEdgeRequest()
374  req.space_id = space_id
375  req.parts = parts
376  req.return_columns = [edge_prop]
377  req.limit = limit
378  req.start_time = start_time
379  req.end_time = end_time
380  req.filter = where
381  req.only_latest_version = only_latest_version
382  req.enable_read_from_follower = enable_read_from_follower
383  req.username = self.useruseruser.encode('utf-8')
384  req.password = self.passwdpasswdpasswd.encode('utf-8')
385  req.need_authenticate = True
386  return ScanResult(
387  self,
388  req=req,
389  part_addrs=part_leaders,
390  is_vertex=False,
391  partial_success=partial_success,
392  )
def scan_vertex(self, space_name, tag_name, prop_names=[], limit=DEFAULT_LIMIT, start_time=DEFAULT_START_TIME, end_time=DEFAULT_END_TIME, where=None, only_latest_version=False, enable_read_from_follower=True, partial_success=False)
def _scan_vertex(self, space_name, part_leaders, tag_name, prop_names, limit, start_time, end_time, where, only_latest_version, enable_read_from_follower, partial_success=False)
def scan_edge(self, space_name, edge_name, prop_names=[], limit=DEFAULT_LIMIT, start_time=DEFAULT_START_TIME, end_time=DEFAULT_END_TIME, where=None, only_latest_version=False, enable_read_from_follower=True, partial_success=False)
def _scan_edge(self, space_name, part_leaders, edge_name, prop_names, limit, start_time, end_time, where, only_latest_version, enable_read_from_follower, partial_success)
def scan_edge_with_part(self, space_name, part, edge_name, prop_names=[], limit=DEFAULT_LIMIT, start_time=DEFAULT_START_TIME, end_time=DEFAULT_END_TIME, where=None, only_latest_version=False, enable_read_from_follower=True, partial_success=False)
def scan_vertex_with_part(self, space_name, part, tag_name, prop_names=[], limit=DEFAULT_LIMIT, start_time=DEFAULT_START_TIME, end_time=DEFAULT_END_TIME, where=None, only_latest_version=False, enable_read_from_follower=True, partial_success=False)