Skip to content

Commit 85d75b8

Browse files
committed
feat: add record data type and use grpc protocol for transmission
Signed-off-by: yehao <[email protected]>
1 parent 44cbdea commit 85d75b8

22 files changed

+1246
-22
lines changed

.pylintrc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
[MASTER]
2-
ignore=static,.git
1+
[MAIN]
2+
ignore=static,.git,proto,grpc_client.py
33
[MESSAGES CONTROL]
44
disable=
55
invalid-name,

opengemini_client/client.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,15 @@ def write_batch_points(self, database: str, batch_points: BatchPoints):
5252
:return: return an error message
5353
"""
5454

55+
def write_by_grpc(self, database: str, batch_points: BatchPoints, rp: str = ''):
56+
"""
57+
batch points to assigned database
58+
:param database: name
59+
:param batch_points: BatchPoints object
60+
:param rp: retention policy
61+
:return: return an error message
62+
"""
63+
5564
@abstractmethod
5665
def create_database(self, database: str, rp: RpConfig = None):
5766
"""

opengemini_client/client_impl.py

Lines changed: 74 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,33 +17,40 @@
1717
import gzip
1818
import io
1919
import itertools
20+
import os.path
2021
from abc import ABC
2122
from http import HTTPStatus
2223
from typing import List
2324

25+
import grpc
2426
import requests
2527
from requests import HTTPError
2628

29+
from opengemini_client import grpc_client
2730
from opengemini_client.client import Client
2831
from opengemini_client.measurement import Measurement, MeasurementCondition
2932
from opengemini_client.models import Config, BatchPoints, Query, QueryResult, Series, SeriesResult, RpConfig, \
30-
ValuesResult, KeyValue
33+
ValuesResult, KeyValue, AuthConfig
3134
from opengemini_client.url_const import UrlConst
3235
from opengemini_client.models import AuthType, TlsConfig
3336

3437

38+
def check_auth_config(auth_config: AuthConfig):
39+
if auth_config is not None:
40+
if auth_config.auth_type == AuthType.PASSWORD:
41+
if len(auth_config.username) == 0:
42+
raise ValueError("invalid auth config due to empty username")
43+
if len(auth_config.password) == 0:
44+
raise ValueError("invalid auth config due to empty password")
45+
if auth_config.auth_type == AuthType.TOKEN and len(auth_config.token) == 0:
46+
raise ValueError("invalid auth config due to empty token")
47+
48+
3549
def check_config(config: Config):
3650
if len(config.address) == 0:
3751
raise ValueError("must have at least one address")
3852

39-
if config.auth_config is not None:
40-
if config.auth_config.auth_type == AuthType.PASSWORD:
41-
if len(config.auth_config.username) == 0:
42-
raise ValueError("invalid auth config due to empty username")
43-
if len(config.auth_config.password) == 0:
44-
raise ValueError("invalid auth config due to empty password")
45-
if config.auth_config.auth_type == AuthType.TOKEN and len(config.auth_config.token) == 0:
46-
raise ValueError("invalid auth config due to empty token")
53+
check_auth_config(config.auth_config)
4754

4855
if config.tls_enabled and config.tls_config is None:
4956
config.tls_config = TlsConfig()
@@ -60,6 +67,17 @@ def check_config(config: Config):
6067
if config.connection_timeout is None or config.connection_timeout <= datetime.timedelta(seconds=0):
6168
config.connection_timeout = datetime.timedelta(seconds=10)
6269

70+
if config.grpc_config is None:
71+
return config
72+
73+
if len(config.grpc_config.address) == 0:
74+
raise ValueError("grpc config must have at least one address")
75+
76+
check_auth_config(config.grpc_config.auth_config)
77+
78+
if config.grpc_config.tls_enable and config.grpc_config.tls_config is None:
79+
config.grpc_config.tls_config = TlsConfig()
80+
6381
return config
6482

6583

@@ -95,6 +113,9 @@ def __init__(self, config: Config):
95113
self.session.verify = config.tls_config.ca_file
96114
self.endpoints = [f"{protocol}{addr.host}:{addr.port}" for addr in config.address]
97115
self.endpoints_iter = itertools.cycle(self.endpoints)
116+
if self.config.grpc_config is not None:
117+
self.grpc_endpoints = [f"{addr.host}:{addr.port}" for addr in config.grpc_config.address]
118+
self.grpc_endpoints_iter = itertools.cycle(self.grpc_endpoints)
98119

99120
def close(self):
100121
self.session.close()
@@ -108,6 +129,31 @@ def __exit__(self, _exc_type, _exc_val, _exc_tb):
108129
def _get_server_url(self):
109130
return next(self.endpoints_iter)
110131

132+
def _get_grpc_server_url(self):
133+
return next(self.grpc_endpoints_iter)
134+
135+
def _get_grpc_channel(self):
136+
server_url = self._get_grpc_server_url()
137+
if self.config.grpc_config.tls_enable is False:
138+
return grpc.insecure_channel(server_url)
139+
140+
root_certificates = None
141+
private_key = None
142+
certificate_chain = None
143+
if os.path.exists(self.config.grpc_config.tls_config.ca_file):
144+
with open(self.config.grpc_config.tls_config.ca_file, 'rb') as fd:
145+
root_certificates = fd.read()
146+
if os.path.exists(self.config.grpc_config.tls_config.cert_file):
147+
with open(self.config.grpc_config.tls_config.cert_file, 'rb') as fd:
148+
certificate_chain = fd.read()
149+
if os.path.exists(self.config.grpc_config.tls_config.key_file):
150+
with open(self.config.grpc_config.tls_config.key_file, 'rb') as fd:
151+
private_key = fd.read()
152+
return grpc.secure_channel(
153+
target=server_url,
154+
credentials=grpc.ssl_channel_credentials(root_certificates, private_key, certificate_chain)
155+
)
156+
111157
def _update_headers(self, method, url_path, headers=None) -> dict:
112158
if headers is None:
113159
headers = {}
@@ -191,6 +237,25 @@ def write_batch_points(self, database: str, batch_points: BatchPoints):
191237
return
192238
raise HTTPError(f"write_batch_points error resp, code: {resp.status_code}, body: {resp.text}")
193239

240+
def write_by_grpc(self, database: str, batch_points: BatchPoints, rp: str = ''):
241+
username = ''
242+
password = ''
243+
if self.config.grpc_config.auth_config is not None:
244+
username = self.config.grpc_config.auth_config.username
245+
password = self.config.grpc_config.auth_config.password
246+
247+
# send grpc request
248+
channel = self._get_grpc_channel()
249+
grpc_client.write(
250+
channel=channel,
251+
database=database,
252+
batch_points=batch_points,
253+
rp=rp,
254+
username=username,
255+
password=password,
256+
timeout=self.config.timeout.seconds,
257+
)
258+
194259
def create_database(self, database: str, rp: RpConfig = None):
195260
if not database:
196261
raise ValueError("empty database name")
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
# Copyright 2025 openGemini Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
# Copyright 2025 openGemini Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import struct
16+
from dataclasses import dataclass
17+
from typing import List
18+
import numpy
19+
from opengemini_client.codec.size import size_of_int64, size_of_uint16, size_of_uint32
20+
21+
22+
@dataclass
23+
class BinaryDecoder:
24+
buf: 'bytes' = b''
25+
offset: int = 0
26+
27+
def int64(self) -> int:
28+
u = self.buf[self.offset:self.offset + size_of_int64()]
29+
v = u[7] | u[6] << 8 | u[5] << 16 | u[4] << 24 | u[3] << 32 | u[2] << 40 | u[1] << 48 | u[0] << 56
30+
v = (numpy.int64(v) >> 1) ^ ((numpy.int64(v) << 63) >> 63)
31+
self.offset += size_of_int64()
32+
return int(v)
33+
34+
def uint16(self) -> int:
35+
u = self.buf[self.offset:self.offset + size_of_uint16()]
36+
v = u[1] | u[0] << 8
37+
self.offset += size_of_uint16()
38+
return int(v)
39+
40+
def uint32(self) -> int:
41+
u = self.buf[self.offset:self.offset + size_of_uint32()]
42+
v = u[3] | u[2] << 8 | u[1] << 16 | u[0] << 24
43+
self.offset += size_of_uint32()
44+
return int(v)
45+
46+
def string(self) -> str:
47+
length = self.uint16()
48+
v = self.buf[self.offset:self.offset + length].decode("utf-8")
49+
self.offset += length
50+
return v
51+
52+
def bytes(self) -> bytes:
53+
length = self.uint32()
54+
if length == 0:
55+
return b''
56+
v = self.buf[self.offset:self.offset + length]
57+
self.offset += length
58+
return v
59+
60+
def uint32_list(self) -> List[int]:
61+
length = self.uint32()
62+
if length == 0:
63+
return []
64+
length1 = length * size_of_uint32()
65+
v = struct.unpack('<' + 'I ' * length, self.buf[self.offset:self.offset + length1])
66+
self.offset += length1
67+
return [int(i) for i in v]
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
# Copyright 2025 openGemini Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import struct
16+
from typing import List
17+
import numpy
18+
19+
20+
def append_int64(b: bytes, v: int) -> bytes:
21+
v = (numpy.int64(v) << 1) ^ (v >> 63)
22+
v = numpy.uint64(v)
23+
u = [numpy.uint8(v >> 56), numpy.uint8(v >> 48), numpy.uint8(v >> 40), numpy.uint8(v >> 32), numpy.uint8(v >> 24),
24+
numpy.uint8(v >> 16), numpy.uint8(v >> 8), numpy.uint8(v)]
25+
return b + bytes(u)
26+
27+
28+
def append_uint16(b: bytes, v: int) -> bytes:
29+
v = numpy.uint16(v)
30+
u = [numpy.uint8(v >> 8), numpy.uint8(v)]
31+
return b + bytes(u)
32+
33+
34+
def append_uint32(b: bytes, v: int) -> bytes:
35+
v = numpy.uint32(v)
36+
u = [numpy.uint8(v >> 24), numpy.uint8(v >> 16), numpy.uint8(v >> 8), numpy.uint8(v)]
37+
return b + bytes(u)
38+
39+
40+
def append_string(b: bytes, v: str) -> bytes:
41+
b = append_uint16(b, len(v))
42+
return b + v.encode("utf-8")
43+
44+
45+
def append_bytes(b: bytes, v: bytes) -> bytes:
46+
b = append_uint32(b, len(v))
47+
return b + v
48+
49+
50+
def append_uint32_list(b: bytes, v: List[int]) -> bytes:
51+
b = append_uint32(b, len(v))
52+
if len(v) == 0:
53+
return b
54+
byte_data = struct.pack('<' + 'I ' * len(v), *v)
55+
return b + byte_data

opengemini_client/codec/size.py

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
# Copyright 2025 openGemini Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from typing import List
16+
17+
_SizeOfInt16 = 2
18+
_SizeOfInt32 = 4
19+
_SizeOfInt64 = 8
20+
21+
_SizeOfUint8 = 1
22+
_SizeOfUint16 = 2
23+
_SizeOfUint32 = 4
24+
_SizeOfUint64 = 8
25+
26+
_SizeOfFloat32 = 4
27+
_SizeOfFloat64 = 8
28+
29+
_SizeOfBool = 1
30+
31+
32+
def size_of_int16():
33+
return _SizeOfInt16
34+
35+
36+
def size_of_int32():
37+
return _SizeOfInt32
38+
39+
40+
def size_of_int64():
41+
return _SizeOfInt64
42+
43+
44+
def size_of_uint8():
45+
return _SizeOfUint8
46+
47+
48+
def size_of_uint16():
49+
return _SizeOfUint16
50+
51+
52+
def size_of_uint32():
53+
return _SizeOfUint32
54+
55+
56+
def size_of_uint64():
57+
return _SizeOfUint64
58+
59+
60+
def size_of_float32():
61+
return _SizeOfFloat32
62+
63+
64+
def size_of_float64():
65+
return _SizeOfFloat64
66+
67+
68+
def size_of_bool():
69+
return _SizeOfBool
70+
71+
72+
def size_of_string(s: str) -> int:
73+
return len(s) + size_of_uint16()
74+
75+
76+
def size_of_bytes(b: bytes) -> int:
77+
return len(b) + size_of_uint32()
78+
79+
80+
def size_of_uint32_list(v: List[int]) -> int:
81+
return len(v) * size_of_uint32() + size_of_uint32()

0 commit comments

Comments
 (0)