Skip to content

Commit cf910c0

Browse files
feat: support-basic-env
1 parent 099b9f6 commit cf910c0

File tree

5 files changed

+203
-9
lines changed

5 files changed

+203
-9
lines changed

influxdb_client_3/__init__.py

Lines changed: 107 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,27 @@
1+
import importlib.util
2+
import os
13
import urllib.parse
4+
from typing import Any
5+
26
import pyarrow as pa
3-
import importlib.util
47

58
from influxdb_client_3.query.query_api import QueryApi as _QueryApi, QueryApiOptionsBuilder
69
from influxdb_client_3.read_file import UploadFile
710
from influxdb_client_3.write_client import InfluxDBClient as _InfluxDBClient, WriteOptions, Point
811
from influxdb_client_3.write_client.client.exceptions import InfluxDBError
912
from influxdb_client_3.write_client.client.write_api import WriteApi as _WriteApi, SYNCHRONOUS, ASYNCHRONOUS, \
10-
PointSettings
13+
PointSettings, WriteType, DefaultWriteOptions
1114
from influxdb_client_3.write_client.domain.write_precision import WritePrecision
1215

1316
polars = importlib.util.find_spec("polars") is not None
1417

18+
INFLUX_HOST = "INFLUX_HOST"
19+
INFLUX_TOKEN = "INFLUX_TOKEN"
20+
INFLUX_DATABASE = "INFLUX_DATABASE"
21+
INFLUX_ORG = "INFLUX_ORG"
22+
INFLUX_PRECISION = "INFLUX_PRECISION"
23+
INFLUX_AUTH_SCHEME = "INFLUX_AUTH_SCHEME"
24+
1525

1626
def write_client_options(**kwargs):
1727
"""
@@ -83,6 +93,27 @@ def _merge_options(defaults, exclude_keys=None, custom=None):
8393
return _deep_merge(defaults, {key: value for key, value in custom.items() if key not in exclude_keys})
8494

8595

96+
def _parse_precision(precision):
97+
"""
98+
Parses the precision value and ensures it is valid.
99+
100+
This function checks that the given `precision` is one of the allowed
101+
values defined in `WritePrecision`. If the precision is invalid, it
102+
raises a `ValueError`. The function returns the valid precision value
103+
if it passes validation.
104+
105+
:param precision: The precision value to be validated.
106+
Must be one of WritePrecision.NS, WritePrecision.MS,
107+
WritePrecision.S, or WritePrecision.US.
108+
:return: The valid precision value.
109+
:rtype: WritePrecision
110+
:raises ValueError: If the provided precision is not valid.
111+
"""
112+
if precision not in [WritePrecision.NS, WritePrecision.MS, WritePrecision.S, WritePrecision.US]:
113+
raise ValueError(f"Invalid precision value: {precision}")
114+
return precision
115+
116+
86117
class InfluxDBClient3:
87118
def __init__(
88119
self,
@@ -136,8 +167,23 @@ def __init__(
136167
self._org = org if org is not None else "default"
137168
self._database = database
138169
self._token = token
139-
self._write_client_options = write_client_options if write_client_options is not None \
140-
else default_client_options(write_options=SYNCHRONOUS)
170+
171+
write_type = DefaultWriteOptions.write_type.value
172+
write_precision = DefaultWriteOptions.write_precision.value
173+
if isinstance(write_client_options, dict) and write_client_options.get('write_options') is not None:
174+
write_opts = write_client_options['write_options']
175+
write_type = getattr(write_opts, 'write_type', write_type)
176+
write_precision = getattr(write_opts, 'write_precision', write_precision)
177+
178+
write_options = WriteOptions(
179+
write_type=write_type,
180+
write_precision=write_precision
181+
)
182+
183+
self._write_client_options = {
184+
"write_options": write_options,
185+
**(write_client_options or {})
186+
}
141187

142188
# Parse the host input
143189
parsed_url = urllib.parse.urlparse(host)
@@ -178,6 +224,63 @@ def __init__(
178224
flight_client_options=flight_client_options,
179225
proxy=kwargs.get("proxy", None), options=q_opts_builder.build())
180226

227+
@classmethod
228+
def from_env(cls, **kwargs: Any) -> 'InfluxDBClient3':
229+
230+
"""
231+
Creates an instance of InfluxDBClient3 configured by specific environment
232+
variables. This method automatically loads configuration settings,
233+
such as connection details, security parameters, and performance
234+
options, from environment variables and initializes the client
235+
accordingly.
236+
237+
:param cls:
238+
The class used to create the client instance.
239+
:param kwargs:
240+
Additional optional parameters that can be passed to customize the
241+
configuration or override specific settings derived from the
242+
environment variables.
243+
244+
:raises ValueError:
245+
If any required environment variables are missing or have empty
246+
values.
247+
248+
:return:
249+
An initialized instance of the `InfluxDBClient3` class with all the
250+
configuration settings applied.
251+
:rtype:
252+
InfluxDBClient3
253+
"""
254+
255+
required_vars = {
256+
INFLUX_HOST: os.getenv(INFLUX_HOST),
257+
INFLUX_TOKEN: os.getenv(INFLUX_TOKEN),
258+
INFLUX_DATABASE: os.getenv(INFLUX_DATABASE)
259+
}
260+
missing_vars = [var for var, value in required_vars.items() if value is None or value == ""]
261+
if missing_vars:
262+
raise ValueError(f"Missing required environment variables: {', '.join(missing_vars)}")
263+
264+
write_options = WriteOptions(write_type=WriteType.synchronous)
265+
266+
precision = os.getenv(INFLUX_PRECISION)
267+
if precision is not None:
268+
write_options.write_precision = _parse_precision(precision)
269+
write_client_option = {'write_options': write_options}
270+
271+
if os.getenv(INFLUX_AUTH_SCHEME) is not None:
272+
kwargs['auth_scheme'] = os.getenv(INFLUX_AUTH_SCHEME)
273+
274+
org = os.getenv(INFLUX_ORG, "default")
275+
return InfluxDBClient3(
276+
host=required_vars[INFLUX_HOST],
277+
token=required_vars[INFLUX_TOKEN],
278+
database=required_vars[INFLUX_DATABASE],
279+
write_client_options=write_client_option,
280+
org=org,
281+
**kwargs
282+
)
283+
181284
def write(self, record=None, database=None, **kwargs):
182285
"""
183286
Write data to InfluxDB.

influxdb_client_3/write_client/client/write_api.py

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,11 @@ class WriteType(Enum):
3939
synchronous = 3
4040

4141

42+
class DefaultWriteOptions(Enum):
43+
write_type = WriteType.synchronous
44+
write_precision = WritePrecision.NS
45+
46+
4247
class WriteOptions(object):
4348
"""Write configuration."""
4449

@@ -51,6 +56,7 @@ def __init__(self, write_type: WriteType = WriteType.batching,
5156
max_retry_time=180_000,
5257
exponential_base=2,
5358
max_close_wait=300_000,
59+
write_precision=DEFAULT_WRITE_PRECISION,
5460
write_scheduler=ThreadPoolScheduler(max_workers=1)) -> None:
5561
"""
5662
Create write api configuration.
@@ -66,7 +72,8 @@ def __init__(self, write_type: WriteType = WriteType.batching,
6672
:param max_retry_delay: the maximum delay between each retry attempt in milliseconds
6773
:param max_retry_time: total timeout for all retry attempts in milliseconds, if 0 retry is disabled
6874
:param exponential_base: base for the exponential retry delay
69-
:parama max_close_wait: the maximum time to wait for writes to be flushed if close() is called
75+
:param max_close_wait: the maximum time to wait for writes to be flushed if close() is called
76+
:param write_precision: the time precision for the data written to InfluxDB.
7077
:param write_scheduler:
7178
"""
7279
self.write_type = write_type
@@ -80,6 +87,7 @@ def __init__(self, write_type: WriteType = WriteType.batching,
8087
self.exponential_base = exponential_base
8188
self.write_scheduler = write_scheduler
8289
self.max_close_wait = max_close_wait
90+
self.write_precision = write_precision
8391

8492
def to_retry_strategy(self, **kwargs):
8593
"""
@@ -290,7 +298,7 @@ def write(self, bucket: str, org: str = None,
290298
str, Iterable['str'], Point, Iterable['Point'], dict, Iterable['dict'], bytes, Iterable['bytes'],
291299
Observable, NamedTuple, Iterable['NamedTuple'], 'dataclass', Iterable['dataclass']
292300
] = None,
293-
write_precision: WritePrecision = DEFAULT_WRITE_PRECISION, **kwargs) -> Any:
301+
write_precision: WritePrecision = None, **kwargs) -> Any:
294302
"""
295303
Write time-series data into InfluxDB.
296304
@@ -361,6 +369,9 @@ def write(self, bucket: str, org: str = None,
361369

362370
self._append_default_tags(record)
363371

372+
if write_precision is None:
373+
write_precision = self._write_options.write_precision
374+
364375
if self._write_options.write_type is WriteType.batching:
365376
return self._write_batching(bucket, org, record,
366377
write_precision, **kwargs)
@@ -443,8 +454,11 @@ def __del__(self):
443454
pass
444455

445456
def _write_batching(self, bucket, org, data,
446-
precision=DEFAULT_WRITE_PRECISION,
457+
precision=None,
447458
**kwargs):
459+
if precision is None:
460+
precision = self._write_options.write_precision
461+
448462
if isinstance(data, bytes):
449463
_key = _BatchItemKey(bucket, org, precision)
450464
self._subject.on_next(_BatchItem(key=_key, data=data))
@@ -454,7 +468,8 @@ def _write_batching(self, bucket, org, data,
454468
precision, **kwargs)
455469

456470
elif isinstance(data, Point):
457-
self._write_batching(bucket, org, data.to_line_protocol(), data.write_precision, **kwargs)
471+
write_precision = data.write_precision if data.write_precision is not None else precision
472+
self._write_batching(bucket, org, data.to_line_protocol(), write_precision, **kwargs)
458473

459474
elif isinstance(data, dict):
460475
self._write_batching(bucket, org, Point.from_dict(data, write_precision=precision, **kwargs),

tests/test_influxdb_client_3.py

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import unittest
22
from unittest.mock import patch
33

4-
from influxdb_client_3 import InfluxDBClient3
4+
from influxdb_client_3 import InfluxDBClient3, WritePrecision, DefaultWriteOptions
55
from tests.util import asyncio_run
66
from tests.util.mocks import ConstantFlightServer, ConstantData
77

@@ -74,6 +74,59 @@ async def test_query_async(self):
7474
assert {'data': 'sql_query', 'reference': query, 'value': -1.0} in result_list
7575
assert {'data': 'query_type', 'reference': 'sql', 'value': -1.0} in result_list
7676

77+
def test_default_client(self):
78+
expected_precision = DefaultWriteOptions.write_precision.value
79+
expected_write_type = DefaultWriteOptions.write_type.value
80+
81+
def verify_client_write_options(c):
82+
write_options = c._write_client_options.get('write_options')
83+
self.assertEqual(write_options.write_precision, expected_precision)
84+
self.assertEqual(write_options.write_type, expected_write_type)
85+
86+
self.assertEqual(c._write_api._write_options.write_precision, expected_precision)
87+
self.assertEqual(c._write_api._write_options.write_type, expected_write_type)
88+
89+
env_client = InfluxDBClient3.from_env()
90+
verify_client_write_options(env_client)
91+
92+
default_client = InfluxDBClient3()
93+
verify_client_write_options(default_client)
94+
95+
@patch.dict('os.environ', {'INFLUX_HOST': 'localhost', 'INFLUX_TOKEN': 'test_token',
96+
'INFLUX_DATABASE': 'test_db', 'INFLUX_ORG': 'test_org',
97+
'INFLUX_PRECISION': WritePrecision.MS})
98+
def test_from_env_all_env_vars_set(self):
99+
client = InfluxDBClient3.from_env()
100+
self.assertIsInstance(client, InfluxDBClient3)
101+
self.assertEqual(client._client.url, "https://localhost:443")
102+
self.assertEqual(client._database, "test_db")
103+
self.assertEqual(client._org, "test_org")
104+
self.assertEqual(client._token, "test_token")
105+
write_options = client._write_client_options.get("write_options")
106+
self.assertEqual(write_options.write_precision, WritePrecision.MS)
107+
client._write_api._point_settings = {}
108+
109+
@patch.dict('os.environ', {'INFLUX_HOST': "", 'INFLUX_TOKEN': "",
110+
'INFLUX_DATABASE': "", 'INFLUX_ORG': ""})
111+
def test_from_env_missing_variables(self):
112+
with self.assertRaises(ValueError) as context:
113+
InfluxDBClient3.from_env()
114+
self.assertIn("Missing required environment variables", str(context.exception))
115+
116+
@patch.dict('os.environ', {'INFLUX_HOST': 'localhost', 'INFLUX_TOKEN': 'test_token',
117+
'INFLUX_DATABASE': 'test_db', 'INFLUX_PRECISION': WritePrecision.MS})
118+
def test_parse_valid_write_precision(self):
119+
client = InfluxDBClient3.from_env()
120+
self.assertIsInstance(client, InfluxDBClient3)
121+
self.assertEqual(client._write_client_options.get('write_options').write_precision, WritePrecision.MS)
122+
123+
@patch.dict('os.environ', {'INFLUX_HOST': 'localhost', 'INFLUX_TOKEN': 'test_token',
124+
'INFLUX_DATABASE': 'test_db', 'INFLUX_PRECISION': 'invalid_value'})
125+
def test_parse_invalid_write_precision(self):
126+
with self.assertRaises(ValueError) as context:
127+
InfluxDBClient3.from_env()
128+
self.assertIn("Invalid precision value: invalid_value", str(context.exception))
129+
77130

78131
if __name__ == '__main__':
79132
unittest.main()

tests/test_influxdb_client_3_integration.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import string
55
import time
66
import unittest
7+
from unittest.mock import patch
78

89
import pyarrow
910
import pytest
@@ -274,3 +275,23 @@ async def test_verify_query_async(self):
274275
result_list = result.to_pylist()
275276
for item in data:
276277
assert lp_to_py_object(item) in result_list, f"original lp data \"{item}\" should be in result list"
278+
279+
def test_from_env(self):
280+
with InfluxDBClient3.from_env() as client:
281+
id_test = time.time_ns()
282+
client.write(f"integration_test_python,type=used value=123.0,id_test={id_test}i")
283+
284+
sql = 'SELECT * FROM integration_test_python where type=$type and id_test=$id_test'
285+
data = client.query(sql, mode="pandas", query_parameters={'type': 'used', 'id_test': id_test})
286+
287+
self.assertIsNotNone(data)
288+
self.assertEqual(1, len(data))
289+
self.assertEqual(id_test, data['id_test'][0])
290+
self.assertEqual(123.0, data['value'][0])
291+
292+
@patch.dict('os.environ', {'INFLUX_AUTH_SCHEME': 'invalid_schema'})
293+
def test_from_env_invalid_auth_schema(self):
294+
with InfluxDBClient3.from_env() as client:
295+
with self.assertRaises(InfluxDBError) as err:
296+
client.write("integration_test_python,type=used value=123.0")
297+
self.assertEqual('unauthorized access', err.exception.message)

tests/test_polars.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ def setUp(self):
4343
database="my_db",
4444
token="my_token"
4545
)
46+
self.client._write_api._point_settings = PointSettings()
4647

4748
def test_write_polars(self):
4849
import polars as pl
@@ -77,6 +78,7 @@ def test_write_polars_batching(self):
7778
write_options=WriteOptions(batch_size=2)
7879
)
7980
)
81+
self.client._write_api._point_settings = PointSettings()
8082
self.client._write_api._write_options = WriteOptions(batch_size=2)
8183
self.client._write_api._write_service = Mock(spec=WriteService)
8284

0 commit comments

Comments
 (0)