10 The client to scan vertex and edge from storage,
11 the return data is from thr graph database
17 from nebula3.storage.ttypes
import (
24 from nebula3.logger
import logger
34 DEFAULT_START_TIME = 0
35 DEFAULT_END_TIME = sys.maxsize
40 def __init__(self, meta_cache, storage_addrs=None, time_out=60000):
48 """set user and password for scan. only useful for enterprise
55 """get all connections which connect to storaged, the ScanResult use it
57 :return: list<GraphStorageConnection>
65 """close the GraphStorageClient
72 except Exception
as e:
73 logger.error(
'Close connection failed: {}'.format(e))
76 def _create_connection(self):
77 """create GraphStorageConnection
79 :return: GraphStorageConnection
84 raise RuntimeError(
'Get storage address from meta cache is empty')
90 except Exception
as e:
91 logger.error(
'Create storage connection failed: {}'.format(e))
95 """get all storage addresses that manage space
97 :param space_name: the specified space name
98 :return: list<(ip, port)>
108 start_time=DEFAULT_START_TIME,
109 end_time=DEFAULT_END_TIME,
111 only_latest_version=False,
112 enable_read_from_follower=True,
113 partial_success=False,
115 """scan vertex with the specified space_name, tag_name,
116 if the prop_names is empty, will return all properties of the tag
118 :param prop_names: if given empty, return all property
119 :param tag_name: the tag name
120 :param space_name: the space name
121 :param limit: the max vertex number from one storaged
122 :param start_time: the min version of vertex
123 :param end_time: the max version of vertex
124 :param where: now is unsupported
125 :param only_latest_version: when storage enable multi versions and only_latest_version is true,
126 only return latest version.
127 when storage disable multi versions, just use the default value.
128 :param enable_read_from_follower: if set to false, forbid follower read
129 :param partial_success: if set true, when partial success, it will continue until finish
132 part_leaders = self.
_meta_cache_meta_cache.get_part_leaders(space_name)
143 enable_read_from_follower,
154 start_time=DEFAULT_START_TIME,
155 end_time=DEFAULT_END_TIME,
157 only_latest_version=False,
158 enable_read_from_follower=True,
159 partial_success=False,
161 """scan vertex with the specified space_name, partId, tag_name,
162 if the prop_names is empty, will return all properties of the tag
164 :param prop_names: if given empty, return all property
165 :param tag_name: the tag name
167 :param space_name: the space name
168 :param limit: the max vertex number from one storaged
169 :param start_time: the min version of vertex
170 :param end_time: the max version of vertex
171 :param where: now is unsupported
172 :param only_latest_version: when storage enable multi versions and only_latest_version is true,
173 only return latest version.
174 when storage disable multi versions, just use the default value.
175 :param enable_read_from_follower: if set to false, forbid follower read
176 :param partial_success: if set true, when partial success, it will continue until finish
180 part_leaders = {part: self.
_meta_cache_meta_cache.get_part_leader(space_name, part)}
191 enable_read_from_follower,
206 enable_read_from_follower,
207 partial_success=False,
209 space_id = self.
_meta_cache_meta_cache.get_space_id(space_name)
210 tag_id = self.
_meta_cache_meta_cache.get_tag_id(space_name, tag_name)
211 vertex_prop: VertexProp = VertexProp()
212 vertex_prop.tag = tag_id
213 vertex_prop.props = [kVid]
214 for prop_name
in prop_names:
215 vertex_prop.props.append(prop_name.encode(
'utf-8'))
217 if len(prop_names) == 0:
218 schema = self.
_meta_cache_meta_cache.get_tag_schema(space_name, tag_name)
219 for col
in schema.columns:
220 vertex_prop.props.append(col.name)
223 for id
in part_leaders.keys():
224 parts[id] = ScanCursor()
227 req = ScanVertexRequest()
228 req.space_id = space_id
230 req.return_columns = [vertex_prop]
232 req.start_time = start_time
233 req.end_time = end_time
235 req.only_latest_version = only_latest_version
236 req.enable_read_from_follower = enable_read_from_follower
237 req.username = self.
useruseruser.encode(
'utf-8')
239 req.need_authenticate =
True
243 part_addrs=part_leaders,
245 partial_success=partial_success,
254 start_time=DEFAULT_START_TIME,
255 end_time=DEFAULT_END_TIME,
257 only_latest_version=False,
258 enable_read_from_follower=True,
259 partial_success=False,
261 """scan edge with the specified space_name, edge_name,
262 if the prop_names is empty, will return all properties of the edge
264 :param prop_names: if given empty, return all property
265 :param edge_name: the edge name
266 :param space_name: the space name
267 :param limit: the max vertex number from one storaged
268 :param start_time: the min version of vertex
269 :param end_time: the max version of vertex
270 :param where: now is unsupported
271 :param only_latest_version: when storage enable multi versions and only_latest_version is true,
272 only return latest version.
273 when storage disable multi versions, just use the default value.
274 :param enable_read_from_follower: if set to false, forbid follower read
275 :param partial_success: if set true, when partial success, it will continue until finish
278 part_leaders = self.
_meta_cache_meta_cache.get_part_leaders(space_name)
289 enable_read_from_follower,
300 start_time=DEFAULT_START_TIME,
301 end_time=DEFAULT_END_TIME,
303 only_latest_version=False,
304 enable_read_from_follower=True,
305 partial_success=False,
307 """scan edge with the specified space_name, partId, edge_name,
308 if the prop_names is empty, will return all properties of the edge
310 :param space_name: the space name
311 :param part: the partition num of the given space
312 :type prop_names: if given empty, return all property
313 :param edge_name: the edge name
314 :param limit: the max vertex number from one storaged
315 :param start_time: the min version of edge
316 :param end_time: the max version of edge
317 :param where: now is unsupported
318 :param only_latest_version: when storage enable multi versions and only_latest_version is true,
319 only return latest version.
320 when storage disable multi versions, just use the default value.
321 :param enable_read_from_follower: if set to false, forbid follower read
322 :param partial_success: if set true, when partial success, it will continue until finish
325 part_leaders = {part: self.
_meta_cache_meta_cache.get_part_leader(space_name, part)}
336 enable_read_from_follower,
351 enable_read_from_follower,
354 space_id = self.
_meta_cache_meta_cache.get_space_id(space_name)
355 edge_type = self.
_meta_cache_meta_cache.get_edge_type(space_name, edge_name)
356 edge_prop = EdgeProp()
357 edge_prop.type = edge_type
358 edge_prop.props = [kSrc, kType, kRank, kDst]
360 for prop_name
in prop_names:
361 edge_prop.props.append(prop_name.encode(
'utf-8'))
364 if len(prop_names) == 0:
365 schema = self.
_meta_cache_meta_cache.get_edge_schema(space_name, edge_name)
366 for col
in schema.columns:
367 edge_prop.props.append(col.name)
370 for id
in part_leaders.keys():
371 parts[id] = ScanCursor()
373 req = ScanEdgeRequest()
374 req.space_id = space_id
376 req.return_columns = [edge_prop]
378 req.start_time = start_time
379 req.end_time = end_time
381 req.only_latest_version = only_latest_version
382 req.enable_read_from_follower = enable_read_from_follower
383 req.username = self.
useruseruser.encode(
'utf-8')
385 req.need_authenticate =
True
389 part_addrs=part_leaders,
391 partial_success=partial_success,
def _create_connection(self)
def get_space_addrs(self, space_name)
def scan_vertex(self, space_name, tag_name, prop_names=[], limit=DEFAULT_LIMIT, start_time=DEFAULT_START_TIME, end_time=DEFAULT_END_TIME, where=None, only_latest_version=False, enable_read_from_follower=True, partial_success=False)
def _scan_vertex(self, space_name, part_leaders, tag_name, prop_names, limit, start_time, end_time, where, only_latest_version, enable_read_from_follower, partial_success=False)
def set_user_passwd(self, user, passwd)
def scan_edge(self, space_name, edge_name, prop_names=[], limit=DEFAULT_LIMIT, start_time=DEFAULT_START_TIME, end_time=DEFAULT_END_TIME, where=None, only_latest_version=False, enable_read_from_follower=True, partial_success=False)
def _scan_edge(self, space_name, part_leaders, edge_name, prop_names, limit, start_time, end_time, where, only_latest_version, enable_read_from_follower, partial_success)
def scan_edge_with_part(self, space_name, part, edge_name, prop_names=[], limit=DEFAULT_LIMIT, start_time=DEFAULT_START_TIME, end_time=DEFAULT_END_TIME, where=None, only_latest_version=False, enable_read_from_follower=True, partial_success=False)
def scan_vertex_with_part(self, space_name, part, tag_name, prop_names=[], limit=DEFAULT_LIMIT, start_time=DEFAULT_START_TIME, end_time=DEFAULT_END_TIME, where=None, only_latest_version=False, enable_read_from_follower=True, partial_success=False)