Skip to content

Commit 87aee82

Browse files
feat: support env
1 parent b1b8f9c commit 87aee82

File tree

3 files changed

+53
-16
lines changed

3 files changed

+53
-16
lines changed

influxdb_client_3/__init__.py

Lines changed: 33 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
from influxdb_client_3.write_client import InfluxDBClient as _InfluxDBClient, WriteOptions, Point
1111
from influxdb_client_3.write_client.client.exceptions import InfluxDBError
1212
from influxdb_client_3.write_client.client.write_api import WriteApi as _WriteApi, SYNCHRONOUS, ASYNCHRONOUS, \
13-
PointSettings, WriteType
13+
PointSettings, WriteType, DefaultWriteOptions
1414
from influxdb_client_3.write_client.domain.write_precision import WritePrecision
1515

1616
polars = importlib.util.find_spec("polars") is not None
@@ -57,6 +57,7 @@ def file_parser_options(**kwargs):
5757
INFLUX_ORG = "INFLUX_ORG"
5858
INFLUX_PRECISION = "INFLUX_PRECISION"
5959
INFLUX_AUTH_SCHEME = "INFLUX_AUTH_SCHEME"
60+
INFLUX_GZIP_THRESHOLD = "INFLUX_GZIP_THRESHOLD"
6061

6162

6263
def from_env(**kwargs: Any) -> 'InfluxDBClient3':
@@ -92,16 +93,20 @@ def from_env(**kwargs: Any) -> 'InfluxDBClient3':
9293
if missing_vars:
9394
raise ValueError(f"Missing required environment variables: {', '.join(missing_vars)}")
9495

95-
org = os.getenv(INFLUX_ORG, "default")
96+
write_options = WriteOptions(write_type=WriteType.synchronous)
97+
98+
if os.getenv(INFLUX_GZIP_THRESHOLD) is not None:
99+
write_options.gzip_threshold = int(os.getenv(INFLUX_GZIP_THRESHOLD))
100+
101+
if os.getenv(INFLUX_PRECISION) is not None:
102+
write_options.write_precision = os.getenv(INFLUX_PRECISION)
103+
104+
write_client_option = {'write_options': write_options}
96105

97106
if os.getenv(INFLUX_AUTH_SCHEME) is not None:
98107
kwargs['auth_scheme'] = os.getenv(INFLUX_AUTH_SCHEME)
99-
100-
write_client_option = None
101-
if os.getenv(INFLUX_PRECISION) is not None:
102-
write_client_option = default_client_options(
103-
write_options=WriteOptions(write_type=WriteType.synchronous, write_precision=os.getenv(INFLUX_PRECISION))
104-
)
108+
109+
org = os.getenv(INFLUX_ORG, "default")
105110

106111
return InfluxDBClient3(
107112
host=required_vars[INFLUX_HOST],
@@ -202,8 +207,26 @@ def __init__(
202207
self._org = org if org is not None else "default"
203208
self._database = database
204209
self._token = token
205-
self._write_client_options = write_client_options if write_client_options is not None \
206-
else default_client_options(write_options=WriteOptions(write_type=WriteType.synchronous, write_precision=WritePrecision.NS))
210+
211+
write_type = DefaultWriteOptions['write_type']
212+
write_precision = DefaultWriteOptions['write_precision']
213+
gzip_threshold = DefaultWriteOptions['gzip_threshold']
214+
if isinstance(write_client_options, dict) and write_client_options.get('write_options') is not None:
215+
write_opts = write_client_options['write_options']
216+
write_type = getattr(write_opts, 'write_type')
217+
write_precision = getattr(write_opts, 'write_precision')
218+
gzip_threshold = getattr(write_opts, 'gzip_threshold')
219+
220+
write_options = WriteOptions(
221+
write_type=write_type,
222+
write_precision=write_precision,
223+
gzip_threshold=gzip_threshold,
224+
)
225+
226+
self._write_client_options = {
227+
"write_options": write_options,
228+
**(write_client_options or {})
229+
}
207230

208231
# Parse the host input
209232
parsed_url = urllib.parse.urlparse(host)

influxdb_client_3/write_client/client/write_api.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,13 @@ class WriteType(Enum):
3939
synchronous = 3
4040

4141

42+
DefaultWriteOptions = {
43+
'write_type': WriteType.synchronous,
44+
'write_precision': WritePrecision.NS,
45+
'gzip_threshold': 1000
46+
}
47+
48+
4249
class WriteOptions(object):
4350
"""Write configuration."""
4451

@@ -52,6 +59,7 @@ def __init__(self, write_type: WriteType = WriteType.batching,
5259
exponential_base=2,
5360
max_close_wait=300_000,
5461
write_precision=DEFAULT_WRITE_PRECISION,
62+
gzip_threshold=1000,
5563
write_scheduler=ThreadPoolScheduler(max_workers=1)) -> None:
5664
"""
5765
Create write api configuration.
@@ -83,6 +91,7 @@ def __init__(self, write_type: WriteType = WriteType.batching,
8391
self.write_scheduler = write_scheduler
8492
self.max_close_wait = max_close_wait
8593
self.write_precision = write_precision
94+
self.gzip_threshold = gzip_threshold
8695

8796
def to_retry_strategy(self, **kwargs):
8897
"""

tests/test_influxdb_client_3.py

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -78,13 +78,13 @@ def test_default_client(self):
7878
expected_precision = WritePrecision.NS
7979
expected_write_type = WriteType.synchronous
8080

81-
def verify_client_write_options(client):
82-
write_options = client._write_client_options.get('write_options')
81+
def verify_client_write_options(c):
82+
write_options = c._write_client_options.get('write_options')
8383
self.assertEqual(write_options.write_precision, expected_precision)
8484
self.assertEqual(write_options.write_type, expected_write_type)
8585

86-
self.assertEqual(client._write_api._write_options.write_precision, expected_precision)
87-
self.assertEqual(client._write_api._write_options.write_type, expected_write_type)
86+
self.assertEqual(c._write_api._write_options.write_precision, expected_precision)
87+
self.assertEqual(c._write_api._write_options.write_type, expected_write_type)
8888

8989
env_client = from_env()
9090
verify_client_write_options(env_client)
@@ -94,15 +94,20 @@ def verify_client_write_options(client):
9494

9595

9696
@patch.dict('os.environ', {'INFLUX_HOST': 'localhost', 'INFLUX_TOKEN': 'test_token',
97-
'INFLUX_DATABASE': 'test_db', 'INFLUX_ORG': 'test_org', 'INFLUX_PRECISION': WritePrecision.MS})
97+
'INFLUX_DATABASE': 'test_db', 'INFLUX_ORG': 'test_org', 'INFLUX_PRECISION': WritePrecision.MS,
98+
'INFLUX_GZIP_THRESHOLD': '2000'})
9899
def test_from_env_all_env_vars_set(self):
99100
client = from_env()
100101
self.assertIsInstance(client, InfluxDBClient3)
101102
self.assertEqual(client._client.url, "https://localhost:443")
102103
self.assertEqual(client._database, "test_db")
103104
self.assertEqual(client._org, "test_org")
104105
self.assertEqual(client._token, "test_token")
105-
self.assertEqual(client._write_client_options.get("write_options").write_precision, WritePrecision.MS)
106+
107+
write_options = client._write_client_options.get("write_options")
108+
self.assertEqual(write_options.write_precision, WritePrecision.MS)
109+
self.assertEqual(write_options.gzip_threshold, 2000)
110+
106111

107112
@patch.dict('os.environ', {'INFLUX_HOST': "", 'INFLUX_TOKEN': "",
108113
'INFLUX_DATABASE': "", 'INFLUX_ORG': ""})

0 commit comments

Comments
 (0)