NebulaGraph Java Client  release-3.6
All Classes Functions Variables
SessionPool.java
1 /* Copyright (c) 2022 vesoft inc. All rights reserved.
2  *
3  * This source code is licensed under Apache 2.0 License.
4  */
5 
6 package com.vesoft.nebula.client.graph;
7 
8 import com.alibaba.fastjson.JSON;
9 import com.vesoft.nebula.ErrorCode;
10 import com.vesoft.nebula.client.graph.data.HostAddress;
11 import com.vesoft.nebula.client.graph.data.ResultSet;
12 import com.vesoft.nebula.client.graph.exception.AuthFailedException;
13 import com.vesoft.nebula.client.graph.exception.BindSpaceFailedException;
14 import com.vesoft.nebula.client.graph.exception.ClientServerIncompatibleException;
15 import com.vesoft.nebula.client.graph.exception.IOErrorException;
16 import com.vesoft.nebula.client.graph.net.AuthResult;
17 import com.vesoft.nebula.client.graph.net.SessionState;
18 import com.vesoft.nebula.client.graph.net.SyncConnection;
19 import java.io.Serializable;
20 import java.util.List;
21 import java.util.Map;
22 import java.util.concurrent.CopyOnWriteArrayList;
23 import java.util.concurrent.Executors;
24 import java.util.concurrent.ScheduledExecutorService;
25 import java.util.concurrent.TimeUnit;
26 import java.util.concurrent.atomic.AtomicBoolean;
27 import java.util.concurrent.atomic.AtomicInteger;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
30 
31 public class SessionPool implements Serializable {
32 
33  private static final long serialVersionUID = 6051248334277617891L;
34 
35  private final Logger log = LoggerFactory.getLogger(this.getClass());
36 
37  private final ScheduledExecutorService healthCheckSchedule =
38  Executors.newScheduledThreadPool(1);
39  private final ScheduledExecutorService sessionQueueMaintainSchedule =
40  Executors.newScheduledThreadPool(1);
41 
42  public CopyOnWriteArrayList<NebulaSession> sessionList = new CopyOnWriteArrayList<>();
43  public AtomicInteger idleSessionSize = new AtomicInteger(0);
44  public AtomicBoolean hasInit = new AtomicBoolean(false);
45  public AtomicBoolean isClosed = new AtomicBoolean(false);
46 
47  private final AtomicInteger pos = new AtomicInteger(0);
48 
49  private final SessionPoolConfig sessionPoolConfig;
50  private final int minSessionSize;
51  private final int maxSessionSize;
52  private final int cleanTime;
53  private final int healthCheckTime;
54  private final int retryTimes;
55  private final int intervalTime;
56  private final boolean reconnect;
57  private final String spaceName;
58  private final String useSpace;
59 
60 
61  public SessionPool(SessionPoolConfig poolConfig) {
62  this.sessionPoolConfig = poolConfig;
63  this.minSessionSize = poolConfig.getMinSessionSize();
64  this.maxSessionSize = poolConfig.getMaxSessionSize();
65  this.cleanTime = poolConfig.getCleanTime();
66  this.retryTimes = poolConfig.getRetryTimes();
67  this.intervalTime = poolConfig.getIntervalTime();
68  this.reconnect = poolConfig.isReconnect();
69  this.healthCheckTime = poolConfig.getHealthCheckTime();
70  this.spaceName = poolConfig.getSpaceName();
71  useSpace = "USE `" + spaceName + "`;";
72  init();
73  }
74 
75 
79  private synchronized NebulaSession getSession() throws ClientServerIncompatibleException,
81  int retry = sessionPoolConfig.getRetryConnectTimes();
82  while (retry-- >= 0) {
83  // if there are idle sessions, get session from queue
84  if (idleSessionSize.get() > 0) {
85  for (NebulaSession nebulaSession : sessionList) {
86  if (nebulaSession.isIdleAndSetUsed()) {
87  idleSessionSize.decrementAndGet();
88  return nebulaSession;
89  }
90  }
91  }
92  // if session size is less than max size, get session from pool
93  if (sessionList.size() < maxSessionSize) {
94  return createSessionObject(SessionState.USED);
95  }
96  // there's no available session, wait for SessionPoolConfig.getWaitTime and re-get
97  try {
98  Thread.sleep(sessionPoolConfig.getWaitTime());
99  } catch (InterruptedException e) {
100  log.error("getSession error when wait for idle sessions, ", e);
101  throw new RuntimeException(e);
102  }
103  }
104 
105  // if session size is equal to max size and no idle session here, throw exception
106  throw new RuntimeException("no extra session available");
107  }
108 
109 
114  @Deprecated
115  public boolean init() {
116  if (hasInit.get()) {
117  return true;
118  }
119 
120  while (sessionList.size() < minSessionSize) {
121  try {
122  createSessionObject(SessionState.IDLE);
123  idleSessionSize.incrementAndGet();
124  } catch (Exception e) {
125  log.error("SessionPool init failed. ");
126  throw new RuntimeException("create session failed.", e);
127  }
128  }
129  healthCheckSchedule.scheduleAtFixedRate(this::checkSession, 0, healthCheckTime,
130  TimeUnit.SECONDS);
131  sessionQueueMaintainSchedule.scheduleAtFixedRate(this::updateSessionQueue, 0, cleanTime,
132  TimeUnit.SECONDS);
133  hasInit.compareAndSet(false, true);
134  return true;
135  }
136 
137 
145  public ResultSet execute(String stmt) throws IOErrorException,
147  stmtCheck(stmt);
148  checkSessionPool();
149  NebulaSession nebulaSession = null;
150  ResultSet resultSet = null;
151  int tryTimes = 0;
152  while (tryTimes++ <= retryTimes) {
153  try {
154  nebulaSession = getSession();
155  resultSet = nebulaSession.execute(stmt);
156  if (resultSet.isSucceeded()
157  || resultSet.getErrorCode() == ErrorCode.E_SEMANTIC_ERROR.getValue()
158  || resultSet.getErrorCode() == ErrorCode.E_SYNTAX_ERROR.getValue()) {
159  releaseSession(nebulaSession);
160  return resultSet;
161  }
162  log.warn(String.format("execute error, code: %d, message: %s, retry: %d",
163  resultSet.getErrorCode(), resultSet.getErrorMessage(), tryTimes));
164  nebulaSession.release();
165  sessionList.remove(nebulaSession);
166  try {
167  Thread.sleep(intervalTime);
168  } catch (InterruptedException interruptedException) {
169  // ignore
170  }
172  // will never get here.
174  throw e;
175  } catch (IOErrorException e) {
176  if (nebulaSession != null) {
177  nebulaSession.release();
178  sessionList.remove(nebulaSession);
179  }
180  if (tryTimes < retryTimes) {
181  log.warn(String.format("execute failed for IOErrorException, message: %s, "
182  + "retry: %d", e.getMessage(), tryTimes));
183  try {
184  Thread.sleep(intervalTime);
185  } catch (InterruptedException interruptedException) {
186  // ignore
187  }
188  } else {
189  throw e;
190  }
191  }
192  }
193  if (nebulaSession != null) {
194  nebulaSession.release();
195  sessionList.remove(nebulaSession);
196  }
197  return resultSet;
198  }
199 
200 
208  @Deprecated
209  public ResultSet execute(String stmt, Map<String, Object> parameterMap)
212  stmtCheck(stmt);
213  checkSessionPool();
214  NebulaSession nebulaSession = getSession();
215  ResultSet resultSet;
216  try {
217  resultSet = nebulaSession.executeWithParameter(stmt, parameterMap);
218 
219  // re-execute for session error
220  if (isSessionError(resultSet)) {
221  sessionList.remove(nebulaSession);
222  nebulaSession = getSession();
223  resultSet = nebulaSession.executeWithParameter(stmt, parameterMap);
224  }
225  } catch (IOErrorException e) {
226  useSpace(nebulaSession, null);
227  throw e;
228  }
229 
230  useSpace(nebulaSession, resultSet);
231  return resultSet;
232  }
233 
234 
238  public void close() {
239  if (isClosed.get()) {
240  return;
241  }
242 
243  if (isClosed.compareAndSet(false, true)) {
244  for (NebulaSession nebulaSession : sessionList) {
245  nebulaSession.release();
246  }
247  sessionList.clear();
248  if (!healthCheckSchedule.isShutdown()) {
249  healthCheckSchedule.shutdown();
250  }
251  if (!sessionQueueMaintainSchedule.isShutdown()) {
252  sessionQueueMaintainSchedule.shutdown();
253  }
254  }
255  }
256 
257 
261  public boolean isActive() {
262  return hasInit.get();
263  }
264 
268  public boolean isClosed() {
269  return isClosed.get();
270  }
271 
275  public int getSessionNums() {
276  return sessionList.size();
277  }
278 
282  public int getIdleSessionNums() {
283  return idleSessionSize.get();
284  }
285 
286 
290  private void releaseSession(NebulaSession nebulaSession) {
291  nebulaSession.isUsedAndSetIdle();
292  idleSessionSize.incrementAndGet();
293  }
294 
295 
299  private void checkSession() {
300  for (NebulaSession nebulaSession : sessionList) {
301  if (nebulaSession.isIdleAndSetUsed()) {
302  try {
303  idleSessionSize.decrementAndGet();
304  nebulaSession.execute("YIELD 1");
305  nebulaSession.isUsedAndSetIdle();
306  idleSessionSize.incrementAndGet();
307  } catch (IOErrorException e) {
308  log.error("session ping error, {}, remove current session.", e.getMessage());
309  nebulaSession.release();
310  sessionList.remove(nebulaSession);
311  }
312  }
313  }
314  }
315 
319  private void updateSessionQueue() {
320  // remove the idle sessions
321  if (idleSessionSize.get() > minSessionSize) {
322  synchronized (this) {
323  for (NebulaSession nebulaSession : sessionList) {
324  if (nebulaSession.isIdle()) {
325  nebulaSession.release();
326  sessionList.remove(nebulaSession);
327  if (idleSessionSize.decrementAndGet() <= minSessionSize) {
328  break;
329  }
330  }
331  }
332  }
333  }
334  }
335 
342  private NebulaSession createSessionObject(SessionState state)
343  throws ClientServerIncompatibleException, AuthFailedException,
344  IOErrorException, BindSpaceFailedException {
345  SyncConnection connection = new SyncConnection();
346  int tryConnect = sessionPoolConfig.getGraphAddressList().size();
347  // reconnect with all available address
348  while (tryConnect-- > 0) {
349  try {
350  if (sessionPoolConfig.isEnableSsl()) {
351  connection.open(getAddress(), sessionPoolConfig.getTimeout(),
352  sessionPoolConfig.getSslParam(),
353  sessionPoolConfig.isUseHttp2(),
354  sessionPoolConfig.getCustomHeaders());
355  } else {
356  connection.open(getAddress(), sessionPoolConfig.getTimeout(),
357  sessionPoolConfig.isUseHttp2(),
358  sessionPoolConfig.getCustomHeaders());
359  }
360  break;
361  } catch (Exception e) {
362  if (tryConnect == 0 || !reconnect) {
363  throw e;
364  } else {
365  log.warn("connect failed, " + e.getMessage());
366  }
367  }
368  }
369 
370  AuthResult authResult;
371  try {
372  authResult = connection.authenticate(sessionPoolConfig.getUsername(),
373  sessionPoolConfig.getPassword());
374  } catch (AuthFailedException e) {
375  log.error(e.getMessage());
376  close();
377  throw e;
378  }
379 
380  NebulaSession nebulaSession = new NebulaSession(connection, authResult.getSessionId(),
381  authResult.getTimezoneOffset(), state);
382  ResultSet result = nebulaSession.execute(useSpace);
383  if (!result.isSucceeded()) {
384  nebulaSession.release();
385  throw new BindSpaceFailedException(result.getErrorMessage());
386  }
387  sessionList.add(nebulaSession);
388  return nebulaSession;
389  }
390 
391 
392  public HostAddress getAddress() {
393  List<HostAddress> addresses = sessionPoolConfig.getGraphAddressList();
394  int newPos = (pos.getAndIncrement()) % addresses.size();
395  return addresses.get(newPos);
396  }
397 
404  private void useSpace(NebulaSession nebulaSession, ResultSet resultSet)
405  throws IOErrorException {
406  if (resultSet == null) {
407  nebulaSession.release();
408  sessionList.remove(nebulaSession);
409  return;
410  }
411  // space has been drop, close the SessionPool
412  if (resultSet.getSpaceName().trim().isEmpty()) {
413  log.warn("space {} has been drop, close the SessionPool.", spaceName);
414  close();
415  return;
416  }
417  // re-bind the configured spaceName, if bind failed, then remove this session.
418  if (!spaceName.equals(resultSet.getSpaceName())) {
419  ResultSet switchSpaceResult = nebulaSession.execute(useSpace);
420  if (!switchSpaceResult.isSucceeded()) {
421  log.warn("Bind Space failed, {}", switchSpaceResult.getErrorMessage());
422  nebulaSession.release();
423  sessionList.remove(nebulaSession);
424  return;
425  }
426  }
427  releaseSession(nebulaSession);
428  }
429 
436  private void useSpaceForJson(NebulaSession nebulaSession, String result)
437  throws IOErrorException {
438  String responseSpaceName =
439  (String) JSON.parseObject(result).getJSONArray("results")
440  .getJSONObject(0).get("spaceName");
441  if (!spaceName.equals(responseSpaceName)) {
442  nebulaSession.execute(useSpace);
443  }
444  releaseSession(nebulaSession);
445  }
446 
447 
448  private boolean isSessionError(ResultSet resultSet) {
449  return resultSet != null
450  && (resultSet.getErrorCode() == ErrorCode.E_SESSION_INVALID.getValue()
451  || resultSet.getErrorCode() == ErrorCode.E_SESSION_NOT_FOUND.getValue()
452  || resultSet.getErrorCode() == ErrorCode.E_SESSION_TIMEOUT.getValue());
453  }
454 
455 
456  private void checkSessionPool() {
457  if (!hasInit.get()) {
458  throw new RuntimeException("The SessionPool has not been initialized, "
459  + "please call init() first.");
460  }
461  if (isClosed.get()) {
462  throw new RuntimeException("The SessionPool has been closed.");
463  }
464  }
465 
466 
467  private void stmtCheck(String stmt) {
468  if (stmt == null || stmt.trim().isEmpty()) {
469  throw new IllegalArgumentException("statement is null.");
470  }
471 
472  if (stmt.trim().toLowerCase().startsWith("use") && stmt.trim().split(" ").length == 2) {
473  throw new IllegalArgumentException("`USE SPACE` alone is forbidden.");
474  }
475  }
476 }
boolean isClosed()
if the SessionPool is closed
void close()
close the session pool
ResultSet execute(String stmt, Map< String, Object > parameterMap)
Execute the nGql sentence with parameter.
boolean isActive()
if the SessionPool has been initialized
ResultSet execute(String stmt)
Execute the nGql sentence.
int getIdleSessionNums()
get the number of idle Session
boolean init()
init the SessionPool this function is moved into SessionPool's constructor, no need to call it manual...
int getSessionNums()
get the number of all Session
int getErrorCode()
get errorCode of execute result
Definition: ResultSet.java:160
String getErrorMessage()
get the error message of the execute result
Definition: ResultSet.java:180
boolean isSucceeded()
the execute result is succeeded
Definition: ResultSet.java:144