Skip to content

Commit d6d717b

Browse files
Add stop method for MergeNamed and Merge channels (#68)
Both channels stores async tasks that listen on the receivers. If they are removed without stopping the receivers tasks, then user gets error: Task was destroyed but it is pending! `stop` method should be called when channel is no longer needed. Destructor can't be async, so we do that with extra method `stop`. Signed-off-by: ela-kotulska-frequenz <[email protected]>
2 parents 32a091a + dd5d3a7 commit d6d717b

File tree

3 files changed

+23
-8
lines changed

3 files changed

+23
-8
lines changed

RELEASE_NOTES.md

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,10 @@
22

33
## Summary
44

5-
<!-- Here goes a general summary of what this release is about -->
6-
75
## Upgrading
86

9-
<!-- Here goes notes on how to upgrade from previous versions, including if there are any depractions and what they should be replaced with -->
10-
117
## New Features
128

13-
<!-- Here goes the main new features and examples or instructions on how to use them -->
9+
* Add method to stop `Merge` and `MergeNamed`.
1410

1511
## Bug Fixes
16-
17-
<!-- Here goes notable bug fixes that are worth a special mention or explanation -->

src/frequenz/channels/util/_merge.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@ class Merge(Receiver[T]):
2424
# do something with msg
2525
pass
2626
```
27+
28+
When `merge` is no longer needed, then it should be stopped using
29+
`self.stop()` method. This will cleanup any internal pending async tasks.
2730
"""
2831

2932
def __init__(self, *args: Receiver[T]) -> None:
@@ -44,6 +47,13 @@ def __del__(self) -> None:
4447
for task in self._pending:
4548
task.cancel()
4649

50+
async def stop(self) -> None:
51+
"""Stop the `Merge` instance and cleanup any pending tasks."""
52+
for task in self._pending:
53+
task.cancel()
54+
await asyncio.gather(*self._pending, return_exceptions=True)
55+
self._pending = set()
56+
4757
async def ready(self) -> None:
4858
"""Wait until the receiver is ready with a value.
4959

src/frequenz/channels/util/_merge_named.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,11 @@
1111

1212

1313
class MergeNamed(Receiver[Tuple[str, T]]):
14-
"""Merge messages coming from multiple named channels into a single stream."""
14+
"""Merge messages coming from multiple named channels into a single stream.
15+
16+
When `MergeNamed` is no longer needed, then it should be stopped using
17+
`self.stop()` method. This will cleanup any internal pending async tasks.
18+
"""
1519

1620
def __init__(self, **kwargs: Receiver[T]) -> None:
1721
"""Create a `MergeNamed` instance.
@@ -31,6 +35,13 @@ def __del__(self) -> None:
3135
for task in self._pending:
3236
task.cancel()
3337

38+
async def stop(self) -> None:
39+
"""Stop the `MergeNamed` instance and cleanup any pending tasks."""
40+
for task in self._pending:
41+
task.cancel()
42+
await asyncio.gather(*self._pending, return_exceptions=True)
43+
self._pending = set()
44+
3445
async def ready(self) -> None:
3546
"""Wait until there's a message in any of the channels.
3647

0 commit comments

Comments
 (0)