diff --git a/CHANGES/1000.bugfix b/CHANGES/1000.bugfix new file mode 100644 index 00000000..99fb2a58 --- /dev/null +++ b/CHANGES/1000.bugfix @@ -0,0 +1 @@ +Fix CancelledError being raised from various .close() methods \ No newline at end of file diff --git a/aiokafka/consumer/fetcher.py b/aiokafka/consumer/fetcher.py index 131ad135..7158c52c 100644 --- a/aiokafka/consumer/fetcher.py +++ b/aiokafka/consumer/fetcher.py @@ -454,7 +454,8 @@ async def close(self): for x in self._pending_tasks: x.cancel() - await x + with contextlib.suppress(asyncio.CancelledError): + await x def _notify(self, future): if future is not None and not future.done(): @@ -512,7 +513,7 @@ def on_done(fut, self=self): # cancellation if not task.done(): task.cancel() - await task + await task self._pending_tasks.clear() self._records.clear() diff --git a/aiokafka/consumer/group_coordinator.py b/aiokafka/consumer/group_coordinator.py index 36d79c10..ac106287 100644 --- a/aiokafka/consumer/group_coordinator.py +++ b/aiokafka/consumer/group_coordinator.py @@ -1,5 +1,6 @@ import asyncio import collections +import contextlib import copy import logging import time @@ -180,7 +181,9 @@ def _group_subscription(self): async def close(self): self._reset_committed_task.cancel() - await self._reset_committed_task + with contextlib.suppress(asyncio.CancelledError): + await self._reset_committed_task + self._reset_committed_task = None def check_errors(self): @@ -759,7 +762,8 @@ async def _stop_heartbeat_task(self): if self._heartbeat_task is not None: if not self._heartbeat_task.done(): self._heartbeat_task.cancel() - await self._heartbeat_task + with contextlib.suppress(asyncio.CancelledError): + await self._heartbeat_task self._heartbeat_task = None async def _heartbeat_routine(self): diff --git a/aiokafka/producer/sender.py b/aiokafka/producer/sender.py index 519b5d34..a75b4f31 100644 --- a/aiokafka/producer/sender.py +++ b/aiokafka/producer/sender.py @@ -1,5 +1,6 @@ import asyncio import collections +import contextlib import logging import time @@ -96,7 +97,8 @@ def sender_task(self): async def close(self): if self._sender_task is not None and not self._sender_task.done(): self._sender_task.cancel() - await self._sender_task + with contextlib.suppress(asyncio.CancelledError): + await self._sender_task async def _sender_routine(self): """Background task, that sends pending batches to leader nodes for