NebulaGraph Python Client  release-3.8
1 #!/usr/bin/env python
2 # --coding:utf-8--
4 # Copyright (c) 2020 vesoft inc. All rights reserved.
5 #
6 # This source code is licensed under Apache 2.0 License.
9 import copy
11 from threading import RLock, Condition
13 from nebula3.common.ttypes import HostAddr, ErrorCode
14 from import ScanCursor
15 from nebula3.logger import logger
18 class PartInfo(object):
19  def __init__(self, part_id, leader: HostAddr, cursor=None):
20  self.part_idpart_id = part_id
21  self.leaderleader = leader
22  self.cursorcursor = cursor
23  self.has_donehas_done = False
25  def __repr__(self) -> str:
26  return 'PartInfo: part_id: {}, leader: {}, cursor: {}, has_done: {}'.format(
27  self.part_idpart_id, self.leaderleader, self.cursorcursor, self.has_donehas_done
28  )
31 class PartManager(object):
32  def __init__(self, parts):
33  self._lock_lock = RLock()
34  self._condition_condition = Condition()
35  self._parts_parts = parts
36  if self._parts_parts is None:
37  self._parts_parts = {}
38  self._part_jobs_part_jobs = len(self._parts_parts)
39  self._stop_stop = False
41  def get_part(self, addr):
42  try:
43  self._condition_condition.acquire()
44  for part_id in self._parts_parts.keys():
45  if (
46  self._parts_parts[part_id].leader == addr
47  and not self._parts_parts[part_id].has_done
48  ):
49  return self._parts_parts[part_id]
50  return None
51  finally:
52  self._condition_condition.release()
54  def update_part_info(self, part_info, is_finished):
55  try:
56  self._condition_condition.acquire()
57  if self._part_jobs_part_jobs > 0:
58  self._part_jobs_part_jobs -= 1
59  if part_info.part_id in self._parts_parts.keys():
60  if is_finished:
61  self._parts_parts.pop(part_info.part_id)
62  else:
63  self._parts_parts[part_info.part_id] = part_info
64  self._condition_condition.notify_all()
65  finally:
66  self._condition_condition.release()
68  def update_part_leader(self, part_id, leader):
69  try:
70  self._condition_condition.acquire()
71  if part_id in self._parts_parts.keys():
72  self._parts_parts[part_id].leader = leader
73  self._condition_condition.notify()
74  finally:
75  self._condition_condition.release()
77  def is_finish(self):
78  try:
79  self._condition_condition.acquire()
80  if self._part_jobs_part_jobs == 0 or self._stop_stop:
81  return True
82  else:
83  return False
84  finally:
85  self._condition_condition.release()
87  def set_stop(self):
88  logger.debug("Stop the jobs")
89  try:
90  self._condition_condition.acquire()
91  self._stop_stop = True
92  self._condition_condition.notify_all()
93  finally:
94  self._condition_condition.release()
96  def reset_jobs(self):
97  logger.debug("Reset the jobs' status ")
98  try:
99  self._condition_condition.acquire()
100  if self._stop_stop:
101  return
102  self._part_jobs_part_jobs = len(self._parts_parts)
103  self._reset_parts_status_reset_parts_status()
104  finally:
105  self._condition_condition.release()
107  def _reset_parts_status(self):
108  for part in self._parts_parts:
109  self._parts_parts[part].has_done = False
111  def has_next(self):
112  try:
113  self._condition_condition.acquire()
114  if self._stop_stop:
115  return False
116  return len(self._parts_parts) != 0
117  finally:
118  self._condition_condition.release()
120  def wait_task(self):
121  try:
122  self._condition_condition.acquire()
123  self._condition_condition.wait()
124  finally:
125  self._condition_condition.release()
128 def do_scan_job(
129  storage_connection, parts_manager, in_req, scan_vertex=True, partial_success=False
130 ):
131  data_sets = []
132  req = copy.deepcopy(in_req)
133  while True:
134  is_finished = False # the part without next, it means is finished
135  if parts_manager.is_finish():
136  break
137  part_info = parts_manager.get_part(storage_connection.storage_addr())
138  if part_info is None:
139  parts_manager.wait_task()
140  continue
141  else:
142  if part_info.cursor is not None:
143  parts = {part_info.part_id: part_info.cursor}
144  else:
145  parts = {part_info.part_id: ScanCursor()}
147 = parts
148  logger.debug('Scan =====> req: {}'.format(req))
149  try:
150  if scan_vertex:
151  resp = storage_connection.scan_vertex(req)
152  else:
153  resp = storage_connection.scan_edge(req)
154  logger.debug('Scan <==== get resp: {}'.format(resp))
155  if len(resp.result.failed_parts) != 0:
156  if resp.result.failed_parts[0].code == ErrorCode.E_LEADER_CHANGED:
157  if resp.result.failed_parts[0].leader is None:
158  logger.error('Happen leader change, but the leader is None')
159  raise RuntimeError(
160  'Happen leader change, but the leader is None'
161  )
162  parts_manager.update_part_leader(
163  resp.result.failed_parts[0].part_id,
164  resp.result.failed_parts[0].leader,
165  )
166  logger.warning(
167  'part_id {} has leader change, '
168  'old leader is {}, new leader is {}'.format(
169  part_info.part_id,
170  storage_connection.storage_addr(),
171  resp.result.failed_parts[0].leader,
172  )
173  )
174  storage_connection.update_leader_info(
175  req.space_id,
176  resp.result.failed_parts[0].part_id,
177  resp.result.failed_parts[0].leader,
178  )
179  continue
180  error = 'Query storage: {}, part id: {} failed: {}'.format(
181  storage_connection.storage_addr(),
182  part_info.part_id,
183  resp.result.failed_parts[0].code,
184  )
185  if not partial_success:
186  logger.error(error)
187  parts_manager.set_stop()
188  return error, []
189  logger.error(error)
190  is_finished = True
191  continue
192  part_info.has_done = True
193  cursor = parts[part_info.part_id]
194  resp_cursor = resp.cursors[part_info.part_id]
195  part_info.cursor = cursor
196  if resp_cursor.next_cursor:
197  cursor.next_cursor = resp_cursor.next_cursor
198  logger.debug(
199  "Get next next_cursor: {}".format(resp_cursor.next_cursor)
200  )
201  else:
202  is_finished = True
204  logger.debug("resp.props size: {}".format(len(resp.props.rows)))
205  if len(resp.props.column_names) == 0:
206  return (
207  'Part id: {} return empty column names'.format(
208  part_info.part_id
209  ),
210  None,
211  )
212  if len(resp.props.rows) == 0:
213  continue
214  data_sets.append(resp.props)
216  except Exception as e:
217  import traceback
219  logger.error(traceback.format_exc())
220  parts_manager.set_stop()
221  return str(e), None
222  finally:
223  parts_manager.update_part_info(part_info, is_finished)
224  return None, data_sets