@@ -89,7 +89,6 @@ def init_patch(self, *args, **kwargs):
8989
9090 # Set up reporting interval
9191 report_interval = metadata .get ("report_interval_ms" ) if metadata else _DEFAULT_REPORT_INTERVAL_MS
92-
9392 # Create and register producer tracker
9493 tr = ProducerTracker (
9594 lib = "kafka-python" ,
@@ -123,6 +122,7 @@ def close_patch(*a, **kw):
123122 self ._superstream_closed = True
124123 tr .close ()
125124 Heartbeat .unregister_tracker (tr .uuid )
125+ logger .debug ("Superstream tracking stopped for kafka-python producer with client_id: {}" , client_id )
126126 return orig_close (* a , ** kw )
127127
128128 self .close = close_patch
@@ -239,6 +239,7 @@ async def stop_patch(*a, **kw):
239239 self ._superstream_closed = True
240240 tr .close ()
241241 Heartbeat .unregister_tracker (tr .uuid )
242+ logger .debug ("Superstream tracking stopped for aiokafka producer with client_id: {}" , client_id )
242243 await original_stop (* a , ** kw )
243244
244245 self .stop = stop_patch
@@ -386,21 +387,19 @@ def produce(self, topic, *args, **kwargs):
386387 self ._tracker .record_topic (topic )
387388 return self ._producer .produce (topic , * args , ** kwargs )
388389
389- def poll (self , * args , ** kwargs ):
390- """Wrapper for poll method."""
391- return self ._producer .poll (* args , ** kwargs )
392-
393- def flush (self , * args , ** kwargs ):
394- """Wrapper for flush method."""
395- return self ._producer .flush (* args , ** kwargs )
396-
397- def close (self , * args , ** kwargs ):
398- """Wrapper for close method that handles cleanup."""
390+ def __del__ (self ):
391+ """Destructor to automatically clean up when producer is garbage collected."""
399392 if hasattr (self , '_tracker' ) and not hasattr (self , '_superstream_closed' ):
400- self ._superstream_closed = True
401- self ._tracker .close ()
402- Heartbeat .unregister_tracker (self ._tracker .uuid )
403- return self ._producer .close (* args , ** kwargs )
393+ try :
394+ self ._superstream_closed = True
395+ self ._tracker .close ()
396+ Heartbeat .unregister_tracker (self ._tracker .uuid )
397+ logger .debug ("Superstream tracking stopped for confluent-kafka producer with client_id: {}" ,
398+ getattr (self ._tracker , 'client_id' , 'unknown' ))
399+ except Exception as e :
400+ logger .error ("Error during automatic cleanup: {}" , e )
401+ else :
402+ logger .debug ("Producer already cleaned up or no tracker found" )
404403
405404 def __getattr__ (self , name ):
406405 """Delegate all other attributes to the underlying producer."""
0 commit comments