NebulaGraph Java Client  release-3.8
All Classes Functions Variables
SessionPool.java
1 /* Copyright (c) 2022 vesoft inc. All rights reserved.
2  *
3  * This source code is licensed under Apache 2.0 License.
4  */
5 
6 package com.vesoft.nebula.client.graph;
7 
8 import com.alibaba.fastjson.JSON;
9 import com.vesoft.nebula.ErrorCode;
10 import com.vesoft.nebula.Value;
11 import com.vesoft.nebula.client.graph.data.HostAddress;
12 import com.vesoft.nebula.client.graph.data.ResultSet;
13 import com.vesoft.nebula.client.graph.exception.AuthFailedException;
14 import com.vesoft.nebula.client.graph.exception.BindSpaceFailedException;
15 import com.vesoft.nebula.client.graph.exception.ClientServerIncompatibleException;
16 import com.vesoft.nebula.client.graph.exception.IOErrorException;
17 import com.vesoft.nebula.client.graph.net.AuthResult;
18 import com.vesoft.nebula.client.graph.net.Session;
19 import com.vesoft.nebula.client.graph.net.SessionState;
20 import com.vesoft.nebula.client.graph.net.SyncConnection;
21 import java.io.Serializable;
22 import java.util.Collections;
23 import java.util.HashMap;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.concurrent.CopyOnWriteArrayList;
27 import java.util.concurrent.Executors;
28 import java.util.concurrent.ScheduledExecutorService;
29 import java.util.concurrent.TimeUnit;
30 import java.util.concurrent.atomic.AtomicBoolean;
31 import java.util.concurrent.atomic.AtomicInteger;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
34 
35 public class SessionPool implements Serializable {
36 
37  private static final long serialVersionUID = 6051248334277617891L;
38 
39  private final Logger log = LoggerFactory.getLogger(this.getClass());
40 
41  private final ScheduledExecutorService healthCheckSchedule =
42  Executors.newScheduledThreadPool(1);
43  private final ScheduledExecutorService sessionQueueMaintainSchedule =
44  Executors.newScheduledThreadPool(1);
45 
46  public CopyOnWriteArrayList<NebulaSession> sessionList = new CopyOnWriteArrayList<>();
47  public AtomicInteger idleSessionSize = new AtomicInteger(0);
48  public AtomicBoolean hasInit = new AtomicBoolean(false);
49  public AtomicBoolean isClosed = new AtomicBoolean(false);
50 
51  private final AtomicInteger pos = new AtomicInteger(0);
52 
53  private final SessionPoolConfig sessionPoolConfig;
54  private final int minSessionSize;
55  private final int maxSessionSize;
56  private final int cleanTime;
57  private final int healthCheckTime;
58  private final int retryTimes;
59  private final int intervalTime;
60  private final boolean reconnect;
61  private final String spaceName;
62  private final String useSpace;
63 
64 
65  public SessionPool(SessionPoolConfig poolConfig) {
66  this.sessionPoolConfig = poolConfig;
67  this.minSessionSize = poolConfig.getMinSessionSize();
68  this.maxSessionSize = poolConfig.getMaxSessionSize();
69  this.cleanTime = poolConfig.getCleanTime();
70  this.retryTimes = poolConfig.getRetryTimes();
71  this.intervalTime = poolConfig.getIntervalTime();
72  this.reconnect = poolConfig.isReconnect();
73  this.healthCheckTime = poolConfig.getHealthCheckTime();
74  this.spaceName = poolConfig.getSpaceName();
75  useSpace = "USE `" + spaceName + "`;";
76  init();
77  }
78 
79 
83  private synchronized NebulaSession getSession() throws ClientServerIncompatibleException,
85  int retry = sessionPoolConfig.getRetryConnectTimes();
86  while (retry-- >= 0) {
87  // if there are idle sessions, get session from queue
88  if (idleSessionSize.get() > 0) {
89  for (NebulaSession nebulaSession : sessionList) {
90  if (nebulaSession.isIdleAndSetUsed()) {
91  idleSessionSize.decrementAndGet();
92  return nebulaSession;
93  }
94  }
95  }
96  // if session size is less than max size, get session from pool
97  if (sessionList.size() < maxSessionSize) {
98  return createSessionObject(SessionState.USED);
99  }
100  // there's no available session, wait for SessionPoolConfig.getWaitTime and re-get
101  try {
102  Thread.sleep(sessionPoolConfig.getWaitTime());
103  } catch (InterruptedException e) {
104  log.error("getSession error when wait for idle sessions, ", e);
105  throw new RuntimeException(e);
106  }
107  }
108 
109  // if session size is equal to max size and no idle session here, throw exception
110  throw new RuntimeException("no extra session available");
111  }
112 
113 
118  @Deprecated
119  public boolean init() {
120  if (hasInit.get()) {
121  return true;
122  }
123 
124  while (sessionList.size() < minSessionSize) {
125  try {
126  createSessionObject(SessionState.IDLE);
127  idleSessionSize.incrementAndGet();
128  } catch (Exception e) {
129  log.error("SessionPool init failed. ");
130  throw new RuntimeException("create session failed.", e);
131  }
132  }
133  healthCheckSchedule.scheduleAtFixedRate(this::checkSession, 0, healthCheckTime,
134  TimeUnit.SECONDS);
135  sessionQueueMaintainSchedule.scheduleAtFixedRate(this::updateSessionQueue, 0, cleanTime,
136  TimeUnit.SECONDS);
137  hasInit.compareAndSet(false, true);
138  return true;
139  }
140 
141 
149  public ResultSet execute(String stmt) throws IOErrorException,
151  stmtCheck(stmt);
152  checkSessionPool();
153  NebulaSession nebulaSession = null;
154  ResultSet resultSet = null;
155  int tryTimes = 0;
156  while (tryTimes++ <= retryTimes) {
157  try {
158  nebulaSession = getSession();
159  resultSet = nebulaSession.execute(stmt);
160  if (resultSet.isSucceeded()
161  || resultSet.getErrorCode() == ErrorCode.E_SEMANTIC_ERROR.getValue()
162  || resultSet.getErrorCode() == ErrorCode.E_SYNTAX_ERROR.getValue()) {
163  releaseSession(nebulaSession);
164  return resultSet;
165  }
166  log.warn(String.format("execute error, code: %d, message: %s, retry: %d",
167  resultSet.getErrorCode(), resultSet.getErrorMessage(), tryTimes));
168  nebulaSession.release();
169  sessionList.remove(nebulaSession);
170  try {
171  Thread.sleep(intervalTime);
172  } catch (InterruptedException interruptedException) {
173  // ignore
174  }
176  // will never get here.
178  throw e;
179  } catch (IOErrorException e) {
180  if (nebulaSession != null) {
181  nebulaSession.release();
182  sessionList.remove(nebulaSession);
183  }
184  if (tryTimes < retryTimes) {
185  log.warn(String.format("execute failed for IOErrorException, message: %s, "
186  + "retry: %d", e.getMessage(), tryTimes));
187  try {
188  Thread.sleep(intervalTime);
189  } catch (InterruptedException interruptedException) {
190  // ignore
191  }
192  } else {
193  throw e;
194  }
195  }
196  }
197  if (nebulaSession != null) {
198  nebulaSession.release();
199  sessionList.remove(nebulaSession);
200  }
201  return resultSet;
202  }
203 
204 
212  public ResultSet execute(String stmt, Map<String, Object> parameterMap)
215  stmtCheck(stmt);
216  checkSessionPool();
217  NebulaSession nebulaSession = getSession();
218  ResultSet resultSet;
219  try {
220  resultSet = nebulaSession.executeWithParameter(stmt, parameterMap);
221 
222  // re-execute for session error
223  if (isSessionError(resultSet)) {
224  sessionList.remove(nebulaSession);
225  nebulaSession = getSession();
226  resultSet = nebulaSession.executeWithParameter(stmt, parameterMap);
227  }
228  } catch (IOErrorException e) {
229  useSpace(nebulaSession, null);
230  throw e;
231  }
232 
233  useSpace(nebulaSession, resultSet);
234  return resultSet;
235  }
236 
237  public ResultSet executeWithTimeout(String stmt,
238  long timeoutMs)
240  return executeWithParameterTimeout(stmt,
241  (Map<String, Object>) Collections.EMPTY_MAP,
242  timeoutMs);
243  }
244 
245  public ResultSet executeWithParameterTimeout(String stmt,
246  Map<String, Object> parameterMap,
247  long timeoutMs)
248  throws IOErrorException, AuthFailedException, BindSpaceFailedException {
249  if (timeoutMs <= 0) {
250  throw new IllegalArgumentException("timeout should be a positive number");
251  }
252  stmtCheck(stmt);
253  checkSessionPool();
254  NebulaSession nebulaSession = null;
255  ResultSet resultSet = null;
256  int tryTimes = 0;
257  while (tryTimes++ <= retryTimes) {
258  try {
259  nebulaSession = getSession();
260  resultSet = nebulaSession.executeWithParameterTimeout(stmt,
261  parameterMap,
262  timeoutMs);
263  if (resultSet.isSucceeded()
264  || resultSet.getErrorCode() == ErrorCode.E_SEMANTIC_ERROR.getValue()
265  || resultSet.getErrorCode() == ErrorCode.E_SYNTAX_ERROR.getValue()) {
266  releaseSession(nebulaSession);
267  return resultSet;
268  }
269  log.warn(String.format("execute error, code: %d, message: %s, retry: %d",
270  resultSet.getErrorCode(),
271  resultSet.getErrorMessage(),
272  tryTimes));
273  nebulaSession.release();
274  sessionList.remove(nebulaSession);
275  try {
276  Thread.sleep(intervalTime);
277  } catch (InterruptedException interruptedException) {
278  // ignore
279  }
280  } catch (ClientServerIncompatibleException e) {
281  // will never get here.
282  } catch (AuthFailedException | BindSpaceFailedException e) {
283  throw e;
284  } catch (IOErrorException e) {
285  if (nebulaSession != null) {
286  nebulaSession.release();
287  sessionList.remove(nebulaSession);
288  }
289  if (tryTimes < retryTimes) {
290  log.warn(String.format("execute failed for IOErrorException, message: %s, "
291  + "retry: %d", e.getMessage(), tryTimes));
292  try {
293  Thread.sleep(intervalTime);
294  } catch (InterruptedException interruptedException) {
295  // ignore
296  }
297  } else {
298  throw e;
299  }
300  }
301  }
302  if (nebulaSession != null) {
303  nebulaSession.release();
304  sessionList.remove(nebulaSession);
305  }
306  return resultSet;
307  }
308 
309  public String executeJson(String stmt)
310  throws ClientServerIncompatibleException, AuthFailedException,
311  IOErrorException, BindSpaceFailedException {
312  return executeJsonWithParameter(stmt, (Map<String, Object>) Collections.EMPTY_MAP);
313  }
314 
315  public String executeJsonWithParameter(String stmt,
316  Map<String, Object> parameterMap)
317  throws ClientServerIncompatibleException, AuthFailedException,
318  IOErrorException, BindSpaceFailedException {
319  stmtCheck(stmt);
320  checkSessionPool();
321  NebulaSession nebulaSession = getSession();
322  String result;
323  try {
324  result = nebulaSession.executeJsonWithParameter(stmt, parameterMap);
325 
326  // re-execute for session error
327  if (isSessionErrorForJson(result)) {
328  sessionList.remove(nebulaSession);
329  nebulaSession = getSession();
330  result = nebulaSession.executeJsonWithParameter(stmt, parameterMap);
331  }
332  } catch (IOErrorException e) {
333  if (e.getType() == IOErrorException.E_CONNECT_BROKEN) {
334  sessionList.remove(nebulaSession);
335  nebulaSession = getSession();
336  result = nebulaSession.executeJsonWithParameter(stmt, parameterMap);
337  return result;
338  }
339  useSpace(nebulaSession, null);
340  throw e;
341  }
342 
343  useSpaceForJson(nebulaSession, result);
344  return result;
345  }
346 
347 
351  public void close() {
352  if (isClosed.get()) {
353  return;
354  }
355 
356  if (isClosed.compareAndSet(false, true)) {
357  for (NebulaSession nebulaSession : sessionList) {
358  nebulaSession.release();
359  }
360  sessionList.clear();
361  if (!healthCheckSchedule.isShutdown()) {
362  healthCheckSchedule.shutdown();
363  }
364  if (!sessionQueueMaintainSchedule.isShutdown()) {
365  sessionQueueMaintainSchedule.shutdown();
366  }
367  }
368  }
369 
370 
374  public boolean isActive() {
375  return hasInit.get();
376  }
377 
381  public boolean isClosed() {
382  return isClosed.get();
383  }
384 
388  public int getSessionNums() {
389  return sessionList.size();
390  }
391 
395  public int getIdleSessionNums() {
396  return idleSessionSize.get();
397  }
398 
399 
403  private void releaseSession(NebulaSession nebulaSession) {
404  nebulaSession.isUsedAndSetIdle();
405  idleSessionSize.incrementAndGet();
406  }
407 
408 
412  private void checkSession() {
413  for (NebulaSession nebulaSession : sessionList) {
414  if (nebulaSession.isIdleAndSetUsed()) {
415  try {
416  idleSessionSize.decrementAndGet();
417  nebulaSession.execute("YIELD 1");
418  nebulaSession.isUsedAndSetIdle();
419  idleSessionSize.incrementAndGet();
420  } catch (IOErrorException e) {
421  log.error("session ping error, {}, remove current session.", e.getMessage());
422  nebulaSession.release();
423  sessionList.remove(nebulaSession);
424  }
425  }
426  }
427  }
428 
432  private void updateSessionQueue() {
433  // remove the idle sessions
434  if (idleSessionSize.get() > minSessionSize) {
435  synchronized (this) {
436  for (NebulaSession nebulaSession : sessionList) {
437  if (nebulaSession.isIdle()) {
438  nebulaSession.release();
439  sessionList.remove(nebulaSession);
440  if (idleSessionSize.decrementAndGet() <= minSessionSize) {
441  break;
442  }
443  }
444  }
445  }
446  }
447  }
448 
455  private NebulaSession createSessionObject(SessionState state)
456  throws ClientServerIncompatibleException, AuthFailedException,
457  IOErrorException, BindSpaceFailedException {
458  SyncConnection connection = new SyncConnection();
459  int tryConnect = sessionPoolConfig.getGraphAddressList().size();
460  // reconnect with all available address
461  while (tryConnect-- > 0) {
462  try {
463  if (sessionPoolConfig.isEnableSsl()) {
464  connection.open(getAddress(), sessionPoolConfig.getTimeout(),
465  sessionPoolConfig.getSslParam(),
466  sessionPoolConfig.isUseHttp2(),
467  sessionPoolConfig.getCustomHeaders());
468  } else {
469  connection.open(getAddress(), sessionPoolConfig.getTimeout(),
470  sessionPoolConfig.isUseHttp2(),
471  sessionPoolConfig.getCustomHeaders());
472  }
473  break;
474  } catch (Exception e) {
475  if (tryConnect == 0 || !reconnect) {
476  throw e;
477  } else {
478  log.warn("connect failed, " + e.getMessage());
479  }
480  }
481  }
482 
483  AuthResult authResult;
484  try {
485  authResult = connection.authenticate(sessionPoolConfig.getUsername(),
486  sessionPoolConfig.getPassword());
487  } catch (AuthFailedException e) {
488  log.error(e.getMessage());
489  if (e.getMessage().toLowerCase().contains("user not exist")
490  || e.getMessage().toLowerCase().contains("invalid password")) {
491  // close the session pool
492  close();
493  } else {
494  // just close the connection
495  connection.close();
496  }
497  throw e;
498  }
499 
500  NebulaSession nebulaSession = new NebulaSession(connection, authResult.getSessionId(),
501  authResult.getTimezoneOffset(), state);
502  ResultSet result = null;
503  try {
504  result = nebulaSession.execute(useSpace);
505  } catch (IOErrorException e) {
506  log.error("binding space failed,", e);
507  nebulaSession.release();
508  throw new BindSpaceFailedException("binding space failed:" + e.getMessage());
509  }
510  if (!result.isSucceeded()) {
511  nebulaSession.release();
512  throw new BindSpaceFailedException(result.getErrorMessage());
513  }
514  sessionList.add(nebulaSession);
515  return nebulaSession;
516  }
517 
518 
519  public HostAddress getAddress() {
520  List<HostAddress> addresses = sessionPoolConfig.getGraphAddressList();
521  int newPos = (pos.getAndIncrement()) % addresses.size();
522  return addresses.get(newPos);
523  }
524 
531  private void useSpace(NebulaSession nebulaSession, ResultSet resultSet)
532  throws IOErrorException {
533  if (resultSet == null) {
534  nebulaSession.release();
535  sessionList.remove(nebulaSession);
536  return;
537  }
538  // space has been drop, close the SessionPool
539  if (resultSet.getSpaceName().trim().isEmpty()) {
540  log.warn("space {} has been drop, close the SessionPool.", spaceName);
541  close();
542  return;
543  }
544  // re-bind the configured spaceName, if bind failed, then remove this session.
545  if (!spaceName.equals(resultSet.getSpaceName())) {
546  ResultSet switchSpaceResult = nebulaSession.execute(useSpace);
547  if (!switchSpaceResult.isSucceeded()) {
548  log.warn("Bind Space failed, {}", switchSpaceResult.getErrorMessage());
549  nebulaSession.release();
550  sessionList.remove(nebulaSession);
551  return;
552  }
553  }
554  releaseSession(nebulaSession);
555  }
556 
563  private void useSpaceForJson(NebulaSession nebulaSession, String result)
564  throws IOErrorException {
565  String responseSpaceName =
566  (String) JSON.parseObject(result).getJSONArray("results")
567  .getJSONObject(0).get("spaceName");
568  if (!spaceName.equals(responseSpaceName)) {
569  nebulaSession.execute(useSpace);
570  }
571  releaseSession(nebulaSession);
572  }
573 
574 
575  private boolean isSessionError(ResultSet resultSet) {
576  return resultSet != null
577  && (resultSet.getErrorCode() == ErrorCode.E_SESSION_INVALID.getValue()
578  || resultSet.getErrorCode() == ErrorCode.E_SESSION_NOT_FOUND.getValue()
579  || resultSet.getErrorCode() == ErrorCode.E_SESSION_TIMEOUT.getValue());
580  }
581 
582  private boolean isSessionErrorForJson(String result) {
583  if (result == null) {
584  return true;
585  }
586  int code = JSON.parseObject(result).getJSONArray("errors")
587  .getJSONObject(0).getIntValue("code");
588  return code == ErrorCode.E_SESSION_INVALID.getValue()
589  || code == ErrorCode.E_SESSION_NOT_FOUND.getValue()
590  || code == ErrorCode.E_SESSION_TIMEOUT.getValue();
591  }
592 
593 
594  private void checkSessionPool() {
595  if (!hasInit.get()) {
596  throw new RuntimeException("The SessionPool has not been initialized, "
597  + "please call init() first.");
598  }
599  if (isClosed.get()) {
600  throw new RuntimeException("The SessionPool has been closed.");
601  }
602  }
603 
604 
605  private void stmtCheck(String stmt) {
606  if (stmt == null || stmt.trim().isEmpty()) {
607  throw new IllegalArgumentException("statement is null.");
608  }
609 
610  if (stmt.trim().toLowerCase().startsWith("use") && stmt.trim().split(" ").length == 2) {
611  throw new IllegalArgumentException("`USE SPACE` alone is forbidden.");
612  }
613  }
614 }
boolean isClosed()
if the SessionPool is closed
void close()
close the session pool
ResultSet execute(String stmt, Map< String, Object > parameterMap)
Execute the nGql sentence with parameter.
boolean isActive()
if the SessionPool has been initialized
ResultSet execute(String stmt)
Execute the nGql sentence.
int getIdleSessionNums()
get the number of idle Session
boolean init()
init the SessionPool this function is moved into SessionPool's constructor, no need to call it manual...
int getSessionNums()
get the number of all Session
int getErrorCode()
get errorCode of execute result
Definition: ResultSet.java:160
String getErrorMessage()
get the error message of the execute result
Definition: ResultSet.java:180
boolean isSucceeded()
the execute result is succeeded
Definition: ResultSet.java:144