NebulaGraph Java Client  release-3.8
All Classes Functions Variables
MetaManager.java
1 /* Copyright (c) 2020 vesoft inc. All rights reserved.
2  *
3  * This source code is licensed under Apache 2.0 License.
4  */
5 
6 package com.vesoft.nebula.client.meta;
7 
8 import com.facebook.thrift.TException;
9 import com.google.common.collect.Maps;
10 import com.vesoft.nebula.HostAddr;
11 import com.vesoft.nebula.client.graph.data.HostAddress;
12 import com.vesoft.nebula.client.graph.data.SSLParam;
13 import com.vesoft.nebula.client.graph.exception.ClientServerIncompatibleException;
14 import com.vesoft.nebula.client.meta.exception.ExecuteFailedException;
15 import com.vesoft.nebula.meta.EdgeItem;
16 import com.vesoft.nebula.meta.HostItem;
17 import com.vesoft.nebula.meta.IdName;
18 import com.vesoft.nebula.meta.SpaceItem;
19 import com.vesoft.nebula.meta.TagItem;
20 import com.vesoft.nebula.util.NetUtil;
21 import java.io.Serializable;
22 import java.net.UnknownHostException;
23 import java.util.ArrayList;
24 import java.util.HashMap;
25 import java.util.HashSet;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.Set;
29 import java.util.concurrent.ConcurrentHashMap;
30 import java.util.concurrent.locks.ReentrantReadWriteLock;
31 import java.util.stream.Collectors;
32 import org.apache.commons.codec.Charsets;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35 
39 public class MetaManager implements MetaCache, Serializable {
40  private class SpaceInfo {
41  private SpaceItem spaceItem = null;
42  private Map<String, TagItem> tagItems = new HashMap<>();
43  private Map<Integer, String> tagIdNames = new HashMap<>();
44  private Map<String, EdgeItem> edgeItems = new HashMap<>();
45  private Map<Integer, String> edgeTypeNames = new HashMap<>();
46  private Map<Integer, List<HostAddr>> partsAlloc = new HashMap<>();
47  }
48 
49  private Map<String, MetaManager.SpaceInfo> spacesInfo = new HashMap<>();
50  private Map<String, Map<Integer, HostAddr>> partLeaders = null;
51  private Map<HostAddr, HostAddr> storageAddressMapping = new ConcurrentHashMap<>();
52 
53  private static final Logger LOGGER = LoggerFactory.getLogger(MetaManager.class);
54 
55  private MetaClient metaClient;
56  private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
57 
58  private static final int DEFAULT_TIMEOUT_MS = 1000;
59  private static final int DEFAULT_CONNECTION_RETRY_SIZE = 3;
60  private static final int DEFAULT_EXECUTION_RETRY_SIZE = 3;
61 
65  public MetaManager(List<HostAddress> address)
66  throws TException, ClientServerIncompatibleException, UnknownHostException {
67  metaClient = new MetaClient(address);
68  metaClient.connect();
69  fillMetaInfo();
70  }
71 
75  public MetaManager(List<HostAddress> address, int timeout, int connectionRetry,
76  int executionRetry, boolean enableSSL, SSLParam sslParam)
77  throws TException, ClientServerIncompatibleException, UnknownHostException {
78  metaClient = new MetaClient(address, timeout, connectionRetry, executionRetry, enableSSL,
79  sslParam);
80  metaClient.connect();
81  fillMetaInfo();
82  }
83 
90  public void addStorageAddrMapping(String sourceAddr, String targetAddr) {
91  if (sourceAddr != null && targetAddr != null) {
92  storageAddressMapping.put(NetUtil.parseHostAddr(sourceAddr),
93  NetUtil.parseHostAddr(targetAddr));
94  }
95  }
96 
102  public void addStorageAddrMapping(Map<String, String> addressMap) {
103  if (addressMap != null && !addressMap.isEmpty()) {
104  for (Map.Entry<String, String> et : addressMap.entrySet()) {
105  storageAddressMapping.put(NetUtil.parseHostAddr(et.getKey()),
106  NetUtil.parseHostAddr(et.getValue()));
107  }
108  }
109  }
110 
114  public void close() {
115  metaClient.close();
116  }
117 
118 
122  private void fillMetaInfo() {
123  try {
124  Map<String, MetaManager.SpaceInfo> tempSpacesInfo = new HashMap<>();
125  List<IdName> spaces = metaClient.getSpaces();
126  for (IdName space : spaces) {
127  SpaceInfo spaceInfo = new SpaceInfo();
128  String spaceName = new String(space.name);
129  SpaceItem spaceItem = metaClient.getSpace(spaceName);
130  spaceInfo.spaceItem = spaceItem;
131  List<TagItem> tags = metaClient.getTags(spaceName);
132  for (TagItem tag : tags) {
133  String tagName = new String(tag.tag_name);
134  if (!spaceInfo.tagItems.containsKey(tagName)
135  || spaceInfo.tagItems.get(tagName).getVersion() < tag.getVersion()) {
136  spaceInfo.tagItems.put(tagName, tag);
137  spaceInfo.tagIdNames.put(tag.tag_id, tagName);
138  }
139  }
140  List<EdgeItem> edges = metaClient.getEdges(spaceName);
141  for (EdgeItem edge : edges) {
142  String edgeName = new String(edge.edge_name);
143  if (!spaceInfo.edgeItems.containsKey(edgeName)
144  || spaceInfo.edgeItems.get(edgeName).getVersion() < edge.getVersion()) {
145  spaceInfo.edgeItems.put(edgeName, edge);
146  spaceInfo.edgeTypeNames.put(edge.edge_type, edgeName);
147  }
148  }
149  spaceInfo.partsAlloc = metaClient.getPartsAlloc(spaceName);
150  tempSpacesInfo.put(spaceName, spaceInfo);
151  }
152  try {
153  lock.writeLock().lock();
154  spacesInfo = tempSpacesInfo;
155  if (partLeaders == null) {
156  partLeaders = new HashMap<>();
157  }
158  for (HostItem hostItem : metaClient.getHostItems()) {
159  HostAddr leader = hostItem.getHostAddr();
160  for (Map.Entry<byte[], List<Integer>> spaceParts
161  : hostItem.getLeader_parts().entrySet()) {
162  String space = new String(spaceParts.getKey(), Charsets.UTF_8);
163  if (!partLeaders.containsKey(space)) {
164  partLeaders.put(space, Maps.newConcurrentMap());
165  }
166  for (int part : spaceParts.getValue()) {
167  partLeaders.get(space).put(part, leader);
168  }
169  }
170  }
171  } finally {
172  lock.writeLock().unlock();
173  }
174  } catch (TException | ExecuteFailedException e) {
175  LOGGER.error(e.getMessage());
176  }
177  }
178 
179 
186  public int getSpaceId(String spaceName) throws IllegalArgumentException {
187  return getSpace(spaceName).space_id;
188  }
189 
196  @Override
197  public SpaceItem getSpace(String spaceName) throws IllegalArgumentException {
198  if (!spacesInfo.containsKey(spaceName)) {
199  fillMetaInfo();
200  }
201  try {
202  lock.readLock().lock();
203  if (!spacesInfo.containsKey(spaceName)) {
204  throw new IllegalArgumentException("space:" + spaceName + " does not exist.");
205  }
206  return spacesInfo.get(spaceName).spaceItem;
207  } finally {
208  lock.readLock().unlock();
209  }
210  }
211 
219  public int getTagId(String spaceName, String tagName) throws IllegalArgumentException {
220  return getTag(spaceName, tagName).tag_id;
221  }
222 
230  @Override
231  public TagItem getTag(String spaceName, String tagName) throws IllegalArgumentException {
232  if (!spacesInfo.containsKey(spaceName)
233  || !spacesInfo.get(spaceName).tagItems.containsKey(tagName)) {
234  fillMetaInfo();
235  }
236  try {
237  lock.readLock().lock();
238  if (!spacesInfo.containsKey(spaceName)) {
239  throw new IllegalArgumentException("Space:" + spaceName + " does not exist.");
240  }
241  if (!spacesInfo.get(spaceName).tagItems.containsKey(tagName)) {
242  throw new IllegalArgumentException("Tag:" + tagName + " does not exist.");
243  }
244  return spacesInfo.get(spaceName).tagItems.get(tagName);
245  } finally {
246  lock.readLock().unlock();
247  }
248  }
249 
250 
258  public int getEdgeType(String spaceName, String edgeName) throws IllegalArgumentException {
259  return getEdge(spaceName, edgeName).edge_type;
260  }
261 
269  @Override
270  public EdgeItem getEdge(String spaceName, String edgeName) throws IllegalArgumentException {
271  if (!spacesInfo.containsKey(spaceName)
272  || !spacesInfo.get(spaceName).edgeItems.containsKey(edgeName)) {
273  fillMetaInfo();
274  }
275  try {
276  lock.readLock().lock();
277  if (!spacesInfo.containsKey(spaceName)) {
278  throw new IllegalArgumentException("Space:" + spaceName + " does not exist.");
279  }
280  if (!spacesInfo.get(spaceName).edgeItems.containsKey(edgeName)) {
281  throw new IllegalArgumentException("Edge:" + edgeName + " does not exist.");
282  }
283  return spacesInfo.get(spaceName).edgeItems.get(edgeName);
284  } finally {
285  lock.readLock().unlock();
286  }
287  }
288 
296  public HostAddr getLeader(String spaceName, int part) throws IllegalArgumentException {
297  if (!spacesInfo.containsKey(spaceName)) {
298  fillMetaInfo();
299  }
300  try {
301  lock.readLock().lock();
302  if (partLeaders == null) {
303  throw new IllegalArgumentException("Space:" + spaceName + " does not exist.");
304  }
305 
306  if (!partLeaders.containsKey(spaceName)) {
307  throw new IllegalArgumentException("Space:" + spaceName + " does not exist.");
308  }
309 
310  if (!partLeaders.get(spaceName).containsKey(part)) {
311  throw new IllegalArgumentException("PartId:" + part + " does not exist.");
312  }
313  HostAddr hostAddr = partLeaders.get(spaceName).get(part);
314  return storageAddressMapping.getOrDefault(hostAddr, hostAddr);
315  } finally {
316  lock.readLock().unlock();
317  }
318  }
319 
326  public List<Integer> getSpaceParts(String spaceName) throws IllegalArgumentException {
327  return new ArrayList<>(getPartsAlloc(spaceName).keySet());
328  }
329 
336  @Override
337  public Map<Integer, List<HostAddr>> getPartsAlloc(String spaceName)
338  throws IllegalArgumentException {
339  if (!spacesInfo.containsKey(spaceName)) {
340  fillMetaInfo();
341  }
342  try {
343  lock.readLock().lock();
344  if (!spacesInfo.containsKey(spaceName)) {
345  throw new IllegalArgumentException("Space:" + spaceName + " does not exist.");
346  }
347  Map<Integer, List<HostAddr>> partsAlloc = spacesInfo.get(spaceName).partsAlloc;
348  if (!storageAddressMapping.isEmpty()) {
349  // transform real address to special address by mapping
350  partsAlloc.keySet().forEach(partId -> {
351  partsAlloc.computeIfPresent(partId, (k, addressList) -> addressList
352  .stream()
353  .map(hostAddr -> storageAddressMapping.getOrDefault(hostAddr, hostAddr))
354  .collect(Collectors.toList()));
355  });
356  }
357  return partsAlloc;
358  } finally {
359  lock.readLock().unlock();
360  }
361  }
362 
370  public void updateLeader(String spaceName, int part, HostAddr newLeader)
371  throws IllegalArgumentException {
372  try {
373  lock.writeLock().lock();
374  if (partLeaders == null) {
375  throw new IllegalArgumentException("Space:" + spaceName + " does not exist.");
376  }
377 
378  if (!partLeaders.containsKey(spaceName)) {
379  throw new IllegalArgumentException("Space:" + spaceName + " does not exist.");
380  }
381 
382  if (!partLeaders.get(spaceName).containsKey(part)) {
383  throw new IllegalArgumentException("PartId:" + part + " does not exist.");
384  }
385  partLeaders.get(spaceName).put(part, newLeader);
386  } finally {
387  lock.writeLock().unlock();
388  }
389  }
390 
394  public Set<HostAddr> listHosts() {
395  Set<HostAddr> hosts = metaClient.listHosts();
396  if (!storageAddressMapping.isEmpty()) {
397  hosts = hosts
398  .stream()
399  .map(hostAddr -> storageAddressMapping.getOrDefault(hostAddr, hostAddr))
400  .collect(Collectors.toSet());
401  }
402  return hosts;
403  }
404 
405  public int getConnectionRetry() {
406  return metaClient.getConnectionRetry();
407  }
408 
409  public int getTimeout() {
410  return metaClient.getTimeout();
411  }
412 
413  public int getExecutionRetry() {
414  return metaClient.getExecutionRetry();
415  }
416 }
synchronized Set< HostItem > getHostItems()
get the leader parts for all storaged address
synchronized SpaceItem getSpace(String spaceName)
get one space
synchronized Set< HostAddr > listHosts()
get all Storaged servers
synchronized List< IdName > getSpaces()
get all spaces
synchronized List< TagItem > getTags(String spaceName)
get all tags of spaceName
synchronized Map< Integer, List< HostAddr > > getPartsAlloc(String spaceName)
Get all parts and the address in a space Store in this.parts.
synchronized List< EdgeItem > getEdges(String spaceName)
get all edges of specific space
MetaManager is a manager for meta info, such as spaces,tags and edges.
Set< HostAddr > listHosts()
get all storage addresses
Map< Integer, List< HostAddr > > getPartsAlloc(String spaceName)
get all parts alloc of one space
EdgeItem getEdge(String spaceName, String edgeName)
get Edge
MetaManager(List< HostAddress > address, int timeout, int connectionRetry, int executionRetry, boolean enableSSL, SSLParam sslParam)
init the meta info cache with more config
int getTagId(String spaceName, String tagName)
get tag id
int getSpaceId(String spaceName)
get space id
TagItem getTag(String spaceName, String tagName)
get tag
void addStorageAddrMapping(String sourceAddr, String targetAddr)
Add address mapping for storage.Used for change address of storage read from meta server.
void addStorageAddrMapping(Map< String, String > addressMap)
Add address mapping for storage.Used for change address of storage read from meta server.
List< Integer > getSpaceParts(String spaceName)
get all parts of one space
int getEdgeType(String spaceName, String edgeName)
get edge type
SpaceItem getSpace(String spaceName)
get space item
MetaManager(List< HostAddress > address)
init the meta info cache
void updateLeader(String spaceName, int part, HostAddr newLeader)
cache new leader for part
HostAddr getLeader(String spaceName, int part)
get part leader