Skip to content

Commit d095ec5

Browse files
author
U-FINAL\odedz
committed
Add connection open/close callbacks
1 parent 7253e21 commit d095ec5

File tree

4 files changed

+56
-0
lines changed

4 files changed

+56
-0
lines changed

CHANGES.rst

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
CHANGES
22
-------
33

4+
552.feature
5+
^^^^^^^^^^^
6+
7+
Add new callbacks for connection opened/close for consumer and producer
8+
49
523.feature
510
^^^^^^^^^^^
611

aiokafka/client.py

100644100755
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,8 @@ def __init__(self, *, loop, bootstrap_servers='localhost',
137137
self._md_update_fut = None
138138
self._md_update_waiter = create_future(loop=self._loop)
139139
self._get_conn_lock = asyncio.Lock(loop=loop)
140+
self._on_connection_closed_callback = None
141+
self._on_connection_opened_callback = None
140142

141143
def __repr__(self):
142144
return '<AIOKafkaClient client_id=%s>' % self._client_id
@@ -371,7 +373,24 @@ def set_topics(self, topics):
371373
self._topics = set(topics)
372374
return res
373375

376+
def set_on_connection_closed_callback(self, on_connection_closed_callback):
377+
"""Set a callback function to invoke when a connection to Kafka is closed
378+
Arguments:
379+
on_connection_closed_callback: a callback function to call
380+
"""
381+
self._on_connection_closed_callback = on_connection_closed_callback;
382+
383+
def set_on_connection_opened_callback(self, on_connection_opened_callback):
384+
"""Set a callback function to invoke when a connection to Kafka is opened
385+
Arguments:
386+
on_connection_opened_callback: a callback function to call
387+
"""
388+
self._on_connection_opened_callback = on_connection_opened_callback;
389+
374390
def _on_connection_closed(self, conn, reason):
391+
if self._on_connection_closed_callback:
392+
self._on_connection_closed_callback(conn, reason)
393+
375394
""" Callback called when connection is closed
376395
"""
377396
# Connection failures imply that our metadata is stale, so let's
@@ -441,6 +460,9 @@ async def _get_conn(
441460
self.force_metadata_update()
442461
return None
443462
else:
463+
if self._on_connection_opened_callback:
464+
self._on_connection_opened_callback(self._conns[conn_id])
465+
444466
return self._conns[conn_id]
445467

446468
async def ready(self, node_id, *, group=ConnectionGroup.DEFAULT):

aiokafka/consumer/consumer.py

100644100755
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -321,6 +321,21 @@ def __del__(self, _warnings=warnings):
321321
context['source_traceback'] = self._source_traceback
322322
self._loop.call_exception_handler(context)
323323

324+
def set_on_connection_closed_callback(self, on_connection_closed_callback):
325+
"""Set a callback function to invoke when a connection to Kafka is closed
326+
Arguments:
327+
on_connection_closed_callback: a callback function to call
328+
"""
329+
self._client.set_on_connection_closed_callback(on_connection_closed_callback);
330+
331+
def set_on_connection_opened_callback(self, on_connection_opened_callback):
332+
"""Set a callback function to invoke when a connection to Kafka is opened
333+
Arguments:
334+
on_connection_opened_callback: a callback function to call
335+
"""
336+
self._client.set_on_connection_opened_callback(on_connection_opened_callback);
337+
338+
324339
async def start(self):
325340
""" Connect to Kafka cluster. This will:
326341

aiokafka/producer/producer.py

100644100755
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -549,6 +549,20 @@ async def send_offsets_to_transaction(self, offsets, group_id):
549549
fut = self._txn_manager.add_offsets_to_txn(formatted_offsets, group_id)
550550
await asyncio.shield(fut, loop=self._loop)
551551

552+
def set_on_connection_closed_callback(self, on_connection_closed_callback):
553+
"""Set a callback function to invoke when a connection to Kafka is closed
554+
Arguments:
555+
on_connection_closed_callback: a callback function to call
556+
"""
557+
self.client.set_on_connection_closed_callback(on_connection_closed_callback);
558+
559+
def set_on_connection_opened_callback(self, on_connection_opened_callback):
560+
"""Set a callback function to invoke when a connection to Kafka is opened
561+
Arguments:
562+
on_connection_opened_callback: a callback function to call
563+
"""
564+
self.client.set_on_connection_opened_callback(on_connection_opened_callback);
565+
552566

553567
class TransactionContext:
554568

0 commit comments

Comments
 (0)