@@ -137,6 +137,8 @@ def __init__(self, *, loop, bootstrap_servers='localhost',
137137 self ._md_update_fut = None
138138 self ._md_update_waiter = create_future (loop = self ._loop )
139139 self ._get_conn_lock = asyncio .Lock (loop = loop )
140+ self ._on_connection_closed_callback = None
141+ self ._on_connection_opened_callback = None
140142
141143 def __repr__ (self ):
142144 return '<AIOKafkaClient client_id=%s>' % self ._client_id
@@ -371,7 +373,24 @@ def set_topics(self, topics):
371373 self ._topics = set (topics )
372374 return res
373375
376+ def set_on_connection_closed_callback (self , on_connection_closed_callback ):
377+ """Set a callback function to invoke when a connection to Kafka is closed
378+ Arguments:
379+ on_connection_closed_callback: a callback function to call
380+ """
381+ self ._on_connection_closed_callback = on_connection_closed_callback ;
382+
383+ def set_on_connection_opened_callback (self , on_connection_opened_callback ):
384+ """Set a callback function to invoke when a connection to Kafka is opened
385+ Arguments:
386+ on_connection_opened_callback: a callback function to call
387+ """
388+ self ._on_connection_opened_callback = on_connection_opened_callback ;
389+
374390 def _on_connection_closed (self , conn , reason ):
391+ if self ._on_connection_closed_callback :
392+ self ._on_connection_closed_callback (conn , reason )
393+
375394 """ Callback called when connection is closed
376395 """
377396 # Connection failures imply that our metadata is stale, so let's
@@ -441,6 +460,9 @@ async def _get_conn(
441460 self .force_metadata_update ()
442461 return None
443462 else :
463+ if self ._on_connection_opened_callback :
464+ self ._on_connection_opened_callback (self ._conns [conn_id ])
465+
444466 return self ._conns [conn_id ]
445467
446468 async def ready (self , node_id , * , group = ConnectionGroup .DEFAULT ):
0 commit comments