Skip to content

Commit a3dc641

Browse files
Implement fork method in the Receiver
This is useful for scenarios where multiple consumers need to process the same stream of messages.
1 parent 429378b commit a3dc641

File tree

10 files changed

+238
-5
lines changed

10 files changed

+238
-5
lines changed

RELEASE_NOTES.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@
1111
## New Features
1212

1313
- An optional `tick_at_start` parameter has been added to `Timer`. When `True`, the timer will trigger immediately after starting, and then wait for the interval before triggering again.
14+
- Add `Receiver.fork` method to create independent clones of the receiver.
15+
- Useful for scenarios where multiple consumers need to process the same stream of messages. Each forked receiver.
16+
- Each forked receiver maintains its own independent message queue
1417

1518
## Bug Fixes
1619

pyproject.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ classifiers = [
2626
]
2727
requires-python = ">= 3.11, < 4"
2828
dependencies = [
29-
"typing-extensions >= 4.5.0, < 5",
29+
"typing-extensions >= 4.11.0, < 5",
3030
"watchfiles >= 0.15.0, < 1.1.0",
3131
]
3232
dynamic = ["version"]
@@ -39,7 +39,7 @@ email = "[email protected]"
3939
dev-flake8 = [
4040
"flake8 == 7.1.1",
4141
"flake8-docstrings == 1.7.0",
42-
"flake8-pyproject == 1.2.3", # For reading the flake8 config from pyproject.toml
42+
"flake8-pyproject == 1.2.3", # For reading the flake8 config from pyproject.toml
4343
"pydoclint == 0.6.0",
4444
"pydocstyle == 6.3.0",
4545
]

src/frequenz/channels/_anycast.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -463,6 +463,25 @@ def close(self) -> None:
463463
"""
464464
self._closed = True
465465

466+
@override
467+
def fork(self, *, name: str | None = None) -> "Receiver[_T]":
468+
"""Create a new receiver that is a clone of this receiver.
469+
470+
Args:
471+
name: An optional name for the new receiver. This is ignored as Anycast
472+
receivers don't have names.
473+
474+
Returns:
475+
A new receiver that is a clone of this receiver.
476+
477+
Raises:
478+
ReceiverStoppedError: If the receiver is closed.
479+
"""
480+
if self._closed:
481+
raise ReceiverStoppedError(self)
482+
483+
return self._channel.new_receiver()
484+
466485
def __str__(self) -> str:
467486
"""Return a string representation of this receiver."""
468487
return f"{self._channel}:{type(self).__name__}"

src/frequenz/channels/_broadcast.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -508,6 +508,33 @@ def close(self) -> None:
508508
hash(self), None
509509
)
510510

511+
@override
512+
def fork(self, *, name: str | None = None) -> "Receiver[_T]":
513+
"""Create a new receiver that is a clone of this receiver.
514+
515+
Args:
516+
name: An optional name for the new receiver. If None, a new name will be
517+
generated based on the receiver's id.
518+
519+
Returns:
520+
A new receiver that is a clone of this receiver.
521+
522+
Raises:
523+
ReceiverStoppedError: If the receiver is closed.
524+
"""
525+
if self._closed:
526+
raise ReceiverStoppedError(self)
527+
528+
limit = self._q.maxlen
529+
assert limit is not None
530+
531+
fork_name = name if name is not None else None
532+
533+
# Create a new receiver with the same configuration
534+
return self._channel.new_receiver(
535+
name=fork_name, limit=limit, warn_on_overflow=self._warn_on_overflow
536+
)
537+
511538
def __str__(self) -> str:
512539
"""Return a string representation of this receiver."""
513540
return f"{self._channel}:{type(self).__name__}"

src/frequenz/channels/_merge.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,35 @@ def close(self) -> None:
206206
for recv in self._receivers.values():
207207
recv.close()
208208

209+
@override
210+
def fork(self, *, name: str | None = None) -> "Merger[ReceiverMessageT_co]":
211+
"""Create a new receiver that is a clone of this receiver.
212+
213+
Args:
214+
name: An optional name for the new receiver. If None, the same naming
215+
approach as the original merger will be used.
216+
217+
Returns:
218+
A new receiver that is a clone of this receiver.
219+
"""
220+
# Fork all the underlying not stopped receivers
221+
222+
forked_receivers: list[Receiver[ReceiverMessageT_co]] = []
223+
for recv_name, recv in self._receivers.items():
224+
# Don't fork stopped receivers
225+
try:
226+
forked = recv.fork(name=recv_name)
227+
except ReceiverStoppedError:
228+
continue
229+
else:
230+
forked_receivers.append(forked)
231+
232+
# Use the provided name or the same approach as original
233+
fork_name = name if name is not None else self._name
234+
235+
# Create a new merger with the forked receivers
236+
return Merger(*forked_receivers, name=fork_name)
237+
209238
def __str__(self) -> str:
210239
"""Return a string representation of this receiver."""
211240
if len(self._receivers) > 3:

src/frequenz/channels/_receiver.py

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,20 @@ def close(self) -> None:
239239
"""
240240
raise NotImplementedError("close() must be implemented by subclasses")
241241

242+
@abstractmethod
243+
def fork(self, *, name: str | None = None) -> "Receiver[ReceiverMessageT_co]":
244+
"""Create a new receiver that is a clone of this receiver.
245+
246+
Args:
247+
name: An optional name for the new receiver.
248+
249+
Returns:
250+
A new receiver that is a clone of this receiver.
251+
252+
Raises:
253+
ReceiverStoppedError: If the receiver is stopped.
254+
"""
255+
242256
def __aiter__(self) -> Self:
243257
"""Get an async iterator over the received messages.
244258
@@ -496,6 +510,24 @@ def close(self) -> None:
496510
"""
497511
self._receiver.close()
498512

513+
@override
514+
def fork(
515+
self, *, name: str | None = None
516+
) -> "_Mapper[ReceiverMessageT_co, MappedMessageT_co]":
517+
"""Create a new receiver that is a clone of this receiver.
518+
519+
Args:
520+
name: An optional name for the new receiver. This is ignored since mapper
521+
receivers don't have names.
522+
523+
Returns:
524+
A new receiver that is a clone of this receiver.
525+
"""
526+
return _Mapper(
527+
receiver=self._receiver.fork(name=name),
528+
mapping_function=self._mapping_function,
529+
)
530+
499531
def __str__(self) -> str:
500532
"""Return a string representation of the mapper."""
501533
return f"{type(self).__name__}:{self._receiver}:{self._mapping_function}"
@@ -573,7 +605,7 @@ def consume(self) -> ReceiverMessageT_co:
573605
The next message that was received.
574606
575607
Raises:
576-
ReceiverStoppedError: If the receiver stopped producing messages.
608+
ReceiverStoppedError: If the receiver is stopped.
577609
"""
578610
if self._recv_closed:
579611
raise ReceiverStoppedError(self)
@@ -595,6 +627,22 @@ def close(self) -> None:
595627
"""
596628
self._receiver.close()
597629

630+
@override
631+
def fork(self, *, name: str | None = None) -> "_Filter[ReceiverMessageT_co]":
632+
"""Create a new receiver that is a clone of this receiver.
633+
634+
Args:
635+
name: An optional name for the new receiver. This is ignored since filter
636+
receivers don't have names.
637+
638+
Returns:
639+
A new receiver that is a clone of this receiver.
640+
"""
641+
return _Filter(
642+
receiver=self._receiver.fork(name=name),
643+
filter_function=self._filter_function,
644+
)
645+
598646
def __str__(self) -> str:
599647
"""Return a string representation of the filter."""
600648
return f"{type(self).__name__}:{self._receiver}:{self._filter_function}"

src/frequenz/channels/event.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,25 @@ def close(self) -> None:
177177
"""Close this receiver."""
178178
self.stop()
179179

180+
@override
181+
def fork(self, *, name: str | None = None) -> "Event":
182+
"""Create a new receiver that is a clone of this receiver.
183+
184+
Args:
185+
name: An optional name for the new receiver. If None, an id-based name
186+
will be used.
187+
188+
Returns:
189+
A new Event receiver that is a clone of this receiver.
190+
191+
Raises:
192+
ReceiverStoppedError: If this receiver is stopped.
193+
"""
194+
if self._is_stopped:
195+
raise ReceiverStoppedError(self)
196+
197+
return Event(name=name)
198+
180199
def __str__(self) -> str:
181200
"""Return a string representation of this event."""
182201
return f"{type(self).__name__}({self._name!r})"

src/frequenz/channels/file_watcher.py

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ class Event:
5656
"""The path where the change was observed."""
5757

5858

59-
class FileWatcher(Receiver[Event]):
59+
class FileWatcher(Receiver[Event]): # pylint: disable=too-many-instance-attributes
6060
"""A receiver that watches for file events.
6161
6262
# Usage
@@ -147,7 +147,8 @@ def __init__(
147147
polling is enabled.
148148
"""
149149
self.event_types: frozenset[EventType] = frozenset(event_types)
150-
"""The types of events to watch for."""
150+
self._force_polling: bool = force_polling
151+
self._polling_interval: timedelta = polling_interval
151152

152153
self._stop_event: asyncio.Event = asyncio.Event()
153154
self._paths: list[pathlib.Path] = [
@@ -250,3 +251,29 @@ def __str__(self) -> str:
250251
def __repr__(self) -> str:
251252
"""Return a string representation of this receiver."""
252253
return f"{type(self).__name__}({self._paths!r}, {self.event_types!r})"
254+
255+
@override
256+
def fork(self, *, name: str | None = None) -> "FileWatcher":
257+
"""Create a new receiver that is a clone of this receiver.
258+
259+
Args:
260+
name: An optional name for the new receiver. This is ignored since FileWatcher
261+
receivers don't have names.
262+
263+
Returns:
264+
A new receiver that is a clone of this receiver.
265+
266+
Raises:
267+
ReceiverStoppedError: If this receiver is stopped.
268+
"""
269+
if self._awatch_stopped_exc is not None:
270+
raise ReceiverStoppedError(self)
271+
272+
return FileWatcher(
273+
# list[pathlib.Path] is the correct type ( expected list[pathlib.Path | str] )
274+
# but mypy doesn't know that
275+
paths=self._paths, # type: ignore
276+
event_types=self.event_types,
277+
force_polling=self._force_polling,
278+
polling_interval=self._polling_interval,
279+
)

src/frequenz/channels/timer.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -789,3 +789,36 @@ def __repr__(self) -> str:
789789
f"{type(self).__name__}<{self.interval=}, {self.missed_tick_policy=}, "
790790
f"{self.loop=}, {self.is_running=}>"
791791
)
792+
793+
@override
794+
def fork(self, *, name: str | None = None) -> "Timer":
795+
"""Create a new receiver that is a clone of this receiver.
796+
797+
Args:
798+
name: An optional name for the new receiver. This is ignored since Timer
799+
receivers don't have names.
800+
801+
Returns:
802+
A new receiver that is a clone of this receiver.
803+
804+
Raises:
805+
ReceiverStoppedError: If the timer was stopped via `stop()`.
806+
"""
807+
if self._stopped:
808+
raise ReceiverStoppedError(self)
809+
810+
# Create a new timer with the same configuration
811+
new_timer = Timer(
812+
self.interval,
813+
self.missed_tick_policy,
814+
auto_start=self.is_running,
815+
loop=self.loop,
816+
)
817+
818+
# If the original timer has a next tick time set, sync the new timer
819+
if self._next_tick_time is not None:
820+
new_timer._next_tick_time = ( # pylint: disable=protected-access
821+
self._next_tick_time
822+
)
823+
824+
return new_timer

tests/test_broadcast.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66

77
import asyncio
8+
from contextlib import closing
89
from dataclasses import dataclass
910
from typing import TypeGuard, assert_never
1011

@@ -425,3 +426,30 @@ async def test_broadcast_close_receiver() -> None:
425426

426427
with pytest.raises(ReceiverStoppedError):
427428
_ = await receiver_2.receive()
429+
430+
431+
async def test_receiver_fork() -> None:
432+
"""Ensure that a receiver can be forked."""
433+
chan = Broadcast[int](name="input-chan")
434+
435+
with (
436+
closing(Broadcast[int](name="input-chan")) as chan,
437+
closing(chan.new_receiver()) as receiver,
438+
closing(receiver.fork()) as forked_receiver,
439+
):
440+
sender = chan.new_sender()
441+
await sender.send(1)
442+
443+
assert (await receiver.receive()) == 1
444+
assert (await forked_receiver.receive()) == 1
445+
446+
447+
async def test_fork_stopped_receiver() -> None:
448+
"""Ensure that a receiver can be forked."""
449+
chan = Broadcast[int](name="input-chan")
450+
451+
receiver = chan.new_receiver()
452+
receiver.close()
453+
454+
with pytest.raises(ReceiverStoppedError):
455+
_ = receiver.fork()

0 commit comments

Comments
 (0)