Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
## New Features

- An optional `tick_at_start` parameter has been added to `Timer`. When `True`, the timer will trigger immediately after starting, and then wait for the interval before triggering again.
- Add `Receiver.fork` method to create independent clones of the receiver.
- Useful for scenarios where multiple consumers need to process the same stream of messages. Each forked receiver.
- Each forked receiver maintains its own independent message queue

## Bug Fixes

Expand Down
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ classifiers = [
]
requires-python = ">= 3.11, < 4"
dependencies = [
"typing-extensions >= 4.5.0, < 5",
"typing-extensions >= 4.11.0, < 5",
"watchfiles >= 0.15.0, < 1.1.0",
]
dynamic = ["version"]
Expand All @@ -39,7 +39,7 @@ email = "[email protected]"
dev-flake8 = [
"flake8 == 7.1.1",
"flake8-docstrings == 1.7.0",
"flake8-pyproject == 1.2.3", # For reading the flake8 config from pyproject.toml
"flake8-pyproject == 1.2.3", # For reading the flake8 config from pyproject.toml
"pydoclint == 0.6.0",
"pydocstyle == 6.3.0",
]
Expand Down
19 changes: 19 additions & 0 deletions src/frequenz/channels/_anycast.py
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,25 @@ def close(self) -> None:
"""
self._closed = True

@override
def fork(self, *, name: str | None = None) -> "Receiver[_T]":
"""Create a new receiver that is a clone of this receiver.

Args:
name: An optional name for the new receiver. This is ignored as Anycast
receivers don't have names.

Returns:
A new receiver that is a clone of this receiver.

Raises:
ReceiverStoppedError: If the receiver is closed.
"""
if self._closed:
raise ReceiverStoppedError(self)

return self._channel.new_receiver()

def __str__(self) -> str:
"""Return a string representation of this receiver."""
return f"{self._channel}:{type(self).__name__}"
Expand Down
27 changes: 27 additions & 0 deletions src/frequenz/channels/_broadcast.py
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,33 @@ def close(self) -> None:
hash(self), None
)

@override
def fork(self, *, name: str | None = None) -> "Receiver[_T]":
"""Create a new receiver that is a clone of this receiver.

Args:
name: An optional name for the new receiver. If None, a new name will be
generated based on the receiver's id.

Returns:
A new receiver that is a clone of this receiver.

Raises:
ReceiverStoppedError: If the receiver is closed.
"""
if self._closed:
raise ReceiverStoppedError(self)

limit = self._q.maxlen
assert limit is not None

fork_name = name if name is not None else None

# Create a new receiver with the same configuration
return self._channel.new_receiver(
name=fork_name, limit=limit, warn_on_overflow=self._warn_on_overflow
)

def __str__(self) -> str:
"""Return a string representation of this receiver."""
return f"{self._channel}:{type(self).__name__}"
Expand Down
29 changes: 29 additions & 0 deletions src/frequenz/channels/_merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,35 @@ def close(self) -> None:
for recv in self._receivers.values():
recv.close()

@override
def fork(self, *, name: str | None = None) -> "Merger[ReceiverMessageT_co]":
"""Create a new receiver that is a clone of this receiver.

Args:
name: An optional name for the new receiver. If None, the same naming
approach as the original merger will be used.

Returns:
A new receiver that is a clone of this receiver.
"""
# Fork all the underlying not stopped receivers

forked_receivers: list[Receiver[ReceiverMessageT_co]] = []
for recv_name, recv in self._receivers.items():
# Don't fork stopped receivers
try:
forked = recv.fork(name=recv_name)
except ReceiverStoppedError:
continue
else:
forked_receivers.append(forked)

# Use the provided name or the same approach as original
fork_name = name if name is not None else self._name

# Create a new merger with the forked receivers
return Merger(*forked_receivers, name=fork_name)

def __str__(self) -> str:
"""Return a string representation of this receiver."""
if len(self._receivers) > 3:
Expand Down
50 changes: 49 additions & 1 deletion src/frequenz/channels/_receiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,20 @@ def close(self) -> None:
"""
raise NotImplementedError("close() must be implemented by subclasses")

@abstractmethod
def fork(self, *, name: str | None = None) -> "Receiver[ReceiverMessageT_co]":
"""Create a new receiver that is a clone of this receiver.

Args:
name: An optional name for the new receiver.

Returns:
A new receiver that is a clone of this receiver.

Raises:
ReceiverStoppedError: If the receiver is stopped.
"""

def __aiter__(self) -> Self:
"""Get an async iterator over the received messages.

Expand Down Expand Up @@ -496,6 +510,24 @@ def close(self) -> None:
"""
self._receiver.close()

@override
def fork(
self, *, name: str | None = None
) -> "_Mapper[ReceiverMessageT_co, MappedMessageT_co]":
"""Create a new receiver that is a clone of this receiver.

Args:
name: An optional name for the new receiver. This is ignored since mapper
receivers don't have names.

Returns:
A new receiver that is a clone of this receiver.
"""
return _Mapper(
receiver=self._receiver.fork(name=name),
mapping_function=self._mapping_function,
)

def __str__(self) -> str:
"""Return a string representation of the mapper."""
return f"{type(self).__name__}:{self._receiver}:{self._mapping_function}"
Expand Down Expand Up @@ -573,7 +605,7 @@ def consume(self) -> ReceiverMessageT_co:
The next message that was received.

Raises:
ReceiverStoppedError: If the receiver stopped producing messages.
ReceiverStoppedError: If the receiver is stopped.
"""
if self._recv_closed:
raise ReceiverStoppedError(self)
Expand All @@ -595,6 +627,22 @@ def close(self) -> None:
"""
self._receiver.close()

@override
def fork(self, *, name: str | None = None) -> "_Filter[ReceiverMessageT_co]":
"""Create a new receiver that is a clone of this receiver.

Args:
name: An optional name for the new receiver. This is ignored since filter
receivers don't have names.

Returns:
A new receiver that is a clone of this receiver.
"""
return _Filter(
receiver=self._receiver.fork(name=name),
filter_function=self._filter_function,
)

def __str__(self) -> str:
"""Return a string representation of the filter."""
return f"{type(self).__name__}:{self._receiver}:{self._filter_function}"
Expand Down
19 changes: 19 additions & 0 deletions src/frequenz/channels/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,25 @@ def close(self) -> None:
"""Close this receiver."""
self.stop()

@override
def fork(self, *, name: str | None = None) -> "Event":
"""Create a new receiver that is a clone of this receiver.

Args:
name: An optional name for the new receiver. If None, an id-based name
will be used.

Returns:
A new Event receiver that is a clone of this receiver.

Raises:
ReceiverStoppedError: If this receiver is stopped.
"""
if self._is_stopped:
raise ReceiverStoppedError(self)

return Event(name=name)

def __str__(self) -> str:
"""Return a string representation of this event."""
return f"{type(self).__name__}({self._name!r})"
Expand Down
31 changes: 29 additions & 2 deletions src/frequenz/channels/file_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class Event:
"""The path where the change was observed."""


class FileWatcher(Receiver[Event]):
class FileWatcher(Receiver[Event]): # pylint: disable=too-many-instance-attributes
"""A receiver that watches for file events.

# Usage
Expand Down Expand Up @@ -147,7 +147,8 @@ def __init__(
polling is enabled.
"""
self.event_types: frozenset[EventType] = frozenset(event_types)
"""The types of events to watch for."""
self._force_polling: bool = force_polling
self._polling_interval: timedelta = polling_interval

self._stop_event: asyncio.Event = asyncio.Event()
self._paths: list[pathlib.Path] = [
Expand Down Expand Up @@ -250,3 +251,29 @@ def __str__(self) -> str:
def __repr__(self) -> str:
"""Return a string representation of this receiver."""
return f"{type(self).__name__}({self._paths!r}, {self.event_types!r})"

@override
def fork(self, *, name: str | None = None) -> "FileWatcher":
"""Create a new receiver that is a clone of this receiver.

Args:
name: An optional name for the new receiver. This is ignored since FileWatcher
receivers don't have names.

Returns:
A new receiver that is a clone of this receiver.

Raises:
ReceiverStoppedError: If this receiver is stopped.
"""
if self._awatch_stopped_exc is not None:
raise ReceiverStoppedError(self)

return FileWatcher(
# list[pathlib.Path] is the correct type ( expected list[pathlib.Path | str] )
# but mypy doesn't know that
paths=self._paths, # type: ignore
event_types=self.event_types,
force_polling=self._force_polling,
polling_interval=self._polling_interval,
)
33 changes: 33 additions & 0 deletions src/frequenz/channels/timer.py
Original file line number Diff line number Diff line change
Expand Up @@ -789,3 +789,36 @@ def __repr__(self) -> str:
f"{type(self).__name__}<{self.interval=}, {self.missed_tick_policy=}, "
f"{self.loop=}, {self.is_running=}>"
)

@override
def fork(self, *, name: str | None = None) -> "Timer":
"""Create a new receiver that is a clone of this receiver.

Args:
name: An optional name for the new receiver. This is ignored since Timer
receivers don't have names.

Returns:
A new receiver that is a clone of this receiver.

Raises:
ReceiverStoppedError: If the timer was stopped via `stop()`.
"""
if self._stopped:
raise ReceiverStoppedError(self)

# Create a new timer with the same configuration
new_timer = Timer(
self.interval,
self.missed_tick_policy,
auto_start=self.is_running,
loop=self.loop,
)

# If the original timer has a next tick time set, sync the new timer
if self._next_tick_time is not None:
new_timer._next_tick_time = ( # pylint: disable=protected-access
self._next_tick_time
)

return new_timer
28 changes: 28 additions & 0 deletions tests/test_broadcast.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@


import asyncio
from contextlib import closing
from dataclasses import dataclass
from typing import TypeGuard, assert_never

Expand Down Expand Up @@ -425,3 +426,30 @@ async def test_broadcast_close_receiver() -> None:

with pytest.raises(ReceiverStoppedError):
_ = await receiver_2.receive()


async def test_receiver_fork() -> None:
"""Ensure that a receiver can be forked."""
chan = Broadcast[int](name="input-chan")

with (
closing(Broadcast[int](name="input-chan")) as chan,
closing(chan.new_receiver()) as receiver,
closing(receiver.fork()) as forked_receiver,
):
sender = chan.new_sender()
await sender.send(1)

assert (await receiver.receive()) == 1
assert (await forked_receiver.receive()) == 1


async def test_fork_stopped_receiver() -> None:
"""Ensure that a receiver can be forked."""
chan = Broadcast[int](name="input-chan")

receiver = chan.new_receiver()
receiver.close()

with pytest.raises(ReceiverStoppedError):
_ = receiver.fork()