6 package com.vesoft.nebula.client.graph;
8 import com.alibaba.fastjson.JSON;
9 import com.vesoft.nebula.ErrorCode;
10 import com.vesoft.nebula.client.graph.data.HostAddress;
11 import com.vesoft.nebula.client.graph.data.ResultSet;
12 import com.vesoft.nebula.client.graph.exception.AuthFailedException;
13 import com.vesoft.nebula.client.graph.exception.BindSpaceFailedException;
14 import com.vesoft.nebula.client.graph.exception.ClientServerIncompatibleException;
15 import com.vesoft.nebula.client.graph.exception.IOErrorException;
16 import com.vesoft.nebula.client.graph.net.AuthResult;
17 import com.vesoft.nebula.client.graph.net.SessionState;
18 import com.vesoft.nebula.client.graph.net.SyncConnection;
19 import java.io.Serializable;
20 import java.util.Collections;
21 import java.util.List;
23 import java.util.concurrent.CopyOnWriteArrayList;
24 import java.util.concurrent.Executors;
25 import java.util.concurrent.ScheduledExecutorService;
26 import java.util.concurrent.TimeUnit;
27 import java.util.concurrent.atomic.AtomicBoolean;
28 import java.util.concurrent.atomic.AtomicInteger;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
34 private static final long serialVersionUID = 6051248334277617891L;
36 private final Logger log = LoggerFactory.getLogger(this.getClass());
38 private final ScheduledExecutorService healthCheckSchedule =
39 Executors.newScheduledThreadPool(1);
40 private final ScheduledExecutorService sessionQueueMaintainSchedule =
41 Executors.newScheduledThreadPool(1);
43 public CopyOnWriteArrayList<NebulaSession> sessionList =
new CopyOnWriteArrayList<>();
44 public AtomicInteger idleSessionSize =
new AtomicInteger(0);
45 public AtomicBoolean hasInit =
new AtomicBoolean(
false);
46 public AtomicBoolean isClosed =
new AtomicBoolean(
false);
48 private final AtomicInteger pos =
new AtomicInteger(0);
51 private final int minSessionSize;
52 private final int maxSessionSize;
53 private final int cleanTime;
54 private final int healthCheckTime;
55 private final int retryTimes;
56 private final int intervalTime;
57 private final boolean reconnect;
58 private final String spaceName;
59 private final String useSpace;
63 this.sessionPoolConfig = poolConfig;
64 this.minSessionSize = poolConfig.getMinSessionSize();
65 this.maxSessionSize = poolConfig.getMaxSessionSize();
66 this.cleanTime = poolConfig.getCleanTime();
67 this.retryTimes = poolConfig.getRetryTimes();
68 this.intervalTime = poolConfig.getIntervalTime();
69 this.reconnect = poolConfig.isReconnect();
70 this.healthCheckTime = poolConfig.getHealthCheckTime();
71 this.spaceName = poolConfig.getSpaceName();
72 useSpace =
"USE `" + spaceName +
"`;";
82 int retry = sessionPoolConfig.getRetryConnectTimes();
83 while (retry-- >= 0) {
85 if (idleSessionSize.get() > 0) {
87 if (nebulaSession.isIdleAndSetUsed()) {
88 idleSessionSize.decrementAndGet();
94 if (sessionList.size() < maxSessionSize) {
99 Thread.sleep(sessionPoolConfig.getWaitTime());
100 }
catch (InterruptedException e) {
101 log.error(
"getSession error when wait for idle sessions, ", e);
102 throw new RuntimeException(e);
107 throw new RuntimeException(
"no extra session available");
121 while (sessionList.size() < minSessionSize) {
124 idleSessionSize.incrementAndGet();
125 }
catch (Exception e) {
126 log.error(
"SessionPool init failed. ");
127 throw new RuntimeException(
"create session failed.", e);
130 healthCheckSchedule.scheduleAtFixedRate(this::checkSession, 0, healthCheckTime,
132 sessionQueueMaintainSchedule.scheduleAtFixedRate(this::updateSessionQueue, 0, cleanTime,
134 hasInit.compareAndSet(
false,
true);
153 while (tryTimes++ <= retryTimes) {
155 nebulaSession = getSession();
156 resultSet = nebulaSession.execute(stmt);
158 || resultSet.
getErrorCode() == ErrorCode.E_SEMANTIC_ERROR.getValue()
159 || resultSet.
getErrorCode() == ErrorCode.E_SYNTAX_ERROR.getValue()) {
160 releaseSession(nebulaSession);
163 log.warn(String.format(
"execute error, code: %d, message: %s, retry: %d",
165 nebulaSession.release();
166 sessionList.remove(nebulaSession);
168 Thread.sleep(intervalTime);
169 }
catch (InterruptedException interruptedException) {
177 if (nebulaSession !=
null) {
178 nebulaSession.release();
179 sessionList.remove(nebulaSession);
181 if (tryTimes < retryTimes) {
182 log.warn(String.format(
"execute failed for IOErrorException, message: %s, "
183 +
"retry: %d", e.getMessage(), tryTimes));
185 Thread.sleep(intervalTime);
186 }
catch (InterruptedException interruptedException) {
194 if (nebulaSession !=
null) {
195 nebulaSession.release();
196 sessionList.remove(nebulaSession);
218 resultSet = nebulaSession.executeWithParameter(stmt, parameterMap);
221 if (isSessionError(resultSet)) {
222 sessionList.remove(nebulaSession);
223 nebulaSession = getSession();
224 resultSet = nebulaSession.executeWithParameter(stmt, parameterMap);
227 useSpace(nebulaSession,
null);
231 useSpace(nebulaSession, resultSet);
235 public String executeJson(String stmt)
238 return executeJsonWithParameter(stmt, (Map<String, Object>) Collections.EMPTY_MAP);
241 public String executeJsonWithParameter(String stmt,
242 Map<String, Object> parameterMap)
243 throws ClientServerIncompatibleException, AuthFailedException,
244 IOErrorException, BindSpaceFailedException {
247 NebulaSession nebulaSession = getSession();
250 result = nebulaSession.executeJsonWithParameter(stmt, parameterMap);
253 if (isSessionErrorForJson(result)) {
254 sessionList.remove(nebulaSession);
255 nebulaSession = getSession();
256 result = nebulaSession.executeJsonWithParameter(stmt, parameterMap);
258 }
catch (IOErrorException e) {
259 if (e.getType() == IOErrorException.E_CONNECT_BROKEN) {
260 sessionList.remove(nebulaSession);
261 nebulaSession = getSession();
262 result = nebulaSession.executeJsonWithParameter(stmt, parameterMap);
265 useSpace(nebulaSession,
null);
269 useSpaceForJson(nebulaSession, result);
278 if (isClosed.get()) {
282 if (isClosed.compareAndSet(
false,
true)) {
284 nebulaSession.release();
287 if (!healthCheckSchedule.isShutdown()) {
288 healthCheckSchedule.shutdown();
290 if (!sessionQueueMaintainSchedule.isShutdown()) {
291 sessionQueueMaintainSchedule.shutdown();
301 return hasInit.get();
308 return isClosed.get();
315 return sessionList.size();
322 return idleSessionSize.get();
330 nebulaSession.isUsedAndSetIdle();
331 idleSessionSize.incrementAndGet();
338 private void checkSession() {
339 for (NebulaSession nebulaSession : sessionList) {
340 if (nebulaSession.isIdleAndSetUsed()) {
342 idleSessionSize.decrementAndGet();
343 nebulaSession.execute(
"YIELD 1");
344 nebulaSession.isUsedAndSetIdle();
345 idleSessionSize.incrementAndGet();
346 }
catch (IOErrorException e) {
347 log.error(
"session ping error, {}, remove current session.", e.getMessage());
348 nebulaSession.release();
349 sessionList.remove(nebulaSession);
358 private void updateSessionQueue() {
360 if (idleSessionSize.get() > minSessionSize) {
361 synchronized (
this) {
362 for (NebulaSession nebulaSession : sessionList) {
363 if (nebulaSession.isIdle()) {
364 nebulaSession.release();
365 sessionList.remove(nebulaSession);
366 if (idleSessionSize.decrementAndGet() <= minSessionSize) {
381 private NebulaSession createSessionObject(SessionState state)
382 throws ClientServerIncompatibleException, AuthFailedException,
383 IOErrorException, BindSpaceFailedException {
384 SyncConnection connection =
new SyncConnection();
385 int tryConnect = sessionPoolConfig.getGraphAddressList().size();
387 while (tryConnect-- > 0) {
389 if (sessionPoolConfig.isEnableSsl()) {
390 connection.open(getAddress(), sessionPoolConfig.getTimeout(),
391 sessionPoolConfig.getSslParam(),
392 sessionPoolConfig.isUseHttp2(),
393 sessionPoolConfig.getCustomHeaders());
395 connection.open(getAddress(), sessionPoolConfig.getTimeout(),
396 sessionPoolConfig.isUseHttp2(),
397 sessionPoolConfig.getCustomHeaders());
400 }
catch (Exception e) {
401 if (tryConnect == 0 || !reconnect) {
404 log.warn(
"connect failed, " + e.getMessage());
409 AuthResult authResult;
411 authResult = connection.authenticate(sessionPoolConfig.getUsername(),
412 sessionPoolConfig.getPassword());
413 }
catch (AuthFailedException e) {
414 log.error(e.getMessage());
415 if (e.getMessage().toLowerCase().contains(
"user not exist")
416 || e.getMessage().toLowerCase().contains(
"invalid password")) {
426 NebulaSession nebulaSession =
new NebulaSession(connection, authResult.getSessionId(),
427 authResult.getTimezoneOffset(), state);
428 ResultSet result =
null;
430 result = nebulaSession.execute(useSpace);
431 }
catch (IOErrorException e) {
432 log.error(
"binding space failed,", e);
433 nebulaSession.release();
434 throw new BindSpaceFailedException(
"binding space failed:" + e.getMessage());
436 if (!result.isSucceeded()) {
437 nebulaSession.release();
438 throw new BindSpaceFailedException(result.getErrorMessage());
440 sessionList.add(nebulaSession);
441 return nebulaSession;
445 public HostAddress getAddress() {
446 List<HostAddress> addresses = sessionPoolConfig.getGraphAddressList();
447 int newPos = (pos.getAndIncrement()) % addresses.size();
448 return addresses.get(newPos);
457 private void useSpace(NebulaSession nebulaSession, ResultSet resultSet)
458 throws IOErrorException {
459 if (resultSet ==
null) {
460 nebulaSession.release();
461 sessionList.remove(nebulaSession);
465 if (resultSet.getSpaceName().trim().isEmpty()) {
466 log.warn(
"space {} has been drop, close the SessionPool.", spaceName);
471 if (!spaceName.equals(resultSet.getSpaceName())) {
472 ResultSet switchSpaceResult = nebulaSession.execute(useSpace);
473 if (!switchSpaceResult.isSucceeded()) {
474 log.warn(
"Bind Space failed, {}", switchSpaceResult.getErrorMessage());
475 nebulaSession.release();
476 sessionList.remove(nebulaSession);
480 releaseSession(nebulaSession);
489 private void useSpaceForJson(NebulaSession nebulaSession, String result)
490 throws IOErrorException {
491 String responseSpaceName =
492 (String) JSON.parseObject(result).getJSONArray(
"results")
493 .getJSONObject(0).get(
"spaceName");
494 if (!spaceName.equals(responseSpaceName)) {
495 nebulaSession.execute(useSpace);
497 releaseSession(nebulaSession);
501 private boolean isSessionError(ResultSet resultSet) {
502 return resultSet !=
null
503 && (resultSet.getErrorCode() == ErrorCode.E_SESSION_INVALID.getValue()
504 || resultSet.getErrorCode() == ErrorCode.E_SESSION_NOT_FOUND.getValue()
505 || resultSet.getErrorCode() == ErrorCode.E_SESSION_TIMEOUT.getValue());
508 private boolean isSessionErrorForJson(String result) {
509 if (result ==
null) {
512 int code = JSON.parseObject(result).getJSONArray(
"errors")
513 .getJSONObject(0).getIntValue(
"code");
514 return code == ErrorCode.E_SESSION_INVALID.getValue()
515 || code == ErrorCode.E_SESSION_NOT_FOUND.getValue()
516 || code == ErrorCode.E_SESSION_TIMEOUT.getValue();
520 private void checkSessionPool() {
521 if (!hasInit.get()) {
522 throw new RuntimeException(
"The SessionPool has not been initialized, "
523 +
"please call init() first.");
525 if (isClosed.get()) {
526 throw new RuntimeException(
"The SessionPool has been closed.");
531 private void stmtCheck(String stmt) {
532 if (stmt ==
null || stmt.trim().isEmpty()) {
533 throw new IllegalArgumentException(
"statement is null.");
536 if (stmt.trim().toLowerCase().startsWith(
"use") && stmt.trim().split(
" ").length == 2) {
537 throw new IllegalArgumentException(
"`USE SPACE` alone is forbidden.");
boolean isClosed()
if the SessionPool is closed
void close()
close the session pool
ResultSet execute(String stmt, Map< String, Object > parameterMap)
Execute the nGql sentence with parameter.
boolean isActive()
if the SessionPool has been initialized
ResultSet execute(String stmt)
Execute the nGql sentence.
int getIdleSessionNums()
get the number of idle Session
boolean init()
init the SessionPool this function is moved into SessionPool's constructor, no need to call it manual...
int getSessionNums()
get the number of all Session
int getErrorCode()
get errorCode of execute result
String getErrorMessage()
get the error message of the execute result
boolean isSucceeded()
the execute result is succeeded