6 package com.vesoft.nebula.client.graph.net;
8 import static com.vesoft.nebula.util.ReflectUtil.isCurrentTypeOrParentType;
10 import com.vesoft.nebula.DataSet;
11 import com.vesoft.nebula.DateTime;
12 import com.vesoft.nebula.Duration;
13 import com.vesoft.nebula.Edge;
14 import com.vesoft.nebula.Geography;
15 import com.vesoft.nebula.NList;
16 import com.vesoft.nebula.NMap;
17 import com.vesoft.nebula.NSet;
18 import com.vesoft.nebula.NullType;
19 import com.vesoft.nebula.Path;
20 import com.vesoft.nebula.Time;
21 import com.vesoft.nebula.Value;
22 import com.vesoft.nebula.Vertex;
23 import com.vesoft.nebula.client.graph.data.HostAddress;
24 import com.vesoft.nebula.client.graph.data.ResultSet;
25 import com.vesoft.nebula.client.graph.exception.IOErrorException;
26 import com.vesoft.nebula.graph.ExecutionResponse;
27 import com.vesoft.nebula.util.ReflectUtil;
28 import java.io.Serializable;
29 import java.lang.reflect.Field;
30 import java.util.ArrayList;
31 import java.util.Calendar;
32 import java.util.Collection;
33 import java.util.Collections;
34 import java.util.HashMap;
35 import java.util.LinkedHashMap;
36 import java.util.List;
38 import java.util.concurrent.atomic.AtomicBoolean;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
53 public class Session implements Serializable, AutoCloseable {
55 private static final long serialVersionUID = -8855886967097862376L;
57 private final long sessionID;
58 private final int timezoneOffset;
61 private final Boolean retryConnect;
62 private final AtomicBoolean connectionIsBroken =
new AtomicBoolean(
false);
63 private final Logger log = LoggerFactory.getLogger(getClass());
76 Boolean retryConnect) {
77 this.connection = connection;
78 this.sessionID = authResult.getSessionId();
79 this.timezoneOffset = authResult.getTimezoneOffset();
81 this.retryConnect = retryConnect;
94 (Map<String, Object>) Collections.EMPTY_MAP);
107 Map<String, Object> parameterMap)
109 if (connection ==
null) {
111 "The session was released, couldn't use again.");
113 Map<byte[], Value> map =
new HashMap<>();
114 parameterMap.forEach((key, value) -> map.put(key.getBytes(),
value2Nvalue(value)));
116 if (connectionIsBroken.get() && retryConnect) {
117 if (retryConnect()) {
118 ExecutionResponse resp =
119 connection.executeWithParameter(sessionID, stmt, map);
120 return new ResultSet(resp, timezoneOffset);
123 "All servers are broken.");
128 ExecutionResponse resp = connection.executeWithParameter(sessionID, stmt, map);
129 return new ResultSet(resp, timezoneOffset);
132 connectionIsBroken.set(
true);
136 if (retryConnect()) {
137 connectionIsBroken.set(
false);
138 ExecutionResponse resp =
139 connection.executeWithParameter(sessionID, stmt, map);
140 return new ResultSet(resp, timezoneOffset);
142 connectionIsBroken.set(
true);
144 "All servers are broken.");
154 return executeWithParameterTimeout(stmt,
155 (Map<String, Object>) Collections.EMPTY_MAP,
159 public ResultSet executeWithParameterTimeout(String stmt,
160 Map<String, Object> parameterMap,
162 if (connection ==
null) {
164 "The session was released, couldn't use again.");
166 if (timeoutMs <= 0) {
167 throw new IllegalArgumentException(
"timeout should be a positive number");
169 Map<byte[], Value> map =
new HashMap<>();
170 parameterMap.forEach((key, value) -> map.put(key.getBytes(),
value2Nvalue(value)));
172 if (connectionIsBroken.get() && retryConnect) {
173 if (retryConnect()) {
174 ExecutionResponse resp =
175 connection.executeWithParameterTimeout(sessionID, stmt, map, timeoutMs);
176 return new ResultSet(resp, timezoneOffset);
178 throw new IOErrorException(IOErrorException.E_ALL_BROKEN,
179 "All servers are broken.");
184 ExecutionResponse resp = connection.executeWithParameterTimeout(sessionID,
188 return new ResultSet(resp, timezoneOffset);
189 }
catch (IOErrorException ie) {
190 if (ie.getType() == IOErrorException.E_CONNECT_BROKEN) {
191 connectionIsBroken.set(
true);
195 if (retryConnect()) {
196 connectionIsBroken.set(
false);
197 ExecutionResponse resp =
198 connection.executeWithParameterTimeout(sessionID,
202 return new ResultSet(resp, timezoneOffset);
204 connectionIsBroken.set(
true);
205 throw new IOErrorException(IOErrorException.E_ALL_BROKEN,
206 "All servers are broken.");
276 (Map<String, Object>) Collections.EMPTY_MAP);
339 Map<String, Object> parameterMap)
throws
341 if (connection ==
null) {
343 "The session was released, couldn't use again.");
346 Map<byte[], Value> map =
new HashMap<>();
347 parameterMap.entrySet().stream()
348 .forEach(x -> map.put(x.getKey().getBytes(),
value2Nvalue(x.getValue())));
349 if (connectionIsBroken.get() && retryConnect) {
350 if (retryConnect()) {
351 return connection.executeJsonWithParameter(sessionID, stmt, map);
354 "All servers are broken.");
359 return connection.executeJsonWithParameter(sessionID, stmt, map);
362 connectionIsBroken.set(
true);
366 if (retryConnect()) {
367 connectionIsBroken.set(
false);
368 return connection.executeJsonWithParameter(sessionID, stmt, map);
370 connectionIsBroken.set(
true);
372 "All servers are broken.");
385 public synchronized boolean ping() {
386 if (connection ==
null) {
389 return connection.ping();
396 if (connection ==
null) {
399 return connection.ping(sessionID);
409 if (connection ==
null) {
413 connection.signout(sessionID);
415 }
catch (Exception e) {
416 log.warn(
"Release session or return object to pool failed:" + e.getMessage());
427 if (connection ==
null) {
430 return connection.getServerAddress();
446 private boolean retryConnect() {
450 if (newConn ==
null) {
451 log.error(
"Get connection object failed.");
454 connection = newConn;
456 }
catch (Exception e) {
457 log.error(
"Reconnected failed: " + e);
468 private static NList list2Nlist(List<Object> list)
throws UnsupportedOperationException {
469 NList nlist =
new NList(
new ArrayList<Value>());
470 for (Object item : list) {
482 private static NMap map2Nmap(Map<String, Object> map)
throws UnsupportedOperationException {
483 NMap nmap =
new NMap(
new HashMap<
byte[], Value>());
484 for (Map.Entry<String, Object> entry : map.entrySet()) {
485 nmap.kvs.put(entry.getKey().getBytes(),
value2Nvalue(entry.getValue()));
496 public static Value
value2Nvalue(Object value)
throws UnsupportedOperationException {
499 Value nullValue =
new Value();
500 nullValue.setNVal(NullType.__NULL__);
503 Class<?> type = value.getClass();
505 if (setter !=
null) {
506 return setter.set(value);
509 if (isCurrentTypeOrParentType(type, parentType)) {
513 }
catch (Exception e) {
514 throw new UnsupportedOperationException(e);
516 throw new UnsupportedOperationException(
517 "Only support convert boolean/float/int/string/map/list/date/pojo to nebula.Value but was"
518 + value.getClass().getTypeName());
522 public synchronized void close() {
530 new HashMap<Class<?>, Setter>() {{
531 put(Value.class, (Setter<Value>) (param) -> param);
532 put(Boolean.class, (Setter<Boolean>) Value::bVal);
533 put(Integer.class, (Setter<Integer>) Value::iVal);
534 put(Short.class, (Setter<Short>) Value::iVal);
535 put(Byte.class, (Setter<Short>) Value::iVal);
536 put(Long.class, (Setter<Long>) Value::iVal);
537 put(Float.class, (Setter<Float>) Value::fVal);
538 put(Double.class, (Setter<Double>) Value::fVal);
539 put(
byte[].
class, (Setter<
byte[]>) Value::sVal);
540 put(Byte[].
class, (Setter<
byte[]>) Value::sVal);
541 put(String.class, (Setter<String>) (param) -> Value.sVal(param.getBytes()));
542 put(com.vesoft.nebula.Date.class, (Setter<com.vesoft.nebula.Date>) Value::dVal);
543 put(Time.class, (Setter<Time>) Value::tVal);
544 put(DateTime.class, (Setter<DateTime>) Value::dtVal);
545 put(Vertex.class, (Setter<Vertex>) Value::vVal);
546 put(Edge.class, (Setter<Edge>) Value::eVal);
547 put(Path.class, (Setter<Path>) Value::pVal);
548 put(NList.class, (Setter<NList>) Value::lVal);
549 put(NMap.class, (Setter<NMap>) Value::mVal);
550 put(NSet.class, (Setter<NSet>) Value::uVal);
551 put(DataSet.class, (Setter<DataSet>) Value::gVal);
552 put(Geography.class, (Setter<Geography>) Value::ggVal);
553 put(Duration.class, (Setter<Duration>) Value::duVal);
561 new LinkedHashMap<Class<?>, Setter>() {{
562 put(Collection.class, (Setter<Collection>) (collection) -> {
563 Value value = new Value();
564 List<Object> list = new ArrayList<>();
565 collection.forEach(el -> list.add(value2Nvalue(el)));
566 value.setLVal(list2Nlist(list));
570 put(Map.class, (Setter<Map<String, Object>>) (map) -> {
571 Value value = new Value();
572 Map<String, Object> valueMap = new HashMap<>();
573 map.forEach((k, v) -> {
574 valueMap.put(k, value2Nvalue(v));
576 value.setMVal(map2Nmap(valueMap));
580 put(java.util.Date.class, (Setter<java.util.Date>) (date) -> {
581 Calendar calendar = new Calendar.Builder().setInstant(date).build();
582 return Value.dtVal(new DateTime(
583 new Short(String.valueOf(calendar.get(Calendar.YEAR))),
584 new Byte(String.valueOf(calendar.get(Calendar.MONTH))),
585 new Byte(String.valueOf(calendar.get(Calendar.DATE))),
586 new Byte(String.valueOf(calendar.get(Calendar.HOUR))),
587 new Byte(String.valueOf(calendar.get(Calendar.MINUTE))),
588 new Byte(String.valueOf(calendar.get(Calendar.SECOND))),
589 new Short(String.valueOf(calendar.get(Calendar.MILLISECOND)))
593 put(Object.class, (Setter<Object>) (obj) -> {
594 Value value = new Value();
595 Map<String, Object> pojoFields = new HashMap<>();
596 Class<?> paramType = obj.getClass();
597 Field[] declaredFields = paramType.getDeclaredFields();
598 for (Field declaredField : declaredFields) {
599 pojoFields.put(declaredField.getName(),
600 value2Nvalue(ReflectUtil.getValue(obj, declaredField)));
602 value.setMVal(map2Nmap(pojoFields));
607 interface Setter<T> {
void updateServerStatus()
Update the services' status when the connection is broken, it is called by Session and NebulaPool.
void setInvalidateConnection(SyncConnection connection)
Set the connection is invalidate, and the object pool will destroy it.
void returnConnection(SyncConnection connection)
Return the connection to object pool.
The Session is an object that operates with nebula-graph.
static Map< Class<?>, Setter > COMPLEX_TYPE_AND_SETTER
some value setter for java type (complex java type include collections or date) that need convert to ...
synchronized HostAddress getGraphHost()
Gets the service address of the current connection.
synchronized String executeJsonWithParameter(String stmt, Map< String, Object > parameterMap)
Execute the nGql sentence.
synchronized boolean pingSession()
check current session is ok
synchronized String executeJson(String stmt)
Execute the nGql sentence.
static Map< Class<?>, Setter > LEAF_TYPE_AND_SETTER
some value setter for java type (basic or nebula special type) that need convert to NValue
synchronized ResultSet execute(String stmt)
Execute the nGql sentence.
synchronized void release()
Notifies the server that the session is no longer needed and returns the connection to the pool,...
long getSessionID()
get SessionID
synchronized boolean ping()
Check current connection is ok.
Session(SyncConnection connection, AuthResult authResult, NebulaPool connPool, Boolean retryConnect)
Constructor.
static Value value2Nvalue(Object value)
convert java value type to nebula thrift value type
synchronized ResultSet executeWithParameter(String stmt, Map< String, Object > parameterMap)
Execute the nGql sentence.