NebulaGraph Java Client  release-3.8
RoundRobinLoadBalancer.java
1 package com.vesoft.nebula.client.graph.net;
2 
3 import com.vesoft.nebula.client.graph.data.HostAddress;
4 import com.vesoft.nebula.client.graph.data.SSLParam;
5 import com.vesoft.nebula.client.graph.exception.ClientServerIncompatibleException;
6 import com.vesoft.nebula.client.graph.exception.IOErrorException;
7 import java.util.ArrayList;
8 import java.util.HashMap;
9 import java.util.List;
10 import java.util.Map;
11 import java.util.concurrent.ConcurrentHashMap;
12 import java.util.concurrent.Executors;
13 import java.util.concurrent.ScheduledExecutorService;
14 import java.util.concurrent.TimeUnit;
15 import java.util.concurrent.atomic.AtomicInteger;
16 import org.slf4j.Logger;
17 import org.slf4j.LoggerFactory;
18 
19 public class RoundRobinLoadBalancer implements LoadBalancer {
20  private static final Logger LOGGER = LoggerFactory.getLogger(RoundRobinLoadBalancer.class);
21  private static final int S_OK = 0;
22  private static final int S_BAD = 1;
23  private final List<HostAddress> addresses = new ArrayList<>();
24  private final Map<HostAddress, Integer> serversStatus = new ConcurrentHashMap<>();
25  private final double minClusterHealthRate;
26  private final int timeout;
27  private final AtomicInteger pos = new AtomicInteger(0);
28  private final int delayTime = 60; // Unit seconds
29  private final ScheduledExecutorService schedule = Executors.newScheduledThreadPool(1);
30  private SSLParam sslParam;
31  private boolean enabledSsl = false;
32 
33  private boolean useHttp2 = false;
34 
35  private Map<String, String> customHeaders;
36 
37  public RoundRobinLoadBalancer(List<HostAddress> addresses, int timeout,
38  double minClusterHealthRate) {
39  this(addresses, timeout, minClusterHealthRate, false, new HashMap<>());
40  }
41 
42  public RoundRobinLoadBalancer(List<HostAddress> addresses, int timeout,
43  double minClusterHealthRate, boolean useHttp2,
44  Map<String, String> headers) {
45  this.timeout = timeout;
46  for (HostAddress addr : addresses) {
47  this.addresses.add(addr);
48  this.serversStatus.put(addr, S_BAD);
49  }
50  this.minClusterHealthRate = minClusterHealthRate;
51  this.useHttp2 = useHttp2;
52  this.customHeaders = headers;
53  schedule.scheduleAtFixedRate(this::scheduleTask, 0, delayTime, TimeUnit.SECONDS);
54  }
55 
56  public RoundRobinLoadBalancer(List<HostAddress> addresses, int timeout, SSLParam sslParam,
57  double minClusterHealthRate) {
58  this(addresses, timeout, sslParam, minClusterHealthRate, false, new HashMap<>());
59  }
60 
61  public RoundRobinLoadBalancer(List<HostAddress> addresses, int timeout, SSLParam sslParam,
62  double minClusterHealthRate, boolean useHttp2,
63  Map<String, String> headers) {
64  this(addresses, timeout, minClusterHealthRate, useHttp2, headers);
65  this.sslParam = sslParam;
66  this.enabledSsl = true;
67  }
68 
69  public void close() {
70  if (!schedule.isShutdown()) {
71  schedule.shutdownNow();
72  }
73  }
74 
75  @Override
76  public HostAddress getAddress() {
77  // TODO: update the server connection num into load balancer
78  int tryCount = 0;
79  int newPos;
80  while (++tryCount <= addresses.size()) {
81  newPos = (pos.getAndIncrement()) % addresses.size();
82  HostAddress addr = addresses.get(newPos);
83  if (serversStatus.get(addr) == S_OK) {
84  return addr;
85  }
86  }
87  return null;
88  }
89 
90  public void updateServersStatus() {
91  for (HostAddress hostAddress : addresses) {
92  if (ping(hostAddress)) {
93  serversStatus.put(hostAddress, S_OK);
94  } else {
95  serversStatus.put(hostAddress, S_BAD);
96  }
97  }
98  }
99 
100  public boolean ping(HostAddress addr) {
101  try {
102  Connection connection = new SyncConnection();
103  if (enabledSsl) {
104  connection.open(addr, this.timeout, sslParam, useHttp2, customHeaders);
105  } else {
106  connection.open(addr, this.timeout, useHttp2, customHeaders);
107  }
108  boolean pong = connection.ping();
109  connection.close();
110  return pong;
111  } catch (IOErrorException e) {
112  LOGGER.warn(String.format("ping server %s failed", addr.toString()), e);
113  return false;
115  LOGGER.error("version verify failed, ", e);
116  return false;
117  }
118  }
119 
120  public boolean isServersOK() {
121  this.updateServersStatus();
122  double numServersWithOkStatus = 0;
123  for (HostAddress hostAddress : addresses) {
124  if (serversStatus.get(hostAddress) == S_OK) {
125  numServersWithOkStatus++;
126  }
127  }
128 
129  // Check health rate.
130  double okServersRate = numServersWithOkStatus / addresses.size();
131  return okServersRate >= minClusterHealthRate;
132  }
133 
134  private void scheduleTask() {
135  updateServersStatus();
136  }
137 }