11# coding: utf-8
22import logging
33import os
4- import re
4+ from collections import defaultdict
55from datetime import timedelta
66from enum import Enum
7- from functools import reduce
8- from itertools import chain
97from random import random
108from time import sleep
11- from typing import Union , List
9+ from typing import Union , List , Any
1210
1311import rx
1412from rx import operators as ops , Observable
@@ -186,13 +184,13 @@ def __init__(self, influxdb_client, write_options: WriteOptions = WriteOptions()
186184 def write (self , bucket : str , org : str = None ,
187185 record : Union [
188186 str , List ['str' ], Point , List ['Point' ], dict , List ['dict' ], bytes , List ['bytes' ], Observable ] = None ,
189- write_precision : WritePrecision = DEFAULT_WRITE_PRECISION , ** kwargs ) -> None :
187+ write_precision : WritePrecision = DEFAULT_WRITE_PRECISION , ** kwargs ) -> Any :
190188 """
191189 Writes time-series data into influxdb.
192190
193191 :param str org: specifies the destination organization for writes; take either the ID or Name interchangeably; if both orgID and org are specified, org takes precedence. (required)
194192 :param str bucket: specifies the destination bucket for writes (required)
195- :param WritePrecision write_precision: specifies the precision for the unix timestamps within the body line-protocol
193+ :param WritePrecision write_precision: specifies the precision for the unix timestamps within the body line-protocol. The precision specified on a Point has precedes and is use for write.
196194 :param record: Points, line protocol, Pandas DataFrame, RxPY Observable to write
197195 :param data_frame_measurement_name: name of measurement for writing Pandas DataFrame
198196 :param data_frame_tag_columns: list of DataFrame columns which are tags, rest columns will be fields
@@ -217,11 +215,21 @@ def write(self, bucket: str, org: str = None,
217215 return self ._write_batching (bucket , org , record ,
218216 write_precision , ** kwargs )
219217
220- final_string = self ._serialize (record , write_precision , ** kwargs )
218+ payloads = defaultdict (list )
219+ self ._serialize (record , write_precision , payloads , ** kwargs )
221220
222221 _async_req = True if self ._write_options .write_type == WriteType .asynchronous else False
223222
224- return self ._post_write (_async_req , bucket , org , final_string , write_precision )
223+ def write_payload (payload ):
224+ final_string = b'\n ' .join (payload [1 ])
225+ return self ._post_write (_async_req , bucket , org , final_string , payload [0 ])
226+
227+ results = list (map (write_payload , payloads .items ()))
228+ if not _async_req :
229+ return None
230+ elif len (results ) == 1 :
231+ return results [0 ]
232+ return results
225233
226234 def flush (self ):
227235 # TODO
@@ -241,44 +249,39 @@ def __del__(self):
241249 self ._disposable = None
242250 pass
243251
244- def _serialize (self , record , write_precision , ** kwargs ) -> bytes :
245- _result = b''
252+ def _serialize (self , record , write_precision , payload , ** kwargs ):
246253 if isinstance (record , bytes ):
247- _result = record
254+ payload [ write_precision ]. append ( record )
248255
249256 elif isinstance (record , str ):
250- _result = record .encode ("utf-8" )
257+ self . _serialize ( record .encode ("utf-8" ), write_precision , payload , ** kwargs )
251258
252259 elif isinstance (record , Point ):
253- _result = self ._serialize (record .to_line_protocol (), write_precision , ** kwargs )
260+ self ._serialize (record .to_line_protocol (), record . write_precision , payload , ** kwargs )
254261
255262 elif isinstance (record , dict ):
256- _result = self ._serialize (Point .from_dict (record , write_precision = write_precision ),
257- write_precision , ** kwargs )
263+ self ._serialize (Point .from_dict (record , write_precision = write_precision ), write_precision , payload , ** kwargs )
258264 elif 'DataFrame' in type (record ).__name__ :
259265 _data = self ._data_frame_to_list_of_points (record , precision = write_precision , ** kwargs )
260- _result = self ._serialize (_data , write_precision , ** kwargs )
266+ self ._serialize (_data , write_precision , payload , ** kwargs )
261267
262268 elif isinstance (record , list ):
263- _result = b'\n ' .join ([self ._serialize (item , write_precision ,
264- ** kwargs ) for item in record ])
265-
266- return _result
269+ for item in record :
270+ self ._serialize (item , write_precision , payload , ** kwargs )
267271
268272 def _write_batching (self , bucket , org , data ,
269273 precision = DEFAULT_WRITE_PRECISION ,
270274 ** kwargs ):
271- _key = _BatchItemKey (bucket , org , precision )
272275 if isinstance (data , bytes ):
276+ _key = _BatchItemKey (bucket , org , precision )
273277 self ._subject .on_next (_BatchItem (key = _key , data = data ))
274278
275279 elif isinstance (data , str ):
276280 self ._write_batching (bucket , org , data .encode ("utf-8" ),
277281 precision , ** kwargs )
278282
279283 elif isinstance (data , Point ):
280- self ._write_batching (bucket , org , data .to_line_protocol (),
281- precision , ** kwargs )
284+ self ._write_batching (bucket , org , data .to_line_protocol (), data .write_precision , ** kwargs )
282285
283286 elif isinstance (data , dict ):
284287 self ._write_batching (bucket , org , Point .from_dict (data , write_precision = precision ),
0 commit comments