6 package com.vesoft.nebula.client.meta;
8 import com.facebook.thrift.TException;
9 import com.google.common.collect.Maps;
10 import com.vesoft.nebula.HostAddr;
11 import com.vesoft.nebula.client.graph.data.HostAddress;
12 import com.vesoft.nebula.client.graph.data.SSLParam;
13 import com.vesoft.nebula.client.graph.exception.ClientServerIncompatibleException;
14 import com.vesoft.nebula.client.meta.exception.ExecuteFailedException;
15 import com.vesoft.nebula.meta.EdgeItem;
16 import com.vesoft.nebula.meta.HostItem;
17 import com.vesoft.nebula.meta.IdName;
18 import com.vesoft.nebula.meta.SpaceItem;
19 import com.vesoft.nebula.meta.TagItem;
20 import com.vesoft.nebula.util.NetUtil;
21 import java.io.Serializable;
22 import java.net.UnknownHostException;
23 import java.util.ArrayList;
24 import java.util.HashMap;
25 import java.util.HashSet;
26 import java.util.List;
29 import java.util.concurrent.ConcurrentHashMap;
30 import java.util.concurrent.locks.ReentrantReadWriteLock;
31 import java.util.stream.Collectors;
32 import org.apache.commons.codec.Charsets;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
40 private class SpaceInfo {
41 private SpaceItem spaceItem =
null;
42 private Map<String, TagItem> tagItems =
new HashMap<>();
43 private Map<Integer, String> tagIdNames =
new HashMap<>();
44 private Map<String, EdgeItem> edgeItems =
new HashMap<>();
45 private Map<Integer, String> edgeTypeNames =
new HashMap<>();
46 private Map<Integer, List<HostAddr>> partsAlloc =
new HashMap<>();
50 private Map<String, Map<Integer, HostAddr>> partLeaders =
null;
51 private Map<HostAddr, HostAddr> storageAddressMapping =
new ConcurrentHashMap<>();
53 private static final Logger LOGGER = LoggerFactory.getLogger(
MetaManager.class);
56 private final ReentrantReadWriteLock lock =
new ReentrantReadWriteLock();
58 private static final int DEFAULT_TIMEOUT_MS = 1000;
59 private static final int DEFAULT_CONNECTION_RETRY_SIZE = 3;
60 private static final int DEFAULT_EXECUTION_RETRY_SIZE = 3;
75 public MetaManager(List<HostAddress> address,
int timeout,
int connectionRetry,
76 int executionRetry,
boolean enableSSL,
SSLParam sslParam)
78 metaClient =
new MetaClient(address, timeout, connectionRetry, executionRetry, enableSSL,
91 if (sourceAddr !=
null && targetAddr !=
null) {
92 storageAddressMapping.put(NetUtil.parseHostAddr(sourceAddr),
93 NetUtil.parseHostAddr(targetAddr));
103 if (addressMap !=
null && !addressMap.isEmpty()) {
104 for (Map.Entry<String, String> et : addressMap.entrySet()) {
105 storageAddressMapping.put(NetUtil.parseHostAddr(et.getKey()),
106 NetUtil.parseHostAddr(et.getValue()));
122 private void fillMetaInfo() {
125 List<IdName> spaces = metaClient.
getSpaces();
126 for (IdName space : spaces) {
127 SpaceInfo spaceInfo =
new SpaceInfo();
128 String spaceName =
new String(space.name);
129 SpaceItem spaceItem = metaClient.
getSpace(spaceName);
130 spaceInfo.spaceItem = spaceItem;
131 List<TagItem> tags = metaClient.
getTags(spaceName);
132 for (TagItem tag : tags) {
133 String tagName =
new String(tag.tag_name);
134 if (!spaceInfo.tagItems.containsKey(tagName)
135 || spaceInfo.tagItems.get(tagName).getVersion() < tag.getVersion()) {
136 spaceInfo.tagItems.put(tagName, tag);
137 spaceInfo.tagIdNames.put(tag.tag_id, tagName);
140 List<EdgeItem> edges = metaClient.
getEdges(spaceName);
141 for (EdgeItem edge : edges) {
142 String edgeName =
new String(edge.edge_name);
143 if (!spaceInfo.edgeItems.containsKey(edgeName)
144 || spaceInfo.edgeItems.get(edgeName).getVersion() < edge.getVersion()) {
145 spaceInfo.edgeItems.put(edgeName, edge);
146 spaceInfo.edgeTypeNames.put(edge.edge_type, edgeName);
150 tempSpacesInfo.put(spaceName, spaceInfo);
153 lock.writeLock().lock();
154 spacesInfo = tempSpacesInfo;
155 if (partLeaders ==
null) {
156 partLeaders =
new HashMap<>();
159 HostAddr leader = hostItem.getHostAddr();
160 for (Map.Entry<
byte[], List<Integer>> spaceParts
161 : hostItem.getLeader_parts().entrySet()) {
162 String space =
new String(spaceParts.getKey(), Charsets.UTF_8);
163 if (!partLeaders.containsKey(space)) {
164 partLeaders.put(space, Maps.newConcurrentMap());
166 for (
int part : spaceParts.getValue()) {
167 partLeaders.get(space).put(part, leader);
172 lock.writeLock().unlock();
174 }
catch (TException | ExecuteFailedException e) {
175 LOGGER.error(e.getMessage());
186 public int getSpaceId(String spaceName)
throws IllegalArgumentException {
187 return getSpace(spaceName).space_id;
197 public SpaceItem
getSpace(String spaceName)
throws IllegalArgumentException {
198 if (!spacesInfo.containsKey(spaceName)) {
202 lock.readLock().lock();
203 if (!spacesInfo.containsKey(spaceName)) {
204 throw new IllegalArgumentException(
"space:" + spaceName +
" does not exist.");
206 return spacesInfo.get(spaceName).spaceItem;
208 lock.readLock().unlock();
219 public int getTagId(String spaceName, String tagName)
throws IllegalArgumentException {
220 return getTag(spaceName, tagName).tag_id;
231 public TagItem
getTag(String spaceName, String tagName)
throws IllegalArgumentException {
232 if (!spacesInfo.containsKey(spaceName)
233 || !spacesInfo.get(spaceName).tagItems.containsKey(tagName)) {
237 lock.readLock().lock();
238 if (!spacesInfo.containsKey(spaceName)) {
239 throw new IllegalArgumentException(
"Space:" + spaceName +
" does not exist.");
241 if (!spacesInfo.get(spaceName).tagItems.containsKey(tagName)) {
242 throw new IllegalArgumentException(
"Tag:" + tagName +
" does not exist.");
244 return spacesInfo.get(spaceName).tagItems.get(tagName);
246 lock.readLock().unlock();
258 public int getEdgeType(String spaceName, String edgeName)
throws IllegalArgumentException {
259 return getEdge(spaceName, edgeName).edge_type;
270 public EdgeItem
getEdge(String spaceName, String edgeName)
throws IllegalArgumentException {
271 if (!spacesInfo.containsKey(spaceName)
272 || !spacesInfo.get(spaceName).edgeItems.containsKey(edgeName)) {
276 lock.readLock().lock();
277 if (!spacesInfo.containsKey(spaceName)) {
278 throw new IllegalArgumentException(
"Space:" + spaceName +
" does not exist.");
280 if (!spacesInfo.get(spaceName).edgeItems.containsKey(edgeName)) {
281 throw new IllegalArgumentException(
"Edge:" + edgeName +
" does not exist.");
283 return spacesInfo.get(spaceName).edgeItems.get(edgeName);
285 lock.readLock().unlock();
296 public HostAddr
getLeader(String spaceName,
int part)
throws IllegalArgumentException {
297 if (!spacesInfo.containsKey(spaceName)) {
301 lock.readLock().lock();
302 if (partLeaders ==
null) {
303 throw new IllegalArgumentException(
"Space:" + spaceName +
" does not exist.");
306 if (!partLeaders.containsKey(spaceName)) {
307 throw new IllegalArgumentException(
"Space:" + spaceName +
" does not exist.");
310 if (!partLeaders.get(spaceName).containsKey(part)) {
311 throw new IllegalArgumentException(
"PartId:" + part +
" does not exist.");
313 HostAddr hostAddr = partLeaders.get(spaceName).get(part);
314 return storageAddressMapping.getOrDefault(hostAddr, hostAddr);
316 lock.readLock().unlock();
326 public List<Integer>
getSpaceParts(String spaceName)
throws IllegalArgumentException {
338 throws IllegalArgumentException {
339 if (!spacesInfo.containsKey(spaceName)) {
343 lock.readLock().lock();
344 if (!spacesInfo.containsKey(spaceName)) {
345 throw new IllegalArgumentException(
"Space:" + spaceName +
" does not exist.");
347 Map<Integer, List<HostAddr>> partsAlloc = spacesInfo.get(spaceName).partsAlloc;
348 if (!storageAddressMapping.isEmpty()) {
350 partsAlloc.keySet().forEach(partId -> {
351 partsAlloc.computeIfPresent(partId, (k, addressList) -> addressList
353 .map(hostAddr -> storageAddressMapping.getOrDefault(hostAddr, hostAddr))
354 .collect(Collectors.toList()));
359 lock.readLock().unlock();
370 public void updateLeader(String spaceName,
int part, HostAddr newLeader)
371 throws IllegalArgumentException {
373 lock.writeLock().lock();
374 if (partLeaders ==
null) {
375 throw new IllegalArgumentException(
"Space:" + spaceName +
" does not exist.");
378 if (!partLeaders.containsKey(spaceName)) {
379 throw new IllegalArgumentException(
"Space:" + spaceName +
" does not exist.");
382 if (!partLeaders.get(spaceName).containsKey(part)) {
383 throw new IllegalArgumentException(
"PartId:" + part +
" does not exist.");
385 partLeaders.get(spaceName).put(part, newLeader);
387 lock.writeLock().unlock();
395 Set<HostAddr> hosts = metaClient.
listHosts();
396 if (!storageAddressMapping.isEmpty()) {
399 .map(hostAddr -> storageAddressMapping.getOrDefault(hostAddr, hostAddr))
400 .collect(Collectors.toSet());
405 public int getConnectionRetry() {
406 return metaClient.getConnectionRetry();
409 public int getTimeout() {
410 return metaClient.getTimeout();
413 public int getExecutionRetry() {
414 return metaClient.getExecutionRetry();