Skip to content

Commit 7b9e802

Browse files
authored
feat: fast no-sync write support (#142)
1 parent f8a629b commit 7b9e802

File tree

13 files changed

+369
-44
lines changed

13 files changed

+369
-44
lines changed

CHANGELOG.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,17 @@
22

33
## 0.14.0 [unreleased]
44

5+
### Features
6+
7+
1. [#142](https://github.com/InfluxCommunity/influxdb3-python/pull/142): Support fast writes without waiting for WAL
8+
persistence:
9+
- New write option (`WriteOptions.no_sync`) added: `True` value means faster write but without the confirmation that
10+
the data was persisted. Default value: `False`.
11+
- **Supported by self-managed InfluxDB 3 Core and Enterprise servers only!**
12+
- Also configurable via environment variable (`INFLUX_WRITE_NO_SYNC`).
13+
- Long precision string values added from v3 HTTP API: `"nanosecond"`, `"microsecond"`, `"millisecond"`,
14+
`"second"` ( in addition to the existing `"ns"`, `"us"`, `"ms"`, `"s"`).
15+
516
## 0.13.0 [2025-05-20]
617

718
### Features

influxdb_client_3/__init__.py

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,10 @@
77
from pyarrow import ArrowException
88

99
from influxdb_client_3.exceptions import InfluxDB3ClientQueryError
10+
from influxdb_client_3.exceptions import InfluxDBError
1011
from influxdb_client_3.query.query_api import QueryApi as _QueryApi, QueryApiOptionsBuilder
1112
from influxdb_client_3.read_file import UploadFile
1213
from influxdb_client_3.write_client import InfluxDBClient as _InfluxDBClient, WriteOptions, Point
13-
from influxdb_client_3.exceptions import InfluxDBError
1414
from influxdb_client_3.write_client.client.write_api import WriteApi as _WriteApi, SYNCHRONOUS, ASYNCHRONOUS, \
1515
PointSettings, DefaultWriteOptions, WriteType
1616
from influxdb_client_3.write_client.domain.write_precision import WritePrecision
@@ -24,6 +24,7 @@
2424
INFLUX_PRECISION = "INFLUX_PRECISION"
2525
INFLUX_AUTH_SCHEME = "INFLUX_AUTH_SCHEME"
2626
INFLUX_GZIP_THRESHOLD = "INFLUX_GZIP_THRESHOLD"
27+
INFLUX_WRITE_NO_SYNC = "INFLUX_WRITE_NO_SYNC"
2728

2829

2930
def write_client_options(**kwargs):
@@ -112,9 +113,15 @@ def _parse_precision(precision):
112113
:rtype: WritePrecision
113114
:raises ValueError: If the provided precision is not valid.
114115
"""
115-
if precision not in [WritePrecision.NS, WritePrecision.MS, WritePrecision.S, WritePrecision.US]:
116-
raise ValueError(f"Invalid precision value: {precision}")
117-
return precision
116+
if precision == WritePrecision.NS or precision == "nanosecond":
117+
return WritePrecision.NS
118+
if precision == WritePrecision.US or precision == "microsecond":
119+
return WritePrecision.US
120+
if precision == WritePrecision.MS or precision == "millisecond":
121+
return WritePrecision.MS
122+
if precision == WritePrecision.S or precision == "second":
123+
return WritePrecision.S
124+
raise ValueError(f"Invalid precision value: {precision}")
118125

119126

120127
def _parse_gzip_threshold(threshold):
@@ -141,6 +148,21 @@ def _parse_gzip_threshold(threshold):
141148
return threshold
142149

143150

151+
def _parse_write_no_sync(write_no_sync):
152+
"""
153+
Parses and validates the provided write no sync value.
154+
155+
This function ensures that the given value is a valid boolean,
156+
and it raises an appropriate error if the value is not valid.
157+
158+
:param write_no_sync: The input value to be parsed and validated.
159+
:type write_no_sync: Any
160+
:return: The validated write no sync value as an boolean.
161+
:rtype: bool
162+
"""
163+
return write_no_sync.strip().lower() in ['true', '1', 't', 'y', 'yes']
164+
165+
144166
class InfluxDBClient3:
145167
def __init__(
146168
self,
@@ -197,14 +219,17 @@ def __init__(
197219

198220
write_type = DefaultWriteOptions.write_type.value
199221
write_precision = DefaultWriteOptions.write_precision.value
222+
write_no_sync = DefaultWriteOptions.no_sync.value
200223
if isinstance(write_client_options, dict) and write_client_options.get('write_options') is not None:
201224
write_opts = write_client_options['write_options']
202225
write_type = getattr(write_opts, 'write_type', write_type)
203226
write_precision = getattr(write_opts, 'write_precision', write_precision)
227+
write_no_sync = getattr(write_opts, 'no_sync', write_no_sync)
204228

205229
write_options = WriteOptions(
206230
write_type=write_type,
207231
write_precision=write_precision,
232+
no_sync=write_no_sync,
208233
)
209234

210235
self._write_client_options = {
@@ -286,6 +311,10 @@ def from_env(cls, **kwargs: Any) -> 'InfluxDBClient3':
286311
kwargs['gzip_threshold'] = _parse_gzip_threshold(gzip_threshold)
287312
kwargs['enable_gzip'] = True
288313

314+
write_no_sync = os.getenv(INFLUX_WRITE_NO_SYNC)
315+
if write_no_sync is not None:
316+
write_options.no_sync = _parse_write_no_sync(write_no_sync)
317+
289318
precision = os.getenv(INFLUX_PRECISION)
290319
if precision is not None:
291320
write_options.write_precision = _parse_precision(precision)

influxdb_client_3/write_client/client/_base.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -282,7 +282,7 @@ def update_request_header_params(self, path: str, params: dict, should_gzip: boo
282282
super().update_request_header_params(path, params, should_gzip)
283283
if should_gzip:
284284
# GZIP Request
285-
if path == '/api/v2/write':
285+
if path == '/api/v2/write' or path == '/api/v3/write_lp':
286286
params["Content-Encoding"] = "gzip"
287287
params["Accept-Encoding"] = "identity"
288288
pass
@@ -298,7 +298,7 @@ def update_request_body(self, path: str, body, should_gzip: bool = False):
298298
_body = super().update_request_body(path, body, should_gzip)
299299
if should_gzip:
300300
# GZIP Request
301-
if path == '/api/v2/write':
301+
if path == '/api/v2/write' or path == '/api/v3/write_lp':
302302
import gzip
303303
if isinstance(_body, bytes):
304304
return gzip.compress(data=_body)

influxdb_client_3/write_client/client/write_api.py

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
from influxdb_client_3.write_client.domain import WritePrecision
2525
from influxdb_client_3.write_client.rest import _UTF_8_encoding
2626

27+
DEFAULT_WRITE_NO_SYNC = False
28+
2729
logger = logging.getLogger('influxdb_client_3.write_client.client.write_api')
2830

2931
if _HAS_DATACLASS:
@@ -41,7 +43,8 @@ class WriteType(Enum):
4143

4244
class DefaultWriteOptions(Enum):
4345
write_type = WriteType.synchronous
44-
write_precision = WritePrecision.NS
46+
write_precision = DEFAULT_WRITE_PRECISION
47+
no_sync = DEFAULT_WRITE_NO_SYNC
4548

4649

4750
class WriteOptions(object):
@@ -57,6 +60,7 @@ def __init__(self, write_type: WriteType = WriteType.batching,
5760
exponential_base=2,
5861
max_close_wait=300_000,
5962
write_precision=DEFAULT_WRITE_PRECISION,
63+
no_sync=DEFAULT_WRITE_NO_SYNC,
6064
write_scheduler=ThreadPoolScheduler(max_workers=1)) -> None:
6165
"""
6266
Create write api configuration.
@@ -72,7 +76,9 @@ def __init__(self, write_type: WriteType = WriteType.batching,
7276
:param max_retry_delay: the maximum delay between each retry attempt in milliseconds
7377
:param max_retry_time: total timeout for all retry attempts in milliseconds, if 0 retry is disabled
7478
:param exponential_base: base for the exponential retry delay
75-
:parama max_close_wait: the maximum time to wait for writes to be flushed if close() is called
79+
:param max_close_wait: the maximum time to wait for writes to be flushed if close() is called
80+
:param write_precision: precision to use when writing points to InfluxDB
81+
:param no_sync: skip waiting for WAL persistence on write
7682
:param write_scheduler:
7783
"""
7884
self.write_type = write_type
@@ -87,6 +93,7 @@ def __init__(self, write_type: WriteType = WriteType.batching,
8793
self.write_scheduler = write_scheduler
8894
self.max_close_wait = max_close_wait
8995
self.write_precision = write_precision
96+
self.no_sync = no_sync
9097

9198
def to_retry_strategy(self, **kwargs):
9299
"""
@@ -202,7 +209,7 @@ def _body_reduce(batch_items):
202209

203210
class WriteApi(_BaseWriteApi):
204211
"""
205-
Implementation for '/api/v2/write' endpoint.
212+
Implementation for '/api/v2/write' and '/api/v3/write_lp' endpoint.
206213
207214
Example:
208215
.. code-block:: python
@@ -375,14 +382,16 @@ def write(self, bucket: str, org: str = None,
375382
return self._write_batching(bucket, org, record,
376383
write_precision, **kwargs)
377384

385+
no_sync = self._write_options.no_sync
386+
378387
payloads = defaultdict(list)
379388
self._serialize(record, write_precision, payloads, **kwargs)
380389

381390
_async_req = True if self._write_options.write_type == WriteType.asynchronous else False
382391

383392
def write_payload(payload):
384393
final_string = b'\n'.join(payload[1])
385-
return self._post_write(_async_req, bucket, org, final_string, payload[0])
394+
return self._post_write(_async_req, bucket, org, final_string, payload[0], no_sync)
386395

387396
results = list(map(write_payload, payloads.items()))
388397
if not _async_req:
@@ -519,18 +528,21 @@ def _retry_callback_delegate(exception):
519528
else:
520529
_retry_callback_delegate = None
521530

531+
no_sync = self._write_options.no_sync
532+
522533
retry = self._write_options.to_retry_strategy(retry_callback=_retry_callback_delegate)
523534

524535
self._post_write(False, batch_item.key.bucket, batch_item.key.org, batch_item.data,
525-
batch_item.key.precision, urlopen_kw={'retries': retry})
536+
batch_item.key.precision, no_sync, urlopen_kw={'retries': retry})
526537

527538
logger.debug("Write request finished %s", batch_item)
528539

529540
return _BatchResponse(data=batch_item)
530541

531-
def _post_write(self, _async_req, bucket, org, body, precision, **kwargs):
542+
def _post_write(self, _async_req, bucket, org, body, precision, no_sync, **kwargs):
532543

533544
return self._write_service.post_write(org=org, bucket=bucket, body=body, precision=precision,
545+
no_sync=no_sync,
534546
async_req=_async_req,
535547
content_type="text/plain; charset=utf-8",
536548
**kwargs)
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
from influxdb_client_3.write_client.domain import WritePrecision
2+
3+
4+
class WritePrecisionConverter(object):
5+
6+
@staticmethod
7+
def to_v2_api_string(precision):
8+
"""
9+
Converts WritePrecision to its string representation for V2 API.
10+
"""
11+
if precision in [WritePrecision.NS, WritePrecision.US, WritePrecision.MS, WritePrecision.S]:
12+
return precision
13+
else:
14+
raise ValueError("Unsupported precision '%s'" % precision)
15+
16+
@staticmethod
17+
def to_v3_api_string(precision):
18+
"""
19+
Converts WritePrecision to its string representation for V3 API.
20+
"""
21+
if precision == WritePrecision.NS:
22+
return "nanosecond"
23+
elif precision == WritePrecision.US:
24+
return "microsecond"
25+
elif precision == WritePrecision.MS:
26+
return "millisecond"
27+
elif precision == WritePrecision.S:
28+
return "second"
29+
else:
30+
raise ValueError("Unsupported precision '%s'" % precision)

influxdb_client_3/write_client/service/signin_service.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ async def post_signin_async(self, **kwargs): # noqa: E501,D401,D403
106106
urlopen_kw=kwargs.get('urlopen_kw', None))
107107

108108
def _post_signin_prepare(self, **kwargs): # noqa: E501,D401,D403
109-
local_var_params = locals()
109+
local_var_params = dict(locals())
110110

111111
all_params = ['zap_trace_span', 'authorization'] # noqa: E501
112112
self._check_operation_params('post_signin', all_params, local_var_params)

influxdb_client_3/write_client/service/signout_service.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ async def post_signout_async(self, **kwargs): # noqa: E501,D401,D403
103103
urlopen_kw=kwargs.get('urlopen_kw', None))
104104

105105
def _post_signout_prepare(self, **kwargs): # noqa: E501,D401,D403
106-
local_var_params = locals()
106+
local_var_params = dict(locals())
107107

108108
all_params = ['zap_trace_span'] # noqa: E501
109109
self._check_operation_params('post_signout', all_params, local_var_params)

0 commit comments

Comments
 (0)