Skip to content

Commit 7b2b906

Browse files
committed
chore: clean up flake
1 parent ff136e1 commit 7b2b906

File tree

6 files changed

+73
-45
lines changed

6 files changed

+73
-45
lines changed

influxdb_client_3/__init__.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -350,7 +350,6 @@ def write(self, record=None, database=None, **kwargs):
350350
:type database: str
351351
:param kwargs: Additional arguments to pass to the write API.
352352
"""
353-
print(f"DEBUG InfluxDBClient3.write {record}")
354353
if database is None:
355354
database = self._database
356355

influxdb_client_3/query/query_api.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ class QueryApiOptions(object):
1919
proxy (str): URL to a proxy server
2020
flight_client_options (dict): base set of flight client options passed to internal pyarrow.flight.FlightClient
2121
"""
22-
_DEFAULT_TIMEOUT = 300.0
22+
_DEFAULT_TIMEOUT = 300.0
2323
tls_root_certs: bytes = None
2424
tls_verify: bool = None
2525
proxy: str = None

influxdb_client_3/write_client/_sync/api_client.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -357,9 +357,7 @@ def call_api(self, resource_path, method,
357357
If parameter async_req is False or missing,
358358
then the method will return the response directly.
359359
"""
360-
print("DEBUG ApiClient.call_api()")
361360
if not async_req:
362-
print(" DEBUG synchronous call")
363361
return self.__call_api(resource_path, method,
364362
path_params, query_params, header_params,
365363
body, post_params, files,
@@ -368,7 +366,6 @@ def call_api(self, resource_path, method,
368366
_preload_content, _request_timeout, urlopen_kw)
369367
else:
370368
# TODO possible refactor - async handler inside package `_sync`?
371-
print(" DEBUG asynchronous call")
372369
thread = self.pool.apply_async(self.__call_api, (resource_path,
373370
method, path_params, query_params,
374371
header_params, body,

influxdb_client_3/write_client/client/write_api.py

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
from collections import defaultdict
88
from datetime import timedelta
99
from enum import Enum
10-
from multiprocessing.pool import ApplyResult
1110
from random import random
1211
from time import sleep
1312
from typing import Union, Any, Iterable, NamedTuple
@@ -274,8 +273,6 @@ def __init__(self,
274273
# Define Subject that listen incoming data and produces writes into InfluxDB
275274
self._subject = Subject()
276275

277-
print(f"DEBUG batching write with subject {self._subject}")
278-
279276
self._disposable = self._subject.pipe(
280277
# Split incoming data to windows by batch_size or flush_interval
281278
ops.window_with_time_or_count(count=write_options.batch_size,
@@ -381,7 +378,6 @@ def write(self, bucket: str, org: str = None,
381378
382379
""" # noqa: E501
383380
org = get_org_query_param(org=org, client=self._influxdb_client)
384-
print("DEBUG WriteApi.write()")
385381

386382
self._append_default_tags(record)
387383

@@ -400,21 +396,15 @@ def write(self, bucket: str, org: str = None,
400396
_async_req = True if self._write_options.write_type == WriteType.asynchronous else False
401397

402398
def write_payload(payload):
403-
print("DEBUG WriteApi.write_payload()")
404399
final_string = b'\n'.join(payload[1])
405400
result = self._post_write(_async_req, bucket, org, final_string, payload[0], no_sync)
406401
# return self._post_write(_async_req, bucket, org, final_string, payload[0], no_sync)
407-
print(f" DEBUG write_payload() result {result}")
408402
return result
409403

410404
results = list(map(write_payload, payloads.items()))
411-
print(f" DEBUG WriteApi.write() results {results}")
412405
if not _async_req:
413-
print(f" --->DEBUG not async_request")
414406
return None
415407
elif len(results) == 1:
416-
# TODO if results contains error or exception ensure handled
417-
print(f" --->DEBUG async_request() results {results[0]}")
418408
return results[0]
419409
return results
420410

@@ -482,13 +472,11 @@ def __del__(self):
482472
def _write_batching(self, bucket, org, data,
483473
precision=None,
484474
**kwargs):
485-
print("DEBUG _write_batching()")
486475
if precision is None:
487476
precision = self._write_options.write_precision
488477

489478
if isinstance(data, bytes):
490479
_key = _BatchItemKey(bucket, org, precision)
491-
print(f" DEBUG _write_batching() data bytes {_key}")
492480
self._subject.on_next(_BatchItem(key=_key, data=data))
493481

494482
elif isinstance(data, str):
@@ -539,9 +527,6 @@ def _write_batching(self, bucket, org, data,
539527
return None
540528

541529
def _http(self, batch_item: _BatchItem):
542-
543-
print("DEBUG _http()")
544-
545530
logger.debug("Write time series data into InfluxDB: %s", batch_item)
546531

547532
if self._retry_callback:
@@ -562,8 +547,6 @@ def _retry_callback_delegate(exception):
562547
return _BatchResponse(data=batch_item)
563548

564549
def _post_write(self, _async_req, bucket, org, body, precision, no_sync, **kwargs):
565-
print("DEBUG write_api._post_write()")
566-
567550
return self._write_service.post_write(org=org, bucket=bucket, body=body, precision=precision,
568551
no_sync=no_sync,
569552
async_req=_async_req,
@@ -572,8 +555,6 @@ def _post_write(self, _async_req, bucket, org, body, precision, no_sync, **kwarg
572555

573556
def _to_response(self, data: _BatchItem, delay: timedelta):
574557

575-
print("DEBUG _to_response()")
576-
577558
return rx.of(data).pipe(
578559
ops.subscribe_on(self._write_options.write_scheduler),
579560
# use delay if its specified

influxdb_client_3/write_client/service/write_service.py

Lines changed: 72 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -42,50 +42,105 @@ def post_write(self, org, bucket, body, **kwargs): # noqa: E501,D401,D403
4242
returns the request thread.
4343
""" # noqa: E501
4444
kwargs['_return_http_data_only'] = True
45-
print("DEBUG WriteService.post_write()")
4645
if kwargs.get('async_req'):
47-
print(" DEBUG making asynchronous request ")
4846
thread = self.post_write_with_http_info(org, bucket, body, **kwargs) # noqa: E501
49-
print(f" DEBUG thread: {thread} ")
5047
# return self.post_write_with_http_info(org, bucket, body, **kwargs) # noqa: E501
5148
return thread
5249
else:
53-
print(" DEBUG making synchronous request ")
5450
(data) = self.post_write_with_http_info(org, bucket, body, **kwargs) # noqa: E501
5551
return data
5652

53+
# TODO review this documentation - is it still up-to-date?
5754
def post_write_with_http_info(self, org, bucket, body, **kwargs): # noqa: E501,D401,D403
5855
"""Write data.
5956
60-
Writes data to a bucket. Use this endpoint to send data in [line protocol](https://docs.influxdata.com/influxdb/latest/reference/syntax/line-protocol/) format to InfluxDB. #### InfluxDB Cloud - Does the following when you send a write request: 1. Validates the request and queues the write. 2. If queued, responds with _success_ (HTTP `2xx` status code); _error_ otherwise. 3. Handles the delete asynchronously and reaches eventual consistency. To ensure that InfluxDB Cloud handles writes and deletes in the order you request them, wait for a success response (HTTP `2xx` status code) before you send the next request. Because writes and deletes are asynchronous, your change might not yet be readable when you receive the response. #### InfluxDB OSS - Validates the request and handles the write synchronously. - If all points were written successfully, responds with HTTP `2xx` status code; otherwise, returns the first line that failed. #### Required permissions - `write-buckets` or `write-bucket BUCKET_ID`. *`BUCKET_ID`* is the ID of the destination bucket. #### Rate limits (with InfluxDB Cloud) `write` rate limits apply. For more information, see [limits and adjustable quotas](https://docs.influxdata.com/influxdb/cloud/account-management/limits/). #### Related guides - [Write data with the InfluxDB API](https://docs.influxdata.com/influxdb/latest/write-data/developer-tools/api) - [Optimize writes to InfluxDB](https://docs.influxdata.com/influxdb/latest/write-data/best-practices/optimize-writes/) - [Troubleshoot issues writing data](https://docs.influxdata.com/influxdb/latest/write-data/troubleshoot/)
57+
Writes data to a bucket.
58+
Use this endpoint to send data in
59+
[line protocol](https://docs.influxdata.com/influxdb/latest/reference/syntax/line-protocol/) format
60+
to InfluxDB.
61+
#### InfluxDB Cloud
62+
- Does the following when you send a write request:
63+
1. Validates the request and queues the write.
64+
2. If queued, responds with _success_ (HTTP `2xx` status code); _error_ otherwise.
65+
3. Handles the delete asynchronously and reaches eventual consistency.
66+
To ensure that InfluxDB Cloud handles writes and deletes in the order you request them, wait for a success
67+
response (HTTP `2xx` status code) before you send the next request. Because writes and deletes are
68+
asynchronous, your change might not yet be readable when you receive the response.
69+
#### InfluxDB OSS
70+
- Validates the request and handles the write synchronously.
71+
- If all points were written successfully, responds with HTTP `2xx` status code;
72+
otherwise, returns the first line that failed. #### Required permissions
73+
- `write-buckets` or `write-bucket BUCKET_ID`. *`BUCKET_ID`* is the ID of the destination bucket.
74+
#### Rate limits (with InfluxDB Cloud) `write` rate limits apply.
75+
For more information, see
76+
[limits and adjustable quotas](https://docs.influxdata.com/influxdb/cloud/account-management/limits/).
77+
#### Related guides
78+
- [Write data with the InfluxDB API]
79+
(https://docs.influxdata.com/influxdb/latest/write-data/developer-tools/api)
80+
- [Optimize writes to InfluxDB]
81+
(https://docs.influxdata.com/influxdb/latest/write-data/best-practices/optimize-writes/)
82+
- [Troubleshoot issues writing data](https://docs.influxdata.com/influxdb/latest/write-data/troubleshoot/)
6183
This method makes a synchronous HTTP request by default. To make an
6284
asynchronous HTTP request, please pass async_req=True
6385
>>> thread = api.post_write_with_http_info(org, bucket, body, async_req=True)
6486
>>> result = thread.get()
6587
6688
:param async_req bool
67-
:param str org: An organization name or ID. #### InfluxDB Cloud - Doesn't use the `org` parameter or `orgID` parameter. - Writes data to the bucket in the organization associated with the authorization (API token). #### InfluxDB OSS - Requires either the `org` parameter or the `orgID` parameter. - If you pass both `orgID` and `org`, they must both be valid. - Writes data to the bucket in the specified organization. (required)
68-
:param str bucket: A bucket name or ID. InfluxDB writes all points in the batch to the specified bucket. (required)
69-
:param str body: In the request body, provide data in [line protocol format](https://docs.influxdata.com/influxdb/latest/reference/syntax/line-protocol/). To send compressed data, do the following: 1. Use [GZIP](https://www.gzip.org/) to compress the line protocol data. 2. In your request, send the compressed data and the `Content-Encoding: gzip` header. #### Related guides - [Best practices for optimizing writes](https://docs.influxdata.com/influxdb/latest/write-data/best-practices/optimize-writes/) (required)
89+
:param str org: An organization name or ID.
90+
#### InfluxDB Cloud - Doesn't use the `org` parameter or `orgID` parameter.
91+
- Writes data to the bucket in the organization associated with the authorization (API token).
92+
#### InfluxDB OSS - Requires either the `org` parameter or the `orgID` parameter.
93+
- If you pass both `orgID` and `org`, they must both be valid.
94+
- Writes data to the bucket in the specified organization. (required)
95+
:param str bucket: A bucket name or ID. InfluxDB writes all points in the batch to the
96+
specified bucket. (required)
97+
:param str body: In the request body, provide data in
98+
[line protocol format](https://docs.influxdata.com/influxdb/latest/reference/syntax/line-protocol/).
99+
To send compressed data, do the following:
100+
1. Use [GZIP](https://www.gzip.org/) to compress the line protocol data.
101+
2. In your request, send the compressed data and the `Content-Encoding: gzip` header.
102+
#### Related guides
103+
- [Best practices for optimizing writes]
104+
(https://docs.influxdata.com/influxdb/latest/write-data/best-practices/optimize-writes/) (required)
70105
:param str zap_trace_span: OpenTracing span context
71-
:param str content_encoding: The compression applied to the line protocol in the request payload. To send a GZIP payload, pass `Content-Encoding: gzip` header.
72-
:param str content_type: The format of the data in the request body. To send a line protocol payload, pass `Content-Type: text/plain; charset=utf-8`.
73-
:param int content_length: The size of the entity-body, in bytes, sent to InfluxDB. If the length is greater than the `max body` configuration option, the server responds with status code `413`.
74-
:param str accept: The content type that the client can understand. Writes only return a response body if they fail--for example, due to a formatting problem or quota limit. #### InfluxDB Cloud - Returns only `application/json` for format and limit errors. - Returns only `text/html` for some quota limit errors. #### InfluxDB OSS - Returns only `application/json` for format and limit errors. #### Related guides - [Troubleshoot issues writing data](https://docs.influxdata.com/influxdb/latest/write-data/troubleshoot/)
75-
:param str org_id: An organization ID. #### InfluxDB Cloud - Doesn't use the `org` parameter or `orgID` parameter. - Writes data to the bucket in the organization associated with the authorization (API token). #### InfluxDB OSS - Requires either the `org` parameter or the `orgID` parameter. - If you pass both `orgID` and `org`, they must both be valid. - Writes data to the bucket in the specified organization.
106+
:param str content_encoding: The compression applied to the line protocol in the request payload.
107+
To send a GZIP payload, pass `Content-Encoding: gzip` header.
108+
:param str content_type: The format of the data in the request body. To send a line protocol payload,
109+
pass `Content-Type: text/plain; charset=utf-8`.
110+
:param int content_length: The size of the entity-body, in bytes, sent to InfluxDB. If the length is greater
111+
than the `max body` configuration option, the server responds with status code `413`.
112+
:param str accept: The content type that the client can understand. Writes only return a response body
113+
if they fail--for example, due to a formatting problem or quota limit.
114+
#### InfluxDB Cloud
115+
- Returns only `application/json` for format and limit errors.
116+
- Returns only `text/html` for some quota limit errors.
117+
#### InfluxDB OSS
118+
- Returns only `application/json` for format and limit errors.
119+
#### Related guides
120+
- [Troubleshoot issues writing data](https://docs.influxdata.com/influxdb/latest/write-data/troubleshoot/)
121+
:param str org_id: An organization ID.
122+
#### InfluxDB Cloud
123+
- Doesn't use the `org` parameter or `orgID` parameter.
124+
- Writes data to the bucket in the organization associated with the authorization (API token).
125+
#### InfluxDB OSS
126+
- Requires either the `org` parameter or the `orgID` parameter.
127+
- If you pass both `orgID` and `org`, they must both be valid.
128+
- Writes data to the bucket in the specified organization.
76129
:param WritePrecision precision: The precision for unix timestamps in the line protocol batch.
77-
:param bool no_sync: Instructs the server whether to wait with the response until WAL persistence completes. True value means faster write but without the confirmation that the data was persisted. Note: This option is supported by InfluxDB 3 Core and Enterprise servers only. For other InfluxDB 3 server types (InfluxDB Clustered, InfluxDB Clould Serverless/Dedicated) the write operation will fail with an error.
130+
:param bool no_sync: Instructs the server whether to wait with the response until WAL persistence completes.
131+
True value means faster write but without the confirmation that the data was persisted.
132+
Note: This option is supported by InfluxDB 3 Core and Enterprise servers only.
133+
For other InfluxDB 3 server types (InfluxDB Clustered, InfluxDB Clould Serverless/Dedicated) the write
134+
operation will fail with an error.
78135
:return: None
79136
If the method is called asynchronously,
80137
returns the request thread.
81-
"""
138+
""" # noqa: E501
82139
print("WriteService.post_write_with_http_info()")
83140
# noqa: E501
84141
local_var_params, path, path_params, query_params, header_params, body_params = \
85142
self._post_write_prepare(org, bucket, body, **kwargs) # noqa: E501
86143

87-
print(f"DEBUG local_var_params: {local_var_params}")
88-
89144
try:
90145
return self.api_client.call_api(
91146
path, 'POST',
@@ -156,8 +211,6 @@ async def post_write_async(self, org, bucket, body, **kwargs): # noqa: E501,D40
156211
def _post_write_prepare(self, org, bucket, body, **kwargs): # noqa: E501,D401,D403
157212
local_var_params = dict(locals())
158213

159-
print(f"DEBUG local_var_params: {local_var_params}")
160-
161214
all_params = ['org', 'bucket', 'body', 'zap_trace_span', 'content_encoding', 'content_type', 'content_length', 'accept', 'org_id', 'precision', 'no_sync'] # noqa: E501
162215
self._check_operation_params('post_write', all_params, local_var_params)
163216
# verify the required parameter 'org' is set

tests/test_influxdb_client_3.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,8 +95,6 @@ def test_write_options(self):
9595
assert client._write_client_options["write_options"].max_retry_delay == 0
9696
assert client._write_client_options["write_options"].write_type == WriteType.synchronous
9797
assert client._write_client_options["write_options"].flush_interval == 500
98-
print(f"DEBUG client._client._base._Configuration {client._client.conf.__dict__}")
99-
print(f"DEBUG client._client._base._Configuration.timeout {client._client.conf.timeout}")
10098

10199

102100
def test_default_write_options(self):

0 commit comments

Comments
 (0)