@@ -48,7 +48,7 @@ def __init__(
4848 """
4949 self .state = TransportState ()
5050 self ._metadata = metadata if metadata is not None else {}
51- self ._compress_level = compress_level
51+ self ._compress_level = min ( 9 , max ( 0 , compress_level if compress_level is not None else 0 ))
5252 self ._json_serializer = json_serializer
5353 self ._max_flush_time = max_flush_time
5454 self ._max_buffer_size = max_buffer_size
@@ -59,9 +59,10 @@ def __init__(
5959
6060 def queue (self , event_type , data , flush = False ):
6161 with self ._queue_lock :
62- self .queued_data .write ((self ._json_serializer ({event_type : data }) + "\n " ).encode ("utf-8" ))
63- since_last_flush = timeit .default_timer () - self ._last_flush
64- queue_size = self .queued_data_size
62+ queued_data = self .queued_data
63+ queued_data .write ((self ._json_serializer ({event_type : data }) + "\n " ).encode ("utf-8" ))
64+ since_last_flush = timeit .default_timer () - self ._last_flush
65+ queue_size = 0 if queued_data .fileobj is None else queued_data .fileobj .tell ()
6566 if flush :
6667 logger .debug ("forced flush" )
6768 self .flush ()
@@ -84,21 +85,11 @@ def queue(self, event_type, data, flush=False):
8485 @property
8586 def queued_data (self ):
8687 if self ._queued_data is None :
87- if self ._compress_level :
88- self ._queued_data = gzip .GzipFile (fileobj = BytesIO (), mode = "w" , compresslevel = self ._compress_level )
89- else :
90- self ._queued_data = BytesIO ()
91- self ._queued_data .write ((self ._json_serializer ({"metadata" : self ._metadata }) + "\n " ).encode ("utf-8" ))
88+ self ._queued_data = gzip .GzipFile (fileobj = BytesIO (), mode = "w" , compresslevel = self ._compress_level )
89+ data = (self ._json_serializer ({"metadata" : self ._metadata }) + "\n " ).encode ("utf-8" )
90+ self ._queued_data .write (data )
9291 return self ._queued_data
9392
94- @property
95- def queued_data_size (self ):
96- f = self ._queued_data
97- if f :
98- # return size of the underlying BytesIO object if it is compressed
99- return f .fileobj .tell () if hasattr (f , "fileobj" ) else f .tell ()
100- return 0
101-
10293 def flush (self , sync = False , start_flush_timer = True ):
10394 """
10495 Flush the queue
@@ -112,11 +103,9 @@ def flush(self, sync=False, start_flush_timer=True):
112103 if queued_data and not self .state .should_try ():
113104 logger .error ("dropping flushed data due to transport failure back-off" )
114105 elif queued_data :
115- if self ._compress_level :
116- fileobj = queued_data .fileobj # get a reference to the fileobj before closing the gzip file
117- queued_data .close ()
118- else :
119- fileobj = queued_data
106+ fileobj = queued_data .fileobj # get a reference to the fileobj before closing the gzip file
107+ queued_data .close ()
108+
120109 # StringIO on Python 2 does not have getbuffer, so we need to fall back to getvalue
121110 data = fileobj .getbuffer () if hasattr (fileobj , "getbuffer" ) else fileobj .getvalue ()
122111 if hasattr (self , "send_async" ) and not sync :
0 commit comments