Skip to content

Commit 6adc268

Browse files
feat: support env
1 parent 87aee82 commit 6adc268

File tree

8 files changed

+90
-16
lines changed

8 files changed

+90
-16
lines changed

influxdb_client_3/__init__.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,9 @@ def from_env(**kwargs: Any) -> 'InfluxDBClient3':
9696
write_options = WriteOptions(write_type=WriteType.synchronous)
9797

9898
if os.getenv(INFLUX_GZIP_THRESHOLD) is not None:
99-
write_options.gzip_threshold = int(os.getenv(INFLUX_GZIP_THRESHOLD))
99+
gzip_threshold = int(os.getenv(INFLUX_GZIP_THRESHOLD))
100+
write_options.enable_gzip = True
101+
write_options.gzip_threshold = gzip_threshold
100102

101103
if os.getenv(INFLUX_PRECISION) is not None:
102104
write_options.write_precision = os.getenv(INFLUX_PRECISION)
@@ -105,7 +107,6 @@ def from_env(**kwargs: Any) -> 'InfluxDBClient3':
105107

106108
if os.getenv(INFLUX_AUTH_SCHEME) is not None:
107109
kwargs['auth_scheme'] = os.getenv(INFLUX_AUTH_SCHEME)
108-
109110
org = os.getenv(INFLUX_ORG, "default")
110111

111112
return InfluxDBClient3(
@@ -210,17 +211,18 @@ def __init__(
210211

211212
write_type = DefaultWriteOptions['write_type']
212213
write_precision = DefaultWriteOptions['write_precision']
213-
gzip_threshold = DefaultWriteOptions['gzip_threshold']
214+
gzip_threshold = None
214215
if isinstance(write_client_options, dict) and write_client_options.get('write_options') is not None:
215216
write_opts = write_client_options['write_options']
216-
write_type = getattr(write_opts, 'write_type')
217-
write_precision = getattr(write_opts, 'write_precision')
217+
write_type = getattr(write_opts, 'write_type', write_type)
218+
write_precision = getattr(write_opts, 'write_precision', write_precision)
218219
gzip_threshold = getattr(write_opts, 'gzip_threshold')
219220

220221
write_options = WriteOptions(
221222
write_type=write_type,
222223
write_precision=write_precision,
223224
gzip_threshold=gzip_threshold,
225+
enable_gzip=kwargs.get('enable_gzip', False)
224226
)
225227

226228
self._write_client_options = {
@@ -244,6 +246,8 @@ def __init__(
244246
url=f"{scheme}://{hostname}:{port}",
245247
token=self._token,
246248
org=self._org,
249+
enable_gzip=write_options.enable_gzip,
250+
gzip_threshold=write_options.gzip_threshold,
247251
**kwargs)
248252

249253
self._write_api = _WriteApi(influxdb_client=self._client, **self._write_client_options)

influxdb_client_3/write_client/_sync/api_client.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,10 @@ def __call_api(
120120
config = self.configuration
121121
self._signin(resource_path=resource_path)
122122

123+
gzip_threshold = config.gzip_threshold
124+
enable_gzip = config.enable_gzip
125+
self.should_compress = self.check_should_compress(body, gzip_threshold, enable_gzip)
126+
123127
# header parameters
124128
header_params = header_params or {}
125129
config.update_request_header_params(resource_path, header_params)
@@ -192,6 +196,12 @@ def __call_api(
192196
return (return_data, response_data.status,
193197
response_data.getheaders())
194198

199+
def check_should_compress(self, body: bytearray, gzip_threshold: int, enable_gzip: bool) -> bool:
200+
body_size = len(body)
201+
if enable_gzip is True or (enable_gzip is not False and (gzip_threshold and body_size >= gzip_threshold)):
202+
return True
203+
return False
204+
195205
def sanitize_for_serialization(self, obj):
196206
"""Build a JSON POST object.
197207

influxdb_client_3/write_client/client/_base.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434

3535
# noinspection PyMethodMayBeStatic
3636
class _BaseClient(object):
37-
def __init__(self, url, token, debug=None, timeout=10_000, enable_gzip=False, org: str = None,
37+
def __init__(self, url, token, debug=None, timeout=10_000, enable_gzip=False, gzip_threshold=None, org: str = None,
3838
default_tags: dict = None, http_client_logger: str = None, **kwargs) -> None:
3939
self.url = url
4040
self.org = org
@@ -47,6 +47,7 @@ def __init__(self, url, token, debug=None, timeout=10_000, enable_gzip=False, or
4747
else:
4848
self.conf.host = self.url
4949
self.conf.enable_gzip = enable_gzip
50+
self.conf.gzip_threshold = gzip_threshold
5051
self.conf.verify_ssl = kwargs.get('verify_ssl', True)
5152
self.conf.ssl_ca_cert = kwargs.get('ssl_ca_cert', None)
5253
self.conf.cert_file = kwargs.get('cert_file', None)
@@ -271,12 +272,14 @@ class _Configuration(Configuration):
271272
def __init__(self):
272273
Configuration.__init__(self)
273274
self.enable_gzip = False
275+
self.gzip_threshold = None
276+
self.should_compress = False
274277
self.username = None
275278
self.password = None
276279

277280
def update_request_header_params(self, path: str, params: dict):
278281
super().update_request_header_params(path, params)
279-
if self.enable_gzip:
282+
if self.should_compress:
280283
# GZIP Request
281284
if path == '/api/v2/write':
282285
params["Content-Encoding"] = "gzip"
@@ -292,7 +295,7 @@ def update_request_header_params(self, path: str, params: dict):
292295

293296
def update_request_body(self, path: str, body):
294297
_body = super().update_request_body(path, body)
295-
if self.enable_gzip:
298+
if self.should_compress:
296299
# GZIP Request
297300
if path == '/api/v2/write':
298301
import gzip

influxdb_client_3/write_client/client/influxdb_client.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
class InfluxDBClient(_BaseClient):
1414
"""InfluxDBClient is client for InfluxDB v2."""
1515

16-
def __init__(self, url, token: str = None, debug=None, timeout=10_000, enable_gzip=False, org: str = None,
16+
def __init__(self, url, token: str = None, debug=None, timeout=10_000, enable_gzip=False, gzip_threshold=None, org: str = None,
1717
default_tags: dict = None, **kwargs) -> None:
1818
"""
1919
Initialize defaults.
@@ -50,7 +50,7 @@ def __init__(self, url, token: str = None, debug=None, timeout=10_000, enable_gz
5050
:key str password: ``password`` to authenticate via username and password credentials to the InfluxDB 2.x
5151
:key list[str] profilers: list of enabled Flux profilers
5252
"""
53-
super().__init__(url=url, token=token, debug=debug, timeout=timeout, enable_gzip=enable_gzip, org=org,
53+
super().__init__(url=url, token=token, debug=debug, timeout=timeout, enable_gzip=enable_gzip, gzip_threshold=gzip_threshold, org=org,
5454
default_tags=default_tags, http_client_logger="urllib3", **kwargs)
5555

5656
from influxdb_client_3.write_client._sync.api_client import ApiClient

influxdb_client_3/write_client/client/write_api.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,12 @@ class WriteType(Enum):
3838
asynchronous = 2
3939
synchronous = 3
4040

41+
DEFAULT_GZIP_THRESHOLD = 1000
4142

43+
#todo: convert to enum
4244
DefaultWriteOptions = {
4345
'write_type': WriteType.synchronous,
44-
'write_precision': WritePrecision.NS,
45-
'gzip_threshold': 1000
46+
'write_precision': WritePrecision.NS
4647
}
4748

4849

@@ -59,7 +60,8 @@ def __init__(self, write_type: WriteType = WriteType.batching,
5960
exponential_base=2,
6061
max_close_wait=300_000,
6162
write_precision=DEFAULT_WRITE_PRECISION,
62-
gzip_threshold=1000,
63+
gzip_threshold=None,
64+
enable_gzip=False,
6365
write_scheduler=ThreadPoolScheduler(max_workers=1)) -> None:
6466
"""
6567
Create write api configuration.
@@ -92,6 +94,7 @@ def __init__(self, write_type: WriteType = WriteType.batching,
9294
self.max_close_wait = max_close_wait
9395
self.write_precision = write_precision
9496
self.gzip_threshold = gzip_threshold
97+
self.enable_gzip = enable_gzip
9598

9699
def to_retry_strategy(self, **kwargs):
97100
"""

influxdb_client_3/write_client/configuration.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ class Configuration(object, metaclass=TypeWithDefault):
4949
Ref: https://openapi-generator.tech
5050
Do not edit the class manually.
5151
"""
52+
#todo: remove wrong document
5253

5354
def __init__(self):
5455
"""Initialize configuration."""
@@ -118,6 +119,10 @@ def __init__(self):
118119
# Safe chars for path_param
119120
self.safe_chars_for_path_param = ''
120121

122+
# Compression settings
123+
self.enable_gzip = False
124+
self.gzip_threshold = None
125+
121126
@property
122127
def logger_file(self):
123128
"""Logger file.

tests/test_api_client.py

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,3 +139,48 @@ def test_api_error_headers(self):
139139
self.assertEqual(headers['Trace-Sampled'], 'false')
140140
self.assertEqual(headers['X-Influxdb-Request-Id'], requestid)
141141
self.assertEqual(headers['X-Influxdb-Build'], 'Mock')
142+
143+
def test_check_should_compress_true(self):
144+
conf = Configuration()
145+
client = ApiClient(conf)
146+
147+
body = bytearray("12345678901234567890".encode("utf-8")) # len = 20
148+
tests = [
149+
{
150+
'gzip_threshold': 10,
151+
'enable_gzip': True,
152+
'expected': True
153+
},
154+
{
155+
'gzip_threshold': 30,
156+
'enable_gzip': True,
157+
'expected': True
158+
},
159+
{
160+
'gzip_threshold': None,
161+
'enable_gzip': True,
162+
'expected': True
163+
},
164+
{
165+
'gzip_threshold': 30,
166+
'enable_gzip': None,
167+
'expected': False
168+
},
169+
{
170+
'gzip_threshold': 30,
171+
'enable_gzip': False,
172+
'expected': False
173+
},
174+
{
175+
'gzip_threshold': 10,
176+
'enable_gzip': None,
177+
'expected': True
178+
},
179+
]
180+
181+
for test in tests:
182+
gzip_threshold = test['gzip_threshold']
183+
enable_gzip = test['enable_gzip']
184+
expected = test['expected']
185+
result = client.check_should_compress(body, gzip_threshold, enable_gzip)
186+
self.assertEqual(result, expected)

tests/test_influxdb_client_3.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import unittest
22
from unittest.mock import patch
33

4-
from influxdb_client_3 import InfluxDBClient3, from_env, WritePrecision, WriteType
4+
from influxdb_client_3 import InfluxDBClient3, from_env, WritePrecision, DefaultWriteOptions
55
from tests.util import asyncio_run
66
from tests.util.mocks import ConstantFlightServer, ConstantData
77

@@ -75,13 +75,17 @@ async def test_query_async(self):
7575
assert {'data': 'query_type', 'reference': 'sql', 'value': -1.0} in result_list
7676

7777
def test_default_client(self):
78-
expected_precision = WritePrecision.NS
79-
expected_write_type = WriteType.synchronous
78+
expected_precision = DefaultWriteOptions['write_precision']
79+
expected_write_type = DefaultWriteOptions['write_type']
80+
expected_gzip_threshold = None
81+
expected_gzip_enabled = False
8082

8183
def verify_client_write_options(c):
8284
write_options = c._write_client_options.get('write_options')
8385
self.assertEqual(write_options.write_precision, expected_precision)
8486
self.assertEqual(write_options.write_type, expected_write_type)
87+
self.assertEqual(write_options.gzip_threshold, expected_gzip_threshold)
88+
self.assertEqual(write_options.enable_gzip, expected_gzip_enabled)
8589

8690
self.assertEqual(c._write_api._write_options.write_precision, expected_precision)
8791
self.assertEqual(c._write_api._write_options.write_type, expected_write_type)

0 commit comments

Comments
 (0)