|
1 | | -import importlib.util |
2 | | -import os |
3 | 1 | import urllib.parse |
4 | | -from typing import Any |
5 | | - |
6 | 2 | import pyarrow as pa |
| 3 | +import importlib.util |
7 | 4 |
|
8 | 5 | from influxdb_client_3.query.query_api import QueryApi as _QueryApi, QueryApiOptionsBuilder |
9 | 6 | from influxdb_client_3.read_file import UploadFile |
10 | 7 | from influxdb_client_3.write_client import InfluxDBClient as _InfluxDBClient, WriteOptions, Point |
11 | 8 | from influxdb_client_3.write_client.client.exceptions import InfluxDBError |
12 | 9 | from influxdb_client_3.write_client.client.write_api import WriteApi as _WriteApi, SYNCHRONOUS, ASYNCHRONOUS, \ |
13 | | - PointSettings, WriteType, DefaultWriteOptions |
| 10 | + PointSettings |
14 | 11 | from influxdb_client_3.write_client.domain.write_precision import WritePrecision |
15 | 12 |
|
16 | 13 | polars = importlib.util.find_spec("polars") is not None |
17 | 14 |
|
18 | | -INFLUX_HOST = "INFLUX_HOST" |
19 | | -INFLUX_TOKEN = "INFLUX_TOKEN" |
20 | | -INFLUX_DATABASE = "INFLUX_DATABASE" |
21 | | -INFLUX_ORG = "INFLUX_ORG" |
22 | | -INFLUX_PRECISION = "INFLUX_PRECISION" |
23 | | -INFLUX_AUTH_SCHEME = "INFLUX_AUTH_SCHEME" |
24 | | - |
25 | 15 |
|
26 | 16 | def write_client_options(**kwargs): |
27 | 17 | """ |
@@ -93,27 +83,6 @@ def _merge_options(defaults, exclude_keys=None, custom=None): |
93 | 83 | return _deep_merge(defaults, {key: value for key, value in custom.items() if key not in exclude_keys}) |
94 | 84 |
|
95 | 85 |
|
96 | | -def _parse_precision(precision): |
97 | | - """ |
98 | | - Parses the precision value and ensures it is valid. |
99 | | -
|
100 | | - This function checks that the given `precision` is one of the allowed |
101 | | - values defined in `WritePrecision`. If the precision is invalid, it |
102 | | - raises a `ValueError`. The function returns the valid precision value |
103 | | - if it passes validation. |
104 | | -
|
105 | | - :param precision: The precision value to be validated. |
106 | | - Must be one of WritePrecision.NS, WritePrecision.MS, |
107 | | - WritePrecision.S, or WritePrecision.US. |
108 | | - :return: The valid precision value. |
109 | | - :rtype: WritePrecision |
110 | | - :raises ValueError: If the provided precision is not valid. |
111 | | - """ |
112 | | - if precision not in [WritePrecision.NS, WritePrecision.MS, WritePrecision.S, WritePrecision.US]: |
113 | | - raise ValueError(f"Invalid precision value: {precision}") |
114 | | - return precision |
115 | | - |
116 | | - |
117 | 86 | class InfluxDBClient3: |
118 | 87 | def __init__( |
119 | 88 | self, |
@@ -167,23 +136,8 @@ def __init__( |
167 | 136 | self._org = org if org is not None else "default" |
168 | 137 | self._database = database |
169 | 138 | self._token = token |
170 | | - |
171 | | - write_type = DefaultWriteOptions.write_type.value |
172 | | - write_precision = DefaultWriteOptions.write_precision.value |
173 | | - if isinstance(write_client_options, dict) and write_client_options.get('write_options') is not None: |
174 | | - write_opts = write_client_options['write_options'] |
175 | | - write_type = getattr(write_opts, 'write_type', write_type) |
176 | | - write_precision = getattr(write_opts, 'write_precision', write_precision) |
177 | | - |
178 | | - write_options = WriteOptions( |
179 | | - write_type=write_type, |
180 | | - write_precision=write_precision |
181 | | - ) |
182 | | - |
183 | | - self._write_client_options = { |
184 | | - "write_options": write_options, |
185 | | - **(write_client_options or {}) |
186 | | - } |
| 139 | + self._write_client_options = write_client_options if write_client_options is not None \ |
| 140 | + else default_client_options(write_options=SYNCHRONOUS) |
187 | 141 |
|
188 | 142 | # Parse the host input |
189 | 143 | parsed_url = urllib.parse.urlparse(host) |
@@ -224,63 +178,6 @@ def __init__( |
224 | 178 | flight_client_options=flight_client_options, |
225 | 179 | proxy=kwargs.get("proxy", None), options=q_opts_builder.build()) |
226 | 180 |
|
227 | | - @classmethod |
228 | | - def from_env(cls, **kwargs: Any) -> 'InfluxDBClient3': |
229 | | - |
230 | | - """ |
231 | | - Creates an instance of InfluxDBClient3 configured by specific environment |
232 | | - variables. This method automatically loads configuration settings, |
233 | | - such as connection details, security parameters, and performance |
234 | | - options, from environment variables and initializes the client |
235 | | - accordingly. |
236 | | -
|
237 | | - :param cls: |
238 | | - The class used to create the client instance. |
239 | | - :param kwargs: |
240 | | - Additional optional parameters that can be passed to customize the |
241 | | - configuration or override specific settings derived from the |
242 | | - environment variables. |
243 | | -
|
244 | | - :raises ValueError: |
245 | | - If any required environment variables are missing or have empty |
246 | | - values. |
247 | | -
|
248 | | - :return: |
249 | | - An initialized instance of the `InfluxDBClient3` class with all the |
250 | | - configuration settings applied. |
251 | | - :rtype: |
252 | | - InfluxDBClient3 |
253 | | - """ |
254 | | - |
255 | | - required_vars = { |
256 | | - INFLUX_HOST: os.getenv(INFLUX_HOST), |
257 | | - INFLUX_TOKEN: os.getenv(INFLUX_TOKEN), |
258 | | - INFLUX_DATABASE: os.getenv(INFLUX_DATABASE) |
259 | | - } |
260 | | - missing_vars = [var for var, value in required_vars.items() if value is None or value == ""] |
261 | | - if missing_vars: |
262 | | - raise ValueError(f"Missing required environment variables: {', '.join(missing_vars)}") |
263 | | - |
264 | | - write_options = WriteOptions(write_type=WriteType.synchronous) |
265 | | - |
266 | | - precision = os.getenv(INFLUX_PRECISION) |
267 | | - if precision is not None: |
268 | | - write_options.write_precision = _parse_precision(precision) |
269 | | - write_client_option = {'write_options': write_options} |
270 | | - |
271 | | - if os.getenv(INFLUX_AUTH_SCHEME) is not None: |
272 | | - kwargs['auth_scheme'] = os.getenv(INFLUX_AUTH_SCHEME) |
273 | | - |
274 | | - org = os.getenv(INFLUX_ORG, "default") |
275 | | - return InfluxDBClient3( |
276 | | - host=required_vars[INFLUX_HOST], |
277 | | - token=required_vars[INFLUX_TOKEN], |
278 | | - database=required_vars[INFLUX_DATABASE], |
279 | | - write_client_options=write_client_option, |
280 | | - org=org, |
281 | | - **kwargs |
282 | | - ) |
283 | | - |
284 | 181 | def write(self, record=None, database=None, **kwargs): |
285 | 182 | """ |
286 | 183 | Write data to InfluxDB. |
|
0 commit comments