@@ -132,27 +132,17 @@ def worker(self):
132132 # https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/sdk.md#batching-processor.
133133 # Shutdown will interrupt this sleep. Emit will interrupt this sleep only if the queue is bigger then threshold.
134134 sleep_interrupted = self ._worker_awaken .wait (self ._schedule_delay )
135- print (
136- "In worker loop:{}, {}, {}" .format (
137- sleep_interrupted ,
138- self ._schedule_delay ,
139- self ._schedule_delay_millis ,
140- )
141- )
142135 if self ._shutdown :
143- print ("Shutdown is set..." )
144136 break
145137 self ._export (
146138 BatchExportStrategy .EXPORT_WHILE_BATCH_EXCEEDS_THRESHOLD
147139 if sleep_interrupted
148140 else BatchExportStrategy .EXPORT_AT_LEAST_ONE_BATCH
149141 )
150142 self ._worker_awaken .clear ()
151- print ("last export bach..." )
152143 self ._export (BatchExportStrategy .EXPORT_ALL )
153144
154145 def _export (self , batch_strategy : BatchExportStrategy ) -> None :
155- print ("export started...:{}" .format (batch_strategy ))
156146 with self ._export_lock :
157147 iteration = 0
158148 # We could see concurrent export calls from worker and force_flush. We call _should_export_batch
@@ -161,7 +151,6 @@ def _export(self, batch_strategy: BatchExportStrategy) -> None:
161151 iteration += 1
162152 token = attach (set_value (_SUPPRESS_INSTRUMENTATION_KEY , True ))
163153 try :
164- print ("SIZE: {}" .format (len (self ._queue )))
165154 self ._exporter .export (
166155 [
167156 # Oldest records are at the back, so pop from there.
@@ -174,7 +163,6 @@ def _export(self, batch_strategy: BatchExportStrategy) -> None:
174163 )
175164 ]
176165 )
177- print ("export succeded??" )
178166 except Exception : # pylint: disable=broad-exception-caught
179167 self ._logger .exception (
180168 "Exception while exporting %s." , self ._exporting
@@ -194,32 +182,16 @@ def emit(self, data: Telemetry) -> None:
194182 if len (self ._queue ) >= self ._max_export_batch_size :
195183 self ._worker_awaken .set ()
196184
197- # LoggerProvider calls shutdown without arguments currently, so the default is used.
198- def shutdown (self , timeout_millis = 30000 ):
185+ def shutdown (self ):
199186 if self ._shutdown :
200187 return
201188 # Prevents emit and force_flush from further calling export.
202189 self ._shutdown = True
203- # Interrupts sleep in the worker if it's sleeping.
190+ # Interrupts sleep in the worker, if it's sleeping.
204191 self ._worker_awaken .set ()
205- # Wait a tiny bit for the worker thread to wake and call export for a final time.
206- time .sleep (0.1 )
207- # We will force shutdown after 30 seconds.
208- for _ in range (10 ):
209- # If export is not being called, we can shutdown.
210- if not self ._export_lock .locked ():
211- break
212- time .sleep (timeout_millis / 1000 / 10 )
213- # We want to shutdown immediately because we already waited 30 seconds. Some exporter's shutdown support a timeout param.
214- if (
215- "timeout_millis"
216- in inspect .getfullargspec (self ._exporter .shutdown ).args
217- ):
218- self ._exporter .shutdown (timeout_millis = 0 ) # type: ignore
219- else :
220- self ._exporter .shutdown ()
221- # Worker thread should be finished at this point and return instantly.
192+ # Main worker loop should exit after one final export call with flush all strategy.
222193 self ._worker_thread .join ()
194+ self ._exporter .shutdown ()
223195
224196 # TODO: Fix force flush so the timeout is used https://github.com/open-telemetry/opentelemetry-python/issues/4568.
225197 def force_flush (self , timeout_millis : Optional [int ] = None ) -> bool :
0 commit comments