Skip to content

Commit 2deea4e

Browse files
authored
feat: disable grpc response compression (#179)
1 parent fac1793 commit 2deea4e

File tree

8 files changed

+707
-5
lines changed

8 files changed

+707
-5
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@
88
- `write_dataframe()`: New method for writing pandas and polars DataFrames with explicit parameters (`measurement`, `timestamp_column`, `tags`, `timestamp_timezone`).
99
- `query_dataframe()`: New method for querying data directly to a pandas or polars DataFrame via the `frame_type` parameter.
1010
- Updated README with clear examples for DataFrame operations.
11+
1. [#179](https://github.com/InfluxCommunity/influxdb3-python/pull/179): Add option to disable gRPC response
12+
compression for Flight queries:
13+
- `disable_grpc_compression` parameter in `InfluxDBClient3` constructor
14+
- `INFLUX_DISABLE_GRPC_COMPRESSION` environment variable support in `from_env()`
1115

1216
### Bug Fixes
1317

README.md

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -204,9 +204,32 @@ print(table.to_pandas().to_markdown())
204204
```
205205

206206
### gRPC compression
207-
The Python client supports gRPC response compression.
208-
If the server chooses to compress query responses (e.g., with gzip), the client
209-
will automatically decompress them — no extra configuration is required.
207+
208+
#### Request compression
209+
210+
Request compression is not supported by InfluxDB 3 — the client sends uncompressed requests.
211+
212+
#### Response compression
213+
214+
Response compression is enabled by default. The client sends the `grpc-accept-encoding: identity, deflate, gzip`
215+
header, and the server returns gzip-compressed responses (if supported). The client automatically
216+
decompresses them — no configuration required.
217+
218+
To **disable response compression**:
219+
220+
```python
221+
# Via constructor parameter
222+
client = InfluxDBClient3(
223+
host="your-host",
224+
token="your-token",
225+
database="your-database",
226+
disable_grpc_compression=True
227+
)
228+
229+
# Or via environment variable
230+
# INFLUX_DISABLE_GRPC_COMPRESSION=true
231+
client = InfluxDBClient3.from_env()
232+
```
210233

211234
## Windows Users
212235
Currently, Windows users require an extra installation when querying via Flight natively. This is due to the fact gRPC cannot locate Windows root certificates. To work around this please follow these steps:

influxdb_client_3/__init__.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
INFLUX_WRITE_NO_SYNC = "INFLUX_WRITE_NO_SYNC"
3232
INFLUX_WRITE_TIMEOUT = "INFLUX_WRITE_TIMEOUT"
3333
INFLUX_QUERY_TIMEOUT = "INFLUX_QUERY_TIMEOUT"
34+
INFLUX_DISABLE_GRPC_COMPRESSION = "INFLUX_DISABLE_GRPC_COMPRESSION"
3435

3536

3637
def write_client_options(**kwargs):
@@ -190,6 +191,7 @@ def __init__(
190191
flight_client_options=None,
191192
write_port_overwrite=None,
192193
query_port_overwrite=None,
194+
disable_grpc_compression=False,
193195
**kwargs):
194196
"""
195197
Initialize an InfluxDB client.
@@ -206,6 +208,8 @@ def __init__(
206208
:type write_client_options: dict[str, any]
207209
:param flight_client_options: dictionary for providing additional arguments for the FlightClient.
208210
:type flight_client_options: dict[str, any]
211+
:param disable_grpc_compression: Disable gRPC compression for Flight query responses. Default is False.
212+
:type disable_grpc_compression: bool
209213
:key auth_scheme: token authentication scheme. Set to "Bearer" for Edge.
210214
:key bool verify_ssl: Set this to false to skip verifying SSL certificate when calling API from https server.
211215
:key str ssl_ca_cert: Set this to customize the certificate file to verify the peer.
@@ -291,6 +295,8 @@ def __init__(
291295
connection_string = f"grpc+tcp://{hostname}:{port}"
292296

293297
q_opts_builder = QueryApiOptionsBuilder()
298+
if disable_grpc_compression:
299+
q_opts_builder.disable_grpc_compression(True)
294300
if kw_keys.__contains__('ssl_ca_cert'):
295301
q_opts_builder.root_certs(kwargs.get('ssl_ca_cert', None))
296302
if kw_keys.__contains__('verify_ssl'):
@@ -361,13 +367,20 @@ def from_env(cls, **kwargs: Any) -> 'InfluxDBClient3':
361367
if os.getenv(INFLUX_AUTH_SCHEME) is not None:
362368
kwargs['auth_scheme'] = os.getenv(INFLUX_AUTH_SCHEME)
363369

370+
disable_grpc_compression = os.getenv(INFLUX_DISABLE_GRPC_COMPRESSION)
371+
if disable_grpc_compression is not None:
372+
disable_grpc_compression = disable_grpc_compression.strip().lower() in ['true', '1', 't', 'y', 'yes']
373+
else:
374+
disable_grpc_compression = False
375+
364376
org = os.getenv(INFLUX_ORG, "default")
365377
return InfluxDBClient3(
366378
host=required_vars[INFLUX_HOST],
367379
token=required_vars[INFLUX_TOKEN],
368380
database=required_vars[INFLUX_DATABASE],
369381
write_client_options=write_client_option,
370382
org=org,
383+
disable_grpc_compression=disable_grpc_compression,
371384
**kwargs
372385
)
373386

influxdb_client_3/query/query_api.py

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,19 +19,22 @@ class QueryApiOptions(object):
1919
proxy (str): URL to a proxy server
2020
flight_client_options (dict): base set of flight client options passed to internal pyarrow.flight.FlightClient
2121
timeout(float): timeout in seconds to wait for a response
22+
disable_grpc_compression (bool): disable gRPC compression for query responses
2223
"""
2324
_DEFAULT_TIMEOUT = 300.0
2425
tls_root_certs: bytes = None
2526
tls_verify: bool = None
2627
proxy: str = None
2728
flight_client_options: dict = None
2829
timeout: float = None
30+
disable_grpc_compression: bool = False
2931

3032
def __init__(self, root_certs_path: str,
3133
verify: bool,
3234
proxy: str,
3335
flight_client_options: dict,
34-
timeout: float = _DEFAULT_TIMEOUT):
36+
timeout: float = _DEFAULT_TIMEOUT,
37+
disable_grpc_compression: bool = False):
3538
"""
3639
Initialize a set of QueryApiOptions
3740
@@ -41,13 +44,15 @@ def __init__(self, root_certs_path: str,
4144
:param flight_client_options: set of flight_client_options
4245
to be passed to internal pyarrow.flight.FlightClient.
4346
:param timeout: timeout in seconds to wait for a response.
47+
:param disable_grpc_compression: disable gRPC compression for query responses.
4448
"""
4549
if root_certs_path:
4650
self.tls_root_certs = self._read_certs(root_certs_path)
4751
self.tls_verify = verify
4852
self.proxy = proxy
4953
self.flight_client_options = flight_client_options
5054
self.timeout = timeout
55+
self.disable_grpc_compression = disable_grpc_compression
5156

5257
def _read_certs(self, path: str) -> bytes:
5358
with open(path, "rb") as certs_file:
@@ -75,6 +80,7 @@ class QueryApiOptionsBuilder(object):
7580
_proxy: str = None
7681
_flight_client_options: dict = None
7782
_timeout: float = None
83+
_disable_grpc_compression: bool = False
7884

7985
def root_certs(self, path: str):
8086
self._root_certs_path = path
@@ -96,6 +102,11 @@ def timeout(self, timeout: float):
96102
self._timeout = timeout
97103
return self
98104

105+
def disable_grpc_compression(self, disable: bool):
106+
"""Disable gRPC compression for query responses."""
107+
self._disable_grpc_compression = disable
108+
return self
109+
99110
def build(self) -> QueryApiOptions:
100111
"""Build a QueryApiOptions object with previously set values"""
101112
return QueryApiOptions(
@@ -104,6 +115,7 @@ def build(self) -> QueryApiOptions:
104115
proxy=self._proxy,
105116
flight_client_options=self._flight_client_options,
106117
timeout=self._timeout,
118+
disable_grpc_compression=self._disable_grpc_compression,
107119
)
108120

109121

@@ -162,6 +174,13 @@ def __init__(self,
162174
self._flight_client_options["disable_server_verification"] = not options.tls_verify
163175
if options.timeout is not None:
164176
self._default_timeout = options.timeout
177+
if options.disable_grpc_compression:
178+
# Disable gRPC response compression by only enabling identity algorithm
179+
# Bitset: bit 0 = identity, bit 1 = deflate, bit 2 = gzip
180+
# Setting to 1 (0b001) enables only identity (no compression)
181+
self._flight_client_options["generic_options"].append(
182+
("grpc.compression_enabled_algorithms_bitset", 1)
183+
)
165184
if self._proxy:
166185
self._flight_client_options["generic_options"].append(("grpc.http_proxy", self._proxy))
167186
self._flight_client = FlightClient(connection_string, **self._flight_client_options)

setup.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,13 @@ def get_version():
5151
'pandas': ['pandas'],
5252
'polars': ['polars'],
5353
'dataframe': ['pandas', 'polars'],
54-
'test': ['pytest', 'pytest-cov', 'pytest-httpserver']
54+
'test': [
55+
'pytest',
56+
'pytest-cov',
57+
'pytest-httpserver',
58+
'h2>=4.0.0,<5.0.0',
59+
'cryptography>=3.4.0',
60+
]
5561
},
5662
install_requires=requires,
5763
python_requires='>=3.8',

tests/test_influxdb_client_3.py

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -302,6 +302,74 @@ def test_parse_invalid_write_timeout_range(self):
302302
with self.assertRaisesRegex(ValueError, ".*Must be non-negative.*"):
303303
InfluxDBClient3.from_env()
304304

305+
def assertGrpcCompressionDisabled(self, client, disabled):
306+
"""Assert whether gRPC compression is disabled for the client."""
307+
self.assertIsInstance(client, InfluxDBClient3)
308+
generic_options = dict(client._query_api._flight_client_options['generic_options'])
309+
if disabled:
310+
self.assertEqual(generic_options.get('grpc.compression_enabled_algorithms_bitset'), 1)
311+
else:
312+
self.assertIsNone(generic_options.get('grpc.compression_enabled_algorithms_bitset'))
313+
314+
@patch.dict('os.environ', {'INFLUX_HOST': 'localhost', 'INFLUX_TOKEN': 'test_token',
315+
'INFLUX_DATABASE': 'test_db', 'INFLUX_DISABLE_GRPC_COMPRESSION': 'true'})
316+
def test_from_env_disable_grpc_compression_true(self):
317+
client = InfluxDBClient3.from_env()
318+
self.assertGrpcCompressionDisabled(client, True)
319+
320+
@patch.dict('os.environ', {'INFLUX_HOST': 'localhost', 'INFLUX_TOKEN': 'test_token',
321+
'INFLUX_DATABASE': 'test_db', 'INFLUX_DISABLE_GRPC_COMPRESSION': 'TrUe'})
322+
def test_from_env_disable_grpc_compression_true_mixed_case(self):
323+
client = InfluxDBClient3.from_env()
324+
self.assertGrpcCompressionDisabled(client, True)
325+
326+
@patch.dict('os.environ', {'INFLUX_HOST': 'localhost', 'INFLUX_TOKEN': 'test_token',
327+
'INFLUX_DATABASE': 'test_db', 'INFLUX_DISABLE_GRPC_COMPRESSION': '1'})
328+
def test_from_env_disable_grpc_compression_one(self):
329+
client = InfluxDBClient3.from_env()
330+
self.assertGrpcCompressionDisabled(client, True)
331+
332+
@patch.dict('os.environ', {'INFLUX_HOST': 'localhost', 'INFLUX_TOKEN': 'test_token',
333+
'INFLUX_DATABASE': 'test_db', 'INFLUX_DISABLE_GRPC_COMPRESSION': 'false'})
334+
def test_from_env_disable_grpc_compression_false(self):
335+
client = InfluxDBClient3.from_env()
336+
self.assertGrpcCompressionDisabled(client, False)
337+
338+
@patch.dict('os.environ', {'INFLUX_HOST': 'localhost', 'INFLUX_TOKEN': 'test_token',
339+
'INFLUX_DATABASE': 'test_db', 'INFLUX_DISABLE_GRPC_COMPRESSION': 'anything-else'})
340+
def test_from_env_disable_grpc_compression_anything_else_is_false(self):
341+
client = InfluxDBClient3.from_env()
342+
self.assertGrpcCompressionDisabled(client, False)
343+
344+
def test_disable_grpc_compression_parameter_true(self):
345+
client = InfluxDBClient3(
346+
host="localhost",
347+
org="my_org",
348+
database="my_db",
349+
token="my_token",
350+
disable_grpc_compression=True
351+
)
352+
self.assertGrpcCompressionDisabled(client, True)
353+
354+
def test_disable_grpc_compression_parameter_false(self):
355+
client = InfluxDBClient3(
356+
host="localhost",
357+
org="my_org",
358+
database="my_db",
359+
token="my_token",
360+
disable_grpc_compression=False
361+
)
362+
self.assertGrpcCompressionDisabled(client, False)
363+
364+
def test_disable_grpc_compression_default_is_false(self):
365+
client = InfluxDBClient3(
366+
host="localhost",
367+
org="my_org",
368+
database="my_db",
369+
token="my_token",
370+
)
371+
self.assertGrpcCompressionDisabled(client, False)
372+
305373
def test_query_with_arrow_error(self):
306374
f = ErrorFlightServer()
307375
with InfluxDBClient3(f"http://localhost:{f.port}", "my_org", "my_db", "my_token") as c:

0 commit comments

Comments
 (0)