Skip to content

Commit dd5d3a7

Browse files
Add stop method for MergeNamed and Merge channels
Both channels stores async tasks that listen on the receivers. If they are removed without stopping the receivers tasks, then user gets error: Task was destroyed but it is pending! `stop` method should be called when channel is no longer needed. Destructor can't be async, so we do that with extra method `stop`. Signed-off-by: ela-kotulska-frequenz <[email protected]>
1 parent 32a091a commit dd5d3a7

File tree

3 files changed

+23
-8
lines changed

3 files changed

+23
-8
lines changed

RELEASE_NOTES.md

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,10 @@
22

33
## Summary
44

5-
<!-- Here goes a general summary of what this release is about -->
6-
75
## Upgrading
86

9-
<!-- Here goes notes on how to upgrade from previous versions, including if there are any depractions and what they should be replaced with -->
10-
117
## New Features
128

13-
<!-- Here goes the main new features and examples or instructions on how to use them -->
9+
* Add method to stop `Merge` and `MergeNamed`.
1410

1511
## Bug Fixes
16-
17-
<!-- Here goes notable bug fixes that are worth a special mention or explanation -->

src/frequenz/channels/util/_merge.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@ class Merge(Receiver[T]):
2424
# do something with msg
2525
pass
2626
```
27+
28+
When `merge` is no longer needed, then it should be stopped using
29+
`self.stop()` method. This will cleanup any internal pending async tasks.
2730
"""
2831

2932
def __init__(self, *args: Receiver[T]) -> None:
@@ -44,6 +47,13 @@ def __del__(self) -> None:
4447
for task in self._pending:
4548
task.cancel()
4649

50+
async def stop(self) -> None:
51+
"""Stop the `Merge` instance and cleanup any pending tasks."""
52+
for task in self._pending:
53+
task.cancel()
54+
await asyncio.gather(*self._pending, return_exceptions=True)
55+
self._pending = set()
56+
4757
async def ready(self) -> None:
4858
"""Wait until the receiver is ready with a value.
4959

src/frequenz/channels/util/_merge_named.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,11 @@
1111

1212

1313
class MergeNamed(Receiver[Tuple[str, T]]):
14-
"""Merge messages coming from multiple named channels into a single stream."""
14+
"""Merge messages coming from multiple named channels into a single stream.
15+
16+
When `MergeNamed` is no longer needed, then it should be stopped using
17+
`self.stop()` method. This will cleanup any internal pending async tasks.
18+
"""
1519

1620
def __init__(self, **kwargs: Receiver[T]) -> None:
1721
"""Create a `MergeNamed` instance.
@@ -31,6 +35,13 @@ def __del__(self) -> None:
3135
for task in self._pending:
3236
task.cancel()
3337

38+
async def stop(self) -> None:
39+
"""Stop the `MergeNamed` instance and cleanup any pending tasks."""
40+
for task in self._pending:
41+
task.cancel()
42+
await asyncio.gather(*self._pending, return_exceptions=True)
43+
self._pending = set()
44+
3445
async def ready(self) -> None:
3546
"""Wait until there's a message in any of the channels.
3647

0 commit comments

Comments
 (0)