6 package com.vesoft.nebula.client.storage;
8 import com.vesoft.nebula.HostAddr;
9 import com.vesoft.nebula.client.graph.data.HostAddress;
10 import com.vesoft.nebula.client.graph.data.SSLParam;
11 import com.vesoft.nebula.client.meta.MetaManager;
12 import com.vesoft.nebula.client.storage.scan.PartScanInfo;
13 import com.vesoft.nebula.client.storage.scan.ScanEdgeResultIterator;
14 import com.vesoft.nebula.client.storage.scan.ScanVertexResultIterator;
15 import com.vesoft.nebula.meta.ColumnDef;
16 import com.vesoft.nebula.meta.Schema;
17 import com.vesoft.nebula.storage.EdgeProp;
18 import com.vesoft.nebula.storage.ScanEdgeRequest;
19 import com.vesoft.nebula.storage.ScanVertexRequest;
20 import com.vesoft.nebula.storage.VertexProp;
21 import java.io.Serializable;
22 import java.util.ArrayList;
23 import java.util.Arrays;
24 import java.util.HashSet;
25 import java.util.List;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
31 private static final Logger LOGGER = LoggerFactory.getLogger(
StorageClient.class);
36 private final List<HostAddress> addresses;
37 private int timeout = 10000;
38 private int connectionRetry = 3;
39 private int executionRetry = 1;
41 private boolean enableSSL =
false;
44 private String user =
null;
45 private String password =
null;
66 this.addresses = addresses;
78 this.addresses = addresses;
79 this.timeout = timeout;
86 public StorageClient(List<HostAddress> addresses,
int timeout,
int connectionRetry,
87 int executionRetry,
boolean enableSSL,
SSLParam sslParam) {
88 this(addresses, timeout);
89 this.connectionRetry = connectionRetry;
90 this.executionRetry = executionRetry;
91 this.enableSSL = enableSSL;
92 this.sslParam = sslParam;
93 if (enableSSL && sslParam ==
null) {
94 throw new IllegalArgumentException(
"SSL is enabled, but SSLParam is nul.");
104 connection.open(addresses.get(0), timeout, enableSSL, sslParam);
106 config.setEnableSSL(enableSSL);
107 config.setSslParam(sslParam);
109 metaManager =
new MetaManager(addresses, timeout, connectionRetry, executionRetry,
110 enableSSL, sslParam);
120 this.password = password;
137 List<String> returnCols) {
138 return scanVertex(spaceName, tagName, returnCols, DEFAULT_LIMIT);
154 List<String> returnCols) {
155 return scanVertex(spaceName, part, tagName, returnCols, DEFAULT_LIMIT);
168 return scanVertex(spaceName, tagName, DEFAULT_LIMIT);
182 return scanVertex(spaceName, part, tagName, DEFAULT_LIMIT);
198 List<String> returnCols,
200 return scanVertex(spaceName, tagName, returnCols, limit, DEFAULT_START_TIME,
219 List<String> returnCols,
221 return scanVertex(spaceName, part, tagName, returnCols, limit, DEFAULT_START_TIME,
238 return scanVertex(spaceName, tagName, limit, DEFAULT_START_TIME, DEFAULT_END_TIME);
257 limit, DEFAULT_START_TIME, DEFAULT_END_TIME);
277 List<String> returnCols,
281 return scanVertex(spaceName, tagName, returnCols, limit, startTime, endTime,
282 DEFAULT_ALLOW_PART_SUCCESS, DEFAULT_ALLOW_READ_FOLLOWER);
304 List<String> returnCols,
308 return scanVertex(spaceName, part, tagName, returnCols, limit, startTime, endTime,
309 DEFAULT_ALLOW_PART_SUCCESS, DEFAULT_ALLOW_READ_FOLLOWER);
331 return scanVertex(spaceName, tagName, limit, startTime, endTime,
332 DEFAULT_ALLOW_PART_SUCCESS, DEFAULT_ALLOW_READ_FOLLOWER);
362 DEFAULT_ALLOW_PART_SUCCESS,
363 DEFAULT_ALLOW_READ_FOLLOWER);
390 List<String> returnCols,
394 boolean allowPartSuccess,
395 boolean allowReadFromFollower) {
397 if (parts.isEmpty()) {
398 throw new IllegalArgumentException(
"No valid part in space " + spaceName);
410 allowReadFromFollower);
439 List<String> returnCols,
443 boolean allowPartSuccess,
444 boolean allowReadFromFollower) {
455 allowReadFromFollower);
484 boolean allowPartSuccess,
485 boolean allowReadFromFollower) {
487 if (parts.isEmpty()) {
488 throw new IllegalArgumentException(
"No valid part in space " + spaceName);
500 allowReadFromFollower);
529 boolean allowPartSuccess,
530 boolean allowReadFromFollower) {
541 allowReadFromFollower);
547 List<String> returnCols,
552 boolean allowPartSuccess,
553 boolean allowReadFromFollower) {
554 if (spaceName ==
null || spaceName.trim().isEmpty()) {
555 throw new IllegalArgumentException(
"space name is empty.");
557 if (tagName ==
null || tagName.trim().isEmpty()) {
558 throw new IllegalArgumentException(
"tag name is empty");
560 if (noColumns && returnCols ==
null) {
561 throw new IllegalArgumentException(
"returnCols is null");
564 Set<PartScanInfo> partScanInfoSet =
new HashSet<>();
565 for (
int part : parts) {
566 HostAddr leader = metaManager.
getLeader(spaceName, part);
567 partScanInfoSet.add(
new PartScanInfo(part,
new HostAddress(leader.getHost(),
570 List<HostAddress> addrs =
new ArrayList<>();
571 for (HostAddr addr : metaManager.
listHosts()) {
572 addrs.add(
new HostAddress(addr.getHost(), addr.getPort()));
575 long tag = metaManager.
getTag(spaceName, tagName).getTag_id();
576 List<byte[]> props =
new ArrayList<>();
577 props.add(
"_vid".getBytes());
579 if (returnCols.size() == 0) {
580 Schema schema = metaManager.
getTag(spaceName, tagName).getSchema();
581 for (ColumnDef columnDef : schema.getColumns()) {
582 props.add(columnDef.getName());
585 for (String prop : returnCols) {
586 props.add(prop.getBytes());
590 VertexProp vertexCols =
new VertexProp((
int) tag, props);
591 List<VertexProp> vertexProps = Arrays.asList(vertexCols);
592 ScanVertexRequest request =
new ScanVertexRequest();
594 .setSpace_id(getSpaceId(spaceName))
595 .setReturn_columns(vertexProps)
597 .setStart_time(startTime)
598 .setEnd_time(endTime)
599 .setEnable_read_from_follower(allowReadFromFollower);
601 return doScanVertex(spaceName, tagName, partScanInfoSet, request, addrs, allowPartSuccess);
616 private ScanVertexResultIterator doScanVertex(String spaceName,
618 Set<PartScanInfo> partScanInfoSet,
619 ScanVertexRequest request,
620 List<HostAddress> addrs,
621 boolean allowPartSuccess) {
622 if (addrs ==
null || addrs.isEmpty()) {
623 throw new IllegalArgumentException(
"storage hosts is empty.");
626 return new ScanVertexResultIterator.ScanVertexResultBuilder()
627 .withMetaClient(metaManager)
629 .withPartScanInfo(partScanInfoSet)
630 .withRequest(request)
631 .withAddresses(addrs)
632 .withSpaceName(spaceName)
633 .withTagName(tagName)
634 .withPartSuccess(allowPartSuccess)
636 .withPassword(password)
652 List<String> returnCols) {
654 return scanEdge(spaceName, edgeName, returnCols, DEFAULT_LIMIT);
669 List<String> returnCols) {
671 return scanEdge(spaceName, part, edgeName, returnCols, DEFAULT_LIMIT);
684 return scanEdge(spaceName, edgeName, DEFAULT_LIMIT);
698 return scanEdge(spaceName, part, edgeName, DEFAULT_LIMIT);
714 List<String> returnCols,
int limit) {
715 return scanEdge(spaceName, edgeName, returnCols, limit, DEFAULT_START_TIME,
732 List<String> returnCols,
int limit) {
733 return scanEdge(spaceName, part, edgeName, returnCols, limit, DEFAULT_START_TIME,
749 return scanEdge(spaceName, edgeName, limit, DEFAULT_START_TIME, DEFAULT_END_TIME);
765 return scanEdge(spaceName, part, edgeName, limit, DEFAULT_START_TIME, DEFAULT_END_TIME);
782 List<String> returnCols,
786 return scanEdge(spaceName, edgeName, returnCols, limit, startTime, endTime,
787 DEFAULT_ALLOW_PART_SUCCESS, DEFAULT_ALLOW_READ_FOLLOWER);
810 List<String> returnCols,
814 return scanEdge(spaceName, part, edgeName, returnCols, limit, startTime, endTime,
815 DEFAULT_ALLOW_PART_SUCCESS, DEFAULT_ALLOW_READ_FOLLOWER);
838 return scanEdge(spaceName, edgeName, limit, startTime, endTime,
839 DEFAULT_ALLOW_PART_SUCCESS, DEFAULT_ALLOW_READ_FOLLOWER);
864 return scanEdge(spaceName, part, edgeName, limit, startTime, endTime,
865 DEFAULT_ALLOW_PART_SUCCESS, DEFAULT_ALLOW_READ_FOLLOWER);
889 List<String> returnCols,
893 boolean allowPartSuccess,
894 boolean allowReadFromFollower) {
897 if (parts.isEmpty()) {
898 throw new IllegalArgumentException(
"No valid part in space " + spaceName);
901 return scanEdge(spaceName, parts, edgeName, returnCols,
false,
902 limit, startTime, endTime, allowPartSuccess, allowReadFromFollower);
927 List<String> returnCols,
931 boolean allowPartSuccess,
932 boolean allowReadFromFollower) {
933 return scanEdge(spaceName, Arrays.asList(part), edgeName, returnCols,
false,
934 limit, startTime, endTime, allowPartSuccess, allowReadFromFollower);
961 boolean allowPartSuccess,
962 boolean allowReadFromFollower) {
965 if (parts.isEmpty()) {
966 throw new IllegalArgumentException(
"No valid part in space " + spaceName);
968 return scanEdge(spaceName, parts, edgeName,
new ArrayList<>(),
true,
969 limit, startTime, endTime, allowPartSuccess, allowReadFromFollower);
998 boolean allowPartSuccess,
999 boolean allowReadFromFollower) {
1000 return scanEdge(spaceName, Arrays.asList(part), edgeName,
new ArrayList<>(),
true,
1001 limit, startTime, endTime, allowPartSuccess, allowReadFromFollower);
1006 List<Integer> parts,
1008 List<String> returnCols,
1013 boolean allowPartSuccess,
1014 boolean allowReadFromFollower) {
1015 if (spaceName ==
null || spaceName.trim().isEmpty()) {
1016 throw new IllegalArgumentException(
"space name is empty.");
1018 if (edgeName ==
null || edgeName.trim().isEmpty()) {
1019 throw new IllegalArgumentException(
"edge name is empty");
1021 if (noColumns && returnCols ==
null) {
1022 throw new IllegalArgumentException(
"returnCols is null");
1025 Set<PartScanInfo> partScanInfoSet =
new HashSet<>();
1026 for (
int part : parts) {
1027 HostAddr leader = metaManager.
getLeader(spaceName, part);
1028 partScanInfoSet.add(
new PartScanInfo(part,
new HostAddress(leader.getHost(),
1029 leader.getPort())));
1031 List<HostAddress> addrs =
new ArrayList<>();
1032 for (HostAddr addr : metaManager.
listHosts()) {
1033 addrs.add(
new HostAddress(addr.getHost(), addr.getPort()));
1035 List<byte[]> props =
new ArrayList<>();
1036 props.add(
"_src".getBytes());
1037 props.add(
"_dst".getBytes());
1038 props.add(
"_rank".getBytes());
1040 if (returnCols.size() == 0) {
1041 Schema schema = metaManager.
getEdge(spaceName, edgeName).getSchema();
1042 for (ColumnDef columnDef : schema.getColumns()) {
1043 props.add(columnDef.name);
1046 for (String prop : returnCols) {
1047 props.add(prop.getBytes());
1052 long edgeId = getEdgeId(spaceName, edgeName);
1053 EdgeProp edgeCols =
new EdgeProp((
int) edgeId, props);
1054 List<EdgeProp> edgeProps = Arrays.asList(edgeCols);
1056 ScanEdgeRequest request =
new ScanEdgeRequest();
1058 .setSpace_id(getSpaceId(spaceName))
1059 .setReturn_columns(edgeProps)
1061 .setStart_time(startTime)
1062 .setEnd_time(endTime)
1063 .setEnable_read_from_follower(allowReadFromFollower);
1065 return doScanEdge(spaceName, edgeName, partScanInfoSet, request, addrs, allowPartSuccess);
1080 private ScanEdgeResultIterator doScanEdge(String spaceName,
1082 Set<PartScanInfo> partScanInfoSet,
1083 ScanEdgeRequest request,
1084 List<HostAddress> addrs,
1085 boolean allowPartSuccess) {
1086 if (addrs ==
null || addrs.isEmpty()) {
1087 throw new IllegalArgumentException(
"storage hosts is empty.");
1090 return new ScanEdgeResultIterator.ScanEdgeResultBuilder()
1091 .withMetaClient(metaManager)
1093 .withPartScanInfo(partScanInfoSet)
1094 .withRequest(request)
1095 .withAddresses(addrs)
1096 .withSpaceName(spaceName)
1097 .withEdgeName(edgeName)
1098 .withPartSuccess(allowPartSuccess)
1100 .withPassword(password)
1112 if (connection !=
null) {
1115 if (metaManager !=
null) {
1116 metaManager.
close();
1127 return this.connection;
1137 private int getSpaceId(String spaceName) {
1148 private long getEdgeId(String spaceName, String edgeName) {
1149 return metaManager.
getEdge(spaceName, edgeName).getEdge_type();
1152 private static final int DEFAULT_LIMIT = 1000;
1153 private static final long DEFAULT_START_TIME = 0;
1154 private static final long DEFAULT_END_TIME = Long.MAX_VALUE;
1155 private static final boolean DEFAULT_ALLOW_PART_SUCCESS =
false;
1156 private static final boolean DEFAULT_ALLOW_READ_FOLLOWER =
true;
ScanVertexResultIterator scanVertex(String spaceName, String tagName, List< String > returnCols)
scan vertex of all parts with specific return cols, if returnCols is an empty list,...
StorageClient(List< HostAddress > addresses, int timeout)
Get a Nebula Storage client that executes the scan query to get NebulaGraph's data with multi servers...
ScanEdgeResultIterator scanEdge(String spaceName, int part, String edgeName, List< String > returnCols)
scan edge of specific part with return cols.
ScanVertexResultIterator scanVertex(String spaceName, int part, String tagName, List< String > returnCols, int limit)
scan vertex of specific part with specific return cols and limit.
ScanVertexResultIterator scanVertex(String spaceName, String tagName)
scan vertex of all parts with no return cols.
ScanEdgeResultIterator scanEdge(String spaceName, int part, String edgeName, int limit, long startTime, long endTime)
scan edge of specific part with no return cols and limit, start time, end time config.
ScanEdgeResultIterator scanEdge(String spaceName, int part, String edgeName, List< String > returnCols, int limit)
scan edge of specific part with return cols.
ScanVertexResultIterator scanVertex(String spaceName, int part, String tagName, List< String > returnCols)
scan vertex of specific part with specific return cols, if returnCols is an empty list,...
GraphStorageConnection getConnection()
return client's connection session
ScanEdgeResultIterator scanEdge(String spaceName, String edgeName, List< String > returnCols, int limit, long startTime, long endTime, boolean allowPartSuccess, boolean allowReadFromFollower)
scan edge of all parts with return cols and limit, start time, end time, if allow partial success,...
ScanVertexResultIterator scanVertex(String spaceName, int part, String tagName, List< String > returnCols, int limit, long startTime, long endTime)
scan vertex of specific part with specific returnCols, limit, startTime and endTime.
void close()
release storage client
ScanEdgeResultIterator scanEdge(String spaceName, String edgeName)
scan edge of all parts with no return cols.
ScanVertexResultIterator scanVertex(String spaceName, String tagName, List< String > returnCols, int limit, long startTime, long endTime)
scan vertex of all parts with specific returnCols, limit, startTime and endTime.
boolean connect()
Connect to Nebula Storage server.
ScanEdgeResultIterator scanEdge(String spaceName, String edgeName, List< String > returnCols)
scan edge of all parts with return cols.
StorageClient(List< HostAddress > addresses)
Get a Nebula Storage client that executes the scan query to get NebulaGraph's data with multi servers...
StorageClient(String ip, int port)
Get a Nebula Storage client that executes the scan query to get NebulaGraph's data with one server ho...
ScanEdgeResultIterator scanEdge(String spaceName, int part, String edgeName)
scan edge of specific part with no return cols.
ScanVertexResultIterator scanVertex(String spaceName, int part, String tagName)
scan vertex of specific part with no return cols.
ScanVertexResultIterator scanVertex(String spaceName, String tagName, int limit)
scan vertex of all parts with no return cols and limit.
ScanEdgeResultIterator scanEdge(String spaceName, int part, String edgeName, int limit)
scan edge of specific part with no return cols and limit config.
ScanVertexResultIterator scanVertex(String spaceName, String tagName, int limit, long startTime, long endTime, boolean allowPartSuccess, boolean allowReadFromFollower)
scan vertex of all parts with no return cols, limit, startTime, endTime, whether allow partial succes...
ScanVertexResultIterator scanVertex(String spaceName, String tagName, List< String > returnCols, int limit)
scan vertex of all parts with specific return cols and limit.
ScanEdgeResultIterator scanEdge(String spaceName, String edgeName, int limit, long startTime, long endTime)
scan edge of all parts with no return cols and limit, start time, end time config.
ScanEdgeResultIterator scanEdge(String spaceName, String edgeName, List< String > returnCols, int limit, long startTime, long endTime)
scan edge of all parts with return cols and limit, start time, end time config.
ScanVertexResultIterator scanVertex(String spaceName, String tagName, int limit, long startTime, long endTime)
scan vertex of all parts with no returnCols, limit, startTime and endTime.
ScanEdgeResultIterator scanEdge(String spaceName, int part, String edgeName, List< String > returnCols, int limit, long startTime, long endTime)
scan edge of specific part with return cols and limit, start time, end time config.
ScanVertexResultIterator scanVertex(String spaceName, int part, String tagName, int limit, long startTime, long endTime)
scan vertex of specific part with no returnCols, limit, startTime and endTime.
ScanEdgeResultIterator scanEdge(String spaceName, int part, String edgeName, List< String > returnCols, int limit, long startTime, long endTime, boolean allowPartSuccess, boolean allowReadFromFollower)
scan edge of specific part with return cols and limit, start time, end time, if allow partial success...
ScanEdgeResultIterator scanEdge(String spaceName, String edgeName, List< String > returnCols, int limit)
scan edge of all parts with return cols and limit config.
ScanVertexResultIterator scanVertex(String spaceName, int part, String tagName, int limit)
scan vertex of specific part with no return cols and limit.
ScanEdgeResultIterator scanEdge(String spaceName, int part, String edgeName, int limit, long startTime, long endTime, boolean allowPartSuccess, boolean allowReadFromFollower)
scan edge of specific part with no return cols and limit, start time, end time, if allow partial succ...
ScanVertexResultIterator scanVertex(String spaceName, int part, String tagName, int limit, long startTime, long endTime, boolean allowPartSuccess, boolean allowReadFromFollower)
scan vertex of specific part with no return cols, limit, startTime, endTime, whether allow partial su...
ScanEdgeResultIterator scanEdge(String spaceName, String edgeName, int limit, long startTime, long endTime, boolean allowPartSuccess, boolean allowReadFromFollower)
scan edge of all parts with no return cols and limit, start time, end time, if allow partial success,...
ScanVertexResultIterator scanVertex(String spaceName, String tagName, List< String > returnCols, int limit, long startTime, long endTime, boolean allowPartSuccess, boolean allowReadFromFollower)
scan vertex of all parts with specific return cols, limit, startTime, endTime, whether allow partial ...
ScanVertexResultIterator scanVertex(String spaceName, int part, String tagName, List< String > returnCols, int limit, long startTime, long endTime, boolean allowPartSuccess, boolean allowReadFromFollower)
scan vertex of specific part with specific return cols, limit, startTime, endTime, whether allow part...
StorageClient(List< HostAddress > addresses, int timeout, int connectionRetry, int executionRetry, boolean enableSSL, SSLParam sslParam)
Get a Nebula Storage client that executes the scan query to get NebulaGraph's data with multi servers...
ScanEdgeResultIterator scanEdge(String spaceName, String edgeName, int limit)
scan edge of all parts with no return cols and limit config.
ScanVertexResult's iterator.