6 package com.vesoft.nebula.client.storage;
8 import com.vesoft.nebula.HostAddr;
9 import com.vesoft.nebula.client.graph.data.HostAddress;
10 import com.vesoft.nebula.client.graph.data.SSLParam;
11 import com.vesoft.nebula.client.meta.MetaManager;
12 import com.vesoft.nebula.client.storage.scan.PartScanInfo;
13 import com.vesoft.nebula.client.storage.scan.ScanEdgeResultIterator;
14 import com.vesoft.nebula.client.storage.scan.ScanVertexResultIterator;
15 import com.vesoft.nebula.meta.ColumnDef;
16 import com.vesoft.nebula.meta.Schema;
17 import com.vesoft.nebula.storage.EdgeProp;
18 import com.vesoft.nebula.storage.ScanEdgeRequest;
19 import com.vesoft.nebula.storage.ScanVertexRequest;
20 import com.vesoft.nebula.storage.VertexProp;
21 import java.io.Serializable;
22 import java.util.ArrayList;
23 import java.util.Arrays;
24 import java.util.HashSet;
25 import java.util.List;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
31 private static final Logger LOGGER = LoggerFactory.getLogger(
StorageClient.class);
36 private final List<HostAddress> addresses;
37 private int timeout = 10000;
38 private int connectionRetry = 3;
39 private int executionRetry = 1;
41 private boolean enableSSL =
false;
63 this.addresses = addresses;
75 this.addresses = addresses;
76 this.timeout = timeout;
83 public StorageClient(List<HostAddress> addresses,
int timeout,
int connectionRetry,
84 int executionRetry,
boolean enableSSL,
SSLParam sslParam) {
85 this(addresses, timeout);
86 this.connectionRetry = connectionRetry;
87 this.executionRetry = executionRetry;
88 this.enableSSL = enableSSL;
89 this.sslParam = sslParam;
90 if (enableSSL && sslParam ==
null) {
91 throw new IllegalArgumentException(
"SSL is enabled, but SSLParam is nul.");
101 connection.open(addresses.get(0), timeout, enableSSL, sslParam);
103 config.setEnableSSL(enableSSL);
104 config.setSslParam(sslParam);
106 metaManager =
new MetaManager(addresses, timeout, connectionRetry, executionRetry,
107 enableSSL, sslParam);
124 List<String> returnCols) {
125 return scanVertex(spaceName, tagName, returnCols, DEFAULT_LIMIT);
141 List<String> returnCols) {
142 return scanVertex(spaceName, part, tagName, returnCols, DEFAULT_LIMIT);
155 return scanVertex(spaceName, tagName, DEFAULT_LIMIT);
169 return scanVertex(spaceName, part, tagName, DEFAULT_LIMIT);
185 List<String> returnCols,
187 return scanVertex(spaceName, tagName, returnCols, limit, DEFAULT_START_TIME,
206 List<String> returnCols,
208 return scanVertex(spaceName, part, tagName, returnCols, limit, DEFAULT_START_TIME,
225 return scanVertex(spaceName, tagName, limit, DEFAULT_START_TIME, DEFAULT_END_TIME);
244 limit, DEFAULT_START_TIME, DEFAULT_END_TIME);
264 List<String> returnCols,
268 return scanVertex(spaceName, tagName, returnCols, limit, startTime, endTime,
269 DEFAULT_ALLOW_PART_SUCCESS, DEFAULT_ALLOW_READ_FOLLOWER);
291 List<String> returnCols,
295 return scanVertex(spaceName, part, tagName, returnCols, limit, startTime, endTime,
296 DEFAULT_ALLOW_PART_SUCCESS, DEFAULT_ALLOW_READ_FOLLOWER);
318 return scanVertex(spaceName, tagName, limit, startTime, endTime,
319 DEFAULT_ALLOW_PART_SUCCESS, DEFAULT_ALLOW_READ_FOLLOWER);
349 DEFAULT_ALLOW_PART_SUCCESS,
350 DEFAULT_ALLOW_READ_FOLLOWER);
377 List<String> returnCols,
381 boolean allowPartSuccess,
382 boolean allowReadFromFollower) {
384 if (parts.isEmpty()) {
385 throw new IllegalArgumentException(
"No valid part in space " + spaceName);
397 allowReadFromFollower);
426 List<String> returnCols,
430 boolean allowPartSuccess,
431 boolean allowReadFromFollower) {
442 allowReadFromFollower);
471 boolean allowPartSuccess,
472 boolean allowReadFromFollower) {
474 if (parts.isEmpty()) {
475 throw new IllegalArgumentException(
"No valid part in space " + spaceName);
487 allowReadFromFollower);
516 boolean allowPartSuccess,
517 boolean allowReadFromFollower) {
528 allowReadFromFollower);
534 List<String> returnCols,
539 boolean allowPartSuccess,
540 boolean allowReadFromFollower) {
541 if (spaceName ==
null || spaceName.trim().isEmpty()) {
542 throw new IllegalArgumentException(
"space name is empty.");
544 if (tagName ==
null || tagName.trim().isEmpty()) {
545 throw new IllegalArgumentException(
"tag name is empty");
547 if (noColumns && returnCols ==
null) {
548 throw new IllegalArgumentException(
"returnCols is null");
551 Set<PartScanInfo> partScanInfoSet =
new HashSet<>();
552 for (
int part : parts) {
553 HostAddr leader = metaManager.
getLeader(spaceName, part);
554 partScanInfoSet.add(
new PartScanInfo(part,
new HostAddress(leader.getHost(),
557 List<HostAddress> addrs =
new ArrayList<>();
558 for (HostAddr addr : metaManager.
listHosts()) {
559 addrs.add(
new HostAddress(addr.getHost(), addr.getPort()));
562 long tag = metaManager.
getTag(spaceName, tagName).getTag_id();
563 List<byte[]> props =
new ArrayList<>();
564 props.add(
"_vid".getBytes());
566 if (returnCols.size() == 0) {
567 Schema schema = metaManager.
getTag(spaceName, tagName).getSchema();
568 for (ColumnDef columnDef : schema.getColumns()) {
569 props.add(columnDef.getName());
572 for (String prop : returnCols) {
573 props.add(prop.getBytes());
577 VertexProp vertexCols =
new VertexProp((
int) tag, props);
578 List<VertexProp> vertexProps = Arrays.asList(vertexCols);
579 ScanVertexRequest request =
new ScanVertexRequest();
581 .setSpace_id(getSpaceId(spaceName))
582 .setReturn_columns(vertexProps)
584 .setStart_time(startTime)
585 .setEnd_time(endTime)
586 .setEnable_read_from_follower(allowReadFromFollower);
588 return doScanVertex(spaceName, tagName, partScanInfoSet, request, addrs, allowPartSuccess);
603 private ScanVertexResultIterator doScanVertex(String spaceName,
605 Set<PartScanInfo> partScanInfoSet,
606 ScanVertexRequest request,
607 List<HostAddress> addrs,
608 boolean allowPartSuccess) {
609 if (addrs ==
null || addrs.isEmpty()) {
610 throw new IllegalArgumentException(
"storage hosts is empty.");
613 return new ScanVertexResultIterator.ScanVertexResultBuilder()
614 .withMetaClient(metaManager)
616 .withPartScanInfo(partScanInfoSet)
617 .withRequest(request)
618 .withAddresses(addrs)
619 .withSpaceName(spaceName)
620 .withTagName(tagName)
621 .withPartSuccess(allowPartSuccess)
637 List<String> returnCols) {
639 return scanEdge(spaceName, edgeName, returnCols, DEFAULT_LIMIT);
654 List<String> returnCols) {
656 return scanEdge(spaceName, part, edgeName, returnCols, DEFAULT_LIMIT);
669 return scanEdge(spaceName, edgeName, DEFAULT_LIMIT);
683 return scanEdge(spaceName, part, edgeName, DEFAULT_LIMIT);
699 List<String> returnCols,
int limit) {
700 return scanEdge(spaceName, edgeName, returnCols, limit, DEFAULT_START_TIME,
717 List<String> returnCols,
int limit) {
718 return scanEdge(spaceName, part, edgeName, returnCols, limit, DEFAULT_START_TIME,
734 return scanEdge(spaceName, edgeName, limit, DEFAULT_START_TIME, DEFAULT_END_TIME);
750 return scanEdge(spaceName, part, edgeName, limit, DEFAULT_START_TIME, DEFAULT_END_TIME);
767 List<String> returnCols,
771 return scanEdge(spaceName, edgeName, returnCols, limit, startTime, endTime,
772 DEFAULT_ALLOW_PART_SUCCESS, DEFAULT_ALLOW_READ_FOLLOWER);
795 List<String> returnCols,
799 return scanEdge(spaceName, part, edgeName, returnCols, limit, startTime, endTime,
800 DEFAULT_ALLOW_PART_SUCCESS, DEFAULT_ALLOW_READ_FOLLOWER);
823 return scanEdge(spaceName, edgeName, limit, startTime, endTime,
824 DEFAULT_ALLOW_PART_SUCCESS, DEFAULT_ALLOW_READ_FOLLOWER);
849 return scanEdge(spaceName, part, edgeName, limit, startTime, endTime,
850 DEFAULT_ALLOW_PART_SUCCESS, DEFAULT_ALLOW_READ_FOLLOWER);
874 List<String> returnCols,
878 boolean allowPartSuccess,
879 boolean allowReadFromFollower) {
882 if (parts.isEmpty()) {
883 throw new IllegalArgumentException(
"No valid part in space " + spaceName);
886 return scanEdge(spaceName, parts, edgeName, returnCols,
false,
887 limit, startTime, endTime, allowPartSuccess, allowReadFromFollower);
912 List<String> returnCols,
916 boolean allowPartSuccess,
917 boolean allowReadFromFollower) {
918 return scanEdge(spaceName, Arrays.asList(part), edgeName, returnCols,
false,
919 limit, startTime, endTime, allowPartSuccess, allowReadFromFollower);
946 boolean allowPartSuccess,
947 boolean allowReadFromFollower) {
950 if (parts.isEmpty()) {
951 throw new IllegalArgumentException(
"No valid part in space " + spaceName);
953 return scanEdge(spaceName, parts, edgeName,
new ArrayList<>(),
true,
954 limit, startTime, endTime, allowPartSuccess, allowReadFromFollower);
983 boolean allowPartSuccess,
984 boolean allowReadFromFollower) {
985 return scanEdge(spaceName, Arrays.asList(part), edgeName,
new ArrayList<>(),
true,
986 limit, startTime, endTime, allowPartSuccess, allowReadFromFollower);
993 List<String> returnCols,
998 boolean allowPartSuccess,
999 boolean allowReadFromFollower) {
1000 if (spaceName ==
null || spaceName.trim().isEmpty()) {
1001 throw new IllegalArgumentException(
"space name is empty.");
1003 if (edgeName ==
null || edgeName.trim().isEmpty()) {
1004 throw new IllegalArgumentException(
"edge name is empty");
1006 if (noColumns && returnCols ==
null) {
1007 throw new IllegalArgumentException(
"returnCols is null");
1010 Set<PartScanInfo> partScanInfoSet =
new HashSet<>();
1011 for (
int part : parts) {
1012 HostAddr leader = metaManager.
getLeader(spaceName, part);
1013 partScanInfoSet.add(
new PartScanInfo(part,
new HostAddress(leader.getHost(),
1014 leader.getPort())));
1016 List<HostAddress> addrs =
new ArrayList<>();
1017 for (HostAddr addr : metaManager.
listHosts()) {
1018 addrs.add(
new HostAddress(addr.getHost(), addr.getPort()));
1020 List<byte[]> props =
new ArrayList<>();
1021 props.add(
"_src".getBytes());
1022 props.add(
"_dst".getBytes());
1023 props.add(
"_rank".getBytes());
1025 if (returnCols.size() == 0) {
1026 Schema schema = metaManager.
getEdge(spaceName, edgeName).getSchema();
1027 for (ColumnDef columnDef : schema.getColumns()) {
1028 props.add(columnDef.name);
1031 for (String prop : returnCols) {
1032 props.add(prop.getBytes());
1037 long edgeId = getEdgeId(spaceName, edgeName);
1038 EdgeProp edgeCols =
new EdgeProp((
int) edgeId, props);
1039 List<EdgeProp> edgeProps = Arrays.asList(edgeCols);
1041 ScanEdgeRequest request =
new ScanEdgeRequest();
1043 .setSpace_id(getSpaceId(spaceName))
1044 .setReturn_columns(edgeProps)
1046 .setStart_time(startTime)
1047 .setEnd_time(endTime)
1048 .setEnable_read_from_follower(allowReadFromFollower);
1050 return doScanEdge(spaceName, edgeName, partScanInfoSet, request, addrs, allowPartSuccess);
1065 private ScanEdgeResultIterator doScanEdge(String spaceName,
1067 Set<PartScanInfo> partScanInfoSet,
1068 ScanEdgeRequest request,
1069 List<HostAddress> addrs,
1070 boolean allowPartSuccess) {
1071 if (addrs ==
null || addrs.isEmpty()) {
1072 throw new IllegalArgumentException(
"storage hosts is empty.");
1075 return new ScanEdgeResultIterator.ScanEdgeResultBuilder()
1076 .withMetaClient(metaManager)
1078 .withPartScanInfo(partScanInfoSet)
1079 .withRequest(request)
1080 .withAddresses(addrs)
1081 .withSpaceName(spaceName)
1082 .withEdgeName(edgeName)
1083 .withPartSuccess(allowPartSuccess)
1095 if (connection !=
null) {
1098 if (metaManager !=
null) {
1099 metaManager.
close();
1110 return this.connection;
1120 private int getSpaceId(String spaceName) {
1131 private long getEdgeId(String spaceName, String edgeName) {
1132 return metaManager.
getEdge(spaceName, edgeName).getEdge_type();
1135 private static final int DEFAULT_LIMIT = 1000;
1136 private static final long DEFAULT_START_TIME = 0;
1137 private static final long DEFAULT_END_TIME = Long.MAX_VALUE;
1138 private static final boolean DEFAULT_ALLOW_PART_SUCCESS =
false;
1139 private static final boolean DEFAULT_ALLOW_READ_FOLLOWER =
true;
ScanVertexResultIterator scanVertex(String spaceName, String tagName, List< String > returnCols)
scan vertex of all parts with specific return cols, if returnCols is an empty list,...
StorageClient(List< HostAddress > addresses, int timeout)
Get a Nebula Storage client that executes the scan query to get NebulaGraph's data with multi servers...
ScanEdgeResultIterator scanEdge(String spaceName, int part, String edgeName, List< String > returnCols)
scan edge of specific part with return cols.
ScanVertexResultIterator scanVertex(String spaceName, int part, String tagName, List< String > returnCols, int limit)
scan vertex of specific part with specific return cols and limit.
ScanVertexResultIterator scanVertex(String spaceName, String tagName)
scan vertex of all parts with no return cols.
ScanEdgeResultIterator scanEdge(String spaceName, int part, String edgeName, int limit, long startTime, long endTime)
scan edge of specific part with no return cols and limit, start time, end time config.
ScanEdgeResultIterator scanEdge(String spaceName, int part, String edgeName, List< String > returnCols, int limit)
scan edge of specific part with return cols.
ScanVertexResultIterator scanVertex(String spaceName, int part, String tagName, List< String > returnCols)
scan vertex of specific part with specific return cols, if returnCols is an empty list,...
GraphStorageConnection getConnection()
return client's connection session
ScanEdgeResultIterator scanEdge(String spaceName, String edgeName, List< String > returnCols, int limit, long startTime, long endTime, boolean allowPartSuccess, boolean allowReadFromFollower)
scan edge of all parts with return cols and limit, start time, end time, if allow partial success,...
ScanVertexResultIterator scanVertex(String spaceName, int part, String tagName, List< String > returnCols, int limit, long startTime, long endTime)
scan vertex of specific part with specific returnCols, limit, startTime and endTime.
void close()
release storage client
ScanEdgeResultIterator scanEdge(String spaceName, String edgeName)
scan edge of all parts with no return cols.
ScanVertexResultIterator scanVertex(String spaceName, String tagName, List< String > returnCols, int limit, long startTime, long endTime)
scan vertex of all parts with specific returnCols, limit, startTime and endTime.
boolean connect()
Connect to Nebula Storage server.
ScanEdgeResultIterator scanEdge(String spaceName, String edgeName, List< String > returnCols)
scan edge of all parts with return cols.
StorageClient(List< HostAddress > addresses)
Get a Nebula Storage client that executes the scan query to get NebulaGraph's data with multi servers...
StorageClient(String ip, int port)
Get a Nebula Storage client that executes the scan query to get NebulaGraph's data with one server ho...
ScanEdgeResultIterator scanEdge(String spaceName, int part, String edgeName)
scan edge of specific part with no return cols.
ScanVertexResultIterator scanVertex(String spaceName, int part, String tagName)
scan vertex of specific part with no return cols.
ScanVertexResultIterator scanVertex(String spaceName, String tagName, int limit)
scan vertex of all parts with no return cols and limit.
ScanEdgeResultIterator scanEdge(String spaceName, int part, String edgeName, int limit)
scan edge of specific part with no return cols and limit config.
ScanVertexResultIterator scanVertex(String spaceName, String tagName, int limit, long startTime, long endTime, boolean allowPartSuccess, boolean allowReadFromFollower)
scan vertex of all parts with no return cols, limit, startTime, endTime, whether allow partial succes...
ScanVertexResultIterator scanVertex(String spaceName, String tagName, List< String > returnCols, int limit)
scan vertex of all parts with specific return cols and limit.
ScanEdgeResultIterator scanEdge(String spaceName, String edgeName, int limit, long startTime, long endTime)
scan edge of all parts with no return cols and limit, start time, end time config.
ScanEdgeResultIterator scanEdge(String spaceName, String edgeName, List< String > returnCols, int limit, long startTime, long endTime)
scan edge of all parts with return cols and limit, start time, end time config.
ScanVertexResultIterator scanVertex(String spaceName, String tagName, int limit, long startTime, long endTime)
scan vertex of all parts with no returnCols, limit, startTime and endTime.
ScanEdgeResultIterator scanEdge(String spaceName, int part, String edgeName, List< String > returnCols, int limit, long startTime, long endTime)
scan edge of specific part with return cols and limit, start time, end time config.
ScanVertexResultIterator scanVertex(String spaceName, int part, String tagName, int limit, long startTime, long endTime)
scan vertex of specific part with no returnCols, limit, startTime and endTime.
ScanEdgeResultIterator scanEdge(String spaceName, int part, String edgeName, List< String > returnCols, int limit, long startTime, long endTime, boolean allowPartSuccess, boolean allowReadFromFollower)
scan edge of specific part with return cols and limit, start time, end time, if allow partial success...
ScanEdgeResultIterator scanEdge(String spaceName, String edgeName, List< String > returnCols, int limit)
scan edge of all parts with return cols and limit config.
ScanVertexResultIterator scanVertex(String spaceName, int part, String tagName, int limit)
scan vertex of specific part with no return cols and limit.
ScanEdgeResultIterator scanEdge(String spaceName, int part, String edgeName, int limit, long startTime, long endTime, boolean allowPartSuccess, boolean allowReadFromFollower)
scan edge of specific part with no return cols and limit, start time, end time, if allow partial succ...
ScanVertexResultIterator scanVertex(String spaceName, int part, String tagName, int limit, long startTime, long endTime, boolean allowPartSuccess, boolean allowReadFromFollower)
scan vertex of specific part with no return cols, limit, startTime, endTime, whether allow partial su...
ScanEdgeResultIterator scanEdge(String spaceName, String edgeName, int limit, long startTime, long endTime, boolean allowPartSuccess, boolean allowReadFromFollower)
scan edge of all parts with no return cols and limit, start time, end time, if allow partial success,...
ScanVertexResultIterator scanVertex(String spaceName, String tagName, List< String > returnCols, int limit, long startTime, long endTime, boolean allowPartSuccess, boolean allowReadFromFollower)
scan vertex of all parts with specific return cols, limit, startTime, endTime, whether allow partial ...
ScanVertexResultIterator scanVertex(String spaceName, int part, String tagName, List< String > returnCols, int limit, long startTime, long endTime, boolean allowPartSuccess, boolean allowReadFromFollower)
scan vertex of specific part with specific return cols, limit, startTime, endTime, whether allow part...
StorageClient(List< HostAddress > addresses, int timeout, int connectionRetry, int executionRetry, boolean enableSSL, SSLParam sslParam)
Get a Nebula Storage client that executes the scan query to get NebulaGraph's data with multi servers...
ScanEdgeResultIterator scanEdge(String spaceName, String edgeName, int limit)
scan edge of all parts with no return cols and limit config.
ScanVertexResult's iterator.