6 package com.vesoft.nebula.client.graph;
8 import com.alibaba.fastjson.JSON;
9 import com.vesoft.nebula.ErrorCode;
10 import com.vesoft.nebula.Value;
11 import com.vesoft.nebula.client.graph.data.HostAddress;
12 import com.vesoft.nebula.client.graph.data.ResultSet;
13 import com.vesoft.nebula.client.graph.exception.AuthFailedException;
14 import com.vesoft.nebula.client.graph.exception.BindSpaceFailedException;
15 import com.vesoft.nebula.client.graph.exception.ClientServerIncompatibleException;
16 import com.vesoft.nebula.client.graph.exception.IOErrorException;
17 import com.vesoft.nebula.client.graph.net.AuthResult;
18 import com.vesoft.nebula.client.graph.net.Session;
19 import com.vesoft.nebula.client.graph.net.SessionState;
20 import com.vesoft.nebula.client.graph.net.SyncConnection;
21 import java.io.Serializable;
22 import java.util.Collections;
23 import java.util.HashMap;
24 import java.util.List;
26 import java.util.concurrent.CopyOnWriteArrayList;
27 import java.util.concurrent.Executors;
28 import java.util.concurrent.ScheduledExecutorService;
29 import java.util.concurrent.TimeUnit;
30 import java.util.concurrent.atomic.AtomicBoolean;
31 import java.util.concurrent.atomic.AtomicInteger;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
37 private static final long serialVersionUID = 6051248334277617891L;
39 private final Logger log = LoggerFactory.getLogger(this.getClass());
41 private final ScheduledExecutorService healthCheckSchedule =
42 Executors.newScheduledThreadPool(1);
43 private final ScheduledExecutorService sessionQueueMaintainSchedule =
44 Executors.newScheduledThreadPool(1);
46 public CopyOnWriteArrayList<NebulaSession> sessionList =
new CopyOnWriteArrayList<>();
47 public AtomicInteger idleSessionSize =
new AtomicInteger(0);
48 public AtomicBoolean hasInit =
new AtomicBoolean(
false);
49 public AtomicBoolean isClosed =
new AtomicBoolean(
false);
51 private final AtomicInteger pos =
new AtomicInteger(0);
54 private final int minSessionSize;
55 private final int maxSessionSize;
56 private final int cleanTime;
57 private final int healthCheckTime;
58 private final int retryTimes;
59 private final int intervalTime;
60 private final boolean reconnect;
61 private final String spaceName;
62 private final String useSpace;
66 this.sessionPoolConfig = poolConfig;
67 this.minSessionSize = poolConfig.getMinSessionSize();
68 this.maxSessionSize = poolConfig.getMaxSessionSize();
69 this.cleanTime = poolConfig.getCleanTime();
70 this.retryTimes = poolConfig.getRetryTimes();
71 this.intervalTime = poolConfig.getIntervalTime();
72 this.reconnect = poolConfig.isReconnect();
73 this.healthCheckTime = poolConfig.getHealthCheckTime();
74 this.spaceName = poolConfig.getSpaceName();
75 useSpace =
"USE `" + spaceName +
"`;";
85 int retry = sessionPoolConfig.getRetryConnectTimes();
86 while (retry-- >= 0) {
88 if (idleSessionSize.get() > 0) {
90 if (nebulaSession.isIdleAndSetUsed()) {
91 idleSessionSize.decrementAndGet();
97 if (sessionList.size() < maxSessionSize) {
102 Thread.sleep(sessionPoolConfig.getWaitTime());
103 }
catch (InterruptedException e) {
104 log.error(
"getSession error when wait for idle sessions, ", e);
105 throw new RuntimeException(e);
110 throw new RuntimeException(
"no extra session available");
124 while (sessionList.size() < minSessionSize) {
127 idleSessionSize.incrementAndGet();
128 }
catch (Exception e) {
129 log.error(
"SessionPool init failed. ");
130 throw new RuntimeException(
"create session failed.", e);
133 healthCheckSchedule.scheduleAtFixedRate(this::checkSession, 0, healthCheckTime,
135 sessionQueueMaintainSchedule.scheduleAtFixedRate(this::updateSessionQueue, 0, cleanTime,
137 hasInit.compareAndSet(
false,
true);
156 while (tryTimes++ <= retryTimes) {
158 nebulaSession = getSession();
159 resultSet = nebulaSession.execute(stmt);
161 || resultSet.
getErrorCode() == ErrorCode.E_SEMANTIC_ERROR.getValue()
162 || resultSet.
getErrorCode() == ErrorCode.E_SYNTAX_ERROR.getValue()) {
163 releaseSession(nebulaSession);
166 log.warn(String.format(
"execute error, code: %d, message: %s, retry: %d",
168 nebulaSession.release();
169 sessionList.remove(nebulaSession);
171 Thread.sleep(intervalTime);
172 }
catch (InterruptedException interruptedException) {
180 if (nebulaSession !=
null) {
181 nebulaSession.release();
182 sessionList.remove(nebulaSession);
184 if (tryTimes < retryTimes) {
185 log.warn(String.format(
"execute failed for IOErrorException, message: %s, "
186 +
"retry: %d", e.getMessage(), tryTimes));
188 Thread.sleep(intervalTime);
189 }
catch (InterruptedException interruptedException) {
197 if (nebulaSession !=
null) {
198 nebulaSession.release();
199 sessionList.remove(nebulaSession);
220 resultSet = nebulaSession.executeWithParameter(stmt, parameterMap);
223 if (isSessionError(resultSet)) {
224 sessionList.remove(nebulaSession);
225 nebulaSession = getSession();
226 resultSet = nebulaSession.executeWithParameter(stmt, parameterMap);
229 useSpace(nebulaSession,
null);
233 useSpace(nebulaSession, resultSet);
237 public ResultSet executeWithTimeout(String stmt,
240 return executeWithParameterTimeout(stmt,
241 (Map<String, Object>) Collections.EMPTY_MAP,
245 public ResultSet executeWithParameterTimeout(String stmt,
246 Map<String, Object> parameterMap,
248 throws IOErrorException, AuthFailedException, BindSpaceFailedException {
249 if (timeoutMs <= 0) {
250 throw new IllegalArgumentException(
"timeout should be a positive number");
254 NebulaSession nebulaSession =
null;
255 ResultSet resultSet =
null;
257 while (tryTimes++ <= retryTimes) {
259 nebulaSession = getSession();
260 resultSet = nebulaSession.executeWithParameterTimeout(stmt,
263 if (resultSet.isSucceeded()
264 || resultSet.getErrorCode() == ErrorCode.E_SEMANTIC_ERROR.getValue()
265 || resultSet.getErrorCode() == ErrorCode.E_SYNTAX_ERROR.getValue()) {
266 releaseSession(nebulaSession);
269 log.warn(String.format(
"execute error, code: %d, message: %s, retry: %d",
270 resultSet.getErrorCode(),
271 resultSet.getErrorMessage(),
273 nebulaSession.release();
274 sessionList.remove(nebulaSession);
276 Thread.sleep(intervalTime);
277 }
catch (InterruptedException interruptedException) {
280 }
catch (ClientServerIncompatibleException e) {
282 }
catch (AuthFailedException | BindSpaceFailedException e) {
284 }
catch (IOErrorException e) {
285 if (nebulaSession !=
null) {
286 nebulaSession.release();
287 sessionList.remove(nebulaSession);
289 if (tryTimes < retryTimes) {
290 log.warn(String.format(
"execute failed for IOErrorException, message: %s, "
291 +
"retry: %d", e.getMessage(), tryTimes));
293 Thread.sleep(intervalTime);
294 }
catch (InterruptedException interruptedException) {
302 if (nebulaSession !=
null) {
303 nebulaSession.release();
304 sessionList.remove(nebulaSession);
309 public String executeJson(String stmt)
310 throws ClientServerIncompatibleException, AuthFailedException,
311 IOErrorException, BindSpaceFailedException {
312 return executeJsonWithParameter(stmt, (Map<String, Object>) Collections.EMPTY_MAP);
315 public String executeJsonWithParameter(String stmt,
316 Map<String, Object> parameterMap)
317 throws ClientServerIncompatibleException, AuthFailedException,
318 IOErrorException, BindSpaceFailedException {
321 NebulaSession nebulaSession = getSession();
324 result = nebulaSession.executeJsonWithParameter(stmt, parameterMap);
327 if (isSessionErrorForJson(result)) {
328 sessionList.remove(nebulaSession);
329 nebulaSession = getSession();
330 result = nebulaSession.executeJsonWithParameter(stmt, parameterMap);
332 }
catch (IOErrorException e) {
333 if (e.getType() == IOErrorException.E_CONNECT_BROKEN) {
334 sessionList.remove(nebulaSession);
335 nebulaSession = getSession();
336 result = nebulaSession.executeJsonWithParameter(stmt, parameterMap);
339 useSpace(nebulaSession,
null);
343 useSpaceForJson(nebulaSession, result);
352 if (isClosed.get()) {
356 if (isClosed.compareAndSet(
false,
true)) {
358 nebulaSession.release();
361 if (!healthCheckSchedule.isShutdown()) {
362 healthCheckSchedule.shutdown();
364 if (!sessionQueueMaintainSchedule.isShutdown()) {
365 sessionQueueMaintainSchedule.shutdown();
375 return hasInit.get();
382 return isClosed.get();
389 return sessionList.size();
396 return idleSessionSize.get();
404 nebulaSession.isUsedAndSetIdle();
405 idleSessionSize.incrementAndGet();
412 private void checkSession() {
413 for (NebulaSession nebulaSession : sessionList) {
414 if (nebulaSession.isIdleAndSetUsed()) {
416 idleSessionSize.decrementAndGet();
417 nebulaSession.execute(
"YIELD 1");
418 nebulaSession.isUsedAndSetIdle();
419 idleSessionSize.incrementAndGet();
420 }
catch (IOErrorException e) {
421 log.error(
"session ping error, {}, remove current session.", e.getMessage());
422 nebulaSession.release();
423 sessionList.remove(nebulaSession);
432 private void updateSessionQueue() {
434 if (idleSessionSize.get() > minSessionSize) {
435 synchronized (
this) {
436 for (NebulaSession nebulaSession : sessionList) {
437 if (nebulaSession.isIdle()) {
438 nebulaSession.release();
439 sessionList.remove(nebulaSession);
440 if (idleSessionSize.decrementAndGet() <= minSessionSize) {
455 private NebulaSession createSessionObject(SessionState state)
456 throws ClientServerIncompatibleException, AuthFailedException,
457 IOErrorException, BindSpaceFailedException {
458 SyncConnection connection =
new SyncConnection();
459 int tryConnect = sessionPoolConfig.getGraphAddressList().size();
461 while (tryConnect-- > 0) {
463 if (sessionPoolConfig.isEnableSsl()) {
464 connection.open(getAddress(), sessionPoolConfig.getTimeout(),
465 sessionPoolConfig.getSslParam(),
466 sessionPoolConfig.isUseHttp2(),
467 sessionPoolConfig.getCustomHeaders());
469 connection.open(getAddress(), sessionPoolConfig.getTimeout(),
470 sessionPoolConfig.isUseHttp2(),
471 sessionPoolConfig.getCustomHeaders());
474 }
catch (Exception e) {
475 if (tryConnect == 0 || !reconnect) {
478 log.warn(
"connect failed, " + e.getMessage());
483 AuthResult authResult;
485 authResult = connection.authenticate(sessionPoolConfig.getUsername(),
486 sessionPoolConfig.getPassword());
487 }
catch (AuthFailedException e) {
488 log.error(e.getMessage());
489 if (e.getMessage().toLowerCase().contains(
"user not exist")
490 || e.getMessage().toLowerCase().contains(
"invalid password")) {
500 NebulaSession nebulaSession =
new NebulaSession(connection, authResult.getSessionId(),
501 authResult.getTimezoneOffset(), state);
502 ResultSet result =
null;
504 result = nebulaSession.execute(useSpace);
505 }
catch (IOErrorException e) {
506 log.error(
"binding space failed,", e);
507 nebulaSession.release();
508 throw new BindSpaceFailedException(
"binding space failed:" + e.getMessage());
510 if (!result.isSucceeded()) {
511 nebulaSession.release();
512 throw new BindSpaceFailedException(result.getErrorMessage());
514 sessionList.add(nebulaSession);
515 return nebulaSession;
519 public HostAddress getAddress() {
520 List<HostAddress> addresses = sessionPoolConfig.getGraphAddressList();
521 int newPos = (pos.getAndIncrement()) % addresses.size();
522 return addresses.get(newPos);
531 private void useSpace(NebulaSession nebulaSession, ResultSet resultSet)
532 throws IOErrorException {
533 if (resultSet ==
null) {
534 nebulaSession.release();
535 sessionList.remove(nebulaSession);
539 if (resultSet.getSpaceName().trim().isEmpty()) {
540 log.warn(
"space {} has been drop, close the SessionPool.", spaceName);
545 if (!spaceName.equals(resultSet.getSpaceName())) {
546 ResultSet switchSpaceResult = nebulaSession.execute(useSpace);
547 if (!switchSpaceResult.isSucceeded()) {
548 log.warn(
"Bind Space failed, {}", switchSpaceResult.getErrorMessage());
549 nebulaSession.release();
550 sessionList.remove(nebulaSession);
554 releaseSession(nebulaSession);
563 private void useSpaceForJson(NebulaSession nebulaSession, String result)
564 throws IOErrorException {
565 String responseSpaceName =
566 (String) JSON.parseObject(result).getJSONArray(
"results")
567 .getJSONObject(0).get(
"spaceName");
568 if (!spaceName.equals(responseSpaceName)) {
569 nebulaSession.execute(useSpace);
571 releaseSession(nebulaSession);
575 private boolean isSessionError(ResultSet resultSet) {
576 return resultSet !=
null
577 && (resultSet.getErrorCode() == ErrorCode.E_SESSION_INVALID.getValue()
578 || resultSet.getErrorCode() == ErrorCode.E_SESSION_NOT_FOUND.getValue()
579 || resultSet.getErrorCode() == ErrorCode.E_SESSION_TIMEOUT.getValue());
582 private boolean isSessionErrorForJson(String result) {
583 if (result ==
null) {
586 int code = JSON.parseObject(result).getJSONArray(
"errors")
587 .getJSONObject(0).getIntValue(
"code");
588 return code == ErrorCode.E_SESSION_INVALID.getValue()
589 || code == ErrorCode.E_SESSION_NOT_FOUND.getValue()
590 || code == ErrorCode.E_SESSION_TIMEOUT.getValue();
594 private void checkSessionPool() {
595 if (!hasInit.get()) {
596 throw new RuntimeException(
"The SessionPool has not been initialized, "
597 +
"please call init() first.");
599 if (isClosed.get()) {
600 throw new RuntimeException(
"The SessionPool has been closed.");
605 private void stmtCheck(String stmt) {
606 if (stmt ==
null || stmt.trim().isEmpty()) {
607 throw new IllegalArgumentException(
"statement is null.");
610 if (stmt.trim().toLowerCase().startsWith(
"use") && stmt.trim().split(
" ").length == 2) {
611 throw new IllegalArgumentException(
"`USE SPACE` alone is forbidden.");
boolean isClosed()
if the SessionPool is closed
void close()
close the session pool
ResultSet execute(String stmt, Map< String, Object > parameterMap)
Execute the nGql sentence with parameter.
boolean isActive()
if the SessionPool has been initialized
ResultSet execute(String stmt)
Execute the nGql sentence.
int getIdleSessionNums()
get the number of idle Session
boolean init()
init the SessionPool this function is moved into SessionPool's constructor, no need to call it manual...
int getSessionNums()
get the number of all Session
int getErrorCode()
get errorCode of execute result
String getErrorMessage()
get the error message of the execute result
boolean isSucceeded()
the execute result is succeeded