NebulaGraph Python Client  release-3.8
base.py
1 import datetime
2 from abc import abstractmethod
3 from typing import Dict, Any, Optional
4 from nebula3.data.ResultSet import ResultSet
5 from nebula3.common.ttypes import ErrorCode, Value, NList, Date, Time, DateTime
6 
7 
8 class ExecuteError(Exception):
9  def __init__(self, stmt: str, param: Any, code: ErrorCode, msg: str):
10  self.stmtstmt = stmt
11  self.paramparam = param
12  self.codecode = code
13  self.msgmsg = msg
14 
15  def __str__(self):
16  return (
17  f"ExecuteError. err_code: {self.code}, err_msg: {self.msg}.\n"
18  + f"Statement: \n{self.stmt}\n"
19  + f"Parameter: \n{self.param}"
20  )
21 
22 
24  @abstractmethod
25  def execute_parameter(
26  self, stmt: str, params: Optional[Dict[str, Any]]
27  ) -> ResultSet:
28  pass
29 
30  @abstractmethod
31  def execute_json_with_parameter(
32  self, stmt: str, params: Optional[Dict[str, Any]]
33  ) -> bytes:
34  pass
35 
36  def execute(self, stmt: str) -> ResultSet:
37  return self.execute_parameterexecute_parameter(stmt, None)
38 
39  def execute_json(self, stmt: str) -> bytes:
40  return self.execute_json_with_parameterexecute_json_with_parameter(stmt, None)
41 
43  self,
44  stmt: str,
45  params: Optional[Dict[str, Any]] = None,
46  ):
47  """**Recommended** Execute a statement with parameters in Python type instead of thrift type."""
48  if params is None:
49  result = self.execute_parameterexecute_parameter(stmt, None)
50  else:
51  result = self.execute_parameterexecute_parameter(stmt, _build_byte_param(params))
52 
53  if not result.is_succeeded():
54  raise ExecuteError(stmt, params, result.error_code(), result.error_msg())
55 
56  return result
57 
58 
59 def _build_byte_param(params: dict) -> dict:
60  byte_params = {}
61  for k, v in params.items():
62  if isinstance(v, Value):
63  byte_params[k] = v
64  elif str(type(v)).startswith("nebula3.common.ttypes"):
65  byte_params[k] = v
66  else:
67  byte_params[k] = _cast_value(v)
68  return byte_params
69 
70 
71 def _cast_value(value: Any) -> Value:
72  """
73  Cast the value to nebula Value type
74  ref: https://github.com/vesoft-inc/nebula/blob/master/src/common/datatypes/Value.cpp
75  :param value: the value to be casted
76  :return: the casted value
77  """
78  casted_value = Value()
79  if isinstance(value, bool):
80  casted_value.set_bVal(value)
81  elif isinstance(value, int):
82  casted_value.set_iVal(value)
83  elif isinstance(value, str):
84  casted_value.set_sVal(value)
85  elif isinstance(value, float):
86  casted_value.set_fVal(value)
87  elif isinstance(value, datetime.date):
88  date_value = Date(year=value.year, month=value.month, day=value.day)
89  casted_value.set_dVal(date_value)
90  elif isinstance(value, datetime.time):
91  time_value = Time(
92  hour=value.hour,
93  minute=value.minute,
94  sec=value.second,
95  microsec=value.microsecond,
96  )
97  casted_value.set_tVal(time_value)
98  elif isinstance(value, datetime.datetime):
99  datetime_value = DateTime(
100  year=value.year,
101  month=value.month,
102  day=value.day,
103  hour=value.hour,
104  minute=value.minute,
105  sec=value.second,
106  microsec=value.microsecond,
107  )
108  casted_value.set_dtVal(datetime_value)
109  # TODO: add support for GeoSpatial
110  elif isinstance(value, list):
111  byte_list = []
112  for item in value:
113  byte_list.append(_cast_value(item))
114  casted_value.set_lVal(NList(values=byte_list))
115  elif isinstance(value, dict):
116  # TODO: add support for NMap
117  raise TypeError("Unsupported type: dict")
118  else:
119  raise TypeError(f"Unsupported type: {type(value)}")
120  return casted_value
bytes execute_json_with_parameter(self, str stmt, Optional[Dict[str, Any]] params)
Definition: base.py:33
def execute_py(self, str stmt, Optional[Dict[str, Any]] params=None)
Definition: base.py:46
ResultSet execute_parameter(self, str stmt, Optional[Dict[str, Any]] params)
Definition: base.py:27