NebulaGraph Java Client  release-3.8
All Classes Functions Variables
Session.java
1 /* Copyright (c) 2020 vesoft inc. All rights reserved.
2  *
3  * This source code is licensed under Apache 2.0 License.
4  */
5 
6 package com.vesoft.nebula.client.graph.net;
7 
8 import static com.vesoft.nebula.util.ReflectUtil.isCurrentTypeOrParentType;
9 
10 import com.vesoft.nebula.DataSet;
11 import com.vesoft.nebula.DateTime;
12 import com.vesoft.nebula.Duration;
13 import com.vesoft.nebula.Edge;
14 import com.vesoft.nebula.Geography;
15 import com.vesoft.nebula.NList;
16 import com.vesoft.nebula.NMap;
17 import com.vesoft.nebula.NSet;
18 import com.vesoft.nebula.NullType;
19 import com.vesoft.nebula.Path;
20 import com.vesoft.nebula.Time;
21 import com.vesoft.nebula.Value;
22 import com.vesoft.nebula.Vertex;
23 import com.vesoft.nebula.client.graph.data.HostAddress;
24 import com.vesoft.nebula.client.graph.data.ResultSet;
25 import com.vesoft.nebula.client.graph.exception.IOErrorException;
26 import com.vesoft.nebula.graph.ExecutionResponse;
27 import com.vesoft.nebula.util.ReflectUtil;
28 import java.io.Serializable;
29 import java.lang.reflect.Field;
30 import java.util.ArrayList;
31 import java.util.Calendar;
32 import java.util.Collection;
33 import java.util.Collections;
34 import java.util.HashMap;
35 import java.util.LinkedHashMap;
36 import java.util.List;
37 import java.util.Map;
38 import java.util.concurrent.atomic.AtomicBoolean;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
41 
53 public class Session implements Serializable, AutoCloseable {
54 
55  private static final long serialVersionUID = -8855886967097862376L;
56 
57  private final long sessionID;
58  private final int timezoneOffset;
59  private SyncConnection connection;
60  private final NebulaPool pool;
61  private final Boolean retryConnect;
62  private final AtomicBoolean connectionIsBroken = new AtomicBoolean(false);
63  private final Logger log = LoggerFactory.getLogger(getClass());
64 
73  public Session(SyncConnection connection,
74  AuthResult authResult,
75  NebulaPool connPool,
76  Boolean retryConnect) {
77  this.connection = connection;
78  this.sessionID = authResult.getSessionId();
79  this.timezoneOffset = authResult.getTimezoneOffset();
80  this.pool = connPool;
81  this.retryConnect = retryConnect;
82  }
83 
91  public synchronized ResultSet execute(String stmt) throws
93  return executeWithParameter(stmt,
94  (Map<String, Object>) Collections.EMPTY_MAP);
95  }
96 
105  public synchronized ResultSet executeWithParameter(
106  String stmt,
107  Map<String, Object> parameterMap)
108  throws IOErrorException {
109  if (connection == null) {
110  throw new IOErrorException(IOErrorException.E_CONNECT_BROKEN,
111  "The session was released, couldn't use again.");
112  }
113  Map<byte[], Value> map = new HashMap<>();
114  parameterMap.forEach((key, value) -> map.put(key.getBytes(), value2Nvalue(value)));
115 
116  if (connectionIsBroken.get() && retryConnect) {
117  if (retryConnect()) {
118  ExecutionResponse resp =
119  connection.executeWithParameter(sessionID, stmt, map);
120  return new ResultSet(resp, timezoneOffset);
121  } else {
122  throw new IOErrorException(IOErrorException.E_ALL_BROKEN,
123  "All servers are broken.");
124  }
125  }
126 
127  try {
128  ExecutionResponse resp = connection.executeWithParameter(sessionID, stmt, map);
129  return new ResultSet(resp, timezoneOffset);
130  } catch (IOErrorException ie) {
131  if (ie.getType() == IOErrorException.E_CONNECT_BROKEN) {
132  connectionIsBroken.set(true);
133  pool.updateServerStatus();
134 
135  if (retryConnect) {
136  if (retryConnect()) {
137  connectionIsBroken.set(false);
138  ExecutionResponse resp =
139  connection.executeWithParameter(sessionID, stmt, map);
140  return new ResultSet(resp, timezoneOffset);
141  } else {
142  connectionIsBroken.set(true);
143  throw new IOErrorException(IOErrorException.E_ALL_BROKEN,
144  "All servers are broken.");
145  }
146  }
147  }
148  throw ie;
149  }
150  }
151 
152 
153  public ResultSet executeWithTimeout(String stmt, long timeoutMs) throws IOErrorException {
154  return executeWithParameterTimeout(stmt,
155  (Map<String, Object>) Collections.EMPTY_MAP,
156  timeoutMs);
157  }
158 
159  public ResultSet executeWithParameterTimeout(String stmt,
160  Map<String, Object> parameterMap,
161  long timeoutMs) throws IOErrorException {
162  if (connection == null) {
163  throw new IOErrorException(IOErrorException.E_CONNECT_BROKEN,
164  "The session was released, couldn't use again.");
165  }
166  if (timeoutMs <= 0) {
167  throw new IllegalArgumentException("timeout should be a positive number");
168  }
169  Map<byte[], Value> map = new HashMap<>();
170  parameterMap.forEach((key, value) -> map.put(key.getBytes(), value2Nvalue(value)));
171 
172  if (connectionIsBroken.get() && retryConnect) {
173  if (retryConnect()) {
174  ExecutionResponse resp =
175  connection.executeWithParameterTimeout(sessionID, stmt, map, timeoutMs);
176  return new ResultSet(resp, timezoneOffset);
177  } else {
178  throw new IOErrorException(IOErrorException.E_ALL_BROKEN,
179  "All servers are broken.");
180  }
181  }
182 
183  try {
184  ExecutionResponse resp = connection.executeWithParameterTimeout(sessionID,
185  stmt,
186  map,
187  timeoutMs);
188  return new ResultSet(resp, timezoneOffset);
189  } catch (IOErrorException ie) {
190  if (ie.getType() == IOErrorException.E_CONNECT_BROKEN) {
191  connectionIsBroken.set(true);
192  pool.updateServerStatus();
193 
194  if (retryConnect) {
195  if (retryConnect()) {
196  connectionIsBroken.set(false);
197  ExecutionResponse resp =
198  connection.executeWithParameterTimeout(sessionID,
199  stmt,
200  map,
201  timeoutMs);
202  return new ResultSet(resp, timezoneOffset);
203  } else {
204  connectionIsBroken.set(true);
205  throw new IOErrorException(IOErrorException.E_ALL_BROKEN,
206  "All servers are broken.");
207  }
208  }
209  }
210  throw ie;
211  }
212  }
213 
273  public synchronized String executeJson(String stmt) throws
275  return executeJsonWithParameter(stmt,
276  (Map<String, Object>) Collections.EMPTY_MAP);
277  }
278 
338  public synchronized String executeJsonWithParameter(String stmt,
339  Map<String, Object> parameterMap) throws
341  if (connection == null) {
342  throw new IOErrorException(IOErrorException.E_CONNECT_BROKEN,
343  "The session was released, couldn't use again.");
344  }
345 
346  Map<byte[], Value> map = new HashMap<>();
347  parameterMap.entrySet().stream()
348  .forEach(x -> map.put(x.getKey().getBytes(), value2Nvalue(x.getValue())));
349  if (connectionIsBroken.get() && retryConnect) {
350  if (retryConnect()) {
351  return connection.executeJsonWithParameter(sessionID, stmt, map);
352  } else {
353  throw new IOErrorException(IOErrorException.E_ALL_BROKEN,
354  "All servers are broken.");
355  }
356  }
357 
358  try {
359  return connection.executeJsonWithParameter(sessionID, stmt, map);
360  } catch (IOErrorException ie) {
361  if (ie.getType() == IOErrorException.E_CONNECT_BROKEN) {
362  connectionIsBroken.set(true);
363  pool.updateServerStatus();
364 
365  if (retryConnect) {
366  if (retryConnect()) {
367  connectionIsBroken.set(false);
368  return connection.executeJsonWithParameter(sessionID, stmt, map);
369  } else {
370  connectionIsBroken.set(true);
371  throw new IOErrorException(IOErrorException.E_ALL_BROKEN,
372  "All servers are broken.");
373  }
374  }
375  }
376  throw ie;
377  }
378  }
379 
385  public synchronized boolean ping() {
386  if (connection == null) {
387  return false;
388  }
389  return connection.ping();
390  }
391 
395  public synchronized boolean pingSession() {
396  if (connection == null) {
397  return false;
398  }
399  return connection.ping(sessionID);
400  }
401 
408  public synchronized void release() {
409  if (connection == null) {
410  return;
411  }
412  try {
413  connection.signout(sessionID);
414  pool.returnConnection(connection);
415  } catch (Exception e) {
416  log.warn("Release session or return object to pool failed:" + e.getMessage());
417  }
418  connection = null;
419  }
420 
426  public synchronized HostAddress getGraphHost() {
427  if (connection == null) {
428  return null;
429  }
430  return connection.getServerAddress();
431  }
432 
436  public long getSessionID() {
437  return sessionID;
438  }
439 
446  private boolean retryConnect() {
447  try {
448  pool.setInvalidateConnection(connection);
449  SyncConnection newConn = pool.getConnection();
450  if (newConn == null) {
451  log.error("Get connection object failed.");
452  return false;
453  }
454  connection = newConn;
455  return true;
456  } catch (Exception e) {
457  log.error("Reconnected failed: " + e);
458  return false;
459  }
460  }
461 
468  private static NList list2Nlist(List<Object> list) throws UnsupportedOperationException {
469  NList nlist = new NList(new ArrayList<Value>());
470  for (Object item : list) {
471  nlist.values.add(value2Nvalue(item));
472  }
473  return nlist;
474  }
475 
482  private static NMap map2Nmap(Map<String, Object> map) throws UnsupportedOperationException {
483  NMap nmap = new NMap(new HashMap<byte[], Value>());
484  for (Map.Entry<String, Object> entry : map.entrySet()) {
485  nmap.kvs.put(entry.getKey().getBytes(), value2Nvalue(entry.getValue()));
486  }
487  return nmap;
488  }
489 
496  public static Value value2Nvalue(Object value) throws UnsupportedOperationException {
497  try {
498  if (value == null) {
499  Value nullValue = new Value();
500  nullValue.setNVal(NullType.__NULL__);
501  return nullValue;
502  }
503  Class<?> type = value.getClass();
504  Setter<Object> setter = LEAF_TYPE_AND_SETTER.get(type);
505  if (setter != null) {
506  return setter.set(value);
507  }
508  for (Class<?> parentType : COMPLEX_TYPE_AND_SETTER.keySet()) {
509  if (isCurrentTypeOrParentType(type, parentType)) {
510  return COMPLEX_TYPE_AND_SETTER.get(parentType).set(value);
511  }
512  }
513  } catch (Exception e) {
514  throw new UnsupportedOperationException(e);
515  }
516  throw new UnsupportedOperationException(
517  "Only support convert boolean/float/int/string/map/list/date/pojo to nebula.Value but was"
518  + value.getClass().getTypeName());
519  }
520 
521  @Override
522  public synchronized void close() {
523  release();
524  }
525 
529  public static Map<Class<?>, Setter> LEAF_TYPE_AND_SETTER =
530  new HashMap<Class<?>, Setter>() {{
531  put(Value.class, (Setter<Value>) (param) -> param);
532  put(Boolean.class, (Setter<Boolean>) Value::bVal);
533  put(Integer.class, (Setter<Integer>) Value::iVal);
534  put(Short.class, (Setter<Short>) Value::iVal);
535  put(Byte.class, (Setter<Short>) Value::iVal);
536  put(Long.class, (Setter<Long>) Value::iVal);
537  put(Float.class, (Setter<Float>) Value::fVal);
538  put(Double.class, (Setter<Double>) Value::fVal);
539  put(byte[].class, (Setter<byte[]>) Value::sVal);
540  put(Byte[].class, (Setter<byte[]>) Value::sVal);
541  put(String.class, (Setter<String>) (param) -> Value.sVal(param.getBytes()));
542  put(com.vesoft.nebula.Date.class, (Setter<com.vesoft.nebula.Date>) Value::dVal);
543  put(Time.class, (Setter<Time>) Value::tVal);
544  put(DateTime.class, (Setter<DateTime>) Value::dtVal);
545  put(Vertex.class, (Setter<Vertex>) Value::vVal);
546  put(Edge.class, (Setter<Edge>) Value::eVal);
547  put(Path.class, (Setter<Path>) Value::pVal);
548  put(NList.class, (Setter<NList>) Value::lVal);
549  put(NMap.class, (Setter<NMap>) Value::mVal);
550  put(NSet.class, (Setter<NSet>) Value::uVal);
551  put(DataSet.class, (Setter<DataSet>) Value::gVal);
552  put(Geography.class, (Setter<Geography>) Value::ggVal);
553  put(Duration.class, (Setter<Duration>) Value::duVal);
554  }};
555 
560  public static Map<Class<?>, Setter> COMPLEX_TYPE_AND_SETTER =
561  new LinkedHashMap<Class<?>, Setter>() {{
562  put(Collection.class, (Setter<Collection>) (collection) -> {
563  Value value = new Value();
564  List<Object> list = new ArrayList<>();
565  collection.forEach(el -> list.add(value2Nvalue(el)));
566  value.setLVal(list2Nlist(list));
567  return value;
568  });
569 
570  put(Map.class, (Setter<Map<String, Object>>) (map) -> {
571  Value value = new Value();
572  Map<String, Object> valueMap = new HashMap<>();
573  map.forEach((k, v) -> {
574  valueMap.put(k, value2Nvalue(v));
575  });
576  value.setMVal(map2Nmap(valueMap));
577  return value;
578  });
579 
580  put(java.util.Date.class, (Setter<java.util.Date>) (date) -> {
581  Calendar calendar = new Calendar.Builder().setInstant(date).build();
582  return Value.dtVal(new DateTime(
583  new Short(String.valueOf(calendar.get(Calendar.YEAR))),
584  new Byte(String.valueOf(calendar.get(Calendar.MONTH))),
585  new Byte(String.valueOf(calendar.get(Calendar.DATE))),
586  new Byte(String.valueOf(calendar.get(Calendar.HOUR))),
587  new Byte(String.valueOf(calendar.get(Calendar.MINUTE))),
588  new Byte(String.valueOf(calendar.get(Calendar.SECOND))),
589  new Short(String.valueOf(calendar.get(Calendar.MILLISECOND)))
590  ));
591  });
592 
593  put(Object.class, (Setter<Object>) (obj) -> {
594  Value value = new Value();
595  Map<String, Object> pojoFields = new HashMap<>();
596  Class<?> paramType = obj.getClass();
597  Field[] declaredFields = paramType.getDeclaredFields();
598  for (Field declaredField : declaredFields) {
599  pojoFields.put(declaredField.getName(),
600  value2Nvalue(ReflectUtil.getValue(obj, declaredField)));
601  }
602  value.setMVal(map2Nmap(pojoFields));
603  return value;
604  });
605  }};
606 
607  interface Setter<T> {
608 
609  Value set(T param);
610  }
611 }
void updateServerStatus()
Update the services' status when the connection is broken, it is called by Session and NebulaPool.
void setInvalidateConnection(SyncConnection connection)
Set the connection is invalidate, and the object pool will destroy it.
void returnConnection(SyncConnection connection)
Return the connection to object pool.
The Session is an object that operates with nebula-graph.
Definition: Session.java:53
static Map< Class<?>, Setter > COMPLEX_TYPE_AND_SETTER
some value setter for java type (complex java type include collections or date) that need convert to ...
Definition: Session.java:560
synchronized HostAddress getGraphHost()
Gets the service address of the current connection.
Definition: Session.java:426
synchronized String executeJsonWithParameter(String stmt, Map< String, Object > parameterMap)
Execute the nGql sentence.
Definition: Session.java:338
synchronized boolean pingSession()
check current session is ok
Definition: Session.java:395
synchronized String executeJson(String stmt)
Execute the nGql sentence.
Definition: Session.java:273
static Map< Class<?>, Setter > LEAF_TYPE_AND_SETTER
some value setter for java type (basic or nebula special type) that need convert to NValue
Definition: Session.java:529
synchronized ResultSet execute(String stmt)
Execute the nGql sentence.
Definition: Session.java:91
synchronized void release()
Notifies the server that the session is no longer needed and returns the connection to the pool,...
Definition: Session.java:408
synchronized boolean ping()
Check current connection is ok.
Definition: Session.java:385
Session(SyncConnection connection, AuthResult authResult, NebulaPool connPool, Boolean retryConnect)
Constructor.
Definition: Session.java:73
static Value value2Nvalue(Object value)
convert java value type to nebula thrift value type
Definition: Session.java:496
synchronized ResultSet executeWithParameter(String stmt, Map< String, Object > parameterMap)
Execute the nGql sentence.
Definition: Session.java:105