6 package com.vesoft.nebula.client.graph;
8 import com.alibaba.fastjson.JSON;
9 import com.vesoft.nebula.ErrorCode;
10 import com.vesoft.nebula.client.graph.data.HostAddress;
11 import com.vesoft.nebula.client.graph.data.ResultSet;
12 import com.vesoft.nebula.client.graph.exception.AuthFailedException;
13 import com.vesoft.nebula.client.graph.exception.BindSpaceFailedException;
14 import com.vesoft.nebula.client.graph.exception.ClientServerIncompatibleException;
15 import com.vesoft.nebula.client.graph.exception.IOErrorException;
16 import com.vesoft.nebula.client.graph.net.AuthResult;
17 import com.vesoft.nebula.client.graph.net.SessionState;
18 import com.vesoft.nebula.client.graph.net.SyncConnection;
19 import java.io.Serializable;
20 import java.util.List;
22 import java.util.concurrent.CopyOnWriteArrayList;
23 import java.util.concurrent.Executors;
24 import java.util.concurrent.ScheduledExecutorService;
25 import java.util.concurrent.TimeUnit;
26 import java.util.concurrent.atomic.AtomicBoolean;
27 import java.util.concurrent.atomic.AtomicInteger;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
33 private static final long serialVersionUID = 6051248334277617891L;
35 private final Logger log = LoggerFactory.getLogger(this.getClass());
37 private final ScheduledExecutorService healthCheckSchedule =
38 Executors.newScheduledThreadPool(1);
39 private final ScheduledExecutorService sessionQueueMaintainSchedule =
40 Executors.newScheduledThreadPool(1);
42 public CopyOnWriteArrayList<NebulaSession> sessionList =
new CopyOnWriteArrayList<>();
43 public AtomicInteger idleSessionSize =
new AtomicInteger(0);
44 public AtomicBoolean hasInit =
new AtomicBoolean(
false);
45 public AtomicBoolean isClosed =
new AtomicBoolean(
false);
47 private final AtomicInteger pos =
new AtomicInteger(0);
50 private final int minSessionSize;
51 private final int maxSessionSize;
52 private final int cleanTime;
53 private final int healthCheckTime;
54 private final int retryTimes;
55 private final int intervalTime;
56 private final boolean reconnect;
57 private final String spaceName;
58 private final String useSpace;
62 this.sessionPoolConfig = poolConfig;
63 this.minSessionSize = poolConfig.getMinSessionSize();
64 this.maxSessionSize = poolConfig.getMaxSessionSize();
65 this.cleanTime = poolConfig.getCleanTime();
66 this.retryTimes = poolConfig.getRetryTimes();
67 this.intervalTime = poolConfig.getIntervalTime();
68 this.reconnect = poolConfig.isReconnect();
69 this.healthCheckTime = poolConfig.getHealthCheckTime();
70 this.spaceName = poolConfig.getSpaceName();
71 useSpace =
"USE `" + spaceName +
"`;";
81 int retry = sessionPoolConfig.getRetryConnectTimes();
82 while (retry-- >= 0) {
84 if (idleSessionSize.get() > 0) {
86 if (nebulaSession.isIdleAndSetUsed()) {
87 idleSessionSize.decrementAndGet();
93 if (sessionList.size() < maxSessionSize) {
98 Thread.sleep(sessionPoolConfig.getWaitTime());
99 }
catch (InterruptedException e) {
100 log.error(
"getSession error when wait for idle sessions, ", e);
101 throw new RuntimeException(e);
106 throw new RuntimeException(
"no extra session available");
120 while (sessionList.size() < minSessionSize) {
123 idleSessionSize.incrementAndGet();
124 }
catch (Exception e) {
125 log.error(
"SessionPool init failed. ");
126 throw new RuntimeException(
"create session failed.", e);
129 healthCheckSchedule.scheduleAtFixedRate(this::checkSession, 0, healthCheckTime,
131 sessionQueueMaintainSchedule.scheduleAtFixedRate(this::updateSessionQueue, 0, cleanTime,
133 hasInit.compareAndSet(
false,
true);
152 while (tryTimes++ <= retryTimes) {
154 nebulaSession = getSession();
155 resultSet = nebulaSession.execute(stmt);
157 || resultSet.
getErrorCode() == ErrorCode.E_SEMANTIC_ERROR.getValue()
158 || resultSet.
getErrorCode() == ErrorCode.E_SYNTAX_ERROR.getValue()) {
159 releaseSession(nebulaSession);
162 log.warn(String.format(
"execute error, code: %d, message: %s, retry: %d",
164 nebulaSession.release();
165 sessionList.remove(nebulaSession);
167 Thread.sleep(intervalTime);
168 }
catch (InterruptedException interruptedException) {
176 if (nebulaSession !=
null) {
177 nebulaSession.release();
178 sessionList.remove(nebulaSession);
180 if (tryTimes < retryTimes) {
181 log.warn(String.format(
"execute failed for IOErrorException, message: %s, "
182 +
"retry: %d", e.getMessage(), tryTimes));
184 Thread.sleep(intervalTime);
185 }
catch (InterruptedException interruptedException) {
193 if (nebulaSession !=
null) {
194 nebulaSession.release();
195 sessionList.remove(nebulaSession);
217 resultSet = nebulaSession.executeWithParameter(stmt, parameterMap);
220 if (isSessionError(resultSet)) {
221 sessionList.remove(nebulaSession);
222 nebulaSession = getSession();
223 resultSet = nebulaSession.executeWithParameter(stmt, parameterMap);
226 useSpace(nebulaSession,
null);
230 useSpace(nebulaSession, resultSet);
239 if (isClosed.get()) {
243 if (isClosed.compareAndSet(
false,
true)) {
245 nebulaSession.release();
248 if (!healthCheckSchedule.isShutdown()) {
249 healthCheckSchedule.shutdown();
251 if (!sessionQueueMaintainSchedule.isShutdown()) {
252 sessionQueueMaintainSchedule.shutdown();
262 return hasInit.get();
269 return isClosed.get();
276 return sessionList.size();
283 return idleSessionSize.get();
291 nebulaSession.isUsedAndSetIdle();
292 idleSessionSize.incrementAndGet();
299 private void checkSession() {
300 for (NebulaSession nebulaSession : sessionList) {
301 if (nebulaSession.isIdleAndSetUsed()) {
303 idleSessionSize.decrementAndGet();
304 nebulaSession.execute(
"YIELD 1");
305 nebulaSession.isUsedAndSetIdle();
306 idleSessionSize.incrementAndGet();
307 }
catch (IOErrorException e) {
308 log.error(
"session ping error, {}, remove current session.", e.getMessage());
309 nebulaSession.release();
310 sessionList.remove(nebulaSession);
319 private void updateSessionQueue() {
321 if (idleSessionSize.get() > minSessionSize) {
322 synchronized (
this) {
323 for (NebulaSession nebulaSession : sessionList) {
324 if (nebulaSession.isIdle()) {
325 nebulaSession.release();
326 sessionList.remove(nebulaSession);
327 if (idleSessionSize.decrementAndGet() <= minSessionSize) {
342 private NebulaSession createSessionObject(SessionState state)
343 throws ClientServerIncompatibleException, AuthFailedException,
344 IOErrorException, BindSpaceFailedException {
345 SyncConnection connection =
new SyncConnection();
346 int tryConnect = sessionPoolConfig.getGraphAddressList().size();
348 while (tryConnect-- > 0) {
350 if (sessionPoolConfig.isEnableSsl()) {
351 connection.open(getAddress(), sessionPoolConfig.getTimeout(),
352 sessionPoolConfig.getSslParam(),
353 sessionPoolConfig.isUseHttp2(),
354 sessionPoolConfig.getCustomHeaders());
356 connection.open(getAddress(), sessionPoolConfig.getTimeout(),
357 sessionPoolConfig.isUseHttp2(),
358 sessionPoolConfig.getCustomHeaders());
361 }
catch (Exception e) {
362 if (tryConnect == 0 || !reconnect) {
365 log.warn(
"connect failed, " + e.getMessage());
370 AuthResult authResult;
372 authResult = connection.authenticate(sessionPoolConfig.getUsername(),
373 sessionPoolConfig.getPassword());
374 }
catch (AuthFailedException e) {
375 log.error(e.getMessage());
380 NebulaSession nebulaSession =
new NebulaSession(connection, authResult.getSessionId(),
381 authResult.getTimezoneOffset(), state);
382 ResultSet result = nebulaSession.execute(useSpace);
383 if (!result.isSucceeded()) {
384 nebulaSession.release();
385 throw new BindSpaceFailedException(result.getErrorMessage());
387 sessionList.add(nebulaSession);
388 return nebulaSession;
392 public HostAddress getAddress() {
393 List<HostAddress> addresses = sessionPoolConfig.getGraphAddressList();
394 int newPos = (pos.getAndIncrement()) % addresses.size();
395 return addresses.get(newPos);
404 private void useSpace(NebulaSession nebulaSession, ResultSet resultSet)
405 throws IOErrorException {
406 if (resultSet ==
null) {
407 nebulaSession.release();
408 sessionList.remove(nebulaSession);
412 if (resultSet.getSpaceName().trim().isEmpty()) {
413 log.warn(
"space {} has been drop, close the SessionPool.", spaceName);
418 if (!spaceName.equals(resultSet.getSpaceName())) {
419 ResultSet switchSpaceResult = nebulaSession.execute(useSpace);
420 if (!switchSpaceResult.isSucceeded()) {
421 log.warn(
"Bind Space failed, {}", switchSpaceResult.getErrorMessage());
422 nebulaSession.release();
423 sessionList.remove(nebulaSession);
427 releaseSession(nebulaSession);
436 private void useSpaceForJson(NebulaSession nebulaSession, String result)
437 throws IOErrorException {
438 String responseSpaceName =
439 (String) JSON.parseObject(result).getJSONArray(
"results")
440 .getJSONObject(0).get(
"spaceName");
441 if (!spaceName.equals(responseSpaceName)) {
442 nebulaSession.execute(useSpace);
444 releaseSession(nebulaSession);
448 private boolean isSessionError(ResultSet resultSet) {
449 return resultSet !=
null
450 && (resultSet.getErrorCode() == ErrorCode.E_SESSION_INVALID.getValue()
451 || resultSet.getErrorCode() == ErrorCode.E_SESSION_NOT_FOUND.getValue()
452 || resultSet.getErrorCode() == ErrorCode.E_SESSION_TIMEOUT.getValue());
456 private void checkSessionPool() {
457 if (!hasInit.get()) {
458 throw new RuntimeException(
"The SessionPool has not been initialized, "
459 +
"please call init() first.");
461 if (isClosed.get()) {
462 throw new RuntimeException(
"The SessionPool has been closed.");
467 private void stmtCheck(String stmt) {
468 if (stmt ==
null || stmt.trim().isEmpty()) {
469 throw new IllegalArgumentException(
"statement is null.");
472 if (stmt.trim().toLowerCase().startsWith(
"use") && stmt.trim().split(
" ").length == 2) {
473 throw new IllegalArgumentException(
"`USE SPACE` alone is forbidden.");
boolean isClosed()
if the SessionPool is closed
void close()
close the session pool
ResultSet execute(String stmt, Map< String, Object > parameterMap)
Execute the nGql sentence with parameter.
boolean isActive()
if the SessionPool has been initialized
ResultSet execute(String stmt)
Execute the nGql sentence.
int getIdleSessionNums()
get the number of idle Session
boolean init()
init the SessionPool this function is moved into SessionPool's constructor, no need to call it manual...
int getSessionNums()
get the number of all Session
int getErrorCode()
get errorCode of execute result
String getErrorMessage()
get the error message of the execute result
boolean isSucceeded()
the execute result is succeeded