Skip to content

Commit cf133d1

Browse files
committed
Make Receiver.close method async
This makes it consistent with the channel close method, which is async. It also makes it easy to fully stop any pending tasks, as is the case with `merge`. Signed-off-by: Sahas Subramanian <[email protected]>
1 parent 1b1cfd6 commit cf133d1

File tree

7 files changed

+16
-12
lines changed

7 files changed

+16
-12
lines changed

src/frequenz/channels/_anycast.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -456,7 +456,7 @@ def consume(self) -> _T:
456456
return next_val
457457

458458
@override
459-
def close(self) -> None:
459+
async def close(self) -> None:
460460
"""Close this receiver.
461461
462462
After closing, the receiver will not be able to receive any more messages.

src/frequenz/channels/_broadcast.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -496,7 +496,7 @@ def consume(self) -> _T:
496496
return self._q.popleft()
497497

498498
@override
499-
def close(self) -> None:
499+
async def close(self) -> None:
500500
"""Close the receiver.
501501
502502
After calling this method, new messages will not be received. Once the

src/frequenz/channels/_merge.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ def consume(self) -> ReceiverMessageT_co:
192192
return self._results.popleft()
193193

194194
@override
195-
def close(self) -> None:
195+
async def close(self) -> None:
196196
"""Close the receiver.
197197
198198
After calling this method, new messages will not be received. Once the
@@ -202,9 +202,13 @@ def close(self) -> None:
202202
for task in self._pending:
203203
if not task.done() and task.get_loop().is_running():
204204
task.cancel()
205+
try:
206+
await task
207+
except asyncio.CancelledError:
208+
pass
205209
self._pending = set()
206210
for recv in self._receivers.values():
207-
recv.close()
211+
await recv.close()
208212

209213
def __str__(self) -> str:
210214
"""Return a string representation of this receiver."""

src/frequenz/channels/_receiver.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,7 @@ def consume(self) -> ReceiverMessageT_co:
218218
"""
219219

220220
@abstractmethod
221-
def close(self) -> None:
221+
async def close(self) -> None:
222222
"""Close the receiver.
223223
224224
After calling this method, new messages will not be received. Once the
@@ -474,14 +474,14 @@ def consume(self) -> MappedMessageT_co: # noqa: DOC502
474474
return self._mapping_function(self._receiver.consume())
475475

476476
@override
477-
def close(self) -> None:
477+
async def close(self) -> None:
478478
"""Close the receiver.
479479
480480
After calling this method, new messages will not be received. Once the
481481
receiver's buffer is drained, trying to receive a message will raise a
482482
[`ReceiverStoppedError`][frequenz.channels.ReceiverStoppedError].
483483
"""
484-
self._receiver.close()
484+
await self._receiver.close()
485485

486486
def __str__(self) -> str:
487487
"""Return a string representation of the mapper."""
@@ -573,14 +573,14 @@ def consume(self) -> ReceiverMessageT_co:
573573
return message
574574

575575
@override
576-
def close(self) -> None:
576+
async def close(self) -> None:
577577
"""Close the receiver.
578578
579579
After calling this method, new messages will not be received. Once the
580580
receiver's buffer is drained, trying to receive a message will raise a
581581
[`ReceiverStoppedError`][frequenz.channels.ReceiverStoppedError].
582582
"""
583-
self._receiver.close()
583+
await self._receiver.close()
584584

585585
def __str__(self) -> str:
586586
"""Return a string representation of the filter."""

src/frequenz/channels/event.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ def consume(self) -> None:
173173
self._event.clear()
174174

175175
@override
176-
def close(self) -> None:
176+
async def close(self) -> None:
177177
"""Close this receiver."""
178178
self.stop()
179179

src/frequenz/channels/file_watcher.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,7 @@ def consume(self) -> Event:
233233
return Event(type=EventType(change), path=pathlib.Path(path_str))
234234

235235
@override
236-
def close(self) -> None:
236+
async def close(self) -> None:
237237
"""Close this receiver."""
238238
self._stop_event.set()
239239

src/frequenz/channels/timer.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -746,7 +746,7 @@ def consume(self) -> timedelta:
746746
return drift
747747

748748
@override
749-
def close(self) -> None:
749+
async def close(self) -> None:
750750
"""Close the timer."""
751751
self.stop()
752752

0 commit comments

Comments
 (0)