11 from threading
import RLock, Condition
13 from nebula3.common.ttypes
import HostAddr, ErrorCode
14 from nebula3.storage.ttypes
import ScanCursor
15 from nebula3.logger
import logger
19 def __init__(self, part_id, leader: HostAddr, cursor=
None):
25 def __repr__(self) -> str:
26 return 'PartInfo: part_id: {}, leader: {}, cursor: {}, has_done: {}'.format(
32 def __init__(self, parts):
33 self.
_lock_lock = RLock()
36 if self.
_parts_parts
is None:
39 self.
_stop_stop =
False
41 def get_part(self, addr):
44 for part_id
in self.
_parts_parts.keys():
46 self.
_parts_parts[part_id].leader == addr
47 and not self.
_parts_parts[part_id].has_done
49 return self.
_parts_parts[part_id]
54 def update_part_info(self, part_info, is_finished):
59 if part_info.part_id
in self.
_parts_parts.keys():
61 self.
_parts_parts.pop(part_info.part_id)
63 self.
_parts_parts[part_info.part_id] = part_info
68 def update_part_leader(self, part_id, leader):
71 if part_id
in self.
_parts_parts.keys():
72 self.
_parts_parts[part_id].leader = leader
88 logger.debug(
"Stop the jobs")
91 self.
_stop_stop =
True
97 logger.debug(
"Reset the jobs' status ")
107 def _reset_parts_status(self):
108 for part
in self.
_parts_parts:
109 self.
_parts_parts[part].has_done =
False
116 return len(self.
_parts_parts) != 0
129 storage_connection, parts_manager, in_req, scan_vertex=True, partial_success=False
132 req = copy.deepcopy(in_req)
135 if parts_manager.is_finish():
137 part_info = parts_manager.get_part(storage_connection.storage_addr())
138 if part_info
is None:
139 parts_manager.wait_task()
142 if part_info.cursor
is not None:
143 parts = {part_info.part_id: part_info.cursor}
145 parts = {part_info.part_id: ScanCursor()}
148 logger.debug(
'Scan =====> req: {}'.format(req))
151 resp = storage_connection.scan_vertex(req)
153 resp = storage_connection.scan_edge(req)
154 logger.debug(
'Scan <==== get resp: {}'.format(resp))
155 if len(resp.result.failed_parts) != 0:
156 if resp.result.failed_parts[0].code == ErrorCode.E_LEADER_CHANGED:
157 if resp.result.failed_parts[0].leader
is None:
158 logger.error(
'Happen leader change, but the leader is None')
160 'Happen leader change, but the leader is None'
162 parts_manager.update_part_leader(
163 resp.result.failed_parts[0].part_id,
164 resp.result.failed_parts[0].leader,
167 'part_id {} has leader change, '
168 'old leader is {}, new leader is {}'.format(
170 storage_connection.storage_addr(),
171 resp.result.failed_parts[0].leader,
174 storage_connection.update_leader_info(
176 resp.result.failed_parts[0].part_id,
177 resp.result.failed_parts[0].leader,
180 error =
'Query storage: {}, part id: {} failed: {}'.format(
181 storage_connection.storage_addr(),
183 resp.result.failed_parts[0].code,
185 if not partial_success:
187 parts_manager.set_stop()
192 part_info.has_done =
True
193 cursor = parts[part_info.part_id]
194 resp_cursor = resp.cursors[part_info.part_id]
195 part_info.cursor = cursor
196 if resp_cursor.next_cursor:
197 cursor.next_cursor = resp_cursor.next_cursor
199 "Get next next_cursor: {}".format(resp_cursor.next_cursor)
204 logger.debug(
"resp.props size: {}".format(len(resp.props.rows)))
205 if len(resp.props.column_names) == 0:
207 'Part id: {} return empty column names'.format(
212 if len(resp.props.rows) == 0:
214 data_sets.append(resp.props)
216 except Exception
as e:
219 logger.error(traceback.format_exc())
220 parts_manager.set_stop()
223 parts_manager.update_part_info(part_info, is_finished)
224 return None, data_sets
def _reset_parts_status(self)