Skip to content

Commit 8cfeac6

Browse files
authored
feat: add shutdown support to async client (#290)
1 parent af27edd commit 8cfeac6

File tree

2 files changed

+21
-0
lines changed

2 files changed

+21
-0
lines changed

pulsar/asyncio.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -777,6 +777,16 @@ async def subscribe(self, topic: Union[str, List[str]],
777777
schema.attach_client(self._client)
778778
return Consumer(await future, schema)
779779

780+
def shutdown(self) -> None:
781+
"""
782+
Shutdown the client and all the associated producers and consumers
783+
784+
Raises
785+
------
786+
PulsarException
787+
"""
788+
self._client.shutdown()
789+
780790
async def get_topic_partitions(self, topic: str) -> List[str]:
781791
"""
782792
Get the list of partitions for a given topic in asynchronous mode.

tests/asyncio_test.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,17 @@ async def test_producer_is_connected(self):
227227
await producer.close()
228228
self.assertFalse(producer.is_connected())
229229

230+
async def test_shutdown_client(self):
231+
producer = await self._client.create_producer("persistent://public/default/partitioned_topic_name_test")
232+
await producer.send(b"hello")
233+
self._client.shutdown()
234+
235+
try:
236+
await producer.send(b"hello")
237+
self.fail("Expected AlreadyClosed exception after client shutdown")
238+
except PulsarException as e:
239+
self.assertEqual(e.error(), pulsar.Result.AlreadyClosed)
240+
230241
async def _prepare_messages(self, producer: Producer) -> List[pulsar.MessageId]:
231242
msg_ids = []
232243
for i in range(5):

0 commit comments

Comments
 (0)