10 The client to scan vertex and edge from storage,
11 the return data is from thr graph database
17 from nebula3.storage.ttypes
import (
24 from nebula3.logger
import logger
34 DEFAULT_START_TIME = 0
35 DEFAULT_END_TIME = sys.maxsize
38 def __init__(self, meta_cache, storage_addrs=None, time_out=60000):
46 """get all connections which connect to storaged, the ScanResult use it
48 :return: list<GraphStorageConnection>
56 """close the GraphStorageClient
63 except Exception
as e:
64 logger.error(
'Close connection failed: {}'.format(e))
67 def _create_connection(self):
68 """create GraphStorageConnection
70 :return: GraphStorageConnection
75 raise RuntimeError(
'Get storage address from meta cache is empty')
81 except Exception
as e:
82 logger.error(
'Create storage connection failed: {}'.format(e))
86 """get all storage addresses that manage space
88 :param space_name: the specified space name
89 :return: list<(ip, port)>
99 start_time=DEFAULT_START_TIME,
100 end_time=DEFAULT_END_TIME,
102 only_latest_version=False,
103 enable_read_from_follower=True,
104 partial_success=False,
106 """scan vertex with the specified space_name, tag_name,
107 if the prop_names is empty, will return all properties of the tag
109 :param prop_names: if given empty, return all property
110 :param tag_name: the tag name
111 :param space_name: the space name
112 :param limit: the max vertex number from one storaged
113 :param start_time: the min version of vertex
114 :param end_time: the max version of vertex
115 :param where: now is unsupported
116 :param only_latest_version: when storage enable multi versions and only_latest_version is true,
117 only return latest version.
118 when storage disable multi versions, just use the default value.
119 :param enable_read_from_follower: if set to false, forbid follower read
120 :param partial_success: if set true, when partial success, it will continue until finish
123 part_leaders = self.
_meta_cache_meta_cache.get_part_leaders(space_name)
134 enable_read_from_follower,
145 start_time=DEFAULT_START_TIME,
146 end_time=DEFAULT_END_TIME,
148 only_latest_version=False,
149 enable_read_from_follower=True,
150 partial_success=False,
152 """scan vertex with the specified space_name, partId, tag_name,
153 if the prop_names is empty, will return all properties of the tag
155 :param prop_names: if given empty, return all property
156 :param tag_name: the tag name
158 :param space_name: the space name
159 :param limit: the max vertex number from one storaged
160 :param start_time: the min version of vertex
161 :param end_time: the max version of vertex
162 :param where: now is unsupported
163 :param only_latest_version: when storage enable multi versions and only_latest_version is true,
164 only return latest version.
165 when storage disable multi versions, just use the default value.
166 :param enable_read_from_follower: if set to false, forbid follower read
167 :param partial_success: if set true, when partial success, it will continue until finish
171 part_leaders = {part: self.
_meta_cache_meta_cache.get_part_leader(space_name, part)}
182 enable_read_from_follower,
197 enable_read_from_follower,
198 partial_success=False,
200 space_id = self.
_meta_cache_meta_cache.get_space_id(space_name)
201 tag_id = self.
_meta_cache_meta_cache.get_tag_id(space_name, tag_name)
202 vertex_prop: VertexProp = VertexProp()
203 vertex_prop.tag = tag_id
204 vertex_prop.props = [kVid]
205 for prop_name
in prop_names:
206 vertex_prop.props.append(prop_name.encode(
'utf-8'))
208 if len(prop_names) == 0:
209 schema = self.
_meta_cache_meta_cache.get_tag_schema(space_name, tag_name)
210 for col
in schema.columns:
211 vertex_prop.props.append(col.name)
214 for id
in part_leaders.keys():
215 parts[id] = ScanCursor()
218 req = ScanVertexRequest()
219 req.space_id = space_id
221 req.return_columns = [vertex_prop]
223 req.start_time = start_time
224 req.end_time = end_time
226 req.only_latest_version = only_latest_version
227 req.enable_read_from_follower = enable_read_from_follower
231 part_addrs=part_leaders,
233 partial_success=partial_success,
242 start_time=DEFAULT_START_TIME,
243 end_time=DEFAULT_END_TIME,
245 only_latest_version=False,
246 enable_read_from_follower=True,
247 partial_success=False,
249 """scan edge with the specified space_name, edge_name,
250 if the prop_names is empty, will return all properties of the edge
252 :param prop_names: if given empty, return all property
253 :param edge_name: the edge name
254 :param space_name: the space name
255 :param limit: the max vertex number from one storaged
256 :param start_time: the min version of vertex
257 :param end_time: the max version of vertex
258 :param where: now is unsupported
259 :param only_latest_version: when storage enable multi versions and only_latest_version is true,
260 only return latest version.
261 when storage disable multi versions, just use the default value.
262 :param enable_read_from_follower: if set to false, forbid follower read
263 :param partial_success: if set true, when partial success, it will continue until finish
266 part_leaders = self.
_meta_cache_meta_cache.get_part_leaders(space_name)
277 enable_read_from_follower,
288 start_time=DEFAULT_START_TIME,
289 end_time=DEFAULT_END_TIME,
291 only_latest_version=False,
292 enable_read_from_follower=True,
293 partial_success=False,
295 """scan edge with the specified space_name, partId, edge_name,
296 if the prop_names is empty, will return all properties of the edge
298 :param space_name: the space name
299 :param part: the partition num of the given space
300 :type prop_names: if given empty, return all property
301 :param edge_name: the edge name
302 :param limit: the max vertex number from one storaged
303 :param start_time: the min version of edge
304 :param end_time: the max version of edge
305 :param where: now is unsupported
306 :param only_latest_version: when storage enable multi versions and only_latest_version is true,
307 only return latest version.
308 when storage disable multi versions, just use the default value.
309 :param enable_read_from_follower: if set to false, forbid follower read
310 :param partial_success: if set true, when partial success, it will continue until finish
313 part_leaders = {part: self.
_meta_cache_meta_cache.get_part_leader(space_name, part)}
324 enable_read_from_follower,
339 enable_read_from_follower,
342 space_id = self.
_meta_cache_meta_cache.get_space_id(space_name)
343 edge_type = self.
_meta_cache_meta_cache.get_edge_type(space_name, edge_name)
344 edge_prop = EdgeProp()
345 edge_prop.type = edge_type
346 edge_prop.props = [kSrc, kType, kRank, kDst]
348 for prop_name
in prop_names:
349 edge_prop.props.append(prop_name.encode(
'utf-8'))
352 if len(prop_names) == 0:
353 schema = self.
_meta_cache_meta_cache.get_edge_schema(space_name, edge_name)
354 for col
in schema.columns:
355 edge_prop.props.append(col.name)
358 for id
in part_leaders.keys():
359 parts[id] = ScanCursor()
361 req = ScanEdgeRequest()
362 req.space_id = space_id
364 req.return_columns = [edge_prop]
366 req.start_time = start_time
367 req.end_time = end_time
369 req.only_latest_version = only_latest_version
370 req.enable_read_from_follower = enable_read_from_follower
374 part_addrs=part_leaders,
376 partial_success=partial_success,
def _create_connection(self)
def get_space_addrs(self, space_name)
def scan_vertex(self, space_name, tag_name, prop_names=[], limit=DEFAULT_LIMIT, start_time=DEFAULT_START_TIME, end_time=DEFAULT_END_TIME, where=None, only_latest_version=False, enable_read_from_follower=True, partial_success=False)
def _scan_vertex(self, space_name, part_leaders, tag_name, prop_names, limit, start_time, end_time, where, only_latest_version, enable_read_from_follower, partial_success=False)
def scan_edge(self, space_name, edge_name, prop_names=[], limit=DEFAULT_LIMIT, start_time=DEFAULT_START_TIME, end_time=DEFAULT_END_TIME, where=None, only_latest_version=False, enable_read_from_follower=True, partial_success=False)
def _scan_edge(self, space_name, part_leaders, edge_name, prop_names, limit, start_time, end_time, where, only_latest_version, enable_read_from_follower, partial_success)
def scan_edge_with_part(self, space_name, part, edge_name, prop_names=[], limit=DEFAULT_LIMIT, start_time=DEFAULT_START_TIME, end_time=DEFAULT_END_TIME, where=None, only_latest_version=False, enable_read_from_follower=True, partial_success=False)
def scan_vertex_with_part(self, space_name, part, tag_name, prop_names=[], limit=DEFAULT_LIMIT, start_time=DEFAULT_START_TIME, end_time=DEFAULT_END_TIME, where=None, only_latest_version=False, enable_read_from_follower=True, partial_success=False)