Skip to content

Commit 3a05af6

Browse files
committed
feat: (WIP) timeouts - adds tests for write API timeout.
1 parent 14bedd6 commit 3a05af6

File tree

5 files changed

+130
-51
lines changed

5 files changed

+130
-51
lines changed

influxdb_client_3/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -352,7 +352,7 @@ def write(self, record=None, database=None, **kwargs):
352352
database = self._database
353353

354354
try:
355-
self._write_api.write(bucket=database, record=record, **kwargs)
355+
return self._write_api.write(bucket=database, record=record, **kwargs)
356356
except InfluxDBError as e:
357357
raise e
358358

influxdb_client_3/write_client/client/write_api.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from collections import defaultdict
88
from datetime import timedelta
99
from enum import Enum
10+
from multiprocessing.pool import ApplyResult
1011
from random import random
1112
from time import sleep
1213
from typing import Union, Any, Iterable, NamedTuple
@@ -264,6 +265,7 @@ def __init__(self,
264265
"""
265266
super().__init__(influxdb_client=influxdb_client, point_settings=point_settings)
266267
self._write_options = write_options
268+
# TODO - callbacks seem to be used with batching type only - could they be used with sync or async?
267269
self._success_callback = kwargs.get('success_callback', None)
268270
self._error_callback = kwargs.get('error_callback', None)
269271
self._retry_callback = kwargs.get('retry_callback', None)
@@ -303,6 +305,7 @@ def __init__(self,
303305
You can use native asynchronous version of the client:
304306
- https://influxdb-client.readthedocs.io/en/stable/usage.html#how-to-use-asyncio
305307
"""
308+
# TODO above message has link to Influxdb2 API __NOT__ Influxdb3 API !!! - illustrates different API
306309
warnings.warn(message, DeprecationWarning)
307310

308311
def write(self, bucket: str, org: str = None,

tests/test_api_client.py

Lines changed: 42 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,13 @@
11
import json
2+
import time
23
import unittest
34
import uuid
5+
from concurrent.futures import thread
46
from unittest import mock
7+
8+
import pytest
59
from urllib3 import response
10+
from urllib3.exceptions import ConnectTimeoutError
611

712
from influxdb_client_3.write_client._sync.api_client import ApiClient
813
from influxdb_client_3.write_client.configuration import Configuration
@@ -36,9 +41,20 @@ def data(self):
3641

3742
return MockResponse(None, 200)
3843

39-
4044
class ApiClientTests(unittest.TestCase):
4145

46+
received_timeout_total = None
47+
def mock_urllib3_timeout_request(method,
48+
url,
49+
body,
50+
headers,
51+
**urlopen_kw):
52+
if urlopen_kw.get('timeout', None) is not None:
53+
ApiClientTests.received_timeout_total = urlopen_kw['timeout'].total
54+
raise ConnectTimeoutError()
55+
56+
return response.HTTPResponse(status=200, version=4, reason="OK", decode_content=False, request_url=url)
57+
4258
def test_default_headers(self):
4359
global _package
4460
conf = Configuration()
@@ -140,10 +156,33 @@ def test_api_error_headers(self):
140156
self.assertEqual(headers['X-Influxdb-Request-Id'], requestid)
141157
self.assertEqual(headers['X-Influxdb-Build'], 'Mock')
142158

143-
def test_request_timeout_from_config(self):
144-
# TODO
159+
@mock.patch("urllib3._request_methods.RequestMethods.request",
160+
side_effect=mock_urllib3_timeout_request)
161+
def test_request_config_timeout(self, mock_request):
162+
conf = Configuration()
163+
conf.host = "http://localhost:8181"
164+
conf.timeout = 300
165+
local_client = ApiClient(conf)
166+
service = WriteService(local_client)
167+
with pytest.raises(ConnectTimeoutError):
168+
service.post_write("TEST_ORG", "TEST_BUCKET", "data,foo=bar val=3.14",
169+
_preload_content=False)
170+
self.assertEqual(0.3, self.received_timeout_total)
171+
self.received_timeout_total = None
172+
173+
@mock.patch("urllib3._request_methods.RequestMethods.request",
174+
side_effect=mock_urllib3_timeout_request)
175+
def test_request_arg_timeout(self, mock_request):
145176
conf = Configuration()
177+
conf.host = "http://localhost:8181"
178+
conf.timeout = 300
146179
local_client = ApiClient(conf)
180+
service = WriteService(local_client)
181+
with pytest.raises(ConnectTimeoutError):
182+
service.post_write("TEST_ORG", "TEST_BUCKET", "data,foo=bar val=3.14",
183+
_request_timeout=100, _preload_content=False)
184+
self.assertEqual(0.1, self.received_timeout_total)
185+
self.received_timeout_total = None
147186

148187
def test_should_gzip(self):
149188
# Test when gzip is disabled

tests/test_influxdb_client_3_integration.py

Lines changed: 61 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
import time
88
import unittest
99

10-
from urllib3.exceptions import MaxRetryError, ConnectTimeoutError
10+
from urllib3.exceptions import MaxRetryError, ConnectTimeoutError, TimeoutError as Url3TimeoutError
1111

1212
from influxdb_client_3 import InfluxDBClient3, write_client_options, WriteOptions, SYNCHRONOUS, flight_client_options, \
1313
WriteType
@@ -251,22 +251,14 @@ def test_get_server_version(self):
251251
version = self.client.get_server_version()
252252
assert version is not None
253253

254-
# TODO set sync test, also investigate behavior with batcher and retry
255-
# TODO do these need to be run with integration - won't mock suffice?
256254
def test_write_timeout_sync(self):
257255

258-
ErrorRecord = None
259-
def set_error_record(error):
260-
nonlocal ErrorRecord
261-
ErrorRecord = error
262-
263-
with pytest.raises(ConnectTimeoutError) as e:
256+
with pytest.raises(Url3TimeoutError):
264257
localClient = InfluxDBClient3(
265258
host=self.host,
266259
database=self.database,
267260
token=self.token,
268-
write_client_options=flight_client_options(
269-
error_callback=set_error_record,
261+
write_client_options=write_client_options(
270262
write_options=WriteOptions(
271263
max_retry_time=0,
272264
timeout=20,
@@ -277,83 +269,105 @@ def set_error_record(error):
277269

278270
localClient.write("test_write_timeout,location=harfa fVal=3.14,iVal=42i")
279271

280-
281-
@pytest.mark.skip(reason="placeholder - partially implemented")
282272
@asyncio_run
283273
async def test_write_timeout_async(self):
284-
# fco = flight_client_options(max_retries=10, timeout=30_000)
285-
# print(f"DEBUG fco: {fco}")
286-
# TODO ensure API can handle either callback or thrown exception
287-
# TODO asserts based on solution
288274

289-
ErrorRecord = None
290-
def set_error_record(error):
291-
nonlocal ErrorRecord
292-
ErrorRecord = error
275+
with pytest.raises(Url3TimeoutError):
276+
localClient = InfluxDBClient3(
277+
host=self.host,
278+
database=self.database,
279+
token=self.token,
280+
write_client_options=write_client_options(
281+
# error_callback=set_error_record,
282+
write_options=WriteOptions(
283+
max_retry_time=0, # disable retries
284+
timeout=20,
285+
write_type=WriteType.asynchronous
286+
)
287+
)
288+
)
289+
290+
applyResult = localClient.write("test_write_timeout,location=harfa fVal=3.14,iVal=42i")
291+
applyResult.get()
293292

293+
def test_write_timeout_batching(self):
294+
295+
ErrorResult = {"rt": None, "rd": None, "rx": None}
296+
297+
def set_error_result(rt, rd, rx):
298+
nonlocal ErrorResult
299+
ErrorResult = {"rt": rt, "rd": rd, "rx": rx}
294300

295301
localClient = InfluxDBClient3(
296302
host=self.host,
297303
database=self.database,
298304
token=self.token,
299-
write_client_options=flight_client_options(
300-
error_callback=set_error_record,
305+
write_client_options=write_client_options(
306+
error_callback=set_error_result,
301307
write_options=WriteOptions(
302-
max_retry_time=0,
308+
max_retry_time=0, # disable retries
303309
timeout=20,
304-
write_type=WriteType.asynchronous
310+
write_type=WriteType.batching,
311+
max_retries=1,
312+
batch_size=1,
305313
)
306314
)
307315
)
316+
lp = "test_write_timeout,location=harfa fVal=3.14,iVal=42i"
317+
localClient.write(lp)
308318

309-
print(f"DEBUG localClient._write_client_options: {localClient._write_client_options['write_options'].__dict__}")
310-
print(f"DEBUG localClient._client._base._Configuration {localClient._client.conf.timeout}")
311-
312-
applyResult = localClient.write("test_write_timeout,location=harfa fVal=3.14,iVal=42i")
313-
print(f"DEBUG applyResult: {applyResult}")
314-
result = applyResult.get()
315-
print(f"DEBUG result: {result}")
319+
# wait for batcher attempt last write retry
320+
time.sleep(0.1)
316321

322+
assert ErrorResult["rt"] == (self.database, 'default', 'ns')
323+
assert ErrorResult["rd"] is not None
324+
assert isinstance(ErrorResult["rd"], bytes)
325+
assert ErrorResult["rd"].decode('utf-8') == lp
326+
assert ErrorResult["rx"] is not None
327+
assert isinstance(ErrorResult["rx"], MaxRetryError)
328+
mre = ErrorResult["rx"]
329+
assert isinstance(mre.reason, Url3TimeoutError)
317330

318-
def test_write_timeout_batching(self):
331+
def test_write_timeout_retry(self):
319332

320333
ErrorResult = {"rt": None, "rd": None, "rx": None}
321-
322334
def set_error_result(rt, rd, rx):
323335
nonlocal ErrorResult
324336
ErrorResult = {"rt": rt, "rd": rd, "rx": rx}
325337

338+
retry_ct = 0
339+
def retry_cb(args, data, excp):
340+
nonlocal retry_ct
341+
retry_ct += 1
342+
326343
localClient = InfluxDBClient3(
327344
host=self.host,
328345
database=self.database,
329346
token=self.token,
330-
write_client_options=flight_client_options(
347+
write_client_options=write_client_options(
331348
error_callback=set_error_result,
349+
retry_callback=retry_cb,
332350
write_options=WriteOptions(
333-
max_retry_time=0,
351+
max_retry_time=10000,
352+
max_retry_delay=100,
353+
retry_interval=100,
334354
timeout=20,
335-
write_type=WriteType.batching,
336-
max_retries=1,
355+
max_retries=3,
337356
batch_size=1,
338357
)
339358
)
340359
)
360+
341361
lp = "test_write_timeout,location=harfa fVal=3.14,iVal=42i"
342362
localClient.write(lp)
363+
time.sleep(1) # await all retries
343364

344-
# wait for batcher attempt last write retry
345-
time.sleep(0.1)
346-
365+
assert retry_ct == 3
347366
assert ErrorResult["rt"] == (self.database, 'default', 'ns')
348367
assert ErrorResult["rd"] is not None
349368
assert isinstance(ErrorResult["rd"], bytes)
350369
assert ErrorResult["rd"].decode('utf-8') == lp
351370
assert ErrorResult["rx"] is not None
352371
assert isinstance(ErrorResult["rx"], MaxRetryError)
353372
mre = ErrorResult["rx"]
354-
assert isinstance(mre.reason, ConnectTimeoutError)
355-
356-
@pytest.mark.skip("place holder")
357-
def test_write_timeout_retry(self):
358-
# TODO
359-
print("DEBUG test_write_timeout_retry")
373+
assert isinstance(mre.reason, Url3TimeoutError)

tests/test_write_local_server.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
import re
2+
import time
23
from http import HTTPStatus
34

45
import pytest
56
from pytest_httpserver import HTTPServer, RequestMatcher
7+
from urllib3.exceptions import TimeoutError as urllib3_TimeoutError
68

79
from influxdb_client_3 import InfluxDBClient3, WriteOptions, WritePrecision, write_client_options, WriteType
810
from influxdb_client_3.write_client.rest import ApiException
@@ -20,6 +22,10 @@ def assert_request_made(httpserver, matcher):
2022
httpserver.assert_request_made(matcher)
2123
httpserver.check_assertions()
2224

25+
@staticmethod
26+
def delay_response(httpserver, delay = 1.0):
27+
httpserver.expect_request(re.compile(".*")).respond_with_handler(lambda request: time.sleep(delay))
28+
2329
def test_write_default_params(self, httpserver: HTTPServer):
2430
self.set_response_status(httpserver, 200)
2531

@@ -127,3 +133,20 @@ def test_write_with_no_sync_true_and_gzip(self, httpserver: HTTPServer):
127133
method="POST", uri="/api/v3/write_lp",
128134
query_string={"org": "ORG", "db": "DB", "precision": "microsecond", "no_sync": "true"},
129135
headers={"Content-Encoding": "gzip"}, ))
136+
137+
def test_write_with_timeout(self, httpserver: HTTPServer):
138+
self.delay_response(httpserver, 0.5)
139+
140+
with pytest.raises(urllib3_TimeoutError):
141+
InfluxDBClient3(
142+
host=(httpserver.url_for("/")), org="ORG", database="DB", token="TOKEN",
143+
write_client_options=write_client_options(
144+
write_options=WriteOptions(
145+
write_type=WriteType.synchronous,
146+
write_precision=WritePrecision.US,
147+
timeout=30,
148+
no_sync=True
149+
)
150+
),
151+
enable_gzip=True
152+
).write(self.SAMPLE_RECORD)

0 commit comments

Comments
 (0)