@@ -93,7 +93,6 @@ def __init__(
9393 self ._shutdown = False
9494 self ._shutdown_timeout_exceeded = False
9595 self ._export_lock = threading .Lock ()
96- self ._final_export_finished = threading .Event ()
9796 self ._worker_awaken = threading .Event ()
9897 self ._worker_thread .start ()
9998 if hasattr (os , "register_at_fork" ):
@@ -118,7 +117,6 @@ def _should_export_batch(
118117 def _at_fork_reinit (self ):
119118 self ._export_lock = threading .Lock ()
120119 self ._worker_awaken = threading .Event ()
121- self ._export_event = threading .Event ()
122120 self ._queue .clear ()
123121 self ._worker_thread = threading .Thread (
124122 name = f"OtelBatch{ self ._exporting } RecordProcessor" ,
@@ -170,9 +168,6 @@ def _export(self, batch_strategy: BatchExportStrategy) -> None:
170168 "Exception while exporting %s." , self ._exporting
171169 )
172170 detach (token )
173- # This is the final export. Set the signal to shutdown that export is done.
174- if batch_strategy == BatchExportStrategy .EXPORT_ALL and self .shutdown :
175- self ._final_export_finished .set ()
176171
177172 # Do not add any logging.log statements to this function, they can be being routed back to this `emit` function,
178173 # resulting in endless recursive calls that crash the program.
@@ -194,7 +189,7 @@ def shutdown(self, timeout_millis: int = 30000):
194189 self ._shutdown = True
195190 # Interrupts sleep in the worker if it's sleeping.
196191 self ._worker_awaken .set ()
197- self ._final_export_finished . wait (timeout_millis / 1000 )
192+ self ._worker_thread . join (timeout_millis / 1000 )
198193 # Stops worker thread from calling export again if queue is still not empty.
199194 self ._shutdown_timeout_exceeded = True
200195 # We want to shutdown immediately because we already waited `timeout_millis`.
0 commit comments