Skip to content

Commit 6d06c50

Browse files
committed
feat: add producer connectivity functionality
1 parent 36d6fd6 commit 6d06c50

File tree

2 files changed

+13
-0
lines changed

2 files changed

+13
-0
lines changed

pulsar/asyncio.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,12 @@ def last_sequence_id(self):
220220
message was ever published.
221221
"""
222222
return self._producer.last_sequence_id()
223+
224+
def is_connected(self) -> bool:
225+
"""
226+
Check if the producer is connected or not.
227+
"""
228+
return self._producer.is_connected()
223229

224230
class Consumer:
225231
"""

tests/asyncio_test.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,13 @@ async def test_close_producer(self):
156156
except PulsarException as e:
157157
self.assertEqual(e.error(), pulsar.Result.AlreadyClosed)
158158

159+
async def test_producer_is_connected(self):
160+
topic = f'asyncio-test-producer-is-connected-{time.time()}'
161+
producer = await self._client.create_producer(topic)
162+
self.assertTrue(producer.is_connected())
163+
await producer.close()
164+
self.assertFalse(producer.is_connected())
165+
159166
async def _prepare_messages(self, producer: Producer) -> List[pulsar.MessageId]:
160167
msg_ids = []
161168
for i in range(5):

0 commit comments

Comments
 (0)