NebulaGraph Java Client  release-3.8
SessionPool.java
1 /* Copyright (c) 2022 vesoft inc. All rights reserved.
2  *
3  * This source code is licensed under Apache 2.0 License.
4  */
5 
6 package com.vesoft.nebula.client.graph;
7 
8 import com.alibaba.fastjson.JSON;
9 import com.vesoft.nebula.ErrorCode;
10 import com.vesoft.nebula.client.graph.data.HostAddress;
11 import com.vesoft.nebula.client.graph.data.ResultSet;
12 import com.vesoft.nebula.client.graph.exception.AuthFailedException;
13 import com.vesoft.nebula.client.graph.exception.BindSpaceFailedException;
14 import com.vesoft.nebula.client.graph.exception.ClientServerIncompatibleException;
15 import com.vesoft.nebula.client.graph.exception.IOErrorException;
16 import com.vesoft.nebula.client.graph.net.AuthResult;
17 import com.vesoft.nebula.client.graph.net.SessionState;
18 import com.vesoft.nebula.client.graph.net.SyncConnection;
19 import java.io.Serializable;
20 import java.util.Collections;
21 import java.util.List;
22 import java.util.Map;
23 import java.util.concurrent.CopyOnWriteArrayList;
24 import java.util.concurrent.Executors;
25 import java.util.concurrent.ScheduledExecutorService;
26 import java.util.concurrent.TimeUnit;
27 import java.util.concurrent.atomic.AtomicBoolean;
28 import java.util.concurrent.atomic.AtomicInteger;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31 
32 public class SessionPool implements Serializable {
33 
34  private static final long serialVersionUID = 6051248334277617891L;
35 
36  private final Logger log = LoggerFactory.getLogger(this.getClass());
37 
38  private final ScheduledExecutorService healthCheckSchedule =
39  Executors.newScheduledThreadPool(1);
40  private final ScheduledExecutorService sessionQueueMaintainSchedule =
41  Executors.newScheduledThreadPool(1);
42 
43  public CopyOnWriteArrayList<NebulaSession> sessionList = new CopyOnWriteArrayList<>();
44  public AtomicInteger idleSessionSize = new AtomicInteger(0);
45  public AtomicBoolean hasInit = new AtomicBoolean(false);
46  public AtomicBoolean isClosed = new AtomicBoolean(false);
47 
48  private final AtomicInteger pos = new AtomicInteger(0);
49 
50  private final SessionPoolConfig sessionPoolConfig;
51  private final int minSessionSize;
52  private final int maxSessionSize;
53  private final int cleanTime;
54  private final int healthCheckTime;
55  private final int retryTimes;
56  private final int intervalTime;
57  private final boolean reconnect;
58  private final String spaceName;
59  private final String useSpace;
60 
61 
62  public SessionPool(SessionPoolConfig poolConfig) {
63  this.sessionPoolConfig = poolConfig;
64  this.minSessionSize = poolConfig.getMinSessionSize();
65  this.maxSessionSize = poolConfig.getMaxSessionSize();
66  this.cleanTime = poolConfig.getCleanTime();
67  this.retryTimes = poolConfig.getRetryTimes();
68  this.intervalTime = poolConfig.getIntervalTime();
69  this.reconnect = poolConfig.isReconnect();
70  this.healthCheckTime = poolConfig.getHealthCheckTime();
71  this.spaceName = poolConfig.getSpaceName();
72  useSpace = "USE `" + spaceName + "`;";
73  init();
74  }
75 
76 
80  private synchronized NebulaSession getSession() throws ClientServerIncompatibleException,
82  int retry = sessionPoolConfig.getRetryConnectTimes();
83  while (retry-- >= 0) {
84  // if there are idle sessions, get session from queue
85  if (idleSessionSize.get() > 0) {
86  for (NebulaSession nebulaSession : sessionList) {
87  if (nebulaSession.isIdleAndSetUsed()) {
88  idleSessionSize.decrementAndGet();
89  return nebulaSession;
90  }
91  }
92  }
93  // if session size is less than max size, get session from pool
94  if (sessionList.size() < maxSessionSize) {
95  return createSessionObject(SessionState.USED);
96  }
97  // there's no available session, wait for SessionPoolConfig.getWaitTime and re-get
98  try {
99  Thread.sleep(sessionPoolConfig.getWaitTime());
100  } catch (InterruptedException e) {
101  log.error("getSession error when wait for idle sessions, ", e);
102  throw new RuntimeException(e);
103  }
104  }
105 
106  // if session size is equal to max size and no idle session here, throw exception
107  throw new RuntimeException("no extra session available");
108  }
109 
110 
115  @Deprecated
116  public boolean init() {
117  if (hasInit.get()) {
118  return true;
119  }
120 
121  while (sessionList.size() < minSessionSize) {
122  try {
123  createSessionObject(SessionState.IDLE);
124  idleSessionSize.incrementAndGet();
125  } catch (Exception e) {
126  log.error("SessionPool init failed. ");
127  throw new RuntimeException("create session failed.", e);
128  }
129  }
130  healthCheckSchedule.scheduleAtFixedRate(this::checkSession, 0, healthCheckTime,
131  TimeUnit.SECONDS);
132  sessionQueueMaintainSchedule.scheduleAtFixedRate(this::updateSessionQueue, 0, cleanTime,
133  TimeUnit.SECONDS);
134  hasInit.compareAndSet(false, true);
135  return true;
136  }
137 
138 
146  public ResultSet execute(String stmt) throws IOErrorException,
148  stmtCheck(stmt);
149  checkSessionPool();
150  NebulaSession nebulaSession = null;
151  ResultSet resultSet = null;
152  int tryTimes = 0;
153  while (tryTimes++ <= retryTimes) {
154  try {
155  nebulaSession = getSession();
156  resultSet = nebulaSession.execute(stmt);
157  if (resultSet.isSucceeded()
158  || resultSet.getErrorCode() == ErrorCode.E_SEMANTIC_ERROR.getValue()
159  || resultSet.getErrorCode() == ErrorCode.E_SYNTAX_ERROR.getValue()) {
160  releaseSession(nebulaSession);
161  return resultSet;
162  }
163  log.warn(String.format("execute error, code: %d, message: %s, retry: %d",
164  resultSet.getErrorCode(), resultSet.getErrorMessage(), tryTimes));
165  nebulaSession.release();
166  sessionList.remove(nebulaSession);
167  try {
168  Thread.sleep(intervalTime);
169  } catch (InterruptedException interruptedException) {
170  // ignore
171  }
173  // will never get here.
175  throw e;
176  } catch (IOErrorException e) {
177  if (nebulaSession != null) {
178  nebulaSession.release();
179  sessionList.remove(nebulaSession);
180  }
181  if (tryTimes < retryTimes) {
182  log.warn(String.format("execute failed for IOErrorException, message: %s, "
183  + "retry: %d", e.getMessage(), tryTimes));
184  try {
185  Thread.sleep(intervalTime);
186  } catch (InterruptedException interruptedException) {
187  // ignore
188  }
189  } else {
190  throw e;
191  }
192  }
193  }
194  if (nebulaSession != null) {
195  nebulaSession.release();
196  sessionList.remove(nebulaSession);
197  }
198  return resultSet;
199  }
200 
201 
209  @Deprecated
210  public ResultSet execute(String stmt, Map<String, Object> parameterMap)
213  stmtCheck(stmt);
214  checkSessionPool();
215  NebulaSession nebulaSession = getSession();
216  ResultSet resultSet;
217  try {
218  resultSet = nebulaSession.executeWithParameter(stmt, parameterMap);
219 
220  // re-execute for session error
221  if (isSessionError(resultSet)) {
222  sessionList.remove(nebulaSession);
223  nebulaSession = getSession();
224  resultSet = nebulaSession.executeWithParameter(stmt, parameterMap);
225  }
226  } catch (IOErrorException e) {
227  useSpace(nebulaSession, null);
228  throw e;
229  }
230 
231  useSpace(nebulaSession, resultSet);
232  return resultSet;
233  }
234 
235  public String executeJson(String stmt)
238  return executeJsonWithParameter(stmt, (Map<String, Object>) Collections.EMPTY_MAP);
239  }
240 
241  public String executeJsonWithParameter(String stmt,
242  Map<String, Object> parameterMap)
243  throws ClientServerIncompatibleException, AuthFailedException,
244  IOErrorException, BindSpaceFailedException {
245  stmtCheck(stmt);
246  checkSessionPool();
247  NebulaSession nebulaSession = getSession();
248  String result;
249  try {
250  result = nebulaSession.executeJsonWithParameter(stmt, parameterMap);
251 
252  // re-execute for session error
253  if (isSessionErrorForJson(result)) {
254  sessionList.remove(nebulaSession);
255  nebulaSession = getSession();
256  result = nebulaSession.executeJsonWithParameter(stmt, parameterMap);
257  }
258  } catch (IOErrorException e) {
259  if (e.getType() == IOErrorException.E_CONNECT_BROKEN) {
260  sessionList.remove(nebulaSession);
261  nebulaSession = getSession();
262  result = nebulaSession.executeJsonWithParameter(stmt, parameterMap);
263  return result;
264  }
265  useSpace(nebulaSession, null);
266  throw e;
267  }
268 
269  useSpaceForJson(nebulaSession, result);
270  return result;
271  }
272 
273 
277  public void close() {
278  if (isClosed.get()) {
279  return;
280  }
281 
282  if (isClosed.compareAndSet(false, true)) {
283  for (NebulaSession nebulaSession : sessionList) {
284  nebulaSession.release();
285  }
286  sessionList.clear();
287  if (!healthCheckSchedule.isShutdown()) {
288  healthCheckSchedule.shutdown();
289  }
290  if (!sessionQueueMaintainSchedule.isShutdown()) {
291  sessionQueueMaintainSchedule.shutdown();
292  }
293  }
294  }
295 
296 
300  public boolean isActive() {
301  return hasInit.get();
302  }
303 
307  public boolean isClosed() {
308  return isClosed.get();
309  }
310 
314  public int getSessionNums() {
315  return sessionList.size();
316  }
317 
321  public int getIdleSessionNums() {
322  return idleSessionSize.get();
323  }
324 
325 
329  private void releaseSession(NebulaSession nebulaSession) {
330  nebulaSession.isUsedAndSetIdle();
331  idleSessionSize.incrementAndGet();
332  }
333 
334 
338  private void checkSession() {
339  for (NebulaSession nebulaSession : sessionList) {
340  if (nebulaSession.isIdleAndSetUsed()) {
341  try {
342  idleSessionSize.decrementAndGet();
343  nebulaSession.execute("YIELD 1");
344  nebulaSession.isUsedAndSetIdle();
345  idleSessionSize.incrementAndGet();
346  } catch (IOErrorException e) {
347  log.error("session ping error, {}, remove current session.", e.getMessage());
348  nebulaSession.release();
349  sessionList.remove(nebulaSession);
350  }
351  }
352  }
353  }
354 
358  private void updateSessionQueue() {
359  // remove the idle sessions
360  if (idleSessionSize.get() > minSessionSize) {
361  synchronized (this) {
362  for (NebulaSession nebulaSession : sessionList) {
363  if (nebulaSession.isIdle()) {
364  nebulaSession.release();
365  sessionList.remove(nebulaSession);
366  if (idleSessionSize.decrementAndGet() <= minSessionSize) {
367  break;
368  }
369  }
370  }
371  }
372  }
373  }
374 
381  private NebulaSession createSessionObject(SessionState state)
382  throws ClientServerIncompatibleException, AuthFailedException,
383  IOErrorException, BindSpaceFailedException {
384  SyncConnection connection = new SyncConnection();
385  int tryConnect = sessionPoolConfig.getGraphAddressList().size();
386  // reconnect with all available address
387  while (tryConnect-- > 0) {
388  try {
389  if (sessionPoolConfig.isEnableSsl()) {
390  connection.open(getAddress(), sessionPoolConfig.getTimeout(),
391  sessionPoolConfig.getSslParam(),
392  sessionPoolConfig.isUseHttp2(),
393  sessionPoolConfig.getCustomHeaders());
394  } else {
395  connection.open(getAddress(), sessionPoolConfig.getTimeout(),
396  sessionPoolConfig.isUseHttp2(),
397  sessionPoolConfig.getCustomHeaders());
398  }
399  break;
400  } catch (Exception e) {
401  if (tryConnect == 0 || !reconnect) {
402  throw e;
403  } else {
404  log.warn("connect failed, " + e.getMessage());
405  }
406  }
407  }
408 
409  AuthResult authResult;
410  try {
411  authResult = connection.authenticate(sessionPoolConfig.getUsername(),
412  sessionPoolConfig.getPassword());
413  } catch (AuthFailedException e) {
414  log.error(e.getMessage());
415  if (e.getMessage().toLowerCase().contains("user not exist")
416  || e.getMessage().toLowerCase().contains("invalid password")) {
417  // close the session pool
418  close();
419  } else {
420  // just close the connection
421  connection.close();
422  }
423  throw e;
424  }
425 
426  NebulaSession nebulaSession = new NebulaSession(connection, authResult.getSessionId(),
427  authResult.getTimezoneOffset(), state);
428  ResultSet result = null;
429  try {
430  result = nebulaSession.execute(useSpace);
431  } catch (IOErrorException e) {
432  log.error("binding space failed,", e);
433  nebulaSession.release();
434  throw new BindSpaceFailedException("binding space failed:" + e.getMessage());
435  }
436  if (!result.isSucceeded()) {
437  nebulaSession.release();
438  throw new BindSpaceFailedException(result.getErrorMessage());
439  }
440  sessionList.add(nebulaSession);
441  return nebulaSession;
442  }
443 
444 
445  public HostAddress getAddress() {
446  List<HostAddress> addresses = sessionPoolConfig.getGraphAddressList();
447  int newPos = (pos.getAndIncrement()) % addresses.size();
448  return addresses.get(newPos);
449  }
450 
457  private void useSpace(NebulaSession nebulaSession, ResultSet resultSet)
458  throws IOErrorException {
459  if (resultSet == null) {
460  nebulaSession.release();
461  sessionList.remove(nebulaSession);
462  return;
463  }
464  // space has been drop, close the SessionPool
465  if (resultSet.getSpaceName().trim().isEmpty()) {
466  log.warn("space {} has been drop, close the SessionPool.", spaceName);
467  close();
468  return;
469  }
470  // re-bind the configured spaceName, if bind failed, then remove this session.
471  if (!spaceName.equals(resultSet.getSpaceName())) {
472  ResultSet switchSpaceResult = nebulaSession.execute(useSpace);
473  if (!switchSpaceResult.isSucceeded()) {
474  log.warn("Bind Space failed, {}", switchSpaceResult.getErrorMessage());
475  nebulaSession.release();
476  sessionList.remove(nebulaSession);
477  return;
478  }
479  }
480  releaseSession(nebulaSession);
481  }
482 
489  private void useSpaceForJson(NebulaSession nebulaSession, String result)
490  throws IOErrorException {
491  String responseSpaceName =
492  (String) JSON.parseObject(result).getJSONArray("results")
493  .getJSONObject(0).get("spaceName");
494  if (!spaceName.equals(responseSpaceName)) {
495  nebulaSession.execute(useSpace);
496  }
497  releaseSession(nebulaSession);
498  }
499 
500 
501  private boolean isSessionError(ResultSet resultSet) {
502  return resultSet != null
503  && (resultSet.getErrorCode() == ErrorCode.E_SESSION_INVALID.getValue()
504  || resultSet.getErrorCode() == ErrorCode.E_SESSION_NOT_FOUND.getValue()
505  || resultSet.getErrorCode() == ErrorCode.E_SESSION_TIMEOUT.getValue());
506  }
507 
508  private boolean isSessionErrorForJson(String result) {
509  if (result == null) {
510  return true;
511  }
512  int code = JSON.parseObject(result).getJSONArray("errors")
513  .getJSONObject(0).getIntValue("code");
514  return code == ErrorCode.E_SESSION_INVALID.getValue()
515  || code == ErrorCode.E_SESSION_NOT_FOUND.getValue()
516  || code == ErrorCode.E_SESSION_TIMEOUT.getValue();
517  }
518 
519 
520  private void checkSessionPool() {
521  if (!hasInit.get()) {
522  throw new RuntimeException("The SessionPool has not been initialized, "
523  + "please call init() first.");
524  }
525  if (isClosed.get()) {
526  throw new RuntimeException("The SessionPool has been closed.");
527  }
528  }
529 
530 
531  private void stmtCheck(String stmt) {
532  if (stmt == null || stmt.trim().isEmpty()) {
533  throw new IllegalArgumentException("statement is null.");
534  }
535 
536  if (stmt.trim().toLowerCase().startsWith("use") && stmt.trim().split(" ").length == 2) {
537  throw new IllegalArgumentException("`USE SPACE` alone is forbidden.");
538  }
539  }
540 }
boolean isClosed()
if the SessionPool is closed
void close()
close the session pool
ResultSet execute(String stmt, Map< String, Object > parameterMap)
Execute the nGql sentence with parameter.
boolean isActive()
if the SessionPool has been initialized
ResultSet execute(String stmt)
Execute the nGql sentence.
int getIdleSessionNums()
get the number of idle Session
boolean init()
init the SessionPool this function is moved into SessionPool's constructor, no need to call it manual...
int getSessionNums()
get the number of all Session
int getErrorCode()
get errorCode of execute result
Definition: ResultSet.java:160
String getErrorMessage()
get the error message of the execute result
Definition: ResultSet.java:180
boolean isSucceeded()
the execute result is succeeded
Definition: ResultSet.java:144