Skip to content

Commit 55e096b

Browse files
committed
feat: add measuremnet read-write interface, add epoch to query interface
Signed-off-by: yehao <[email protected]>
1 parent e45f652 commit 55e096b

File tree

6 files changed

+400
-14
lines changed

6 files changed

+400
-14
lines changed

opengemini_client/__init__.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,3 +32,13 @@
3232
SeriesResult,
3333
ValuesResult
3434
)
35+
36+
from .measurement import (
37+
FieldType,
38+
ShardType,
39+
IndexType,
40+
EngineType,
41+
ComparisonOperator,
42+
Measurement,
43+
MeasurementCondition
44+
)

opengemini_client/client.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
from typing import List
2020

2121
from opengemini_client.models import BatchPoints, QueryResult, Query, RpConfig, ValuesResult
22+
from opengemini_client.measurement import Measurement, MeasurementCondition
2223

2324

2425
class Client(ABC):
@@ -85,6 +86,18 @@ def show_retention_policies(self, dbname):
8586
def drop_retention_policy(self, dbname, retention_policy: str):
8687
pass
8788

89+
@abstractmethod
90+
def create_measurement(self, measurement: Measurement):
91+
pass
92+
93+
@abstractmethod
94+
def show_measurements(self, condition: MeasurementCondition) -> List[str]:
95+
pass
96+
97+
@abstractmethod
98+
def drop_measurement(self, database, retention_policy, measurement: str):
99+
pass
100+
88101
@abstractmethod
89102
def show_tag_keys(self, database, command: str) -> List[ValuesResult]:
90103
"""

opengemini_client/client_impl.py

Lines changed: 47 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
from requests import HTTPError
2626

2727
from opengemini_client.client import Client
28+
from opengemini_client.measurement import Measurement, MeasurementCondition
2829
from opengemini_client.models import Config, BatchPoints, Query, QueryResult, Series, SeriesResult, RpConfig, \
2930
ValuesResult, KeyValue
3031
from opengemini_client.url_const import UrlConst
@@ -97,10 +98,10 @@ def __enter__(self):
9798
def __exit__(self, _exc_type, _exc_val, _exc_tb):
9899
self.session.close()
99100

100-
def get_server_url(self):
101+
def _get_server_url(self):
101102
return next(self.endpoints_iter)
102103

103-
def update_headers(self, method, url_path, headers=None) -> dict:
104+
def _update_headers(self, method, url_path, headers=None) -> dict:
104105
if headers is None:
105106
headers = {}
106107

@@ -121,10 +122,10 @@ def update_headers(self, method, url_path, headers=None) -> dict:
121122

122123
return headers
123124

124-
def request(self, method, server_url, url_path, headers=None, body=None, params=None) -> requests.Response:
125+
def _request(self, method, server_url, url_path, headers=None, body=None, params=None) -> requests.Response:
125126
if params is None:
126127
params = {}
127-
headers = self.update_headers(method, url_path, headers)
128+
headers = self._update_headers(method, url_path, headers)
128129
full_url = server_url + url_path
129130
if self.config.gzip_enabled and body is not None:
130131
compressed = io.BytesIO()
@@ -139,36 +140,37 @@ def request(self, method, server_url, url_path, headers=None, body=None, params=
139140
raise HTTPError(f"request error resp, code: {resp.status_code}, body: {resp.text}")
140141
return resp
141142

142-
def exec_http_request_by_index(self, idx, method, url_path, headers=None, body=None) -> requests.Response:
143+
def _exec_http_request_by_index(self, idx, method, url_path, headers=None, body=None) -> requests.Response:
143144
if idx >= len(self.endpoints) or idx < 0:
144145
raise ValueError("openGeminiDB client error. Index out of range")
145-
return self.request(method, self.endpoints[idx], url_path, headers, body)
146+
return self._request(method, self.endpoints[idx], url_path, headers, body)
146147

147148
def ping(self, idx: int):
148-
resp = self.exec_http_request_by_index(idx, 'GET', UrlConst.PING)
149+
resp = self._exec_http_request_by_index(idx, 'GET', UrlConst.PING)
149150
if resp.status_code != HTTPStatus.NO_CONTENT:
150151
raise HTTPError(f"ping error resp, code: {resp.status_code}, body: {resp.text}")
151152

152153
def query(self, query: Query) -> QueryResult:
153-
server_url = self.get_server_url()
154-
params = {'db': query.database, 'q': query.command, 'rp': query.retention_policy}
154+
server_url = self._get_server_url()
155+
params = {'db': query.database, 'q': query.command, 'rp': query.retention_policy,
156+
'epoch': query.precision.epoch()}
155157

156-
resp = self.request(method='GET', server_url=server_url, url_path=UrlConst.QUERY, params=params)
158+
resp = self._request(method='GET', server_url=server_url, url_path=UrlConst.QUERY, params=params)
157159
if resp.status_code == HTTPStatus.OK:
158160
return resolve_query_body(resp)
159161
raise HTTPError(f"query error resp, code: {resp.status_code}, body: {resp.text}")
160162

161163
def _query_post(self, query: Query) -> QueryResult:
162-
server_url = self.get_server_url()
164+
server_url = self._get_server_url()
163165
params = {'db': query.database, 'q': query.command, 'rp': query.retention_policy}
164166

165-
resp = self.request(method='POST', server_url=server_url, url_path=UrlConst.QUERY, params=params)
167+
resp = self._request(method='POST', server_url=server_url, url_path=UrlConst.QUERY, params=params)
166168
if resp.status_code == HTTPStatus.OK:
167169
return resolve_query_body(resp)
168170
raise HTTPError(f"query_post error resp, code: {resp.status_code}, body: {resp.text}")
169171

170172
def write_batch_points(self, database: str, batch_points: BatchPoints):
171-
server_url = self.get_server_url()
173+
server_url = self._get_server_url()
172174
params = {'db': database}
173175
with io.StringIO() as writer:
174176
for point in batch_points.points:
@@ -177,7 +179,7 @@ def write_batch_points(self, database: str, batch_points: BatchPoints):
177179
writer.write(point.to_string())
178180
writer.write('\n')
179181
body = writer.getvalue().encode()
180-
resp = self.request(method="POST", server_url=server_url, url_path=UrlConst.WRITE, params=params, body=body)
182+
resp = self._request(method="POST", server_url=server_url, url_path=UrlConst.WRITE, params=params, body=body)
181183
if resp.status_code == HTTPStatus.NO_CONTENT:
182184
return
183185
raise HTTPError(f"write_batch_points error resp, code: {resp.status_code}, body: {resp.text}")
@@ -279,6 +281,37 @@ def _show_with_result_key_value(self, database, command: str) -> List[ValuesResu
279281
values_results.append(values_result)
280282
return values_results
281283

284+
def create_measurement(self, measurement: Measurement):
285+
if measurement is None:
286+
raise ValueError("empty measurement")
287+
measurement.check()
288+
command = measurement.to_string()
289+
return self._query_post(Query(database=measurement.database, command=command, retention_policy=''))
290+
291+
def show_measurements(self, condition: MeasurementCondition) -> List[str]:
292+
if condition is None:
293+
raise ValueError("empty measurement condition")
294+
condition.check()
295+
command = condition.to_string()
296+
result = self.query(Query(database=condition.database, command=command, retention_policy=''))
297+
if result.error is not None:
298+
raise HTTPError(f"show_measurements error result, error: {result.error}")
299+
measurements = []
300+
if len(result.results) != 0 and len(result.results[0].series) == 0:
301+
return measurements
302+
for v in result.results[0].series[0].values:
303+
if isinstance(v[0], str):
304+
measurements.append(str(v[0]))
305+
return measurements
306+
307+
def drop_measurement(self, database, retention_policy, measurement: str):
308+
if not database:
309+
raise ValueError("empty database name")
310+
if not measurement:
311+
raise ValueError("empty measurement name")
312+
command = f"DROP MEASUREMENT {measurement}"
313+
return self._query_post(Query(database=database, command=command, retention_policy=retention_policy))
314+
282315
def show_tag_keys(self, database, command: str) -> List[ValuesResult]:
283316
return self._show_with_result_any(database, command)
284317

opengemini_client/measurement.py

Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
# Copyright 2024 openGemini Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import io
16+
from dataclasses import dataclass, field
17+
from enum import Enum
18+
from typing import List, Dict
19+
20+
ErrEmptyDatabaseName = "empty database name"
21+
ErrEmptyMeasurement = "empty measurement"
22+
ErrEmptyTagOrField = "empty tag or field"
23+
ErrEmptyIndexList = "empty index list"
24+
25+
26+
class FieldType(Enum):
27+
FieldTypeInt64 = "INT64"
28+
FieldTypeFloat64 = "FLOAT64"
29+
FieldTypeString = "STRING"
30+
FieldTypeBool = "BOOL"
31+
32+
33+
class ShardType(Enum):
34+
ShardTypeHash = "HASH"
35+
ShardTypeRange = "RANGE"
36+
37+
38+
class IndexType(Enum):
39+
IndexTypeText = "text"
40+
41+
42+
class EngineType(Enum):
43+
EngineTypeColumnstore = "columnstore"
44+
45+
46+
class ComparisonOperator(Enum):
47+
Equals = "="
48+
NotEquals = "<>"
49+
GreaterThan = ">"
50+
LessThan = "<"
51+
GreaterThanOrEquals = ">="
52+
LessThanOrEquals = "<="
53+
Match = "=~"
54+
NotMatch = "!~"
55+
56+
57+
@dataclass
58+
class Measurement:
59+
database: str
60+
measurement: str
61+
retention_policy: str
62+
# specify tag list to create measurement
63+
tags: List[str] = field(default_factory=list)
64+
# specify field map to create measurement
65+
fields: Dict[str, FieldType] = field(default_factory=dict)
66+
# specify shard type to create measurement, support ShardTypeHash and ShardTypeRange two ways to
67+
# break up data, required when use high series cardinality storage engine(HSCE)
68+
shard_type: ShardType = None
69+
# specify shard keys(tag as partition key) to create measurement, required when use
70+
# high series cardinality storage engine(HSCE)
71+
shard_keys: List[str] = field(default_factory=list)
72+
# FullTextIndex required when want measurement support full-text index
73+
index_type: IndexType = None
74+
# required when specify which Field fields to create a full-text index on,
75+
# these fields must be 'string' data type
76+
index_list: List[str] = field(default_factory=list)
77+
# required when want measurement support HSCE, set EngineTypeColumnStore
78+
engine_type: EngineType = None
79+
# required when use HSCE, such as the primary key is `location` and `direction`, which means that the
80+
# storage engine will create indexes on these two fields
81+
primary_keys: List[str] = field(default_factory=list)
82+
# required when use HSCE, specify the data sorting method inside the storage engine, time means sorting
83+
# by time, and can also be changed to rtt or direction, or even other fields in the table
84+
sort_keys: List[str] = field(default_factory=list)
85+
86+
def check(self):
87+
if len(self.database) == 0:
88+
raise ValueError(ErrEmptyDatabaseName)
89+
if len(self.measurement) == 0:
90+
raise ValueError(ErrEmptyMeasurement)
91+
if len(self.tags) == 0 and len(self.fields) == 0:
92+
raise ValueError(ErrEmptyTagOrField)
93+
if self.index_type is not None and len(self.index_list) == 0:
94+
raise ValueError(ErrEmptyIndexList)
95+
96+
def _write_tags_fields(self, writer: io.StringIO):
97+
writer.write(f"CREATE MEASUREMENT {self.measurement} (")
98+
if len(self.tags) != 0:
99+
tags = []
100+
for tag in self.tags:
101+
tags.append(f"{tag} TAG")
102+
writer.write(",".join(tags))
103+
if len(self.tags) != 0 and len(self.fields) != 0:
104+
writer.write(",")
105+
if len(self.fields) != 0:
106+
fields = []
107+
for key, value in self.fields.items():
108+
fields.append(f"{key} {value.value} FIELD")
109+
writer.write(",".join(fields))
110+
writer.write(")")
111+
112+
def _write_index(self, writer: io.StringIO):
113+
writer.write(" WITH ")
114+
writer.write(f" INDEXTYPE {self.index_type.value}")
115+
writer.write(" INDEXLIST " + ",".join(self.index_list))
116+
117+
def _writer_other(self, writer: io.StringIO):
118+
with_identifier = False
119+
if self.engine_type is not None:
120+
with_identifier = True
121+
writer.write(" WITH ")
122+
writer.write(f" ENGINETYPE = {self.engine_type.value}")
123+
if len(self.shard_keys) != 0:
124+
if with_identifier is False:
125+
with_identifier = True
126+
writer.write(" WITH ")
127+
writer.write(" SHARDKEY " + ",".join(self.shard_keys))
128+
if self.shard_type is not None:
129+
if with_identifier is False:
130+
with_identifier = True
131+
writer.write(" WITH ")
132+
writer.write(f" TYPE {self.shard_type.value}")
133+
if len(self.primary_keys) != 0:
134+
if with_identifier is False:
135+
with_identifier = True
136+
writer.write(" WITH ")
137+
writer.write(" PRIMARYKEY " + ",".join(self.primary_keys))
138+
if len(self.sort_keys) != 0:
139+
if with_identifier is False:
140+
writer.write(" WITH ")
141+
writer.write(" SORTKEY " + ",".join(self.sort_keys))
142+
143+
def to_string(self) -> str:
144+
writer = io.StringIO()
145+
self._write_tags_fields(writer)
146+
147+
if self.index_type is not None:
148+
self._write_index(writer)
149+
return writer.getvalue()
150+
151+
self._writer_other(writer)
152+
return writer.getvalue()
153+
154+
155+
@dataclass
156+
class MeasurementCondition:
157+
database: str
158+
Operator: ComparisonOperator = None
159+
Value: str = ''
160+
161+
def check(self):
162+
if len(self.database) == 0:
163+
raise ValueError(ErrEmptyDatabaseName)
164+
165+
def to_string(self) -> str:
166+
writer = io.StringIO()
167+
writer.write("SHOW MEASUREMENTS")
168+
if self.Operator is not None:
169+
writer.write(f" WITH MEASUREMENT {self.Operator.value} {self.Value}")
170+
return writer.getvalue()

0 commit comments

Comments
 (0)