Skip to content

Commit f3a75b8

Browse files
committed
feat: fast no-sync write support
1 parent f8a629b commit f3a75b8

File tree

10 files changed

+286
-42
lines changed

10 files changed

+286
-42
lines changed

influxdb_client_3/__init__.py

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,10 @@
77
from pyarrow import ArrowException
88

99
from influxdb_client_3.exceptions import InfluxDB3ClientQueryError
10+
from influxdb_client_3.exceptions import InfluxDBError
1011
from influxdb_client_3.query.query_api import QueryApi as _QueryApi, QueryApiOptionsBuilder
1112
from influxdb_client_3.read_file import UploadFile
1213
from influxdb_client_3.write_client import InfluxDBClient as _InfluxDBClient, WriteOptions, Point
13-
from influxdb_client_3.exceptions import InfluxDBError
1414
from influxdb_client_3.write_client.client.write_api import WriteApi as _WriteApi, SYNCHRONOUS, ASYNCHRONOUS, \
1515
PointSettings, DefaultWriteOptions, WriteType
1616
from influxdb_client_3.write_client.domain.write_precision import WritePrecision
@@ -24,6 +24,7 @@
2424
INFLUX_PRECISION = "INFLUX_PRECISION"
2525
INFLUX_AUTH_SCHEME = "INFLUX_AUTH_SCHEME"
2626
INFLUX_GZIP_THRESHOLD = "INFLUX_GZIP_THRESHOLD"
27+
INFLUX_WRITE_NO_SYNC = "INFLUX_WRITE_NO_SYNC"
2728

2829

2930
def write_client_options(**kwargs):
@@ -112,9 +113,15 @@ def _parse_precision(precision):
112113
:rtype: WritePrecision
113114
:raises ValueError: If the provided precision is not valid.
114115
"""
115-
if precision not in [WritePrecision.NS, WritePrecision.MS, WritePrecision.S, WritePrecision.US]:
116-
raise ValueError(f"Invalid precision value: {precision}")
117-
return precision
116+
if precision == WritePrecision.NS or precision == "nanosecond":
117+
return WritePrecision.NS
118+
if precision == WritePrecision.US or precision == "microsecond":
119+
return WritePrecision.US
120+
if precision == WritePrecision.MS or precision == "millisecond":
121+
return WritePrecision.MS
122+
if precision == WritePrecision.S or precision == "second":
123+
return WritePrecision.S
124+
raise ValueError(f"Invalid precision value: {precision}")
118125

119126

120127
def _parse_gzip_threshold(threshold):
@@ -141,6 +148,26 @@ def _parse_gzip_threshold(threshold):
141148
return threshold
142149

143150

151+
def _parse_write_no_sync(write_no_sync):
152+
"""
153+
Parses and validates the provided write no sync value.
154+
155+
This function ensures that the given value is a valid boolean,
156+
and it raises an appropriate error if the value is not valid.
157+
158+
:param write_no_sync: The input value to be parsed and validated.
159+
:type write_no_sync: Any
160+
:return: The validated write no sync value as an boolean.
161+
:rtype: bool
162+
:raises ValueError: If the provided value is not a boolean.
163+
"""
164+
try:
165+
write_no_sync = bool(write_no_sync)
166+
except (TypeError, ValueError):
167+
raise ValueError(f"Invalid write no sync value: {write_no_sync}. Must be boolean.")
168+
return write_no_sync
169+
170+
144171
class InfluxDBClient3:
145172
def __init__(
146173
self,
@@ -286,6 +313,10 @@ def from_env(cls, **kwargs: Any) -> 'InfluxDBClient3':
286313
kwargs['gzip_threshold'] = _parse_gzip_threshold(gzip_threshold)
287314
kwargs['enable_gzip'] = True
288315

316+
write_no_sync = os.getenv(INFLUX_WRITE_NO_SYNC)
317+
if write_no_sync is not None:
318+
write_options.no_sync = _parse_write_no_sync(write_no_sync)
319+
289320
precision = os.getenv(INFLUX_PRECISION)
290321
if precision is not None:
291322
write_options.write_precision = _parse_precision(precision)

influxdb_client_3/write_client/client/write/point.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
EPOCH = datetime.fromtimestamp(0, tz=timezone.utc)
1414

1515
DEFAULT_WRITE_PRECISION = WritePrecision.NS
16+
DEFAULT_WRITE_NO_SYNC = False
1617

1718
_ESCAPE_MEASUREMENT = str.maketrans({
1819
',': r'\,',

influxdb_client_3/write_client/client/write_api.py

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
from influxdb_client_3.write_client.client._base import _BaseWriteApi, _HAS_DATACLASS
2020
from influxdb_client_3.write_client.client.util.helpers import get_org_query_param
2121
from influxdb_client_3.write_client.client.write.dataframe_serializer import DataframeSerializer
22-
from influxdb_client_3.write_client.client.write.point import Point, DEFAULT_WRITE_PRECISION
22+
from influxdb_client_3.write_client.client.write.point import Point, DEFAULT_WRITE_PRECISION, DEFAULT_WRITE_NO_SYNC
2323
from influxdb_client_3.write_client.client.write.retry import WritesRetry
2424
from influxdb_client_3.write_client.domain import WritePrecision
2525
from influxdb_client_3.write_client.rest import _UTF_8_encoding
@@ -41,7 +41,8 @@ class WriteType(Enum):
4141

4242
class DefaultWriteOptions(Enum):
4343
write_type = WriteType.synchronous
44-
write_precision = WritePrecision.NS
44+
write_precision = DEFAULT_WRITE_PRECISION
45+
no_sync = DEFAULT_WRITE_NO_SYNC
4546

4647

4748
class WriteOptions(object):
@@ -57,6 +58,7 @@ def __init__(self, write_type: WriteType = WriteType.batching,
5758
exponential_base=2,
5859
max_close_wait=300_000,
5960
write_precision=DEFAULT_WRITE_PRECISION,
61+
no_sync=DEFAULT_WRITE_NO_SYNC,
6062
write_scheduler=ThreadPoolScheduler(max_workers=1)) -> None:
6163
"""
6264
Create write api configuration.
@@ -72,7 +74,9 @@ def __init__(self, write_type: WriteType = WriteType.batching,
7274
:param max_retry_delay: the maximum delay between each retry attempt in milliseconds
7375
:param max_retry_time: total timeout for all retry attempts in milliseconds, if 0 retry is disabled
7476
:param exponential_base: base for the exponential retry delay
75-
:parama max_close_wait: the maximum time to wait for writes to be flushed if close() is called
77+
:param max_close_wait: the maximum time to wait for writes to be flushed if close() is called
78+
:param write_precision: precision to use when writing points to InfluxDB
79+
:param no_sync: skip waiting for WAL persistence on write
7680
:param write_scheduler:
7781
"""
7882
self.write_type = write_type
@@ -87,6 +91,7 @@ def __init__(self, write_type: WriteType = WriteType.batching,
8791
self.write_scheduler = write_scheduler
8892
self.max_close_wait = max_close_wait
8993
self.write_precision = write_precision
94+
self.no_sync = no_sync
9095

9196
def to_retry_strategy(self, **kwargs):
9297
"""
@@ -375,14 +380,16 @@ def write(self, bucket: str, org: str = None,
375380
return self._write_batching(bucket, org, record,
376381
write_precision, **kwargs)
377382

383+
no_sync = self._write_options.no_sync
384+
378385
payloads = defaultdict(list)
379386
self._serialize(record, write_precision, payloads, **kwargs)
380387

381388
_async_req = True if self._write_options.write_type == WriteType.asynchronous else False
382389

383390
def write_payload(payload):
384391
final_string = b'\n'.join(payload[1])
385-
return self._post_write(_async_req, bucket, org, final_string, payload[0])
392+
return self._post_write(_async_req, bucket, org, final_string, payload[0], no_sync)
386393

387394
results = list(map(write_payload, payloads.items()))
388395
if not _async_req:
@@ -519,18 +526,21 @@ def _retry_callback_delegate(exception):
519526
else:
520527
_retry_callback_delegate = None
521528

529+
no_sync = self._write_options.no_sync
530+
522531
retry = self._write_options.to_retry_strategy(retry_callback=_retry_callback_delegate)
523532

524533
self._post_write(False, batch_item.key.bucket, batch_item.key.org, batch_item.data,
525-
batch_item.key.precision, urlopen_kw={'retries': retry})
534+
batch_item.key.precision, no_sync, urlopen_kw={'retries': retry})
526535

527536
logger.debug("Write request finished %s", batch_item)
528537

529538
return _BatchResponse(data=batch_item)
530539

531-
def _post_write(self, _async_req, bucket, org, body, precision, **kwargs):
540+
def _post_write(self, _async_req, bucket, org, body, precision, no_sync, **kwargs):
532541

533542
return self._write_service.post_write(org=org, bucket=bucket, body=body, precision=precision,
543+
no_sync=no_sync,
534544
async_req=_async_req,
535545
content_type="text/plain; charset=utf-8",
536546
**kwargs)
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
from influxdb_client_3.write_client.domain import WritePrecision
2+
3+
4+
class WritePrecisionConverter(object):
5+
6+
@staticmethod
7+
def to_v2_api_string(precision):
8+
"""
9+
Converts WritePrecision to its string representation for V2 API.
10+
"""
11+
if precision in [WritePrecision.NS, WritePrecision.US, WritePrecision.MS, WritePrecision.S]:
12+
return precision
13+
else:
14+
raise ValueError("Unsupported precision '%s'" % precision)
15+
16+
@staticmethod
17+
def to_v3_api_string(precision):
18+
"""
19+
Converts WritePrecision to its string representation for V3 API.
20+
"""
21+
if precision == WritePrecision.NS:
22+
return "nanosecond"
23+
elif precision == WritePrecision.US:
24+
return "microsecond"
25+
elif precision == WritePrecision.MS:
26+
return "millisecond"
27+
elif precision == WritePrecision.S:
28+
return "second"
29+
else:
30+
raise ValueError("Unsupported precision '%s'" % precision)

0 commit comments

Comments
 (0)