6 package com.vesoft.nebula.client.graph.net;
8 import static com.vesoft.nebula.util.ReflectUtil.isCurrentTypeOrParentType;
10 import com.vesoft.nebula.DataSet;
11 import com.vesoft.nebula.DateTime;
12 import com.vesoft.nebula.Duration;
13 import com.vesoft.nebula.Edge;
14 import com.vesoft.nebula.Geography;
15 import com.vesoft.nebula.NList;
16 import com.vesoft.nebula.NMap;
17 import com.vesoft.nebula.NSet;
18 import com.vesoft.nebula.NullType;
19 import com.vesoft.nebula.Path;
20 import com.vesoft.nebula.Time;
21 import com.vesoft.nebula.Value;
22 import com.vesoft.nebula.Vertex;
23 import com.vesoft.nebula.client.graph.data.HostAddress;
24 import com.vesoft.nebula.client.graph.data.ResultSet;
25 import com.vesoft.nebula.client.graph.exception.IOErrorException;
26 import com.vesoft.nebula.graph.ExecutionResponse;
27 import com.vesoft.nebula.util.ReflectUtil;
28 import java.io.Serializable;
29 import java.lang.reflect.Field;
30 import java.util.ArrayList;
31 import java.util.Calendar;
32 import java.util.Collection;
33 import java.util.Collections;
34 import java.util.HashMap;
35 import java.util.LinkedHashMap;
36 import java.util.List;
38 import java.util.concurrent.atomic.AtomicBoolean;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
53 public class Session implements Serializable, AutoCloseable {
55 private static final long serialVersionUID = -8855886967097862376L;
57 private final long sessionID;
58 private final int timezoneOffset;
61 private final Boolean retryConnect;
62 private final AtomicBoolean connectionIsBroken =
new AtomicBoolean(
false);
63 private final Logger log = LoggerFactory.getLogger(getClass());
76 Boolean retryConnect) {
77 this.connection = connection;
78 this.sessionID = authResult.getSessionId();
79 this.timezoneOffset = authResult.getTimezoneOffset();
81 this.retryConnect = retryConnect;
94 (Map<String, Object>) Collections.EMPTY_MAP);
107 Map<String, Object> parameterMap)
109 if (connection ==
null) {
111 "The session was released, couldn't use again.");
113 Map<byte[], Value> map =
new HashMap<>();
114 parameterMap.forEach((key, value) -> map.put(key.getBytes(),
value2Nvalue(value)));
116 if (connectionIsBroken.get() && retryConnect) {
117 if (retryConnect()) {
118 ExecutionResponse resp =
119 connection.executeWithParameter(sessionID, stmt, map);
120 return new ResultSet(resp, timezoneOffset);
123 "All servers are broken.");
128 ExecutionResponse resp = connection.executeWithParameter(sessionID, stmt, map);
129 return new ResultSet(resp, timezoneOffset);
132 connectionIsBroken.set(
true);
136 if (retryConnect()) {
137 connectionIsBroken.set(
false);
138 ExecutionResponse resp =
139 connection.executeWithParameter(sessionID, stmt, map);
140 return new ResultSet(resp, timezoneOffset);
142 connectionIsBroken.set(
true);
144 "All servers are broken.");
214 (Map<String, Object>) Collections.EMPTY_MAP);
277 Map<String, Object> parameterMap)
throws
279 if (connection ==
null) {
281 "The session was released, couldn't use again.");
284 Map<byte[], Value> map =
new HashMap<>();
285 parameterMap.entrySet().stream()
286 .forEach(x -> map.put(x.getKey().getBytes(),
value2Nvalue(x.getValue())));
287 if (connectionIsBroken.get() && retryConnect) {
288 if (retryConnect()) {
289 return connection.executeJsonWithParameter(sessionID, stmt, map);
292 "All servers are broken.");
297 return connection.executeJsonWithParameter(sessionID, stmt, map);
300 connectionIsBroken.set(
true);
304 if (retryConnect()) {
305 connectionIsBroken.set(
false);
306 return connection.executeJsonWithParameter(sessionID, stmt, map);
308 connectionIsBroken.set(
true);
310 "All servers are broken.");
323 public synchronized boolean ping() {
324 if (connection ==
null) {
327 return connection.ping();
334 if (connection ==
null) {
337 return connection.ping(sessionID);
347 if (connection ==
null) {
351 connection.signout(sessionID);
353 }
catch (Exception e) {
354 log.warn(
"Release session or return object to pool failed:" + e.getMessage());
365 if (connection ==
null) {
368 return connection.getServerAddress();
384 private boolean retryConnect() {
388 if (newConn ==
null) {
389 log.error(
"Get connection object failed.");
392 connection = newConn;
394 }
catch (Exception e) {
395 log.error(
"Reconnected failed: " + e);
406 private static NList list2Nlist(List<Object> list)
throws UnsupportedOperationException {
407 NList nlist =
new NList(
new ArrayList<Value>());
408 for (Object item : list) {
420 private static NMap map2Nmap(Map<String, Object> map)
throws UnsupportedOperationException {
421 NMap nmap =
new NMap(
new HashMap<
byte[], Value>());
422 for (Map.Entry<String, Object> entry : map.entrySet()) {
423 nmap.kvs.put(entry.getKey().getBytes(),
value2Nvalue(entry.getValue()));
434 public static Value
value2Nvalue(Object value)
throws UnsupportedOperationException {
437 Value nullValue =
new Value();
438 nullValue.setNVal(NullType.__NULL__);
441 Class<?> type = value.getClass();
443 if (setter !=
null) {
444 return setter.set(value);
447 if (isCurrentTypeOrParentType(type, parentType)) {
451 }
catch (Exception e) {
452 throw new UnsupportedOperationException(e);
454 throw new UnsupportedOperationException(
455 "Only support convert boolean/float/int/string/map/list/date/pojo to nebula.Value but was"
456 + value.getClass().getTypeName());
460 public synchronized void close() {
468 new HashMap<Class<?>, Setter>() {{
469 put(Value.class, (Setter<Value>) (param) -> param);
470 put(Boolean.class, (Setter<Boolean>) Value::bVal);
471 put(Integer.class, (Setter<Integer>) Value::iVal);
472 put(Short.class, (Setter<Short>) Value::iVal);
473 put(Byte.class, (Setter<Short>) Value::iVal);
474 put(Long.class, (Setter<Long>) Value::iVal);
475 put(Float.class, (Setter<Float>) Value::fVal);
476 put(Double.class, (Setter<Double>) Value::fVal);
477 put(
byte[].
class, (Setter<
byte[]>) Value::sVal);
478 put(Byte[].
class, (Setter<
byte[]>) Value::sVal);
479 put(String.class, (Setter<String>) (param) -> Value.sVal(param.getBytes()));
480 put(com.vesoft.nebula.Date.class, (Setter<com.vesoft.nebula.Date>) Value::dVal);
481 put(Time.class, (Setter<Time>) Value::tVal);
482 put(DateTime.class, (Setter<DateTime>) Value::dtVal);
483 put(Vertex.class, (Setter<Vertex>) Value::vVal);
484 put(Edge.class, (Setter<Edge>) Value::eVal);
485 put(Path.class, (Setter<Path>) Value::pVal);
486 put(NList.class, (Setter<NList>) Value::lVal);
487 put(NMap.class, (Setter<NMap>) Value::mVal);
488 put(NSet.class, (Setter<NSet>) Value::uVal);
489 put(DataSet.class, (Setter<DataSet>) Value::gVal);
490 put(Geography.class, (Setter<Geography>) Value::ggVal);
491 put(Duration.class, (Setter<Duration>) Value::duVal);
499 new LinkedHashMap<Class<?>, Setter>() {{
500 put(Collection.class, (Setter<Collection>) (collection) -> {
501 Value value = new Value();
502 List<Object> list = new ArrayList<>();
503 collection.forEach(el -> list.add(value2Nvalue(el)));
504 value.setLVal(list2Nlist(list));
508 put(Map.class, (Setter<Map<String, Object>>) (map) -> {
509 Value value = new Value();
510 Map<String, Object> valueMap = new HashMap<>();
511 map.forEach((k, v) -> {
512 valueMap.put(k, value2Nvalue(v));
514 value.setMVal(map2Nmap(valueMap));
518 put(java.util.Date.class, (Setter<java.util.Date>) (date) -> {
519 Calendar calendar = new Calendar.Builder().setInstant(date).build();
520 return Value.dtVal(new DateTime(
521 new Short(String.valueOf(calendar.get(Calendar.YEAR))),
522 new Byte(String.valueOf(calendar.get(Calendar.MONTH))),
523 new Byte(String.valueOf(calendar.get(Calendar.DATE))),
524 new Byte(String.valueOf(calendar.get(Calendar.HOUR))),
525 new Byte(String.valueOf(calendar.get(Calendar.MINUTE))),
526 new Byte(String.valueOf(calendar.get(Calendar.SECOND))),
527 new Short(String.valueOf(calendar.get(Calendar.MILLISECOND)))
531 put(Object.class, (Setter<Object>) (obj) -> {
532 Value value = new Value();
533 Map<String, Object> pojoFields = new HashMap<>();
534 Class<?> paramType = obj.getClass();
535 Field[] declaredFields = paramType.getDeclaredFields();
536 for (Field declaredField : declaredFields) {
537 pojoFields.put(declaredField.getName(),
538 value2Nvalue(ReflectUtil.getValue(obj, declaredField)));
540 value.setMVal(map2Nmap(pojoFields));
545 interface Setter<T> {
void updateServerStatus()
Update the services' status when the connection is broken, it is called by Session and NebulaPool.
void setInvalidateConnection(SyncConnection connection)
Set the connection is invalidate, and the object pool will destroy it.
void returnConnection(SyncConnection connection)
Return the connection to object pool.
The Session is an object that operates with nebula-graph.
static Map< Class<?>, Setter > COMPLEX_TYPE_AND_SETTER
some value setter for java type (complex java type include collections or date) that need convert to ...
synchronized HostAddress getGraphHost()
Gets the service address of the current connection.
synchronized String executeJsonWithParameter(String stmt, Map< String, Object > parameterMap)
Execute the nGql sentence.
synchronized boolean pingSession()
check current session is ok
synchronized String executeJson(String stmt)
Execute the nGql sentence.
static Map< Class<?>, Setter > LEAF_TYPE_AND_SETTER
some value setter for java type (basic or nebula special type) that need convert to NValue
synchronized ResultSet execute(String stmt)
Execute the nGql sentence.
synchronized void release()
Notifies the server that the session is no longer needed and returns the connection to the pool,...
long getSessionID()
get SessionID
synchronized boolean ping()
Check current connection is ok.
Session(SyncConnection connection, AuthResult authResult, NebulaPool connPool, Boolean retryConnect)
Constructor.
static Value value2Nvalue(Object value)
convert java value type to nebula thrift value type
synchronized ResultSet executeWithParameter(String stmt, Map< String, Object > parameterMap)
Execute the nGql sentence.