14 from nebula3.logger
import logger
18 def __init__(self, data_sets, decode_type='utf-8'):
19 super().__init__(data_sets=data_sets, decode_type=decode_type, is_vertex=
True)
22 """convert the vertexes to relationships
28 for row
in data_set.rows:
30 nodes.append(vertex_data.as_node())
35 def __init__(self, data_sets: list, decode_type=
'utf-8'):
36 super().__init__(data_sets=data_sets, decode_type=decode_type, is_vertex=
False)
39 """convert the edges to relationships
41 :return: list<Relationship>
45 for row
in data_set.rows:
47 relationships.append(edge_data.as_relationship())
59 partial_success=False,
70 for part_id
in part_addrs.keys():
71 part_infos[part_id] =
PartInfo(part_id, part_addrs[part_id])
75 """whether if has data, the first call is always return True
77 :return: True of False
82 """get scan data result
84 :return: VertexResult or EdgeResult
89 raise RuntimeError(
'There is no storage connection')
90 logger.debug(
'Graph storage client num: {}'.format(num))
93 with concurrent.futures.ThreadPoolExecutor(num)
as executor:
95 for i, conn
in enumerate(conns):
96 future = executor.submit(
104 do_scan.append(future)
106 for future
in concurrent.futures.as_completed(do_scan):
107 if future.exception()
is not None:
108 logger.error(future.exception())
109 exceptions.append(future.exception())
111 ret, data_sets = future.result()
113 logger.error(
'Scan failed: {}'.format(ret))
114 exceptions.append(RuntimeError(
'Scan failed: {}'.format(ret)))
116 if len(data_sets) != 0:
117 result.extend(data_sets)
119 if len(exceptions) == 0:
121 logger.warning(
'Get empty result')
def as_relationships(self)