11 from _thread
import RLock
12 from nebula3.Exception
import (
14 PartNotFoundException,
15 SpaceNotFoundException,
17 EdgeNotFoundException,
19 from nebula3.common.ttypes
import HostAddr, ErrorCode
20 from nebula3.meta.ttypes
import (
29 from nebula3.meta
import ttypes, MetaService
31 from nebula3.fbthrift.transport
import TSocket, TTransport
32 from nebula3.fbthrift.protocol
import TBinaryProtocol
33 from nebula3.logger
import logger
37 def __init__(self, addresses, timeout):
38 if len(addresses) == 0:
39 raise RuntimeError(
'Input empty addresses')
44 for address
in addresses:
46 socket.gethostbyname(address[0])
48 raise InValidHostname(str(address[0]))
50 self.
_lock_lock = RLock()
53 """open the connection to connect meta service
59 s = TSocket.TSocket(self.
_leader_leader[0], self.
_leader_leader[1])
62 transport = TTransport.TBufferedTransport(s)
63 protocol = TBinaryProtocol.TBinaryProtocol(transport)
65 self.
_connection_connection = MetaService.Client(protocol)
70 """get all version tags
72 :param space_id: the specified space id
77 raise RuntimeError(
'The connection is no open')
79 req.space_id = space_id
83 if resp.code != ErrorCode.SUCCEEDED:
84 if resp.code == ErrorCode.E_LEADER_CHANGED:
89 "List tags from space id:{} failed, error code: {}".format(
90 space_id, ErrorCode._VALUES_TO_NAMES.get(resp.code)
95 "List tags from space id:{} failed, error code: {}".format(
96 space_id, ErrorCode._VALUES_TO_NAMES.get(resp.code)
101 """get all version edge
103 :param space_id: the specified space id
104 :return: list<EdgeItem>
106 with self.
_lock_lock:
108 raise RuntimeError(
'The connection is no open')
110 req.space_id = space_id
114 if resp.code != ErrorCode.SUCCEEDED:
115 if resp.code == ErrorCode.E_LEADER_CHANGED:
120 "List edges from space id:{} failed, error code: {}".format(
121 space_id, ErrorCode._VALUES_TO_NAMES.get(resp.code)
126 "List edges from space id:{} failed, error code: {}".format(
127 space_id, ErrorCode._VALUES_TO_NAMES.get(resp.code)
132 """get all spaces info
136 with self.
_lock_lock:
138 raise RuntimeError(
'The connection is no open')
139 req = ListSpacesReq()
143 if resp.code != ErrorCode.SUCCEEDED:
144 if resp.code == ErrorCode.E_LEADER_CHANGED:
149 "List spaces failed, error code: {}".format(
150 ErrorCode._VALUES_TO_NAMES.get(resp.code)
155 "List spaces failed, error code: {}".format(
156 ErrorCode._VALUES_TO_NAMES.get(resp.code)
161 """get all online hosts info
163 :eturn: list<HostItem>
165 with self.
_lock_lock:
167 raise RuntimeError(
'The connection is no open')
169 req.role = HostRole.STORAGE
173 if resp.code != ErrorCode.SUCCEEDED:
174 if resp.code == ErrorCode.E_LEADER_CHANGED:
179 "List spaces failed, error code: {}".format(
180 ErrorCode._VALUES_TO_NAMES.get(resp.code)
184 for host
in resp.hosts:
185 if host.status == HostStatus.ONLINE:
186 valid_hosts.append(host)
189 "List spaces failed, error code: {}".format(
190 ErrorCode._VALUES_TO_NAMES.get(resp.code)
195 """get all parts info of the specified space id
198 :eturn: map<PartitionID, list<HostAddr>>
200 with self.
_lock_lock:
202 raise RuntimeError(
'The connection is no open')
203 req = GetPartsAllocReq()
204 req.space_id = space_id
207 resp = self.
_connection_connection.getPartsAlloc(req)
208 if resp.code != ErrorCode.SUCCEEDED:
209 if resp.code == ErrorCode.E_LEADER_CHANGED:
214 "List parts from space id:{} failed, error code: {}".format(
215 space_id, ErrorCode._VALUES_TO_NAMES.get(resp.code)
220 "List parts from space id:{} failed, error code: {}".format(
221 space_id, ErrorCode._VALUES_TO_NAMES.get(resp.code)
226 """close the connection
237 """update the leader meta info when happen leader change
239 :param leader: the address of meta leader
243 self.
_leader_leader = (leader.host, leader.port)
245 except Exception
as e:
262 return 'space_id: {}, space_name: {}, tag_items: {}, edge_items: {}, parts_alloc: {}'.format(
270 def __init__(self, meta_addrs, timeout=2000, load_period=10, decode_type='utf-8'):
271 self._decode_type = decode_type
272 self._load_period = load_period
274 self._space_caches = {}
275 self._space_id_names = {}
276 self._storage_addrs = []
277 self._storage_leader = {}
279 self._meta_client =
MetaClient(meta_addrs, timeout)
280 self._meta_client.open()
286 """close the metaClient
300 """load all space info and schema info from meta services
309 space_id = space.id.get_space_id()
311 space_cache.space_id = space_id
312 space_cache.space_name = space.name.decode(
'utf-8')
313 space_id_names[space_id] = space_cache.space_name
314 tags = self.
_meta_client_meta_client.list_tags(space_id)
315 edges = self.
_meta_client_meta_client.list_edges(space_id)
316 parts_alloc = self.
_meta_client_meta_client.get_parts_alloc(space_id)
318 tag_name = tag.tag_name.decode(self.
_decode_type_decode_type)
319 if tag_name
not in space_cache.tag_items.keys():
320 space_cache.tag_items[tag_name] = tag
322 if space_cache.tag_items[tag_name].version < tag.version:
323 space_cache.tag_items[tag_name] = tag
325 edge_name = edge.edge_name.decode(self.
_decode_type_decode_type)
326 if edge_name
not in space_cache.edge_items.keys():
327 space_cache.edge_items[edge_name] = edge
329 if space_cache.edge_items[edge_name].version < edge.version:
330 space_cache.edge_items[edge_name] = edge
331 space_cache.edge_items[
334 space_cache.parts_alloc = parts_alloc
335 space_caches[space.name.decode(self.
_decode_type_decode_type)] = space_cache
339 for host_item
in hosts:
340 storage_addrs.append(host_item.hostAddr)
342 with self.
_lock_lock:
349 parts_alloc = self.
_space_caches_space_caches[space_name].parts_alloc
351 for part_id
in parts_alloc:
352 self.
_storage_leader_storage_leader[space_name][part_id] = parts_alloc[
355 except Exception
as x:
356 logger.error(
'Update meta data failed: {}'.format(x))
359 logger.error(traceback.format_exc())
362 """get all storage address
364 :return: list[HostAddr]
375 with self.
_lock_lock:
376 tag_item = self.
_get_tag_item_get_tag_item(space_name, tag_name)
377 return tag_item.tag_id
386 with self.
_lock_lock:
387 edge_item = self.
_get_edge_item_get_edge_item(space_name, edge_name)
388 return edge_item.edge_type
396 with self.
_lock_lock:
400 raise SpaceNotFoundException(space_name)
410 tag_item = self.
_get_tag_item_get_tag_item(space_name, tag_name)
411 return tag_item.schema
420 edge_item = self.
_get_edge_item_get_edge_item(space_name, edge_name)
421 return edge_item.schema
428 :return: storage ip port: HostAddr
431 if part_id
not in part_leaders.keys():
432 raise PartNotFoundException(part_id)
433 return part_leaders[part_id]
436 """get all part leader info of the space
438 :param space_name: space name
439 :eturn: map<PartitionID, HostAddr>
441 with self.
_lock_lock:
445 raise SpaceNotFoundException(space_name)
449 """get all part info of the space
451 :param space_name: space name
452 :eturn: map<PartitionID, list<HostAddr>>
454 with self.
_lock_lock:
458 raise SpaceNotFoundException(space_name)
459 return self.
_space_caches_space_caches[space_name].parts_alloc
461 def _get_tag_item(self, space_name, tag_name):
462 with self.
_lock_lock:
466 raise SpaceNotFoundException(space_name)
468 if tag_name
not in space_info.tag_items.keys():
470 if tag_name
not in space_info.tag_items.keys():
471 raise TagNotFoundException(tag_name)
472 return space_info.tag_items[tag_name]
474 def _get_edge_item(self, space_name, edge_name):
475 with self.
_lock_lock:
479 raise SpaceNotFoundException(space_name)
481 if edge_name
not in space_info.edge_items.keys():
483 if edge_name
not in space_info.edge_items.keys():
484 raise EdgeNotFoundException(edge_name)
485 return space_info.edge_items[edge_name]
488 """if the storage leader change, storage client need to call this function
492 :param address: HostAddr, if the address is None, it means the leader can't connect,
493 choose the peer as leader
496 with self.
_lock_lock:
498 logger.error(
"Space name:{} is not found".format(space_id))
501 if part_id
not in self.
_storage_leader_storage_leader[space_name].keys():
502 logger.error(
"part_id:{} is not found".format(space_name))
504 if address
is not None:
507 part_addresses = self.
_space_caches_space_caches[space_name].parts_alloc.get(part_id)
508 for part_addr
in part_addresses:
509 if part_addr == address: