11 from nebula3.fbthrift.transport
import (
18 from nebula3.fbthrift.transport.TTransport
import TTransportException
19 from nebula3.fbthrift.protocol
import THeaderProtocol, TBinaryProtocol
21 from nebula3.common.ttypes
import ErrorCode
22 from nebula3.graph
import GraphService
23 from nebula3.graph.ttypes
import VerifyClientVersionReq
24 from nebula3.logger
import logger
26 from nebula3.Exception
import (
29 ClientServerIncompatibleException,
42 self.
_port_port =
None
49 def open(self, ip, port, timeout, use_http2=False, http_headers=None):
50 """open the connection
52 :param ip: the server ip
53 :param port: the server port
54 :param timeout: the timeout for connect and execute
55 :param use_http2: use http2 or not
56 :param http_headers: http headers
59 self.
open_SSLopen_SSL(ip, port, timeout,
None, use_http2, http_headers)
62 self, ip, port, timeout, ssl_config=None, use_http2=False, http_headers=None
64 """open the SSL connection
66 :param ip: the server ip
67 :param port: the server port
68 :param timeout: the timeout for connect and execute
69 :ssl_config: configs for SSL
70 :param use_http2: use http2 or not
71 :param http_headers: http headers
75 self.
_port_port = port
81 if use_http2
is False:
85 self.
_connection_connection = GraphService.Client(protocol)
86 resp = self.
_connection_connection.verifyClientVersion(VerifyClientVersionReq())
87 if resp.error_code != ErrorCode.SUCCEEDED:
89 raise ClientServerIncompatibleException(resp.error_msg)
90 except Exception
as e:
95 def __get_protocol(self, timeout, ssl_config):
97 if ssl_config
is not None:
98 s = TSSLSocket.TSSLSocket(
101 ssl_config.unix_socket,
102 ssl_config.ssl_version,
103 ssl_config.cert_reqs,
105 ssl_config.verify_name,
108 ssl_config.allow_weak_ssl_versions,
111 s = TSocket.TSocket(self.
_ip_ip, self.
_port_port)
113 s.setTimeout(timeout)
115 buffered_transport = TTransport.TBufferedTransport(s)
116 header_transport = THeaderTransport.THeaderTransport(buffered_transport)
117 protocol = THeaderProtocol.THeaderProtocol(header_transport)
118 header_transport.open()
119 except Exception
as e:
123 def __get_protocal_http2(self, timeout, ssl_config, http_headers):
124 verify, certfile, keyfile, password =
None,
None,
None,
None
125 if ssl_config
is not None:
127 verify = ssl.create_default_context(cafile=ssl_config.ca_certs)
128 certfile = ssl_config.certfile
129 keyfile = ssl_config.keyfile
130 url =
"https://" + self.
_ip_ip +
":" + str(self.
_port_port)
132 url =
"http://" + self.
_ip_ip +
":" + str(self.
_port_port)
134 transport = THttp2Client.THttp2Client(
135 url, timeout, verify, certfile, keyfile, password, http_headers
138 protocol = TBinaryProtocol.TBinaryProtocol(transport)
139 except Exception
as e:
144 """reopen the connection
165 """authenticate to graphd
167 :param user_name: the user name
168 :param password: the password
173 if resp.error_code != ErrorCode.SUCCEEDED:
175 raise AuthFailedException(resp.error_msg)
177 resp.session_id, resp.time_zone_offset_seconds, resp.time_zone_name
179 except TTransportException
as te:
180 if te.message.find(
"timed out"):
182 if te.type == TTransportException.END_OF_FILE:
184 raise IOErrorException(IOErrorException.E_CONNECT_BROKEN, te.message)
187 """execute interface with session_id and ngql
189 :param session_id: the session id get from result of authenticate interface
190 :param stmt: the ngql
191 :return: ExecutionResponse
196 """execute interface with session_id and ngql
197 :param session_id: the session id get from result of authenticate interface
198 :param stmt: the ngql
199 :param params: parameter map
200 :return: ExecutionResponse
203 resp = self.
_connection_connection.executeWithParameter(session_id, stmt, params)
205 except Exception
as te:
206 if isinstance(te, TTransportException):
207 if te.message.find(
"timed out") > 0:
209 raise IOErrorException(IOErrorException.E_TIMEOUT, te.message)
210 elif te.type == TTransportException.END_OF_FILE:
211 raise IOErrorException(
212 IOErrorException.E_CONNECT_BROKEN, te.message
214 elif te.type == TTransportException.NOT_OPEN:
215 raise IOErrorException(IOErrorException.E_NOT_OPEN, te.message)
217 raise IOErrorException(IOErrorException.E_UNKNOWN, te.message)
221 """execute_json interface with session_id and ngql
222 :param session_id: the session id get from result of authenticate interface
223 :param stmt: the ngql
224 :return: string json representing the execution result
229 """execute_json interface with session_id and ngql with parameter
230 :param session_id: the session id get from result of authenticate interface
231 :param stmt: the ngql
232 :param params: parameter map
233 :return: json bytes representing the execution result
236 resp = self.
_connection_connection.executeJsonWithParameter(session_id, stmt, params)
237 if not isinstance(resp, bytes):
238 raise TypeError(
"response is not bytes")
241 except Exception
as te:
242 if isinstance(te, TTransportException):
243 if te.message.find(
"timed out") > 0:
245 raise IOErrorException(IOErrorException.E_TIMEOUT, te.message)
246 elif te.type == TTransportException.END_OF_FILE:
247 raise IOErrorException(
248 IOErrorException.E_CONNECT_BROKEN, te.message
250 elif te.type == TTransportException.NOT_OPEN:
251 raise IOErrorException(IOErrorException.E_NOT_OPEN, te.message)
253 raise IOErrorException(IOErrorException.E_UNKNOWN, te.message)
257 """tells the graphd can release the session info
259 :param session_id:the session id
264 except TTransportException
as te:
265 if te.type == TTransportException.END_OF_FILE:
269 """close the connection
277 except Exception
as e:
279 "Close connection to {}:{} failed:{}".format(self.
_ip_ip, self.
_port_port, e)
283 """check the connection if ok
284 :return: True or False
293 """reset the idletime
300 """get idletime of connection
309 """get the address of the connected service
313 return (self.
_ip_ip, self.
_port_port)
def signout(self, session_id)
def open_SSL(self, ip, port, timeout, ssl_config=None, use_http2=False, http_headers=None)
def __get_protocol(self, timeout, ssl_config)
def execute(self, session_id, stmt)
def execute_parameter(self, session_id, stmt, params)
def open(self, ip, port, timeout, use_http2=False, http_headers=None)
def execute_json(self, session_id, stmt)
def execute_json_with_parameter(self, session_id, stmt, params)
def authenticate(self, user_name, password)
def __get_protocal_http2(self, timeout, ssl_config, http_headers)