NebulaGraph Python Client  release-3.8
ScanResult.py
1 #!/usr/bin/env python
2 # --coding:utf-8--
3 
4 # Copyright (c) 2020 vesoft inc. All rights reserved.
5 #
6 # This source code is licensed under Apache 2.0 License.
7 
8 
9 import concurrent
10 
11 from nebula3.sclient import PartManager, do_scan_job, PartInfo
12 
13 from nebula3.sclient.BaseResult import BaseResult, VertexData, EdgeData
14 from nebula3.logger import logger
15 
16 
18  def __init__(self, data_sets, decode_type='utf-8'):
19  super().__init__(data_sets=data_sets, decode_type=decode_type, is_vertex=True)
20 
21  def as_nodes(self):
22  """convert the vertexes to relationships
23 
24  :return: list<Node>
25  """
26  nodes = []
27  for data_set in self._data_sets_data_sets:
28  for row in data_set.rows:
29  vertex_data = VertexData(row, data_set.column_names, self._decode_type_decode_type)
30  nodes.append(vertex_data.as_node())
31  return nodes
32 
33 
35  def __init__(self, data_sets: list, decode_type='utf-8'):
36  super().__init__(data_sets=data_sets, decode_type=decode_type, is_vertex=False)
37 
38  def as_relationships(self):
39  """convert the edges to relationships
40 
41  :return: list<Relationship>
42  """
43  relationships = []
44  for data_set in self._data_sets_data_sets:
45  for row in data_set.rows:
46  edge_data = EdgeData(row, data_set.column_names, self._decode_type_decode_type)
47  relationships.append(edge_data.as_relationship())
48  return relationships
49 
50 
51 class ScanResult(object):
52  """the scan result"""
53 
54  def __init__(
55  self,
56  graph_storage_client,
57  req,
58  part_addrs,
59  partial_success=False,
60  is_vertex=True,
61  decode_type='utf-8',
62  ):
63  self._is_vertex_is_vertex = is_vertex
64  self._decode_type_decode_type = decode_type
65  self._data_sets_data_sets = []
66  self._graph_storage_client_graph_storage_client = graph_storage_client
67  self._partial_success_partial_success = partial_success
68  self._req_req = req
69  part_infos = {}
70  for part_id in part_addrs.keys():
71  part_infos[part_id] = PartInfo(part_id, part_addrs[part_id])
72  self._parts_manager_parts_manager = PartManager(part_infos)
73 
74  def has_next(self):
75  """whether if has data, the first call is always return True
76 
77  :return: True of False
78  """
79  return self._parts_manager_parts_manager.has_next()
80 
81  def next(self):
82  """get scan data result
83 
84  :return: VertexResult or EdgeResult
85  """
86  conns = self._graph_storage_client_graph_storage_client.get_conns()
87  num = len(conns)
88  if num == 0:
89  raise RuntimeError('There is no storage connection')
90  logger.debug('Graph storage client num: {}'.format(num))
91  exceptions = []
92  result = []
93  with concurrent.futures.ThreadPoolExecutor(num) as executor:
94  do_scan = []
95  for i, conn in enumerate(conns):
96  future = executor.submit(
97  do_scan_job,
98  conns[i],
99  self._parts_manager_parts_manager,
100  self._req_req,
101  self._is_vertex_is_vertex,
102  self._partial_success_partial_success,
103  )
104  do_scan.append(future)
105 
106  for future in concurrent.futures.as_completed(do_scan):
107  if future.exception() is not None:
108  logger.error(future.exception())
109  exceptions.append(future.exception())
110  else:
111  ret, data_sets = future.result()
112  if ret is not None:
113  logger.error('Scan failed: {}'.format(ret))
114  exceptions.append(RuntimeError('Scan failed: {}'.format(ret)))
115  continue
116  if len(data_sets) != 0:
117  result.extend(data_sets)
118  self._parts_manager_parts_manager.reset_jobs()
119  if len(exceptions) == 0:
120  if len(result) == 0:
121  logger.warning('Get empty result')
122  return None
123  else:
124  if self._is_vertex_is_vertex:
125  return VertexResult(result, self._decode_type_decode_type)
126  else:
127  return EdgeResult(result, self._decode_type_decode_type)
128  else:
129  raise exceptions[0]