@@ -41,6 +41,27 @@ def file_parser_options(**kwargs):
4141 return kwargs
4242
4343
44+ def _deep_merge (target , source ):
45+ """
46+ Performs a deep merge of dictionaries or lists,
47+ recursively merging the contents, handling nested structures, and concatenation of lists.
48+ """
49+ if isinstance (target , dict ) and isinstance (source , dict ):
50+ for key , value in source .items ():
51+ if key in target and isinstance (target [key ], (dict , list )) and isinstance (value , (dict , list )):
52+ # If both target and source values are dictionaries or lists, merge them recursively
53+ target [key ] = _deep_merge (target [key ], value )
54+ else :
55+ # Otherwise, replace the target value with the source value
56+ target [key ] = value
57+ elif isinstance (target , list ) and isinstance (source , list ):
58+ # If both target and source are lists, concatenate them
59+ target .extend (source )
60+ else :
61+ # For other types, simply replace the target with the source
62+ target = source
63+ return target
64+
4465class InfluxDBClient3 :
4566 def __init__ (
4667 self ,
@@ -100,7 +121,13 @@ def __init__(
100121 port = query_port_overwrite
101122 self ._flight_client = FlightClient (f"grpc+tls://{ hostname } :{ port } " , ** self ._flight_client_options )
102123
103-
124+ def _merge_options (self , defaults , custom = {}):
125+ """
126+ Merge default option arguments with custom (user-provided) arguments.
127+ """
128+ if len (custom ) == 0 :
129+ return defaults
130+ return _deep_merge (defaults , {key : value for key , value in custom .items ()})
104131
105132 def write (self , record = None , database = None ,** kwargs ):
106133 """
@@ -172,7 +199,8 @@ def _process_dataframe(self, df, measurement_name, tag_columns, timestamp_column
172199 data_frame_measurement_name = measurement_name ,
173200 data_frame_tag_columns = tag_columns ,
174201 data_frame_timestamp_column = timestamp_column , ** kwargs )
175-
202+
203+
176204 def query (self , query , language = "sql" , mode = "all" , database = None ,** kwargs ):
177205 """
178206 Query data from InfluxDB.
@@ -185,7 +213,7 @@ def query(self, query, language="sql", mode="all", database=None,**kwargs ):
185213 :type mode: str
186214 :param database: The database to query from. If not provided, uses the database provided during initialization.
187215 :type database: str
188- :param kwargs: Additional arguments for the query.
216+ :param kwargs: FlightClientCallOptions for the query.
189217 :return: The queried data.
190218 """
191219
@@ -194,10 +222,14 @@ def query(self, query, language="sql", mode="all", database=None,**kwargs ):
194222 database = self ._database
195223
196224 try :
197- headers = [(b"authorization" , f"Bearer { self ._token } " .encode ('utf-8' ))]
198-
199225 # Create an authorization header
200- _options = FlightCallOptions (headers = headers , ** kwargs )
226+ optargs = {
227+ "headers" : [(b"authorization" , f"Bearer { self ._token } " .encode ('utf-8' ))],
228+ "timeout" : 300
229+ }
230+ opts = self ._merge_options (optargs , kwargs )
231+ _options = FlightCallOptions (** opts )
232+
201233 ticket_data = {"database" : database , "sql_query" : query , "query_type" : language }
202234 ticket = Ticket (json .dumps (ticket_data ).encode ('utf-8' ))
203235 flight_reader = self ._flight_client .do_get (ticket , _options )
@@ -225,8 +257,7 @@ def __enter__(self):
225257
226258 def __exit__ (self , exc_type , exc_val , exc_tb ):
227259 self .close ()
228-
229-
260+
230261__all__ = [
231262 "InfluxDBClient3" ,
232263 "Point" ,
0 commit comments