NebulaGraph Java Client  release-3.8
Session.java
1 /* Copyright (c) 2020 vesoft inc. All rights reserved.
2  *
3  * This source code is licensed under Apache 2.0 License.
4  */
5 
6 package com.vesoft.nebula.client.graph.net;
7 
8 import static com.vesoft.nebula.util.ReflectUtil.isCurrentTypeOrParentType;
9 
10 import com.vesoft.nebula.DataSet;
11 import com.vesoft.nebula.DateTime;
12 import com.vesoft.nebula.Duration;
13 import com.vesoft.nebula.Edge;
14 import com.vesoft.nebula.Geography;
15 import com.vesoft.nebula.NList;
16 import com.vesoft.nebula.NMap;
17 import com.vesoft.nebula.NSet;
18 import com.vesoft.nebula.NullType;
19 import com.vesoft.nebula.Path;
20 import com.vesoft.nebula.Time;
21 import com.vesoft.nebula.Value;
22 import com.vesoft.nebula.Vertex;
23 import com.vesoft.nebula.client.graph.data.HostAddress;
24 import com.vesoft.nebula.client.graph.data.ResultSet;
25 import com.vesoft.nebula.client.graph.exception.IOErrorException;
26 import com.vesoft.nebula.graph.ExecutionResponse;
27 import com.vesoft.nebula.util.ReflectUtil;
28 import java.io.Serializable;
29 import java.lang.reflect.Field;
30 import java.util.ArrayList;
31 import java.util.Calendar;
32 import java.util.Collection;
33 import java.util.Collections;
34 import java.util.HashMap;
35 import java.util.LinkedHashMap;
36 import java.util.List;
37 import java.util.Map;
38 import java.util.concurrent.atomic.AtomicBoolean;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
41 
53 public class Session implements Serializable, AutoCloseable {
54 
55  private static final long serialVersionUID = -8855886967097862376L;
56 
57  private final long sessionID;
58  private final int timezoneOffset;
59  private SyncConnection connection;
60  private final NebulaPool pool;
61  private final Boolean retryConnect;
62  private final AtomicBoolean connectionIsBroken = new AtomicBoolean(false);
63  private final Logger log = LoggerFactory.getLogger(getClass());
64 
73  public Session(SyncConnection connection,
74  AuthResult authResult,
75  NebulaPool connPool,
76  Boolean retryConnect) {
77  this.connection = connection;
78  this.sessionID = authResult.getSessionId();
79  this.timezoneOffset = authResult.getTimezoneOffset();
80  this.pool = connPool;
81  this.retryConnect = retryConnect;
82  }
83 
91  public synchronized ResultSet execute(String stmt) throws
93  return executeWithParameter(stmt,
94  (Map<String, Object>) Collections.EMPTY_MAP);
95  }
96 
105  public synchronized ResultSet executeWithParameter(
106  String stmt,
107  Map<String, Object> parameterMap)
108  throws IOErrorException {
109  if (connection == null) {
110  throw new IOErrorException(IOErrorException.E_CONNECT_BROKEN,
111  "The session was released, couldn't use again.");
112  }
113  Map<byte[], Value> map = new HashMap<>();
114  parameterMap.forEach((key, value) -> map.put(key.getBytes(), value2Nvalue(value)));
115 
116  if (connectionIsBroken.get() && retryConnect) {
117  if (retryConnect()) {
118  ExecutionResponse resp =
119  connection.executeWithParameter(sessionID, stmt, map);
120  return new ResultSet(resp, timezoneOffset);
121  } else {
122  throw new IOErrorException(IOErrorException.E_ALL_BROKEN,
123  "All servers are broken.");
124  }
125  }
126 
127  try {
128  ExecutionResponse resp = connection.executeWithParameter(sessionID, stmt, map);
129  return new ResultSet(resp, timezoneOffset);
130  } catch (IOErrorException ie) {
131  if (ie.getType() == IOErrorException.E_CONNECT_BROKEN) {
132  connectionIsBroken.set(true);
133  pool.updateServerStatus();
134 
135  if (retryConnect) {
136  if (retryConnect()) {
137  connectionIsBroken.set(false);
138  ExecutionResponse resp =
139  connection.executeWithParameter(sessionID, stmt, map);
140  return new ResultSet(resp, timezoneOffset);
141  } else {
142  connectionIsBroken.set(true);
143  throw new IOErrorException(IOErrorException.E_ALL_BROKEN,
144  "All servers are broken.");
145  }
146  }
147  }
148  throw ie;
149  }
150  }
151 
211  public synchronized String executeJson(String stmt) throws
213  return executeJsonWithParameter(stmt,
214  (Map<String, Object>) Collections.EMPTY_MAP);
215  }
216 
276  public synchronized String executeJsonWithParameter(String stmt,
277  Map<String, Object> parameterMap) throws
279  if (connection == null) {
280  throw new IOErrorException(IOErrorException.E_CONNECT_BROKEN,
281  "The session was released, couldn't use again.");
282  }
283 
284  Map<byte[], Value> map = new HashMap<>();
285  parameterMap.entrySet().stream()
286  .forEach(x -> map.put(x.getKey().getBytes(), value2Nvalue(x.getValue())));
287  if (connectionIsBroken.get() && retryConnect) {
288  if (retryConnect()) {
289  return connection.executeJsonWithParameter(sessionID, stmt, map);
290  } else {
291  throw new IOErrorException(IOErrorException.E_ALL_BROKEN,
292  "All servers are broken.");
293  }
294  }
295 
296  try {
297  return connection.executeJsonWithParameter(sessionID, stmt, map);
298  } catch (IOErrorException ie) {
299  if (ie.getType() == IOErrorException.E_CONNECT_BROKEN) {
300  connectionIsBroken.set(true);
301  pool.updateServerStatus();
302 
303  if (retryConnect) {
304  if (retryConnect()) {
305  connectionIsBroken.set(false);
306  return connection.executeJsonWithParameter(sessionID, stmt, map);
307  } else {
308  connectionIsBroken.set(true);
309  throw new IOErrorException(IOErrorException.E_ALL_BROKEN,
310  "All servers are broken.");
311  }
312  }
313  }
314  throw ie;
315  }
316  }
317 
323  public synchronized boolean ping() {
324  if (connection == null) {
325  return false;
326  }
327  return connection.ping();
328  }
329 
333  public synchronized boolean pingSession() {
334  if (connection == null) {
335  return false;
336  }
337  return connection.ping(sessionID);
338  }
339 
346  public synchronized void release() {
347  if (connection == null) {
348  return;
349  }
350  try {
351  connection.signout(sessionID);
352  pool.returnConnection(connection);
353  } catch (Exception e) {
354  log.warn("Release session or return object to pool failed:" + e.getMessage());
355  }
356  connection = null;
357  }
358 
364  public synchronized HostAddress getGraphHost() {
365  if (connection == null) {
366  return null;
367  }
368  return connection.getServerAddress();
369  }
370 
374  public long getSessionID() {
375  return sessionID;
376  }
377 
384  private boolean retryConnect() {
385  try {
386  pool.setInvalidateConnection(connection);
387  SyncConnection newConn = pool.getConnection();
388  if (newConn == null) {
389  log.error("Get connection object failed.");
390  return false;
391  }
392  connection = newConn;
393  return true;
394  } catch (Exception e) {
395  log.error("Reconnected failed: " + e);
396  return false;
397  }
398  }
399 
406  private static NList list2Nlist(List<Object> list) throws UnsupportedOperationException {
407  NList nlist = new NList(new ArrayList<Value>());
408  for (Object item : list) {
409  nlist.values.add(value2Nvalue(item));
410  }
411  return nlist;
412  }
413 
420  private static NMap map2Nmap(Map<String, Object> map) throws UnsupportedOperationException {
421  NMap nmap = new NMap(new HashMap<byte[], Value>());
422  for (Map.Entry<String, Object> entry : map.entrySet()) {
423  nmap.kvs.put(entry.getKey().getBytes(), value2Nvalue(entry.getValue()));
424  }
425  return nmap;
426  }
427 
434  public static Value value2Nvalue(Object value) throws UnsupportedOperationException {
435  try {
436  if (value == null) {
437  Value nullValue = new Value();
438  nullValue.setNVal(NullType.__NULL__);
439  return nullValue;
440  }
441  Class<?> type = value.getClass();
442  Setter<Object> setter = LEAF_TYPE_AND_SETTER.get(type);
443  if (setter != null) {
444  return setter.set(value);
445  }
446  for (Class<?> parentType : COMPLEX_TYPE_AND_SETTER.keySet()) {
447  if (isCurrentTypeOrParentType(type, parentType)) {
448  return COMPLEX_TYPE_AND_SETTER.get(parentType).set(value);
449  }
450  }
451  } catch (Exception e) {
452  throw new UnsupportedOperationException(e);
453  }
454  throw new UnsupportedOperationException(
455  "Only support convert boolean/float/int/string/map/list/date/pojo to nebula.Value but was"
456  + value.getClass().getTypeName());
457  }
458 
459  @Override
460  public synchronized void close() {
461  release();
462  }
463 
467  public static Map<Class<?>, Setter> LEAF_TYPE_AND_SETTER =
468  new HashMap<Class<?>, Setter>() {{
469  put(Value.class, (Setter<Value>) (param) -> param);
470  put(Boolean.class, (Setter<Boolean>) Value::bVal);
471  put(Integer.class, (Setter<Integer>) Value::iVal);
472  put(Short.class, (Setter<Short>) Value::iVal);
473  put(Byte.class, (Setter<Short>) Value::iVal);
474  put(Long.class, (Setter<Long>) Value::iVal);
475  put(Float.class, (Setter<Float>) Value::fVal);
476  put(Double.class, (Setter<Double>) Value::fVal);
477  put(byte[].class, (Setter<byte[]>) Value::sVal);
478  put(Byte[].class, (Setter<byte[]>) Value::sVal);
479  put(String.class, (Setter<String>) (param) -> Value.sVal(param.getBytes()));
480  put(com.vesoft.nebula.Date.class, (Setter<com.vesoft.nebula.Date>) Value::dVal);
481  put(Time.class, (Setter<Time>) Value::tVal);
482  put(DateTime.class, (Setter<DateTime>) Value::dtVal);
483  put(Vertex.class, (Setter<Vertex>) Value::vVal);
484  put(Edge.class, (Setter<Edge>) Value::eVal);
485  put(Path.class, (Setter<Path>) Value::pVal);
486  put(NList.class, (Setter<NList>) Value::lVal);
487  put(NMap.class, (Setter<NMap>) Value::mVal);
488  put(NSet.class, (Setter<NSet>) Value::uVal);
489  put(DataSet.class, (Setter<DataSet>) Value::gVal);
490  put(Geography.class, (Setter<Geography>) Value::ggVal);
491  put(Duration.class, (Setter<Duration>) Value::duVal);
492  }};
493 
498  public static Map<Class<?>, Setter> COMPLEX_TYPE_AND_SETTER =
499  new LinkedHashMap<Class<?>, Setter>() {{
500  put(Collection.class, (Setter<Collection>) (collection) -> {
501  Value value = new Value();
502  List<Object> list = new ArrayList<>();
503  collection.forEach(el -> list.add(value2Nvalue(el)));
504  value.setLVal(list2Nlist(list));
505  return value;
506  });
507 
508  put(Map.class, (Setter<Map<String, Object>>) (map) -> {
509  Value value = new Value();
510  Map<String, Object> valueMap = new HashMap<>();
511  map.forEach((k, v) -> {
512  valueMap.put(k, value2Nvalue(v));
513  });
514  value.setMVal(map2Nmap(valueMap));
515  return value;
516  });
517 
518  put(java.util.Date.class, (Setter<java.util.Date>) (date) -> {
519  Calendar calendar = new Calendar.Builder().setInstant(date).build();
520  return Value.dtVal(new DateTime(
521  new Short(String.valueOf(calendar.get(Calendar.YEAR))),
522  new Byte(String.valueOf(calendar.get(Calendar.MONTH))),
523  new Byte(String.valueOf(calendar.get(Calendar.DATE))),
524  new Byte(String.valueOf(calendar.get(Calendar.HOUR))),
525  new Byte(String.valueOf(calendar.get(Calendar.MINUTE))),
526  new Byte(String.valueOf(calendar.get(Calendar.SECOND))),
527  new Short(String.valueOf(calendar.get(Calendar.MILLISECOND)))
528  ));
529  });
530 
531  put(Object.class, (Setter<Object>) (obj) -> {
532  Value value = new Value();
533  Map<String, Object> pojoFields = new HashMap<>();
534  Class<?> paramType = obj.getClass();
535  Field[] declaredFields = paramType.getDeclaredFields();
536  for (Field declaredField : declaredFields) {
537  pojoFields.put(declaredField.getName(),
538  value2Nvalue(ReflectUtil.getValue(obj, declaredField)));
539  }
540  value.setMVal(map2Nmap(pojoFields));
541  return value;
542  });
543  }};
544 
545  interface Setter<T> {
546 
547  Value set(T param);
548  }
549 }
void updateServerStatus()
Update the services' status when the connection is broken, it is called by Session and NebulaPool.
void setInvalidateConnection(SyncConnection connection)
Set the connection is invalidate, and the object pool will destroy it.
void returnConnection(SyncConnection connection)
Return the connection to object pool.
The Session is an object that operates with nebula-graph.
Definition: Session.java:53
static Map< Class<?>, Setter > COMPLEX_TYPE_AND_SETTER
some value setter for java type (complex java type include collections or date) that need convert to ...
Definition: Session.java:498
synchronized HostAddress getGraphHost()
Gets the service address of the current connection.
Definition: Session.java:364
synchronized String executeJsonWithParameter(String stmt, Map< String, Object > parameterMap)
Execute the nGql sentence.
Definition: Session.java:276
synchronized boolean pingSession()
check current session is ok
Definition: Session.java:333
synchronized String executeJson(String stmt)
Execute the nGql sentence.
Definition: Session.java:211
static Map< Class<?>, Setter > LEAF_TYPE_AND_SETTER
some value setter for java type (basic or nebula special type) that need convert to NValue
Definition: Session.java:467
synchronized ResultSet execute(String stmt)
Execute the nGql sentence.
Definition: Session.java:91
synchronized void release()
Notifies the server that the session is no longer needed and returns the connection to the pool,...
Definition: Session.java:346
synchronized boolean ping()
Check current connection is ok.
Definition: Session.java:323
Session(SyncConnection connection, AuthResult authResult, NebulaPool connPool, Boolean retryConnect)
Constructor.
Definition: Session.java:73
static Value value2Nvalue(Object value)
convert java value type to nebula thrift value type
Definition: Session.java:434
synchronized ResultSet executeWithParameter(String stmt, Map< String, Object > parameterMap)
Execute the nGql sentence.
Definition: Session.java:105