@@ -88,7 +88,8 @@ def __init__(
8888 self ._max_flush_time = max_flush_time
8989 self ._max_buffer_size = max_buffer_size
9090 self ._queued_data = None
91- self ._event_queue = ChilledQueue (maxsize = 10000 , chill_until = queue_chill_count , max_chill_time = queue_chill_time )
91+ self ._event_queue = self ._init_event_queue (chill_until = queue_chill_count , max_chill_time = queue_chill_time )
92+ self ._is_chilled_queue = isinstance (self ._event_queue , ChilledQueue )
9293 self ._event_process_thread = threading .Thread (target = self ._process_queue , name = "eapm event processor thread" )
9394 self ._event_process_thread .daemon = True
9495 self ._last_flush = timeit .default_timer ()
@@ -105,18 +106,14 @@ def __init__(
105106 def queue (self , event_type , data , flush = False ):
106107 try :
107108 self ._flushed .clear ()
108- self ._event_queue .put ((event_type , data , flush ), block = False , chill = not (event_type == "close" or flush ))
109+ kwargs = {"chill" : not (event_type == "close" or flush )} if self ._is_chilled_queue else {}
110+ self ._event_queue .put ((event_type , data , flush ), block = False , ** kwargs )
111+
109112 except compat .queue .Full :
110113 logger .warning ("Event of type %s dropped due to full event queue" , event_type )
111114
112115 def _process_queue (self ):
113- def init_buffer ():
114- buffer = gzip .GzipFile (fileobj = compat .BytesIO (), mode = "w" , compresslevel = self ._compress_level )
115- data = (self ._json_serializer ({"metadata" : self ._metadata }) + "\n " ).encode ("utf-8" )
116- buffer .write (data )
117- return buffer
118-
119- buffer = init_buffer ()
116+ buffer = self ._init_buffer ()
120117 buffer_written = False
121118 # add some randomness to timeout to avoid stampedes of several workers that are booted at the same time
122119 max_flush_time = self ._max_flush_time * random .uniform (0.9 , 1.1 ) if self ._max_flush_time else None
@@ -166,11 +163,34 @@ def init_buffer():
166163 if buffer_written :
167164 self ._flush (buffer )
168165 self ._last_flush = timeit .default_timer ()
169- buffer = init_buffer ()
166+ buffer = self . _init_buffer ()
170167 buffer_written = False
171168 max_flush_time = self ._max_flush_time * random .uniform (0.9 , 1.1 ) if self ._max_flush_time else None
172169 self ._flushed .set ()
173170
171+ def _init_buffer (self ):
172+ buffer = gzip .GzipFile (fileobj = compat .BytesIO (), mode = "w" , compresslevel = self ._compress_level )
173+ data = (self ._json_serializer ({"metadata" : self ._metadata }) + "\n " ).encode ("utf-8" )
174+ buffer .write (data )
175+ return buffer
176+
177+ def _init_event_queue (self , chill_until , max_chill_time ):
178+ # some libraries like eventlet monkeypatch queue.Queue and switch out the implementation.
179+ # In those cases we can't rely on internals of queue.Queue to be there, so we simply use
180+ # their queue and forgo the optimizations of ChilledQueue. In the case of eventlet, this
181+ # isn't really a loss, because the main reason for ChilledQueue (avoiding context switches
182+ # due to the event processor thread being woken up all the time) is not an issue.
183+ if all (
184+ (
185+ hasattr (compat .queue .Queue , "not_full" ),
186+ hasattr (compat .queue .Queue , "not_empty" ),
187+ hasattr (compat .queue .Queue , "unfinished_tasks" ),
188+ )
189+ ):
190+ return ChilledQueue (maxsize = 10000 , chill_until = chill_until , max_chill_time = max_chill_time )
191+ else :
192+ return compat .queue .Queue (maxsize = 10000 )
193+
174194 def _flush (self , buffer ):
175195 """
176196 Flush the queue. This method should only be called from the event processing queue
0 commit comments