Skip to content

Commit 047fce5

Browse files
feat: support gzip_threshold
1 parent 007627d commit 047fce5

File tree

6 files changed

+95
-18
lines changed

6 files changed

+95
-18
lines changed

influxdb_client_3/__init__.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
INFLUX_ORG = "INFLUX_ORG"
2222
INFLUX_PRECISION = "INFLUX_PRECISION"
2323
INFLUX_AUTH_SCHEME = "INFLUX_AUTH_SCHEME"
24+
INFLUX_GZIP_THRESHOLD = "INFLUX_GZIP_THRESHOLD"
2425

2526

2627
def write_client_options(**kwargs):
@@ -114,6 +115,30 @@ def _parse_precision(precision):
114115
return precision
115116

116117

118+
def _parse_gzip_threshold(threshold):
119+
"""
120+
Parses and validates the provided threshold value.
121+
122+
This function ensures that the given threshold is a valid integer value,
123+
and it raises an appropriate error if the threshold is not valid. It also
124+
enforces that the threshold value is non-negative.
125+
126+
:param threshold: The input threshold value to be parsed and validated.
127+
:type threshold: Any
128+
:return: The validated threshold value as an integer.
129+
:rtype: int
130+
:raises ValueError: If the provided threshold is not an integer or if it is
131+
negative.
132+
"""
133+
try:
134+
threshold = int(threshold)
135+
except (TypeError, ValueError):
136+
raise ValueError(f"Invalid threshold value: {threshold}. Must be integer.")
137+
if threshold < 0:
138+
raise ValueError(f"Invalid threshold value: {threshold}. Must be non-negative.")
139+
return threshold
140+
141+
117142
class InfluxDBClient3:
118143
def __init__(
119144
self,
@@ -254,6 +279,11 @@ def from_env(cls, **kwargs: Any) -> 'InfluxDBClient3':
254279

255280
write_options = WriteOptions(write_type=WriteType.synchronous)
256281

282+
gzip_threshold = os.getenv(INFLUX_GZIP_THRESHOLD)
283+
if gzip_threshold is not None:
284+
kwargs['gzip_threshold'] = _parse_gzip_threshold(gzip_threshold)
285+
kwargs['enable_gzip'] = True
286+
257287
precision = os.getenv(INFLUX_PRECISION)
258288
if precision is not None:
259289
write_options.write_precision = _parse_precision(precision)

influxdb_client_3/write_client/_sync/api_client.py

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,25 @@ def set_default_header(self, header_name, header_value):
9292
"""Set HTTP header for this API client."""
9393
self.default_headers[header_name] = header_value
9494

95+
@staticmethod
96+
def should_gzip(payload: str, enable_gzip: bool = True, gzip_threshold: int = None) -> bool:
97+
"""
98+
Determine if the payload should be compressed with gzip.
99+
100+
Args:
101+
payload: The string content to potentially compress
102+
enable_gzip: Flag indicating if gzip compression is enabled
103+
gzip_threshold: Minimum size in bytes for compression to be applied
104+
105+
Returns:
106+
bool: True if the payload should be compressed, False otherwise
107+
"""
108+
if not enable_gzip:
109+
return False
110+
111+
payload_size = len(payload.encode('utf-8'))
112+
return gzip_threshold is not None and payload_size >= gzip_threshold
113+
95114
def __call_api(
96115
self, resource_path, method, path_params=None,
97116
query_params=None, header_params=None, body=None, post_params=None,
@@ -102,9 +121,16 @@ def __call_api(
102121
config = self.configuration
103122
self._signin(resource_path=resource_path)
104123

124+
# body
125+
should_gzip = False
126+
if body:
127+
should_gzip = self.should_gzip(config.enable_gzip, config.gzip_threshold, body)
128+
body = self.sanitize_for_serialization(body)
129+
body = config.update_request_body(resource_path, body, should_gzip)
130+
105131
# header parameters
106132
header_params = header_params or {}
107-
config.update_request_header_params(resource_path, header_params)
133+
config.update_request_header_params(resource_path, header_params, should_gzip)
108134
header_params.update(self.default_headers)
109135
if self.cookie:
110136
header_params['Cookie'] = self.cookie
@@ -141,11 +167,6 @@ def __call_api(
141167
# auth setting
142168
self.update_params_for_auth(header_params, query_params, auth_settings)
143169

144-
# body
145-
if body:
146-
body = self.sanitize_for_serialization(body)
147-
body = config.update_request_body(resource_path, body)
148-
149170
# request url
150171
url = self.configuration.host + resource_path
151172

influxdb_client_3/write_client/client/_base.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ def __init__(self, url, token, debug=None, timeout=10_000, enable_gzip=False, or
4949
else:
5050
self.conf.host = self.url
5151
self.conf.enable_gzip = enable_gzip
52+
self.conf.gzip_threshold = kwargs.get('gzip_threshold', None)
5253
self.conf.verify_ssl = kwargs.get('verify_ssl', True)
5354
self.conf.ssl_ca_cert = kwargs.get('ssl_ca_cert', None)
5455
self.conf.cert_file = kwargs.get('cert_file', None)
@@ -277,9 +278,9 @@ def __init__(self):
277278
self.username = None
278279
self.password = None
279280

280-
def update_request_header_params(self, path: str, params: dict):
281-
super().update_request_header_params(path, params)
282-
if self.enable_gzip:
281+
def update_request_header_params(self, path: str, params: dict, should_gzip: bool = False):
282+
super().update_request_header_params(path, params, should_gzip)
283+
if should_gzip:
283284
# GZIP Request
284285
if path == '/api/v2/write':
285286
params["Content-Encoding"] = "gzip"
@@ -293,9 +294,9 @@ def update_request_header_params(self, path: str, params: dict):
293294
pass
294295
pass
295296

296-
def update_request_body(self, path: str, body):
297-
_body = super().update_request_body(path, body)
298-
if self.enable_gzip:
297+
def update_request_body(self, path: str, body, should_gzip: bool = False):
298+
_body = super().update_request_body(path, body, should_gzip)
299+
if should_gzip:
299300
# GZIP Request
300301
if path == '/api/v2/write':
301302
import gzip

influxdb_client_3/write_client/client/influxdb_client.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ class InfluxDBClient(_BaseClient):
1616
"""InfluxDBClient is client for InfluxDB v2."""
1717

1818
def __init__(self, url, token: str = None, debug=None, timeout=10_000, enable_gzip=False, org: str = None,
19-
default_tags: dict = None, **kwargs) -> None:
19+
default_tags: dict = None, gzip_threshold=None, **kwargs) -> None:
2020
"""
2121
Initialize defaults.
2222
@@ -52,7 +52,8 @@ def __init__(self, url, token: str = None, debug=None, timeout=10_000, enable_gz
5252
:key str password: ``password`` to authenticate via username and password credentials to the InfluxDB 2.x
5353
:key list[str] profilers: list of enabled Flux profilers
5454
"""
55-
super().__init__(url=url, token=token, debug=debug, timeout=timeout, enable_gzip=enable_gzip, org=org,
55+
super().__init__(url=url, token=token, debug=debug, timeout=timeout, enable_gzip=enable_gzip,
56+
gzip_threshold=gzip_threshold, org=org,
5657
default_tags=default_tags, http_client_logger="urllib3", **kwargs)
5758

5859
from influxdb_client_3.write_client._sync.api_client import ApiClient

influxdb_client_3/write_client/configuration.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,10 @@ def __init__(self):
9898
# Safe chars for path_param
9999
self.safe_chars_for_path_param = ''
100100

101+
# Compression settings
102+
self.enable_gzip = False
103+
self.gzip_threshold = None
104+
101105
@property
102106
def logger_file(self):
103107
"""Logger file.
@@ -245,19 +249,21 @@ def to_debug_report(self):
245249
"SDK Package Version: {client_version}".\
246250
format(env=sys.platform, pyversion=sys.version, client_version=VERSION)
247251

248-
def update_request_header_params(self, path: str, params: dict):
252+
def update_request_header_params(self, path: str, params: dict, should_gzip: bool = False):
249253
"""Update header params based on custom settings.
250254
251-
:param path: Resource path
255+
:param path: Resource path.
252256
:param params: Header parameters dict to be updated.
257+
:param should_gzip: Describes if request body should be gzip compressed.
253258
"""
254259
pass
255260

256-
def update_request_body(self, path: str, body):
261+
def update_request_body(self, path: str, body, should_gzip: bool = False):
257262
"""Update http body based on custom settings.
258263
259-
:param path: Resource path
264+
:param path: Resource path.
260265
:param body: Request body to be updated.
266+
:param should_gzip: Describes if request body should be gzip compressed.
261267
:return: Updated body
262268
"""
263269
return body

tests/test_api_client.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,3 +139,21 @@ def test_api_error_headers(self):
139139
self.assertEqual(headers['Trace-Sampled'], 'false')
140140
self.assertEqual(headers['X-Influxdb-Request-Id'], requestid)
141141
self.assertEqual(headers['X-Influxdb-Build'], 'Mock')
142+
143+
def test_should_gzip(self):
144+
# Test when gzip is disabled
145+
assert not ApiClient.should_gzip("test", enable_gzip=False, gzip_threshold=1)
146+
147+
# Test when threshold is None
148+
assert not ApiClient.should_gzip("test", enable_gzip=True, gzip_threshold=None)
149+
150+
# Test payload smaller than threshold
151+
assert not ApiClient.should_gzip("test", enable_gzip=True, gzip_threshold=10000)
152+
153+
# Test payload larger than threshold
154+
large_payload = "x" * 10000
155+
assert ApiClient.should_gzip(large_payload, enable_gzip=True, gzip_threshold=1000)
156+
157+
# Test exact threshold match
158+
exact_payload = "x" * 1000
159+
assert ApiClient.should_gzip(exact_payload, enable_gzip=True, gzip_threshold=1000)

0 commit comments

Comments
 (0)