1 package com.vesoft.nebula.client.graph.net;
3 import com.vesoft.nebula.client.graph.data.HostAddress;
4 import com.vesoft.nebula.client.graph.data.SSLParam;
5 import com.vesoft.nebula.client.graph.exception.ClientServerIncompatibleException;
6 import com.vesoft.nebula.client.graph.exception.IOErrorException;
7 import java.util.ArrayList;
8 import java.util.HashMap;
11 import java.util.concurrent.ConcurrentHashMap;
12 import java.util.concurrent.Executors;
13 import java.util.concurrent.ScheduledExecutorService;
14 import java.util.concurrent.TimeUnit;
15 import java.util.concurrent.atomic.AtomicInteger;
16 import org.slf4j.Logger;
17 import org.slf4j.LoggerFactory;
21 private static final int S_OK = 0;
22 private static final int S_BAD = 1;
23 private final List<HostAddress> addresses =
new ArrayList<>();
24 private final Map<HostAddress, Integer> serversStatus =
new ConcurrentHashMap<>();
25 private final double minClusterHealthRate;
26 private final int timeout;
27 private final AtomicInteger pos =
new AtomicInteger(0);
28 private final int delayTime = 60;
29 private final ScheduledExecutorService schedule = Executors.newScheduledThreadPool(1);
31 private boolean enabledSsl =
false;
33 private boolean useHttp2 =
false;
35 private Map<String, String> customHeaders;
38 double minClusterHealthRate) {
39 this(addresses, timeout, minClusterHealthRate,
false,
new HashMap<>());
43 double minClusterHealthRate,
boolean useHttp2,
44 Map<String, String> headers) {
45 this.timeout = timeout;
47 this.addresses.add(addr);
48 this.serversStatus.put(addr, S_BAD);
50 this.minClusterHealthRate = minClusterHealthRate;
51 this.useHttp2 = useHttp2;
52 this.customHeaders = headers;
53 schedule.scheduleAtFixedRate(this::scheduleTask, 0, delayTime, TimeUnit.SECONDS);
57 double minClusterHealthRate) {
58 this(addresses, timeout, sslParam, minClusterHealthRate,
false,
new HashMap<>());
62 double minClusterHealthRate,
boolean useHttp2,
63 Map<String, String> headers) {
64 this(addresses, timeout, minClusterHealthRate, useHttp2, headers);
65 this.sslParam = sslParam;
66 this.enabledSsl =
true;
70 if (!schedule.isShutdown()) {
71 schedule.shutdownNow();
80 while (++tryCount <= addresses.size()) {
81 newPos = (pos.getAndIncrement()) % addresses.size();
83 if (serversStatus.get(addr) == S_OK) {
90 public void updateServersStatus() {
92 if (ping(hostAddress)) {
93 serversStatus.put(hostAddress, S_OK);
95 serversStatus.put(hostAddress, S_BAD);
104 connection.open(addr, this.timeout, sslParam, useHttp2, customHeaders);
106 connection.open(addr, this.timeout, useHttp2, customHeaders);
108 boolean pong = connection.ping();
114 LOGGER.error(
"version verify failed, ", e);
119 public boolean isServersOK() {
120 this.updateServersStatus();
121 double numServersWithOkStatus = 0;
123 if (serversStatus.get(hostAddress) == S_OK) {
124 numServersWithOkStatus++;
129 double okServersRate = numServersWithOkStatus / addresses.size();
130 return okServersRate >= minClusterHealthRate;
133 private void scheduleTask() {
134 updateServersStatus();