Skip to content

Commit ef85c7f

Browse files
feat: support env
1 parent 387f54c commit ef85c7f

File tree

3 files changed

+49
-17
lines changed

3 files changed

+49
-17
lines changed

influxdb_client_3/__init__.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
from influxdb_client_3.write_client import InfluxDBClient as _InfluxDBClient, WriteOptions, Point
1111
from influxdb_client_3.write_client.client.exceptions import InfluxDBError
1212
from influxdb_client_3.write_client.client.write_api import WriteApi as _WriteApi, SYNCHRONOUS, ASYNCHRONOUS, \
13-
PointSettings
13+
PointSettings, WriteType
1414
from influxdb_client_3.write_client.domain.write_precision import WritePrecision
1515

1616
polars = importlib.util.find_spec("polars") is not None
@@ -55,9 +55,11 @@ def file_parser_options(**kwargs):
5555
INFLUX_TOKEN = "INFLUX_TOKEN"
5656
INFLUX_DATABASE = "INFLUX_DATABASE"
5757
INFLUX_ORG = "INFLUX_ORG"
58+
INFLUX_PRECISION = "INFLUX_PRECISION"
5859

5960

6061
def from_env(**kwargs: Any) -> 'InfluxDBClient3':
62+
6163
"""
6264
Create an instance of `InfluxDBClient3` using environment variables for configuration.
6365
@@ -78,7 +80,7 @@ def from_env(**kwargs: Any) -> 'InfluxDBClient3':
7880
flight_client_options, SSL settings, etc.
7981
:return: An initialized `InfluxDBClient3` instance.
8082
:raises ValueError: If any required environment variables are not set.
81-
"""
83+
"""
8284
required_vars = {
8385
INFLUX_HOST: os.getenv(INFLUX_HOST),
8486
INFLUX_TOKEN: os.getenv(INFLUX_TOKEN),
@@ -91,10 +93,17 @@ def from_env(**kwargs: Any) -> 'InfluxDBClient3':
9193

9294
org = os.getenv(INFLUX_ORG, "default")
9395

96+
write_client_option = None
97+
if os.getenv(INFLUX_PRECISION) is not None:
98+
write_client_option = default_client_options(
99+
write_options=WriteOptions(write_type=WriteType.synchronous, write_precision=os.getenv(INFLUX_PRECISION))
100+
)
101+
94102
return InfluxDBClient3(
95103
host=required_vars[INFLUX_HOST],
96104
token=required_vars[INFLUX_TOKEN],
97105
database=required_vars[INFLUX_DATABASE],
106+
write_client_options=write_client_option,
98107
org=org,
99108
**kwargs
100109
)
@@ -190,7 +199,7 @@ def __init__(
190199
self._database = database
191200
self._token = token
192201
self._write_client_options = write_client_options if write_client_options is not None \
193-
else default_client_options(write_options=SYNCHRONOUS)
202+
else default_client_options(write_options=WriteOptions(write_type=WriteType.synchronous, write_precision=WritePrecision.NS))
194203

195204
# Parse the host input
196205
parsed_url = urllib.parse.urlparse(host)

influxdb_client_3/write_client/client/write_api.py

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ def __init__(self, write_type: WriteType = WriteType.batching,
5151
max_retry_time=180_000,
5252
exponential_base=2,
5353
max_close_wait=300_000,
54+
write_precision=DEFAULT_WRITE_PRECISION,
5455
write_scheduler=ThreadPoolScheduler(max_workers=1)) -> None:
5556
"""
5657
Create write api configuration.
@@ -66,7 +67,8 @@ def __init__(self, write_type: WriteType = WriteType.batching,
6667
:param max_retry_delay: the maximum delay between each retry attempt in milliseconds
6768
:param max_retry_time: total timeout for all retry attempts in milliseconds, if 0 retry is disabled
6869
:param exponential_base: base for the exponential retry delay
69-
:parama max_close_wait: the maximum time to wait for writes to be flushed if close() is called
70+
:param max_close_wait: the maximum time to wait for writes to be flushed if close() is called
71+
:param write_precision: the time precision for the data written to InfluxDB.
7072
:param write_scheduler:
7173
"""
7274
self.write_type = write_type
@@ -80,6 +82,7 @@ def __init__(self, write_type: WriteType = WriteType.batching,
8082
self.exponential_base = exponential_base
8183
self.write_scheduler = write_scheduler
8284
self.max_close_wait = max_close_wait
85+
self.write_precision = write_precision
8386

8487
def to_retry_strategy(self, **kwargs):
8588
"""
@@ -290,7 +293,7 @@ def write(self, bucket: str, org: str = None,
290293
str, Iterable['str'], Point, Iterable['Point'], dict, Iterable['dict'], bytes, Iterable['bytes'],
291294
Observable, NamedTuple, Iterable['NamedTuple'], 'dataclass', Iterable['dataclass']
292295
] = None,
293-
write_precision: WritePrecision = DEFAULT_WRITE_PRECISION, **kwargs) -> Any:
296+
write_precision: WritePrecision = None, **kwargs) -> Any:
294297
"""
295298
Write time-series data into InfluxDB.
296299
@@ -360,7 +363,10 @@ def write(self, bucket: str, org: str = None,
360363
org = get_org_query_param(org=org, client=self._influxdb_client)
361364

362365
self._append_default_tags(record)
363-
366+
367+
if write_precision is None:
368+
write_precision = self._write_options.write_precision
369+
364370
if self._write_options.write_type is WriteType.batching:
365371
return self._write_batching(bucket, org, record,
366372
write_precision, **kwargs)
@@ -443,8 +449,11 @@ def __del__(self):
443449
pass
444450

445451
def _write_batching(self, bucket, org, data,
446-
precision=DEFAULT_WRITE_PRECISION,
452+
precision=None,
447453
**kwargs):
454+
if precision is None:
455+
precision = self._write_options.write_precision
456+
448457
if isinstance(data, bytes):
449458
_key = _BatchItemKey(bucket, org, precision)
450459
self._subject.on_next(_BatchItem(key=_key, data=data))
@@ -454,7 +463,8 @@ def _write_batching(self, bucket, org, data,
454463
precision, **kwargs)
455464

456465
elif isinstance(data, Point):
457-
self._write_batching(bucket, org, data.to_line_protocol(), data.write_precision, **kwargs)
466+
write_precision = data.write_precision if data.write_precision is not None else precision
467+
self._write_batching(bucket, org, data.to_line_protocol(), write_precision, **kwargs)
458468

459469
elif isinstance(data, dict):
460470
self._write_batching(bucket, org, Point.from_dict(data, write_precision=precision, **kwargs),

tests/test_influxdb_client_3.py

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import unittest
22
from unittest.mock import patch
33

4-
from influxdb_client_3 import InfluxDBClient3, from_env, write_client_options
4+
from influxdb_client_3 import InfluxDBClient3, from_env, WritePrecision, WriteType
55
from tests.util import asyncio_run
66
from tests.util.mocks import ConstantFlightServer, ConstantData
77

@@ -74,22 +74,35 @@ async def test_query_async(self):
7474
assert {'data': 'sql_query', 'reference': query, 'value': -1.0} in result_list
7575
assert {'data': 'query_type', 'reference': 'sql', 'value': -1.0} in result_list
7676

77+
def test_default_client(self):
78+
expected_precision = WritePrecision.NS
79+
expected_write_type = WriteType.synchronous
80+
81+
def verify_client_write_options(client):
82+
write_options = client._write_client_options.get('write_options')
83+
self.assertEqual(write_options.write_precision, expected_precision)
84+
self.assertEqual(write_options.write_type, expected_write_type)
85+
86+
self.assertEqual(client._write_api._write_options.write_precision, expected_precision)
87+
self.assertEqual(client._write_api._write_options.write_type, expected_write_type)
88+
89+
env_client = from_env()
90+
verify_client_write_options(env_client)
91+
92+
default_client = InfluxDBClient3()
93+
verify_client_write_options(default_client)
94+
95+
7796
@patch.dict('os.environ', {'INFLUX_HOST': 'localhost', 'INFLUX_TOKEN': 'test_token',
78-
'INFLUX_DATABASE': 'test_db', 'INFLUX_ORG': 'test_org'})
97+
'INFLUX_DATABASE': 'test_db', 'INFLUX_ORG': 'test_org', 'INFLUX_PRECISION': WritePrecision.MS})
7998
def test_from_env_all_env_vars_set(self):
8099
client = from_env()
81100
self.assertIsInstance(client, InfluxDBClient3)
82101
self.assertEqual(client._client.url, "https://localhost:443")
83102
self.assertEqual(client._database, "test_db")
84103
self.assertEqual(client._org, "test_org")
85104
self.assertEqual(client._token, "test_token")
86-
87-
def test_from_env_with_kargs(self):
88-
client = from_env(
89-
write_client_options=write_client_options(batch_size=10000),
90-
)
91-
self.assertIsInstance(client, InfluxDBClient3)
92-
self.assertEqual(client._write_client_options['batch_size'], 10000)
105+
self.assertEqual(client._write_client_options.get("write_options").write_precision, WritePrecision.MS)
93106

94107
@patch.dict('os.environ', {'INFLUX_HOST': "", 'INFLUX_TOKEN': "",
95108
'INFLUX_DATABASE': "", 'INFLUX_ORG': ""})

0 commit comments

Comments
 (0)