NebulaGraph Python Client  release-3.4
GraphStorageClient.py
1 #!/usr/bin/env python
2 # --coding:utf-8--
3 
4 # Copyright (c) 2020 vesoft inc. All rights reserved.
5 #
6 # This source code is licensed under Apache 2.0 License.
7 
8 
9 """
10 The client to scan vertex and edge from storage,
11 the return data is from thr graph database
12 """
13 import sys
14 
15 from nebula3.sclient.ScanResult import ScanResult
16 from nebula3.sclient.net import GraphStorageConnection
17 from nebula3.storage.ttypes import (
18  ScanCursor,
19  ScanEdgeRequest,
20  ScanVertexRequest,
21  VertexProp,
22  EdgeProp,
23 )
24 from nebula3.logger import logger
25 
26 kVid = b'_vid'
27 kSrc = b'_src'
28 kType = b'_type'
29 kRank = b'_rank'
30 kDst = b'_dst'
31 
32 
33 class GraphStorageClient(object):
34  DEFAULT_START_TIME = 0
35  DEFAULT_END_TIME = sys.maxsize
36  DEFAULT_LIMIT = 1000
37 
38  def __init__(self, meta_cache, storage_addrs=None, time_out=60000):
39  self._meta_cache_meta_cache = meta_cache
40  self._storage_addrs_storage_addrs = storage_addrs
41  self._time_out_time_out = time_out
42  self._connections_connections = []
43  self._create_connection_create_connection()
44 
45  def get_conns(self):
46  """get all connections which connect to storaged, the ScanResult use it
47 
48  :return: list<GraphStorageConnection>
49  """
50  return self._connections_connections
51 
52  def __del__(self):
53  self.closeclose()
54 
55  def close(self):
56  """close the GraphStorageClient
57 
58  :return:
59  """
60  try:
61  for conn in self._connections_connections:
62  conn.close()
63  except Exception as e:
64  logger.error('Close connection failed: {}'.format(e))
65  raise
66 
67  def _create_connection(self):
68  """create GraphStorageConnection
69 
70  :return: GraphStorageConnection
71  """
72  if self._storage_addrs_storage_addrs is None:
73  self._storage_addrs_storage_addrs = self._meta_cache_meta_cache.get_all_storage_addrs()
74  if len(self._storage_addrs_storage_addrs) == 0:
75  raise RuntimeError('Get storage address from meta cache is empty')
76  try:
77  for addr in self._storage_addrs_storage_addrs:
78  conn = GraphStorageConnection(addr, self._time_out_time_out, self._meta_cache_meta_cache)
79  conn.open()
80  self._connections_connections.append(conn)
81  except Exception as e:
82  logger.error('Create storage connection failed: {}'.format(e))
83  raise
84 
85  def get_space_addrs(self, space_name):
86  """get all storage addresses that manage space
87 
88  :param space_name: the specified space name
89  :return: list<(ip, port)>
90  """
91  return self.meta_cache.get_space_addrs(space_name)
92 
94  self,
95  space_name,
96  tag_name,
97  prop_names=[],
98  limit=DEFAULT_LIMIT,
99  start_time=DEFAULT_START_TIME,
100  end_time=DEFAULT_END_TIME,
101  where=None,
102  only_latest_version=False,
103  enable_read_from_follower=True,
104  partial_success=False,
105  ):
106  """scan vertex with the specified space_name, tag_name,
107  if the prop_names is empty, will return all properties of the tag
108 
109  :param prop_names: if given empty, return all property
110  :param tag_name: the tag name
111  :param space_name: the space name
112  :param limit: the max vertex number from one storaged
113  :param start_time: the min version of vertex
114  :param end_time: the max version of vertex
115  :param where: now is unsupported
116  :param only_latest_version: when storage enable multi versions and only_latest_version is true,
117  only return latest version.
118  when storage disable multi versions, just use the default value.
119  :param enable_read_from_follower: if set to false, forbid follower read
120  :param partial_success: if set true, when partial success, it will continue until finish
121  :return: ScanResult
122  """
123  part_leaders = self._meta_cache_meta_cache.get_part_leaders(space_name)
124  return self._scan_vertex_scan_vertex(
125  space_name,
126  part_leaders,
127  tag_name,
128  prop_names,
129  limit,
130  start_time,
131  end_time,
132  where,
133  only_latest_version,
134  enable_read_from_follower,
135  partial_success,
136  )
137 
139  self,
140  space_name,
141  part,
142  tag_name,
143  prop_names=[],
144  limit=DEFAULT_LIMIT,
145  start_time=DEFAULT_START_TIME,
146  end_time=DEFAULT_END_TIME,
147  where=None,
148  only_latest_version=False,
149  enable_read_from_follower=True,
150  partial_success=False,
151  ):
152  """scan vertex with the specified space_name, partId, tag_name,
153  if the prop_names is empty, will return all properties of the tag
154 
155  :param prop_names: if given empty, return all property
156  :param tag_name: the tag name
157  :type part: part id
158  :param space_name: the space name
159  :param limit: the max vertex number from one storaged
160  :param start_time: the min version of vertex
161  :param end_time: the max version of vertex
162  :param where: now is unsupported
163  :param only_latest_version: when storage enable multi versions and only_latest_version is true,
164  only return latest version.
165  when storage disable multi versions, just use the default value.
166  :param enable_read_from_follower: if set to false, forbid follower read
167  :param partial_success: if set true, when partial success, it will continue until finish
168  :return: ScanResult
169  """
170 
171  part_leaders = {part: self._meta_cache_meta_cache.get_part_leader(space_name, part)}
172  return self._scan_vertex_scan_vertex(
173  space_name,
174  part_leaders,
175  tag_name,
176  prop_names,
177  limit,
178  start_time,
179  end_time,
180  where,
181  only_latest_version,
182  enable_read_from_follower,
183  partial_success,
184  )
185 
186  def _scan_vertex(
187  self,
188  space_name,
189  part_leaders,
190  tag_name,
191  prop_names,
192  limit,
193  start_time,
194  end_time,
195  where,
196  only_latest_version,
197  enable_read_from_follower,
198  partial_success=False,
199  ):
200  space_id = self._meta_cache_meta_cache.get_space_id(space_name)
201  tag_id = self._meta_cache_meta_cache.get_tag_id(space_name, tag_name)
202  vertex_prop: VertexProp = VertexProp()
203  vertex_prop.tag = tag_id
204  vertex_prop.props = [kVid]
205  for prop_name in prop_names:
206  vertex_prop.props.append(prop_name.encode('utf-8'))
207 
208  if len(prop_names) == 0:
209  schema = self._meta_cache_meta_cache.get_tag_schema(space_name, tag_name)
210  for col in schema.columns:
211  vertex_prop.props.append(col.name)
212 
213  parts = {}
214  for id in part_leaders.keys():
215  parts[id] = ScanCursor()
216 
217  # construct request
218  req = ScanVertexRequest()
219  req.space_id = space_id
220  req.parts = parts
221  req.return_columns = [vertex_prop]
222  req.limit = limit
223  req.start_time = start_time
224  req.end_time = end_time
225  req.filter = where
226  req.only_latest_version = only_latest_version
227  req.enable_read_from_follower = enable_read_from_follower
228  return ScanResult(
229  self,
230  req=req,
231  part_addrs=part_leaders,
232  is_vertex=True,
233  partial_success=partial_success,
234  )
235 
237  self,
238  space_name,
239  edge_name,
240  prop_names=[],
241  limit=DEFAULT_LIMIT,
242  start_time=DEFAULT_START_TIME,
243  end_time=DEFAULT_END_TIME,
244  where=None,
245  only_latest_version=False,
246  enable_read_from_follower=True,
247  partial_success=False,
248  ):
249  """scan edge with the specified space_name, edge_name,
250  if the prop_names is empty, will return all properties of the edge
251 
252  :param prop_names: if given empty, return all property
253  :param edge_name: the edge name
254  :param space_name: the space name
255  :param limit: the max vertex number from one storaged
256  :param start_time: the min version of vertex
257  :param end_time: the max version of vertex
258  :param where: now is unsupported
259  :param only_latest_version: when storage enable multi versions and only_latest_version is true,
260  only return latest version.
261  when storage disable multi versions, just use the default value.
262  :param enable_read_from_follower: if set to false, forbid follower read
263  :param partial_success: if set true, when partial success, it will continue until finish
264  :return: ScanResult
265  """
266  part_leaders = self._meta_cache_meta_cache.get_part_leaders(space_name)
267  return self._scan_edge_scan_edge(
268  space_name,
269  part_leaders,
270  edge_name,
271  prop_names,
272  limit,
273  start_time,
274  end_time,
275  where,
276  only_latest_version,
277  enable_read_from_follower,
278  partial_success,
279  )
280 
282  self,
283  space_name,
284  part,
285  edge_name,
286  prop_names=[],
287  limit=DEFAULT_LIMIT,
288  start_time=DEFAULT_START_TIME,
289  end_time=DEFAULT_END_TIME,
290  where=None,
291  only_latest_version=False,
292  enable_read_from_follower=True,
293  partial_success=False,
294  ):
295  """scan edge with the specified space_name, partId, edge_name,
296  if the prop_names is empty, will return all properties of the edge
297 
298  :param space_name: the space name
299  :param part: the partition num of the given space
300  :type prop_names: if given empty, return all property
301  :param edge_name: the edge name
302  :param limit: the max vertex number from one storaged
303  :param start_time: the min version of edge
304  :param end_time: the max version of edge
305  :param where: now is unsupported
306  :param only_latest_version: when storage enable multi versions and only_latest_version is true,
307  only return latest version.
308  when storage disable multi versions, just use the default value.
309  :param enable_read_from_follower: if set to false, forbid follower read
310  :param partial_success: if set true, when partial success, it will continue until finish
311  :return: ScanResult
312  """
313  part_leaders = {part: self._meta_cache_meta_cache.get_part_leader(space_name, part)}
314  return self._scan_edge_scan_edge(
315  space_name,
316  part_leaders,
317  edge_name,
318  prop_names,
319  limit,
320  start_time,
321  end_time,
322  where,
323  only_latest_version,
324  enable_read_from_follower,
325  partial_success,
326  )
327 
328  def _scan_edge(
329  self,
330  space_name,
331  part_leaders,
332  edge_name,
333  prop_names,
334  limit,
335  start_time,
336  end_time,
337  where,
338  only_latest_version,
339  enable_read_from_follower,
340  partial_success,
341  ):
342  space_id = self._meta_cache_meta_cache.get_space_id(space_name)
343  edge_type = self._meta_cache_meta_cache.get_edge_type(space_name, edge_name)
344  edge_prop = EdgeProp()
345  edge_prop.type = edge_type
346  edge_prop.props = [kSrc, kType, kRank, kDst]
347 
348  for prop_name in prop_names:
349  edge_prop.props.append(prop_name.encode('utf-8'))
350 
351  # When storage return column names, here need delete
352  if len(prop_names) == 0:
353  schema = self._meta_cache_meta_cache.get_edge_schema(space_name, edge_name)
354  for col in schema.columns:
355  edge_prop.props.append(col.name)
356 
357  parts = {}
358  for id in part_leaders.keys():
359  parts[id] = ScanCursor()
360 
361  req = ScanEdgeRequest()
362  req.space_id = space_id
363  req.parts = parts
364  req.return_columns = [edge_prop]
365  req.limit = limit
366  req.start_time = start_time
367  req.end_time = end_time
368  req.filter = where
369  req.only_latest_version = only_latest_version
370  req.enable_read_from_follower = enable_read_from_follower
371  return ScanResult(
372  self,
373  req=req,
374  part_addrs=part_leaders,
375  is_vertex=False,
376  partial_success=partial_success,
377  )
def scan_vertex(self, space_name, tag_name, prop_names=[], limit=DEFAULT_LIMIT, start_time=DEFAULT_START_TIME, end_time=DEFAULT_END_TIME, where=None, only_latest_version=False, enable_read_from_follower=True, partial_success=False)
def _scan_vertex(self, space_name, part_leaders, tag_name, prop_names, limit, start_time, end_time, where, only_latest_version, enable_read_from_follower, partial_success=False)
def scan_edge(self, space_name, edge_name, prop_names=[], limit=DEFAULT_LIMIT, start_time=DEFAULT_START_TIME, end_time=DEFAULT_END_TIME, where=None, only_latest_version=False, enable_read_from_follower=True, partial_success=False)
def _scan_edge(self, space_name, part_leaders, edge_name, prop_names, limit, start_time, end_time, where, only_latest_version, enable_read_from_follower, partial_success)
def scan_edge_with_part(self, space_name, part, edge_name, prop_names=[], limit=DEFAULT_LIMIT, start_time=DEFAULT_START_TIME, end_time=DEFAULT_END_TIME, where=None, only_latest_version=False, enable_read_from_follower=True, partial_success=False)
def scan_vertex_with_part(self, space_name, part, tag_name, prop_names=[], limit=DEFAULT_LIMIT, start_time=DEFAULT_START_TIME, end_time=DEFAULT_END_TIME, where=None, only_latest_version=False, enable_read_from_follower=True, partial_success=False)