NebulaGraph Python Client  release-3.8
__init__.py
1 #!/usr/bin/env python
2 # --coding:utf-8--
3 
4 # Copyright (c) 2020 vesoft inc. All rights reserved.
5 #
6 # This source code is licensed under Apache 2.0 License.
7 
8 
9 import copy
10 
11 from threading import RLock, Condition
12 
13 from nebula3.common.ttypes import HostAddr, ErrorCode
14 from nebula3.storage.ttypes import ScanCursor
15 from nebula3.logger import logger
16 
17 
18 class PartInfo(object):
19  def __init__(self, part_id, leader: HostAddr, cursor=None):
20  self.part_idpart_id = part_id
21  self.leaderleader = leader
22  self.cursorcursor = cursor
23  self.has_donehas_done = False
24 
25  def __repr__(self) -> str:
26  return 'PartInfo: part_id: {}, leader: {}, cursor: {}, has_done: {}'.format(
27  self.part_idpart_id, self.leaderleader, self.cursorcursor, self.has_donehas_done
28  )
29 
30 
31 class PartManager(object):
32  def __init__(self, parts):
33  self._lock_lock = RLock()
34  self._condition_condition = Condition()
35  self._parts_parts = parts
36  if self._parts_parts is None:
37  self._parts_parts = {}
38  self._part_jobs_part_jobs = len(self._parts_parts)
39  self._stop_stop = False
40 
41  def get_part(self, addr):
42  try:
43  self._condition_condition.acquire()
44  for part_id in self._parts_parts.keys():
45  if (
46  self._parts_parts[part_id].leader == addr
47  and not self._parts_parts[part_id].has_done
48  ):
49  return self._parts_parts[part_id]
50  return None
51  finally:
52  self._condition_condition.release()
53 
54  def update_part_info(self, part_info, is_finished):
55  try:
56  self._condition_condition.acquire()
57  if self._part_jobs_part_jobs > 0:
58  self._part_jobs_part_jobs -= 1
59  if part_info.part_id in self._parts_parts.keys():
60  if is_finished:
61  self._parts_parts.pop(part_info.part_id)
62  else:
63  self._parts_parts[part_info.part_id] = part_info
64  self._condition_condition.notify_all()
65  finally:
66  self._condition_condition.release()
67 
68  def update_part_leader(self, part_id, leader):
69  try:
70  self._condition_condition.acquire()
71  if part_id in self._parts_parts.keys():
72  self._parts_parts[part_id].leader = leader
73  self._condition_condition.notify()
74  finally:
75  self._condition_condition.release()
76 
77  def is_finish(self):
78  try:
79  self._condition_condition.acquire()
80  if self._part_jobs_part_jobs == 0 or self._stop_stop:
81  return True
82  else:
83  return False
84  finally:
85  self._condition_condition.release()
86 
87  def set_stop(self):
88  logger.debug("Stop the jobs")
89  try:
90  self._condition_condition.acquire()
91  self._stop_stop = True
92  self._condition_condition.notify_all()
93  finally:
94  self._condition_condition.release()
95 
96  def reset_jobs(self):
97  logger.debug("Reset the jobs' status ")
98  try:
99  self._condition_condition.acquire()
100  if self._stop_stop:
101  return
102  self._part_jobs_part_jobs = len(self._parts_parts)
103  self._reset_parts_status_reset_parts_status()
104  finally:
105  self._condition_condition.release()
106 
107  def _reset_parts_status(self):
108  for part in self._parts_parts:
109  self._parts_parts[part].has_done = False
110 
111  def has_next(self):
112  try:
113  self._condition_condition.acquire()
114  if self._stop_stop:
115  return False
116  return len(self._parts_parts) != 0
117  finally:
118  self._condition_condition.release()
119 
120  def wait_task(self):
121  try:
122  self._condition_condition.acquire()
123  self._condition_condition.wait()
124  finally:
125  self._condition_condition.release()
126 
127 
128 def do_scan_job(
129  storage_connection, parts_manager, in_req, scan_vertex=True, partial_success=False
130 ):
131  data_sets = []
132  req = copy.deepcopy(in_req)
133  while True:
134  is_finished = False # the part without next, it means is finished
135  if parts_manager.is_finish():
136  break
137  part_info = parts_manager.get_part(storage_connection.storage_addr())
138  if part_info is None:
139  parts_manager.wait_task()
140  continue
141  else:
142  if part_info.cursor is not None:
143  parts = {part_info.part_id: part_info.cursor}
144  else:
145  parts = {part_info.part_id: ScanCursor()}
146 
147  req.parts = parts
148  logger.debug('Scan =====> req: {}'.format(req))
149  try:
150  if scan_vertex:
151  resp = storage_connection.scan_vertex(req)
152  else:
153  resp = storage_connection.scan_edge(req)
154  logger.debug('Scan <==== get resp: {}'.format(resp))
155  if len(resp.result.failed_parts) != 0:
156  if resp.result.failed_parts[0].code == ErrorCode.E_LEADER_CHANGED:
157  if resp.result.failed_parts[0].leader is None:
158  logger.error('Happen leader change, but the leader is None')
159  raise RuntimeError(
160  'Happen leader change, but the leader is None'
161  )
162  parts_manager.update_part_leader(
163  resp.result.failed_parts[0].part_id,
164  resp.result.failed_parts[0].leader,
165  )
166  logger.warning(
167  'part_id {} has leader change, '
168  'old leader is {}, new leader is {}'.format(
169  part_info.part_id,
170  storage_connection.storage_addr(),
171  resp.result.failed_parts[0].leader,
172  )
173  )
174  storage_connection.update_leader_info(
175  req.space_id,
176  resp.result.failed_parts[0].part_id,
177  resp.result.failed_parts[0].leader,
178  )
179  continue
180  error = 'Query storage: {}, part id: {} failed: {}'.format(
181  storage_connection.storage_addr(),
182  part_info.part_id,
183  resp.result.failed_parts[0].code,
184  )
185  if not partial_success:
186  logger.error(error)
187  parts_manager.set_stop()
188  return error, []
189  logger.error(error)
190  is_finished = True
191  continue
192  part_info.has_done = True
193  cursor = parts[part_info.part_id]
194  resp_cursor = resp.cursors[part_info.part_id]
195  part_info.cursor = cursor
196  if resp_cursor.next_cursor:
197  cursor.next_cursor = resp_cursor.next_cursor
198  logger.debug(
199  "Get next next_cursor: {}".format(resp_cursor.next_cursor)
200  )
201  else:
202  is_finished = True
203 
204  logger.debug("resp.props size: {}".format(len(resp.props.rows)))
205  if len(resp.props.column_names) == 0:
206  return (
207  'Part id: {} return empty column names'.format(
208  part_info.part_id
209  ),
210  None,
211  )
212  if len(resp.props.rows) == 0:
213  continue
214  data_sets.append(resp.props)
215 
216  except Exception as e:
217  import traceback
218 
219  logger.error(traceback.format_exc())
220  parts_manager.set_stop()
221  return str(e), None
222  finally:
223  parts_manager.update_part_info(part_info, is_finished)
224  return None, data_sets