Skip to content

Commit adc3d0b

Browse files
committed
chore: minor refactoring. Sync timeout types in __init__. Extra tests.
1 parent 8229057 commit adc3d0b

File tree

6 files changed

+94
-26
lines changed

6 files changed

+94
-26
lines changed

influxdb_client_3/__init__.py

Lines changed: 33 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
INFLUX_AUTH_SCHEME = "INFLUX_AUTH_SCHEME"
2626
INFLUX_GZIP_THRESHOLD = "INFLUX_GZIP_THRESHOLD"
2727
INFLUX_WRITE_NO_SYNC = "INFLUX_WRITE_NO_SYNC"
28+
INFLUX_WRITE_TIMEOUT = "INFLUX_WRITE_TIMEOUT"
29+
INFLUX_QUERY_TIMEOUT = "INFLUX_QUERY_TIMEOUT"
2830

2931

3032
def write_client_options(**kwargs):
@@ -97,7 +99,7 @@ def _merge_options(defaults, exclude_keys=None, custom=None):
9799
return _deep_merge(defaults, {key: value for key, value in custom.items() if key not in exclude_keys})
98100

99101

100-
def _parse_precision(precision):
102+
def _parse_precision(precision: str) -> WritePrecision:
101103
"""
102104
Parses the precision value and ensures it is valid.
103105
@@ -124,7 +126,7 @@ def _parse_precision(precision):
124126
raise ValueError(f"Invalid precision value: {precision}")
125127

126128

127-
def _parse_gzip_threshold(threshold):
129+
def _parse_gzip_threshold(threshold: str) -> int:
128130
"""
129131
Parses and validates the provided threshold value.
130132
@@ -148,7 +150,7 @@ def _parse_gzip_threshold(threshold):
148150
return threshold
149151

150152

151-
def _parse_write_no_sync(write_no_sync):
153+
def _parse_write_no_sync(write_no_sync: str):
152154
"""
153155
Parses and validates the provided write no sync value.
154156
@@ -163,6 +165,16 @@ def _parse_write_no_sync(write_no_sync):
163165
return write_no_sync.strip().lower() in ['true', '1', 't', 'y', 'yes']
164166

165167

168+
def _parse_timeout(to: str) -> int:
169+
try:
170+
timeout = int(to)
171+
except (TypeError, ValueError):
172+
raise ValueError(f"Invalid timeout value: {to}. Must be a number.")
173+
if timeout < 0:
174+
raise ValueError(f"Invalid timeout value: {to}. Must be non-negative.")
175+
return timeout
176+
177+
166178
class InfluxDBClient3:
167179
def __init__(
168180
self,
@@ -211,24 +223,30 @@ def __init__(
211223
(defaults to false, don't set to true when talking to InfluxDB 2)
212224
:key str username: ``username`` to authenticate via username and password credentials to the InfluxDB 2.x
213225
:key str password: ``password`` to authenticate via username and password credentials to the InfluxDB 2.x
214-
:key str query_timeout: float value used to set the client query API timeout in seconds.
226+
:key str query_timeout: int value used to set the client query API timeout in milliseconds.
227+
:key str write_timeout: int value used to set the client write API timeout in milliseconds.
215228
:key list[str] profilers: list of enabled Flux profilers
216229
"""
217230
self._org = org if org is not None else "default"
218231
self._database = database
219232
self._token = token
233+
kw_keys = kwargs.keys()
220234

221235
write_type = DefaultWriteOptions.write_type.value
222236
write_precision = DefaultWriteOptions.write_precision.value
223237
write_no_sync = DefaultWriteOptions.no_sync.value
224238
write_timeout = DefaultWriteOptions.timeout.value
239+
225240
if isinstance(write_client_options, dict) and write_client_options.get('write_options') is not None:
226241
write_opts = write_client_options['write_options']
227242
write_type = getattr(write_opts, 'write_type', write_type)
228243
write_precision = getattr(write_opts, 'write_precision', write_precision)
229244
write_no_sync = getattr(write_opts, 'no_sync', write_no_sync)
230245
write_timeout = getattr(write_opts, 'timeout', write_timeout)
231246

247+
if kw_keys.__contains__('write_timeout'):
248+
write_timeout = kwargs.get('write_timeout')
249+
232250
write_options = WriteOptions(
233251
write_type=write_type,
234252
write_precision=write_precision,
@@ -269,15 +287,15 @@ def __init__(
269287
connection_string = f"grpc+tcp://{hostname}:{port}"
270288

271289
q_opts_builder = QueryApiOptionsBuilder()
272-
kw_keys = kwargs.keys()
273290
if kw_keys.__contains__('ssl_ca_cert'):
274291
q_opts_builder.root_certs(kwargs.get('ssl_ca_cert', None))
275292
if kw_keys.__contains__('verify_ssl'):
276293
q_opts_builder.tls_verify(kwargs.get('verify_ssl', True))
277294
if kw_keys.__contains__('proxy'):
278295
q_opts_builder.proxy(kwargs.get('proxy', None))
279296
if kw_keys.__contains__('query_timeout'):
280-
q_opts_builder.timeout(kwargs.get('query_timeout', None))
297+
query_timeout_float = float(kwargs.get('query_timeout'))
298+
q_opts_builder.timeout(query_timeout_float / 1000.0)
281299
self._query_api = _QueryApi(connection_string=connection_string, token=token,
282300
flight_client_options=flight_client_options,
283301
proxy=kwargs.get("proxy", None), options=q_opts_builder.build())
@@ -325,6 +343,15 @@ def from_env(cls, **kwargs: Any) -> 'InfluxDBClient3':
325343
if precision is not None:
326344
write_options.write_precision = _parse_precision(precision)
327345

346+
write_timeout = os.getenv(INFLUX_WRITE_TIMEOUT)
347+
if write_timeout is not None:
348+
# N.B. write_options value has precedent over kwargs['write_timeout'] above
349+
write_options.timeout = _parse_timeout(write_timeout)
350+
351+
query_timeout = os.getenv(INFLUX_QUERY_TIMEOUT)
352+
if query_timeout is not None:
353+
kwargs['query_timeout'] = _parse_timeout(query_timeout)
354+
328355
write_client_option = {'write_options': write_options}
329356

330357
if os.getenv(INFLUX_AUTH_SCHEME) is not None:

influxdb_client_3/query/query_api.py

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ class QueryApiOptions(object):
1818
tls_verify (bool): whether to verify SSL certificates or not
1919
proxy (str): URL to a proxy server
2020
flight_client_options (dict): base set of flight client options passed to internal pyarrow.flight.FlightClient
21+
timeout(float): timeout in seconds to wait for a response
2122
"""
2223
_DEFAULT_TIMEOUT = 300.0
2324
tls_root_certs: bytes = None
@@ -26,7 +27,11 @@ class QueryApiOptions(object):
2627
flight_client_options: dict = None
2728
timeout: float = None
2829

29-
def __init__(self, root_certs_path, verify, proxy, flight_client_options, timeout):
30+
def __init__(self, root_certs_path: str,
31+
verify: bool,
32+
proxy: str,
33+
flight_client_options: dict,
34+
timeout: float = _DEFAULT_TIMEOUT):
3035
"""
3136
Initialize a set of QueryApiOptions
3237
@@ -35,6 +40,7 @@ def __init__(self, root_certs_path, verify, proxy, flight_client_options, timeou
3540
:param proxy: URL of a proxy server, if required.
3641
:param flight_client_options: set of flight_client_options
3742
to be passed to internal pyarrow.flight.FlightClient.
43+
:param timeout: timeout in seconds to wait for a response.
3844
"""
3945
if root_certs_path:
4046
self.tls_root_certs = self._read_certs(root_certs_path)
@@ -43,7 +49,7 @@ def __init__(self, root_certs_path, verify, proxy, flight_client_options, timeou
4349
self.flight_client_options = flight_client_options
4450
self.timeout = timeout
4551

46-
def _read_certs(self, path):
52+
def _read_certs(self, path: str) -> bytes:
4753
with open(path, "rb") as certs_file:
4854
return certs_file.read()
4955

@@ -64,33 +70,33 @@ class QueryApiOptionsBuilder(object):
6470
6571
client = QueryApi(connection, token, None, None, options)
6672
"""
67-
_root_certs_path = None
68-
_tls_verify = True
69-
_proxy = None
70-
_flight_client_options = None
71-
_timeout = None
73+
_root_certs_path: str = None
74+
_tls_verify: bool = True
75+
_proxy: str = None
76+
_flight_client_options: dict = None
77+
_timeout: float = None
7278

73-
def root_certs(self, path):
79+
def root_certs(self, path: str):
7480
self._root_certs_path = path
7581
return self
7682

77-
def tls_verify(self, verify):
83+
def tls_verify(self, verify: bool):
7884
self._tls_verify = verify
7985
return self
8086

81-
def proxy(self, proxy):
87+
def proxy(self, proxy: str):
8288
self._proxy = proxy
8389
return self
8490

85-
def flight_client_options(self, flight_client_options):
91+
def flight_client_options(self, flight_client_options: dict):
8692
self._flight_client_options = flight_client_options
8793
return self
8894

89-
def timeout(self, timeout):
95+
def timeout(self, timeout: float):
9096
self._timeout = timeout
9197
return self
9298

93-
def build(self):
99+
def build(self) -> QueryApiOptions:
94100
"""Build a QueryApiOptions object with previously set values"""
95101
return QueryApiOptions(
96102
root_certs_path=self._root_certs_path,

influxdb_client_3/write_client/client/write_api.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ def __init__(self, write_type: WriteType = WriteType.batching,
8282
:param max_close_wait: the maximum time to wait for writes to be flushed if close() is called
8383
:param write_precision: precision to use when writing points to InfluxDB
8484
:param no_sync: skip waiting for WAL persistence on write
85+
:param timeout: timeout to use when writing to the database in milliseconds. Default is 10_000
8586
:param write_scheduler:
8687
"""
8788
self.write_type = write_type
@@ -397,9 +398,7 @@ def write(self, bucket: str, org: str = None,
397398

398399
def write_payload(payload):
399400
final_string = b'\n'.join(payload[1])
400-
result = self._post_write(_async_req, bucket, org, final_string, payload[0], no_sync)
401-
# return self._post_write(_async_req, bucket, org, final_string, payload[0], no_sync)
402-
return result
401+
return self._post_write(_async_req, bucket, org, final_string, payload[0], no_sync)
403402

404403
results = list(map(write_payload, payloads.items()))
405404
if not _async_req:

tests/test_influxdb_client_3.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,8 @@ def verify_client_write_options(c):
187187
@patch.dict('os.environ', {'INFLUX_HOST': 'localhost', 'INFLUX_TOKEN': 'test_token',
188188
'INFLUX_DATABASE': 'test_db', 'INFLUX_ORG': 'test_org',
189189
'INFLUX_PRECISION': WritePrecision.MS, 'INFLUX_AUTH_SCHEME': 'custom_scheme',
190-
'INFLUX_GZIP_THRESHOLD': '2000', 'INFLUX_WRITE_NO_SYNC': 'true'})
190+
'INFLUX_GZIP_THRESHOLD': '2000', 'INFLUX_WRITE_NO_SYNC': 'true',
191+
'INFLUX_WRITE_TIMEOUT': '1234', 'INFLUX_QUERY_TIMEOUT': '5678'})
191192
def test_from_env_all_env_vars_set(self):
192193
client = InfluxDBClient3.from_env()
193194
self.assertIsInstance(client, InfluxDBClient3)
@@ -201,6 +202,8 @@ def test_from_env_all_env_vars_set(self):
201202
write_options = client._write_client_options.get("write_options")
202203
self.assertEqual(write_options.write_precision, WritePrecision.MS)
203204
self.assertEqual(write_options.no_sync, True)
205+
self.assertEqual(1234, write_options.timeout)
206+
self.assertEqual(5.678, client._query_api._default_timeout)
204207

205208
client._write_api._point_settings = {}
206209

tests/test_influxdb_client_3_integration.py

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,22 @@ def test_get_server_version(self):
251251
version = self.client.get_server_version()
252252
assert version is not None
253253

254+
def test_write_timeout(self):
255+
with pytest.raises(Url3TimeoutError):
256+
InfluxDBClient3(
257+
host=self.host,
258+
database=self.database,
259+
token=self.token,
260+
write_timeout=30,
261+
write_client_options=write_client_options(
262+
write_options=WriteOptions(
263+
max_retry_time=0,
264+
timeout=20,
265+
write_type=WriteType.synchronous
266+
)
267+
)
268+
).write("test_write_timeout,location=harfa fVal=3.14,iVal=42i")
269+
254270
def test_write_timeout_sync(self):
255271

256272
with pytest.raises(Url3TimeoutError):
@@ -376,7 +392,7 @@ def test_query_timeout(self):
376392
host=self.host,
377393
token=self.token,
378394
database=self.database,
379-
query_timeout=0.001,
395+
query_timeout=1,
380396
)
381397

382398
with self.assertRaisesRegex(InfluxDB3ClientQueryError, ".*Deadline Exceeded.*"):
@@ -387,7 +403,7 @@ def test_query_timeout_per_call_override(self):
387403
host=self.host,
388404
token=self.token,
389405
database=self.database,
390-
query_timeout=3.0,
406+
query_timeout=3,
391407
)
392408

393409
with self.assertRaisesRegex(InfluxDB3ClientQueryError, ".*Deadline Exceeded.*"):

tests/test_write_local_server.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ def test_write_with_no_sync_true_and_gzip(self, httpserver: HTTPServer):
134134
query_string={"org": "ORG", "db": "DB", "precision": "microsecond", "no_sync": "true"},
135135
headers={"Content-Encoding": "gzip"}, ))
136136

137-
def test_write_with_timeout(self, httpserver: HTTPServer):
137+
def test_write_with_timeout_in_write_options(self, httpserver: HTTPServer):
138138
self.delay_response(httpserver, 0.5)
139139

140140
with pytest.raises(urllib3_TimeoutError):
@@ -150,3 +150,20 @@ def test_write_with_timeout(self, httpserver: HTTPServer):
150150
),
151151
enable_gzip=True
152152
).write(self.SAMPLE_RECORD)
153+
154+
def test_write_with_write_timeout(self, httpserver: HTTPServer):
155+
self.delay_response(httpserver, 0.5)
156+
157+
with pytest.raises(urllib3_TimeoutError):
158+
InfluxDBClient3(
159+
host=(httpserver.url_for("/")), org="ORG", database="DB", token="TOKEN",
160+
write_timeout=30,
161+
write_client_options=write_client_options(
162+
write_options=WriteOptions(
163+
write_type=WriteType.synchronous,
164+
write_precision=WritePrecision.US,
165+
no_sync=True,
166+
)
167+
),
168+
enable_gzip=True
169+
).write(self.SAMPLE_RECORD)

0 commit comments

Comments
 (0)