6 package com.vesoft.nebula.client.storage;
8 import com.vesoft.nebula.HostAddr;
9 import com.vesoft.nebula.client.graph.data.HostAddress;
10 import com.vesoft.nebula.client.graph.data.SSLParam;
11 import com.vesoft.nebula.client.meta.MetaManager;
12 import com.vesoft.nebula.client.storage.scan.PartScanInfo;
13 import com.vesoft.nebula.client.storage.scan.ScanEdgeResultIterator;
14 import com.vesoft.nebula.client.storage.scan.ScanVertexResultIterator;
15 import com.vesoft.nebula.meta.ColumnDef;
16 import com.vesoft.nebula.meta.Schema;
17 import com.vesoft.nebula.storage.EdgeProp;
18 import com.vesoft.nebula.storage.ScanEdgeRequest;
19 import com.vesoft.nebula.storage.ScanVertexRequest;
20 import com.vesoft.nebula.storage.VertexProp;
21 import java.io.Serializable;
22 import java.util.ArrayList;
23 import java.util.Arrays;
24 import java.util.HashSet;
25 import java.util.List;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
32 private static final Logger LOGGER = LoggerFactory.getLogger(
StorageClient.class);
37 private final List<HostAddress> addresses;
38 private int timeout = 10000;
39 private int connectionRetry = 3;
40 private int executionRetry = 1;
42 private boolean enableSSL =
false;
45 private String user =
null;
46 private String password =
null;
48 private Map<String, String> storageAddressMapping =
null;
69 this.addresses = addresses;
81 this.addresses = addresses;
82 this.timeout = timeout;
89 public StorageClient(List<HostAddress> addresses,
int timeout,
int connectionRetry,
90 int executionRetry,
boolean enableSSL,
SSLParam sslParam) {
91 this(addresses, timeout);
92 this.connectionRetry = connectionRetry;
93 this.executionRetry = executionRetry;
94 this.enableSSL = enableSSL;
95 this.sslParam = sslParam;
96 if (enableSSL && sslParam ==
null) {
97 throw new IllegalArgumentException(
"SSL is enabled, but SSLParam is nul.");
107 connection.open(addresses.get(0), timeout, enableSSL, sslParam);
109 config.setEnableSSL(enableSSL);
110 config.setSslParam(sslParam);
112 metaManager =
new MetaManager(addresses, timeout, connectionRetry, executionRetry,
113 enableSSL, sslParam);
124 this.password = password;
140 this.storageAddressMapping = storageAddressMapping;
141 if (this.metaManager !=
null) {
159 List<String> returnCols) {
160 return scanVertex(spaceName, tagName, returnCols, DEFAULT_LIMIT);
176 List<String> returnCols) {
177 return scanVertex(spaceName, part, tagName, returnCols, DEFAULT_LIMIT);
190 return scanVertex(spaceName, tagName, DEFAULT_LIMIT);
204 return scanVertex(spaceName, part, tagName, DEFAULT_LIMIT);
220 List<String> returnCols,
222 return scanVertex(spaceName, tagName, returnCols, limit, DEFAULT_START_TIME,
241 List<String> returnCols,
243 return scanVertex(spaceName, part, tagName, returnCols, limit, DEFAULT_START_TIME,
260 return scanVertex(spaceName, tagName, limit, DEFAULT_START_TIME, DEFAULT_END_TIME);
279 limit, DEFAULT_START_TIME, DEFAULT_END_TIME);
299 List<String> returnCols,
303 return scanVertex(spaceName, tagName, returnCols, limit, startTime, endTime,
304 DEFAULT_ALLOW_PART_SUCCESS, DEFAULT_ALLOW_READ_FOLLOWER);
326 List<String> returnCols,
330 return scanVertex(spaceName, part, tagName, returnCols, limit, startTime, endTime,
331 DEFAULT_ALLOW_PART_SUCCESS, DEFAULT_ALLOW_READ_FOLLOWER);
353 return scanVertex(spaceName, tagName, limit, startTime, endTime,
354 DEFAULT_ALLOW_PART_SUCCESS, DEFAULT_ALLOW_READ_FOLLOWER);
384 DEFAULT_ALLOW_PART_SUCCESS,
385 DEFAULT_ALLOW_READ_FOLLOWER);
412 List<String> returnCols,
416 boolean allowPartSuccess,
417 boolean allowReadFromFollower) {
419 if (parts.isEmpty()) {
420 throw new IllegalArgumentException(
"No valid part in space " + spaceName);
432 allowReadFromFollower);
461 List<String> returnCols,
465 boolean allowPartSuccess,
466 boolean allowReadFromFollower) {
477 allowReadFromFollower);
506 boolean allowPartSuccess,
507 boolean allowReadFromFollower) {
509 if (parts.isEmpty()) {
510 throw new IllegalArgumentException(
"No valid part in space " + spaceName);
522 allowReadFromFollower);
551 boolean allowPartSuccess,
552 boolean allowReadFromFollower) {
563 allowReadFromFollower);
569 List<String> returnCols,
574 boolean allowPartSuccess,
575 boolean allowReadFromFollower) {
576 if (spaceName ==
null || spaceName.trim().isEmpty()) {
577 throw new IllegalArgumentException(
"space name is empty.");
579 if (tagName ==
null || tagName.trim().isEmpty()) {
580 throw new IllegalArgumentException(
"tag name is empty");
582 if (noColumns && returnCols ==
null) {
583 throw new IllegalArgumentException(
"returnCols is null");
586 Set<PartScanInfo> partScanInfoSet =
new HashSet<>();
587 for (
int part : parts) {
588 HostAddr leader = metaManager.
getLeader(spaceName, part);
589 partScanInfoSet.add(
new PartScanInfo(part,
new HostAddress(leader.getHost(),
592 List<HostAddress> addrs =
new ArrayList<>();
593 for (HostAddr addr : metaManager.
listHosts()) {
594 addrs.add(
new HostAddress(addr.getHost(), addr.getPort()));
597 long tag = metaManager.
getTag(spaceName, tagName).getTag_id();
598 List<byte[]> props =
new ArrayList<>();
599 props.add(
"_vid".getBytes());
601 if (returnCols.size() == 0) {
602 Schema schema = metaManager.
getTag(spaceName, tagName).getSchema();
603 for (ColumnDef columnDef : schema.getColumns()) {
604 props.add(columnDef.getName());
607 for (String prop : returnCols) {
608 props.add(prop.getBytes());
612 VertexProp vertexCols =
new VertexProp((
int) tag, props);
613 List<VertexProp> vertexProps = Arrays.asList(vertexCols);
614 ScanVertexRequest request =
new ScanVertexRequest();
616 .setSpace_id(getSpaceId(spaceName))
617 .setReturn_columns(vertexProps)
619 .setStart_time(startTime)
620 .setEnd_time(endTime)
621 .setEnable_read_from_follower(allowReadFromFollower);
623 return doScanVertex(spaceName, tagName, partScanInfoSet, request, addrs, allowPartSuccess);
638 private ScanVertexResultIterator doScanVertex(String spaceName,
640 Set<PartScanInfo> partScanInfoSet,
641 ScanVertexRequest request,
642 List<HostAddress> addrs,
643 boolean allowPartSuccess) {
644 if (addrs ==
null || addrs.isEmpty()) {
645 throw new IllegalArgumentException(
"storage hosts is empty.");
648 return new ScanVertexResultIterator.ScanVertexResultBuilder()
649 .withMetaClient(metaManager)
651 .withPartScanInfo(partScanInfoSet)
652 .withRequest(request)
653 .withAddresses(addrs)
654 .withSpaceName(spaceName)
655 .withTagName(tagName)
656 .withPartSuccess(allowPartSuccess)
658 .withPassword(password)
659 .withStorageAddressMapping(storageAddressMapping)
675 List<String> returnCols) {
677 return scanEdge(spaceName, edgeName, returnCols, DEFAULT_LIMIT);
692 List<String> returnCols) {
694 return scanEdge(spaceName, part, edgeName, returnCols, DEFAULT_LIMIT);
707 return scanEdge(spaceName, edgeName, DEFAULT_LIMIT);
721 return scanEdge(spaceName, part, edgeName, DEFAULT_LIMIT);
737 List<String> returnCols,
int limit) {
738 return scanEdge(spaceName, edgeName, returnCols, limit, DEFAULT_START_TIME,
755 List<String> returnCols,
int limit) {
756 return scanEdge(spaceName, part, edgeName, returnCols, limit, DEFAULT_START_TIME,
772 return scanEdge(spaceName, edgeName, limit, DEFAULT_START_TIME, DEFAULT_END_TIME);
788 return scanEdge(spaceName, part, edgeName, limit, DEFAULT_START_TIME, DEFAULT_END_TIME);
805 List<String> returnCols,
809 return scanEdge(spaceName, edgeName, returnCols, limit, startTime, endTime,
810 DEFAULT_ALLOW_PART_SUCCESS, DEFAULT_ALLOW_READ_FOLLOWER);
833 List<String> returnCols,
837 return scanEdge(spaceName, part, edgeName, returnCols, limit, startTime, endTime,
838 DEFAULT_ALLOW_PART_SUCCESS, DEFAULT_ALLOW_READ_FOLLOWER);
861 return scanEdge(spaceName, edgeName, limit, startTime, endTime,
862 DEFAULT_ALLOW_PART_SUCCESS, DEFAULT_ALLOW_READ_FOLLOWER);
887 return scanEdge(spaceName, part, edgeName, limit, startTime, endTime,
888 DEFAULT_ALLOW_PART_SUCCESS, DEFAULT_ALLOW_READ_FOLLOWER);
912 List<String> returnCols,
916 boolean allowPartSuccess,
917 boolean allowReadFromFollower) {
920 if (parts.isEmpty()) {
921 throw new IllegalArgumentException(
"No valid part in space " + spaceName);
924 return scanEdge(spaceName, parts, edgeName, returnCols,
false,
925 limit, startTime, endTime, allowPartSuccess, allowReadFromFollower);
950 List<String> returnCols,
954 boolean allowPartSuccess,
955 boolean allowReadFromFollower) {
956 return scanEdge(spaceName, Arrays.asList(part), edgeName, returnCols,
false,
957 limit, startTime, endTime, allowPartSuccess, allowReadFromFollower);
984 boolean allowPartSuccess,
985 boolean allowReadFromFollower) {
988 if (parts.isEmpty()) {
989 throw new IllegalArgumentException(
"No valid part in space " + spaceName);
991 return scanEdge(spaceName, parts, edgeName,
new ArrayList<>(),
true,
992 limit, startTime, endTime, allowPartSuccess, allowReadFromFollower);
1021 boolean allowPartSuccess,
1022 boolean allowReadFromFollower) {
1023 return scanEdge(spaceName, Arrays.asList(part), edgeName,
new ArrayList<>(),
true,
1024 limit, startTime, endTime, allowPartSuccess, allowReadFromFollower);
1029 List<Integer> parts,
1031 List<String> returnCols,
1036 boolean allowPartSuccess,
1037 boolean allowReadFromFollower) {
1038 if (spaceName ==
null || spaceName.trim().isEmpty()) {
1039 throw new IllegalArgumentException(
"space name is empty.");
1041 if (edgeName ==
null || edgeName.trim().isEmpty()) {
1042 throw new IllegalArgumentException(
"edge name is empty");
1044 if (noColumns && returnCols ==
null) {
1045 throw new IllegalArgumentException(
"returnCols is null");
1048 Set<PartScanInfo> partScanInfoSet =
new HashSet<>();
1049 for (
int part : parts) {
1050 HostAddr leader = metaManager.
getLeader(spaceName, part);
1051 partScanInfoSet.add(
new PartScanInfo(part,
new HostAddress(leader.getHost(),
1052 leader.getPort())));
1054 List<HostAddress> addrs =
new ArrayList<>();
1055 for (HostAddr addr : metaManager.
listHosts()) {
1056 addrs.add(
new HostAddress(addr.getHost(), addr.getPort()));
1058 List<byte[]> props =
new ArrayList<>();
1059 props.add(
"_src".getBytes());
1060 props.add(
"_dst".getBytes());
1061 props.add(
"_rank".getBytes());
1063 if (returnCols.size() == 0) {
1064 Schema schema = metaManager.
getEdge(spaceName, edgeName).getSchema();
1065 for (ColumnDef columnDef : schema.getColumns()) {
1066 props.add(columnDef.name);
1069 for (String prop : returnCols) {
1070 props.add(prop.getBytes());
1075 long edgeId = getEdgeId(spaceName, edgeName);
1076 EdgeProp edgeCols =
new EdgeProp((
int) edgeId, props);
1077 List<EdgeProp> edgeProps = Arrays.asList(edgeCols);
1079 ScanEdgeRequest request =
new ScanEdgeRequest();
1081 .setSpace_id(getSpaceId(spaceName))
1082 .setReturn_columns(edgeProps)
1084 .setStart_time(startTime)
1085 .setEnd_time(endTime)
1086 .setEnable_read_from_follower(allowReadFromFollower);
1088 return doScanEdge(spaceName, edgeName, partScanInfoSet, request, addrs, allowPartSuccess);
1103 private ScanEdgeResultIterator doScanEdge(String spaceName,
1105 Set<PartScanInfo> partScanInfoSet,
1106 ScanEdgeRequest request,
1107 List<HostAddress> addrs,
1108 boolean allowPartSuccess) {
1109 if (addrs ==
null || addrs.isEmpty()) {
1110 throw new IllegalArgumentException(
"storage hosts is empty.");
1113 return new ScanEdgeResultIterator.ScanEdgeResultBuilder()
1114 .withMetaClient(metaManager)
1116 .withPartScanInfo(partScanInfoSet)
1117 .withRequest(request)
1118 .withAddresses(addrs)
1119 .withSpaceName(spaceName)
1120 .withEdgeName(edgeName)
1121 .withPartSuccess(allowPartSuccess)
1123 .withPassword(password)
1135 if (connection !=
null) {
1138 if (metaManager !=
null) {
1139 metaManager.
close();
1150 return this.connection;
1160 private int getSpaceId(String spaceName) {
1171 private long getEdgeId(String spaceName, String edgeName) {
1172 return metaManager.
getEdge(spaceName, edgeName).getEdge_type();
1175 private static final int DEFAULT_LIMIT = 1000;
1176 private static final long DEFAULT_START_TIME = 0;
1177 private static final long DEFAULT_END_TIME = Long.MAX_VALUE;
1178 private static final boolean DEFAULT_ALLOW_PART_SUCCESS =
false;
1179 private static final boolean DEFAULT_ALLOW_READ_FOLLOWER =
false;
ScanVertexResultIterator scanVertex(String spaceName, String tagName, List< String > returnCols)
scan vertex of all parts with specific return cols, if returnCols is an empty list,...
StorageClient(List< HostAddress > addresses, int timeout)
Get a Nebula Storage client that executes the scan query to get NebulaGraph's data with multi servers...
ScanEdgeResultIterator scanEdge(String spaceName, int part, String edgeName, List< String > returnCols)
scan edge of specific part with return cols.
ScanVertexResultIterator scanVertex(String spaceName, int part, String tagName, List< String > returnCols, int limit)
scan vertex of specific part with specific return cols and limit.
ScanVertexResultIterator scanVertex(String spaceName, String tagName)
scan vertex of all parts with no return cols.
ScanEdgeResultIterator scanEdge(String spaceName, int part, String edgeName, int limit, long startTime, long endTime)
scan edge of specific part with no return cols and limit, start time, end time config.
ScanEdgeResultIterator scanEdge(String spaceName, int part, String edgeName, List< String > returnCols, int limit)
scan edge of specific part with return cols.
ScanVertexResultIterator scanVertex(String spaceName, int part, String tagName, List< String > returnCols)
scan vertex of specific part with specific return cols, if returnCols is an empty list,...
GraphStorageConnection getConnection()
return client's connection session
ScanEdgeResultIterator scanEdge(String spaceName, String edgeName, List< String > returnCols, int limit, long startTime, long endTime, boolean allowPartSuccess, boolean allowReadFromFollower)
scan edge of all parts with return cols and limit, start time, end time, if allow partial success,...
ScanVertexResultIterator scanVertex(String spaceName, int part, String tagName, List< String > returnCols, int limit, long startTime, long endTime)
scan vertex of specific part with specific returnCols, limit, startTime and endTime.
void close()
release storage client
ScanEdgeResultIterator scanEdge(String spaceName, String edgeName)
scan edge of all parts with no return cols.
ScanVertexResultIterator scanVertex(String spaceName, String tagName, List< String > returnCols, int limit, long startTime, long endTime)
scan vertex of all parts with specific returnCols, limit, startTime and endTime.
boolean connect()
Connect to Nebula Storage server.
ScanEdgeResultIterator scanEdge(String spaceName, String edgeName, List< String > returnCols)
scan edge of all parts with return cols.
StorageClient(List< HostAddress > addresses)
Get a Nebula Storage client that executes the scan query to get NebulaGraph's data with multi servers...
StorageClient(String ip, int port)
Get a Nebula Storage client that executes the scan query to get NebulaGraph's data with one server ho...
ScanEdgeResultIterator scanEdge(String spaceName, int part, String edgeName)
scan edge of specific part with no return cols.
ScanVertexResultIterator scanVertex(String spaceName, int part, String tagName)
scan vertex of specific part with no return cols.
ScanVertexResultIterator scanVertex(String spaceName, String tagName, int limit)
scan vertex of all parts with no return cols and limit.
ScanEdgeResultIterator scanEdge(String spaceName, int part, String edgeName, int limit)
scan edge of specific part with no return cols and limit config.
ScanVertexResultIterator scanVertex(String spaceName, String tagName, int limit, long startTime, long endTime, boolean allowPartSuccess, boolean allowReadFromFollower)
scan vertex of all parts with no return cols, limit, startTime, endTime, whether allow partial succes...
ScanVertexResultIterator scanVertex(String spaceName, String tagName, List< String > returnCols, int limit)
scan vertex of all parts with specific return cols and limit.
ScanEdgeResultIterator scanEdge(String spaceName, String edgeName, int limit, long startTime, long endTime)
scan edge of all parts with no return cols and limit, start time, end time config.
ScanEdgeResultIterator scanEdge(String spaceName, String edgeName, List< String > returnCols, int limit, long startTime, long endTime)
scan edge of all parts with return cols and limit, start time, end time config.
ScanVertexResultIterator scanVertex(String spaceName, String tagName, int limit, long startTime, long endTime)
scan vertex of all parts with no returnCols, limit, startTime and endTime.
ScanEdgeResultIterator scanEdge(String spaceName, int part, String edgeName, List< String > returnCols, int limit, long startTime, long endTime)
scan edge of specific part with return cols and limit, start time, end time config.
ScanVertexResultIterator scanVertex(String spaceName, int part, String tagName, int limit, long startTime, long endTime)
scan vertex of specific part with no returnCols, limit, startTime and endTime.
ScanEdgeResultIterator scanEdge(String spaceName, int part, String edgeName, List< String > returnCols, int limit, long startTime, long endTime, boolean allowPartSuccess, boolean allowReadFromFollower)
scan edge of specific part with return cols and limit, start time, end time, if allow partial success...
ScanEdgeResultIterator scanEdge(String spaceName, String edgeName, List< String > returnCols, int limit)
scan edge of all parts with return cols and limit config.
ScanVertexResultIterator scanVertex(String spaceName, int part, String tagName, int limit)
scan vertex of specific part with no return cols and limit.
void setStorageAddressMapping(Map< String, String > storageAddressMapping)
The storage address translation relationship is set to convert the storage address that cannot be obt...
ScanEdgeResultIterator scanEdge(String spaceName, int part, String edgeName, int limit, long startTime, long endTime, boolean allowPartSuccess, boolean allowReadFromFollower)
scan edge of specific part with no return cols and limit, start time, end time, if allow partial succ...
ScanVertexResultIterator scanVertex(String spaceName, int part, String tagName, int limit, long startTime, long endTime, boolean allowPartSuccess, boolean allowReadFromFollower)
scan vertex of specific part with no return cols, limit, startTime, endTime, whether allow partial su...
ScanEdgeResultIterator scanEdge(String spaceName, String edgeName, int limit, long startTime, long endTime, boolean allowPartSuccess, boolean allowReadFromFollower)
scan edge of all parts with no return cols and limit, start time, end time, if allow partial success,...
ScanVertexResultIterator scanVertex(String spaceName, String tagName, List< String > returnCols, int limit, long startTime, long endTime, boolean allowPartSuccess, boolean allowReadFromFollower)
scan vertex of all parts with specific return cols, limit, startTime, endTime, whether allow partial ...
ScanVertexResultIterator scanVertex(String spaceName, int part, String tagName, List< String > returnCols, int limit, long startTime, long endTime, boolean allowPartSuccess, boolean allowReadFromFollower)
scan vertex of specific part with specific return cols, limit, startTime, endTime, whether allow part...
StorageClient(List< HostAddress > addresses, int timeout, int connectionRetry, int executionRetry, boolean enableSSL, SSLParam sslParam)
Get a Nebula Storage client that executes the scan query to get NebulaGraph's data with multi servers...
ScanEdgeResultIterator scanEdge(String spaceName, String edgeName, int limit)
scan edge of all parts with no return cols and limit config.
ScanVertexResult's iterator.