diff --git a/README.md b/README.md
index e3bd5f61..8ec5f93f 100644
--- a/README.md
+++ b/README.md
@@ -28,6 +28,11 @@ channels](https://tour.golang.org/concurrency/2) but it also borrows ideas from
+!!! Note inline end
+
+ Newer Python versions and other operating systems and architectures might
+ work too, but they are not automatically tested, so we cannot guarantee it.
+
The following platforms are officially supported (tested):
- **Python:** 3.11
@@ -38,30 +43,214 @@ The following platforms are officially supported (tested):
## Quick Start
-We assume you are on a system with Python available. If that is not the case,
-please [download and install Python](https://www.python.org/downloads/) first.
+### Installing
-To install Frequenz Channels, you probably want to create a new virtual
-environment first. For example, if you use a `sh` compatible shell, you can do
-this:
+
-```sh
-python3 -m venv .venv
-. .venv/bin/activate
-```
+!!! Tip inline end
+
+ For more details please read the [Installation
+ Guide](docs/user-guide/installation.md).
-Then, just install using `pip`:
+Assuming a [supported](#supported-platforms) working Python environment:
```sh
python3 -m pip install frequenz-channels
```
+
+
+### Examples
+
+#### Hello World
+
+
+
+```python
+import asyncio
+
+from frequenz.channels import Anycast
+
+
+async def main() -> None:
+ hello_channel = Anycast[str](name="hello-world-channel")
+ sender = hello_channel.new_sender()
+ receiver = hello_channel.new_receiver()
+
+ await sender.send("Hello World!")
+ msg = await receiver.receive()
+ print(msg)
+
+
+asyncio.run(main())
+```
+
+
+
+#### Showcase
+
+
+
+This is a comprehensive example that shows most of the main features of the
+library:
+
+```python
+import asyncio
+from dataclasses import dataclass
+from datetime import timedelta
+from enum import Enum, auto
+from typing import assert_never
+
+from frequenz.channels import (
+ Anycast,
+ Broadcast,
+ Receiver,
+ Sender,
+ merge,
+ select,
+ selected_from,
+)
+from frequenz.channels.timer import Timer
+
+
+class Command(Enum):
+ PING = auto()
+ STOP_SENDER = auto()
+
+
+class ReplyCommand(Enum):
+ PONG = auto()
+
+
+@dataclass(frozen=True)
+class Reply:
+ reply: ReplyCommand
+ source: str
+
+
+async def send(
+ sender: Sender[str],
+ control_command: Receiver[Command],
+ control_reply: Sender[Reply],
+) -> None:
+ """Send a counter value every second, until a stop command is received."""
+ print(f"{sender}: Starting")
+ timer = Timer.periodic(timedelta(seconds=1.0))
+ counter = 0
+ async for selected in select(timer, control_command):
+ if selected_from(selected, timer):
+ print(f"{sender}: Sending {counter}")
+ await sender.send(f"{sender}: {counter}")
+ counter += 1
+ elif selected_from(selected, control_command):
+ print(f"{sender}: Received command: {selected.value.name}")
+ match selected.value:
+ case Command.STOP_SENDER:
+ print(f"{sender}: Stopping")
+ break
+ case Command.PING:
+ print(f"{sender}: Ping received, reply with pong")
+ await control_reply.send(Reply(ReplyCommand.PONG, str(sender)))
+ case _ as unknown:
+ assert_never(unknown)
+ print(f"{sender}: Finished")
+
+
+async def receive(
+ receivers: list[Receiver[str]],
+ control_command: Receiver[Command],
+ control_reply: Sender[Reply],
+) -> None:
+ """Receive data from multiple channels, until no more data is received for 2 seconds."""
+ print("receive: Starting")
+ timer = Timer.timeout(timedelta(seconds=2.0))
+ print(f"{timer=}")
+ merged = merge(*receivers)
+ async for selected in select(merged, timer, control_command):
+ if selected_from(selected, merged):
+ message = selected.value
+ print(f"receive: Received {message=}")
+ timer.reset()
+ print(f"{timer=}")
+ elif selected_from(selected, control_command):
+ print(f"receive: received command: {selected.value.name}")
+ match selected.value:
+ case Command.PING:
+ print("receive: Ping received, reply with pong")
+ await control_reply.send(Reply(ReplyCommand.PONG, "receive"))
+ case Command.STOP_SENDER:
+ pass # Ignore
+ case _ as unknown:
+ assert_never(unknown)
+ elif selected_from(selected, timer):
+ drift = selected.value
+ print(
+ f"receive: No data received for {timer.interval + drift} seconds, "
+ "giving up"
+ )
+ break
+ print("receive: Finished")
+
+
+async def main() -> None:
+ data_channel_1 = Anycast[str](name="data-channel-1")
+ data_channel_2 = Anycast[str](name="data-channel-2")
+ command_channel = Broadcast[Command](name="control-channel") # (1)!
+ reply_channel = Anycast[Reply](name="reply-channel")
+
+ async with asyncio.TaskGroup() as tasks:
+ tasks.create_task(
+ send(
+ data_channel_1.new_sender(),
+ command_channel.new_receiver(),
+ reply_channel.new_sender(),
+ ),
+ name="send-channel-1",
+ )
+ tasks.create_task(
+ send(
+ data_channel_2.new_sender(),
+ command_channel.new_receiver(),
+ reply_channel.new_sender(),
+ ),
+ name="send-channel-2",
+ )
+ tasks.create_task(
+ receive(
+ [data_channel_1.new_receiver(), data_channel_2.new_receiver()],
+ command_channel.new_receiver(),
+ reply_channel.new_sender(),
+ ),
+ name="receive",
+ )
+
+ control_sender = command_channel.new_sender()
+ reply_receiver = reply_channel.new_receiver()
+
+ # Send a ping command to all tasks and wait for the replies
+ await control_sender.send(Command.PING)
+ print(f"main: {await reply_receiver.receive()}")
+ print(f"main: {await reply_receiver.receive()}")
+ print(f"main: {await reply_receiver.receive()}")
+
+ await asyncio.sleep(5.0)
+
+ # Stop senders, after 2 seconds not receiving any data,
+ # the receiver will stop too
+ await control_sender.send(Command.STOP_SENDER)
+
+
+asyncio.run(main())
+```
+
+
+
## Documentation
-For more information, please visit the [documentation
+For more information, please read the [documentation
website](https://frequenz-floss.github.io/frequenz-channels-python/).
## Contributing
If you want to know how to build this project and contribute to it, please
-check out the [Contributing Guide](CONTRIBUTING.md).
+check out the [Contributing Guide](docs/CONTRIBUTING.md).
diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md
index 3322bcbc..918d46a9 100644
--- a/RELEASE_NOTES.md
+++ b/RELEASE_NOTES.md
@@ -101,6 +101,8 @@
## New Features
+* A new User's Guide was added to the documentation and the documentation was greately improved in general.
+
* A new `merge()` function was added to replace `Merge`.
* `Anycast`
diff --git a/docs/SUMMARY.md b/docs/SUMMARY.md
index 3755def6..6d7d06eb 100644
--- a/docs/SUMMARY.md
+++ b/docs/SUMMARY.md
@@ -1,3 +1,4 @@
* [Home](index.md)
+* [User Guide](user-guide/)
* [API Reference](reference/)
* [Contributing](CONTRIBUTING.md)
diff --git a/docs/index.md b/docs/index.md
index 17590073..cfe85510 100644
--- a/docs/index.md
+++ b/docs/index.md
@@ -16,66 +16,32 @@
end=""
%}
-## Installation
+## Quick Start
-First, you need to make sure you have Python installed (at least version 3.11):
+### Installing
-```console
-$ python3 --version
-Python 3.11.4
-```
-
-!!! note
-
- These instructions assume you are using a [POSIX compatible
- `sh`](https://pubs.opengroup.org/onlinepubs/9699919799/utilities/sh.html)
- shell.
-
-If that command doesn't print a version newer than 3.11.0, you'll need to
-[download and install Python](https://www.python.org/downloads/) first.
-
-To install Frequenz Channels, you probably want to create a new virtual
-environment first:
-
-```sh
-mkdir my-channels-project
-cd my-channels-project
-python3 -m venv .venv
-. .venv/bin/activate
-```
-
-!!! tip
-
- Using [`direnv`](https://direnv.net/) can greatly simplify this process as
- it automates the creation, activation, and deactivation of the virtual
- environment. The first time you enable `direnv`, the virtual environment
- will be created, and each time you enter or leave a subdirectory, it will be
- activated and deactivated, respectively.
-
- ```sh
- sudo apt install direnv # if you use Debian/Ubuntu
- mkdir my-channels-project
- cd my-channels-project
- echo "layout python python3" > .envrc
- direnv allow
- ```
+{%
+ include-markdown "../README.md"
+ start=""
+ end=""
+%}
- This will create the virtual environment and activate it automatically for you.
+### Examples
-Now you can install Frequenz Channels by using `pip` (if you don't have `pip` installed
-you can follow [the official instructions](https://pip.pypa.io/en/stable/installation/)):
+!!! Example "Hello World"
-```sh
-python3 -m pip install frequenz-channels
-```
+ {%
+ include-markdown "../README.md"
+ preserve-includer-indent=true
+ start=""
+ end=""
+ %}
-To verify that the installation worked, you can invoke the Python interpreter and
-import the `frequenz.channels` module:
+!!! Example "Showcase"
-```console
-$ python3
-Python 3.11.4 (main, Jun 7 2023, 10:13:09) [GCC 12.2.0] on linux
-Type "help", "copyright", "credits" or "license" for more information.
->>> import frequenz.channels
->>>
-```
+ {%
+ include-markdown "../README.md"
+ preserve-includer-indent=true
+ start=""
+ end=""
+ %}
diff --git a/docs/user-guide/SUMMARY.md b/docs/user-guide/SUMMARY.md
new file mode 100644
index 00000000..eada3abf
--- /dev/null
+++ b/docs/user-guide/SUMMARY.md
@@ -0,0 +1,7 @@
+* [Quick Start](quick-start.md)
+* [Installation](installation.md)
+* [Channels](channels/)
+* [Sending](sending.md)
+* [Receiving](receiving/)
+* [Error Handling](error-handling.md)
+* [Utilities](utilities/)
diff --git a/docs/user-guide/channels/anycast.md b/docs/user-guide/channels/anycast.md
new file mode 100644
index 00000000..eca01889
--- /dev/null
+++ b/docs/user-guide/channels/anycast.md
@@ -0,0 +1,10 @@
+# Anycast
+
+::: frequenz.channels._anycast.Anycast
+ options:
+ inherited_members: []
+ members: []
+ show_bases: false
+ show_root_heading: false
+ show_root_toc_entry: false
+ show_source: false
diff --git a/docs/user-guide/channels/broadcast.md b/docs/user-guide/channels/broadcast.md
new file mode 100644
index 00000000..f759bce3
--- /dev/null
+++ b/docs/user-guide/channels/broadcast.md
@@ -0,0 +1,10 @@
+# Broadcast
+
+::: frequenz.channels._broadcast.Broadcast
+ options:
+ inherited_members: []
+ members: []
+ show_bases: false
+ show_root_heading: false
+ show_root_toc_entry: false
+ show_source: false
diff --git a/docs/user-guide/channels/index.md b/docs/user-guide/channels/index.md
new file mode 100644
index 00000000..c6f2df64
--- /dev/null
+++ b/docs/user-guide/channels/index.md
@@ -0,0 +1,41 @@
+# Channels
+
+A channel is a communication mechanism that allows data (messages) to be
+transmitted between different coroutines. It consists of
+[senders](../sending.md), which send messages, and
+[receivers](../receiving/index.md), which receive those messages. The channel itself
+acts as a conduit for these messages.
+
+Conceptually, a channel looks like this:
+
+
+```bob
+.---------. Message .----------. Message .-----------.
+| Sender +------------>| Channel +------------>| Receiver |
+'---------' '----------' '-----------'
+```
+
+
+Besides this simple model, there are many variations of channels depending on
+various factors:
+
+* How many senders and receivers can a channel have?
+
+* Do all receivers receive all messages from all senders?
+
+* How many messages can a channel hold (buffered), or can it hold any messages
+ at all (unbuffered)?
+
+* What happens if a sender tries to send a message to a full channel?
+
+ * Does the send operation block until the channel has space again?
+ * Does it fail?
+ * Does it silently drop the message?
+
+Because these questions can have many answers, there are different types of
+channels. Frequenz Channels offers a few of them:
+
+* [Anycast](anycast.md)
+* [Broadcast](broadcast.md)
+
+More might be added in the future, and you can also create your own.
diff --git a/docs/user-guide/error-handling.md b/docs/user-guide/error-handling.md
new file mode 100644
index 00000000..fc48a0ca
--- /dev/null
+++ b/docs/user-guide/error-handling.md
@@ -0,0 +1,10 @@
+# Error Handling
+
+::: frequenz.channels._exceptions
+ options:
+ inherited_members: []
+ members: []
+ show_bases: false
+ show_root_heading: false
+ show_root_toc_entry: false
+ show_source: false
diff --git a/docs/user-guide/installation.md b/docs/user-guide/installation.md
new file mode 100644
index 00000000..dced442e
--- /dev/null
+++ b/docs/user-guide/installation.md
@@ -0,0 +1,71 @@
+# Installation
+
+{%
+ include-markdown "../../README.md"
+ start=""
+ end=""
+%}
+
+## Installation
+
+First, you need to make sure you have Python installed (at least version 3.11):
+
+!!! Note inline end
+
+ These instructions assume you are using a [POSIX compatible
+ `sh`](https://pubs.opengroup.org/onlinepubs/9699919799/utilities/sh.html)
+ shell.
+
+```console
+$ python3 --version
+Python 3.11.4
+```
+
+If that command doesn't print a version newer than 3.11.0, you'll need to
+[download and install Python](https://www.python.org/downloads/) first.
+
+To install Frequenz Channels, you probably want to create a new virtual
+environment first:
+
+```sh
+mkdir my-channels-project
+cd my-channels-project
+python3 -m venv .venv
+. .venv/bin/activate
+```
+
+!!! Tip
+
+ Using [`direnv`](https://direnv.net/) can greatly simplify this process as
+ it automates the creation, activation, and deactivation of the virtual
+ environment. The first time you enable `direnv`, the virtual environment
+ will be created, and each time you enter or leave a subdirectory, it will be
+ activated and deactivated, respectively.
+
+ ```sh
+ sudo apt install direnv # if you use Debian/Ubuntu
+ mkdir my-channels-project
+ cd my-channels-project
+ echo "layout python python3" > .envrc
+ direnv allow
+ ```
+
+ This will create the virtual environment and activate it automatically for you.
+
+Now you can install Frequenz Channels by using `pip` (if you don't have `pip` installed
+you can follow [the official instructions](https://pip.pypa.io/en/stable/installation/)):
+
+```sh
+python3 -m pip install frequenz-channels
+```
+
+To verify that the installation worked, you can invoke the Python interpreter and
+import the `frequenz.channels` module:
+
+```console
+$ python3
+Python 3.11.4 (main, Jun 7 2023, 10:13:09) [GCC 12.2.0] on linux
+Type "help", "copyright", "credits" or "license" for more information.
+>>> import frequenz.channels
+>>>
+```
diff --git a/docs/user-guide/quick-start.md b/docs/user-guide/quick-start.md
new file mode 100644
index 00000000..95f7bb0a
--- /dev/null
+++ b/docs/user-guide/quick-start.md
@@ -0,0 +1,29 @@
+# Quick Start
+
+## Installing
+
+{%
+ include-markdown "../../README.md"
+ start=""
+ end=""
+%}
+
+## Examples
+
+!!! Example "Hello World"
+
+ {%
+ include-markdown "../../README.md"
+ preserve-includer-indent=true
+ start=""
+ end=""
+ %}
+
+!!! Example "Showcase"
+
+ {%
+ include-markdown "../../README.md"
+ preserve-includer-indent=true
+ start=""
+ end=""
+ %}
diff --git a/docs/user-guide/receiving/index.md b/docs/user-guide/receiving/index.md
new file mode 100644
index 00000000..9dc06a95
--- /dev/null
+++ b/docs/user-guide/receiving/index.md
@@ -0,0 +1,10 @@
+# Receiving
+
+::: frequenz.channels._receiver
+ options:
+ inherited_members: []
+ members: []
+ show_bases: false
+ show_root_heading: false
+ show_root_toc_entry: false
+ show_source: false
diff --git a/docs/user-guide/receiving/multiple-sources/index.md b/docs/user-guide/receiving/multiple-sources/index.md
new file mode 100644
index 00000000..88b05680
--- /dev/null
+++ b/docs/user-guide/receiving/multiple-sources/index.md
@@ -0,0 +1,25 @@
+# Multiple Sources
+
+If you need to receive values from multiple sources it can be complicated, as
+you probably want to get the first message of each receiver as soon as it is
+available. A naive approach like this will not work:
+
+```python
+receiver1: Receiver[int] = channel1.new_receiver()
+receiver2: Receiver[int] = channel2.new_receiver()
+
+msg = await receiver1.receive()
+print(f"Received from channel1: {msg}")
+
+msg = await receiver2.receive()
+print(f"Received from channel2: {msg}")
+```
+
+The problem is that if the first message is not available in `channel1` but in
+`channel2`, the program will be blocked until a message is available in
+`channel1`, but you probably want to receive the first message from `channel2`
+as soon as it is available.
+
+Frequenz Channels provides two tools to solve this issue:
+[`merge()`][frequenz.channels.merge] and
+[`select()`][frequenz.channels.select].
diff --git a/docs/user-guide/receiving/multiple-sources/merge.md b/docs/user-guide/receiving/multiple-sources/merge.md
new file mode 100644
index 00000000..42fc65a4
--- /dev/null
+++ b/docs/user-guide/receiving/multiple-sources/merge.md
@@ -0,0 +1,10 @@
+# Merging
+
+::: frequenz.channels._merge
+ options:
+ inherited_members: []
+ members: []
+ show_bases: false
+ show_root_heading: false
+ show_root_toc_entry: false
+ show_source: false
diff --git a/docs/user-guide/receiving/multiple-sources/select.md b/docs/user-guide/receiving/multiple-sources/select.md
new file mode 100644
index 00000000..514ba723
--- /dev/null
+++ b/docs/user-guide/receiving/multiple-sources/select.md
@@ -0,0 +1,11 @@
+# Selecting
+
+::: frequenz.channels._select
+ options:
+ inherited_members: []
+ members: []
+ show_bases: false
+ show_root_heading: false
+ show_root_toc_entry: false
+ show_source: false
+
diff --git a/docs/user-guide/sending.md b/docs/user-guide/sending.md
new file mode 100644
index 00000000..38afae2f
--- /dev/null
+++ b/docs/user-guide/sending.md
@@ -0,0 +1,10 @@
+# Sending
+
+::: frequenz.channels._sender
+ options:
+ inherited_members: []
+ members: []
+ show_bases: false
+ show_root_heading: false
+ show_root_toc_entry: false
+ show_source: false
diff --git a/docs/user-guide/utilities/events.md b/docs/user-guide/utilities/events.md
new file mode 100644
index 00000000..6d4594bc
--- /dev/null
+++ b/docs/user-guide/utilities/events.md
@@ -0,0 +1,10 @@
+# Events
+
+::: frequenz.channels.event.Event
+ options:
+ inherited_members: []
+ members: []
+ show_bases: false
+ show_root_heading: false
+ show_root_toc_entry: false
+ show_source: false
diff --git a/docs/user-guide/utilities/file-watchers.md b/docs/user-guide/utilities/file-watchers.md
new file mode 100644
index 00000000..c9853a57
--- /dev/null
+++ b/docs/user-guide/utilities/file-watchers.md
@@ -0,0 +1,10 @@
+# File Watchers
+
+::: frequenz.channels.file_watcher.FileWatcher
+ options:
+ inherited_members: []
+ members: []
+ show_bases: false
+ show_root_heading: false
+ show_root_toc_entry: false
+ show_source: false
diff --git a/docs/user-guide/utilities/timers.md b/docs/user-guide/utilities/timers.md
new file mode 100644
index 00000000..5a82364f
--- /dev/null
+++ b/docs/user-guide/utilities/timers.md
@@ -0,0 +1,116 @@
+# Timers
+
+::: frequenz.channels.timer
+ options:
+ inherited_members: []
+ members: []
+ show_bases: false
+ show_root_heading: false
+ show_root_toc_entry: false
+ show_source: false
+
+### Policies
+
+#### Skip Missed And Drift
+
+::: frequenz.channels.timer.SkipMissedAndDrift
+ options:
+ inherited_members: []
+ members: []
+ show_bases: false
+ show_root_heading: false
+ show_root_toc_entry: false
+ show_source: false
+
+#### Skip Missed And Re-Sync
+
+::: frequenz.channels.timer.SkipMissedAndResync
+ options:
+ inherited_members: []
+ members: []
+ show_bases: false
+ show_root_heading: false
+ show_root_toc_entry: false
+ show_source: false
+
+#### Trigger All Missed
+
+::: frequenz.channels.timer.TriggerAllMissed
+ options:
+ inherited_members: []
+ members: []
+ show_bases: false
+ show_root_heading: false
+ show_root_toc_entry: false
+ show_source: false
+
+## High-level Interface
+
+::: frequenz.channels.timer.Timer
+ options:
+ inherited_members: []
+ members: []
+ show_bases: false
+ show_root_heading: false
+ show_root_toc_entry: false
+ show_source: false
+
+### Periodic Timers
+
+::: frequenz.channels.timer.Timer.periodic
+ options:
+ inherited_members: []
+ members: []
+ show_bases: false
+ show_root_heading: false
+ show_root_toc_entry: false
+ show_source: false
+ show_docstring_attributes: false
+ show_docstring_functions: false
+ show_docstring_classes: false
+ show_docstring_other_parameters: false
+ show_docstring_parameters: false
+ show_docstring_raises: false
+ show_docstring_receives: false
+ show_docstring_returns: false
+ show_docstring_warns: false
+ show_docstring_yields: false
+
+### Timeouts
+
+::: frequenz.channels.timer.Timer.timeout
+ options:
+ inherited_members: []
+ members: []
+ show_bases: false
+ show_root_heading: false
+ show_root_toc_entry: false
+ show_source: false
+ show_docstring_attributes: false
+ show_docstring_functions: false
+ show_docstring_classes: false
+ show_docstring_other_parameters: false
+ show_docstring_parameters: false
+ show_docstring_raises: false
+ show_docstring_receives: false
+ show_docstring_returns: false
+ show_docstring_warns: false
+ show_docstring_yields: false
+
+## Low-level Interface
+
+A [`Timer`][frequenz.channels.timer.Timer] can be created using an arbitrary missed
+ticks policy by calling the [low-level
+constructor][frequenz.channels.timer.Timer.__init__] and passing the policy via the
+[`missed_tick_policy`][frequenz.channels.timer.Timer.missed_tick_policy] argument.
+
+### Custom Missed Tick Policies
+
+::: frequenz.channels.timer.MissedTickPolicy
+ options:
+ inherited_members: []
+ members: []
+ show_bases: false
+ show_root_heading: false
+ show_root_toc_entry: false
+ show_source: false
diff --git a/src/frequenz/channels/_anycast.py b/src/frequenz/channels/_anycast.py
index 2a5ac56f..453caf84 100644
--- a/src/frequenz/channels/_anycast.py
+++ b/src/frequenz/channels/_anycast.py
@@ -20,54 +20,180 @@
class Anycast(Generic[_T]):
- """A channel for sending data across async tasks.
+ """A channel that delivers each message to exactly one receiver.
- Anycast channels support multiple senders and multiple receivers. A message sent
- through a sender will be received by exactly one receiver.
+ # Description
+
+ !!! Tip inline end
+
+ [Anycast][frequenz.channels.Anycast] channels behave like the
+ [Golang](https://golang.org/) [channels](https://go.dev/ref/spec#Channel_types).
+
+ [Anycast][frequenz.channels.Anycast] channels support multiple
+ [senders][frequenz.channels.Sender] and multiple
+ [receivers][frequenz.channels.Receiver]. Each message sent through any of the
+ senders will be received by exactly one receiver (but **any** receiver).
+
+
+ ```bob
+ .---------. msg1 msg1 .-----------.
+ | Sender +------. .------>| Receiver |
+ '---------' | .----------. | '-----------'
+ +----->| Channel +-----+
+ .---------. | '----------' | .-----------.
+ | Sender +------' '------>| Receiver |
+ '---------' msg2 msg2 '-----------'
+ ```
+
+
+ !!! Note inline end "Characteristics"
+
+ * **Buffered:** Yes, with a global channel buffer
+ * **Buffer full policy:** Block senders
+ * **Multiple receivers:** Yes
+ * **Multiple senders:** Yes
+ * **Thread-safe:** No
This channel is buffered, and if the senders are faster than the receivers, then the
channel's buffer will fill up. In that case, the senders will block at the
- [send()][frequenz.channels.Sender.send] method until the receivers consume the
+ [`send()`][frequenz.channels.Sender.send] method until the receivers consume the
messages in the channel's buffer. The channel's buffer size can be configured at
creation time via the `limit` argument.
+ The first receiver that is awaited will get the next message. When multiple
+ receivers are waiting, the [asyncio][] loop scheduler picks a receiver for each next
+ massage.
+
+ This means that, in practice, there might be only one receiver receiving all the
+ messages, depending on how tasks are schduled.
+
+ If you need to ensure some delivery policy (like round-robin or uniformly random),
+ then you will have to implement it yourself.
+
+ To create a new [senders][frequenz.channels.Sender] and
+ [receivers][frequenz.channels.Receiver] you can use the
+ [`new_sender()`][frequenz.channels.Broadcast.new_sender] and
+ [`new_receiver()`][frequenz.channels.Broadcast.new_receiver] methods
+ respectively.
+
+ When the channel is not needed anymore, it should be closed with the
+ [`close()`][frequenz.channels.Anycast.close] method. This will prevent further
+ attempts to [`send()`][frequenz.channels.Sender.send] data. Receivers will still be
+ able to drain the pending values on the channel, but after that, subsequent
+ [`receive()`][frequenz.channels.Receiver.receive] calls will raise a
+ [`ReceiverStoppedError`][frequenz.channels.ReceiverStoppedError] exception.
+
+ This channel is useful, for example, to distribute work across multiple workers.
+
In cases where each message need to be received by every receiver, a
- [Broadcast][frequenz.channels.Broadcast] channel may be used.
+ [broadcast][frequenz.channels.Broadcast] channel may be used.
+
+ # Examples
+
+ Example: Send a few numbers to a receiver
+ This is a very simple example that sends a few numbers from a single sender to
+ a single receiver.
+
+ ```python
+ import asyncio
+
+ from frequenz.channels import Anycast, Sender
+
- Uses an [deque][collections.deque] internally, so Anycast channels are not
- thread-safe.
+ async def send(sender: Sender[int]) -> None:
+ for msg in range(3):
+ print(f"sending {msg}")
+ await sender.send(msg)
- When there are multiple channel receivers, they can be awaited
- simultaneously using [select][frequenz.channels.select] or
- [merge][frequenz.channels.merge].
- Example:
- ``` python
- async def send(sender: channel.Sender) -> None:
- while True:
- next = random.randint(3, 17)
- print(f"sending: {next}")
- await sender.send(next)
+ async def main() -> None:
+ channel = Anycast[int](name="numbers")
+ sender = channel.new_sender()
+ receiver = channel.new_receiver()
- async def recv(id: int, receiver: channel.Receiver) -> None:
- while True:
- next = await receiver.receive()
- print(f"receiver_{id} received {next}")
- await asyncio.sleep(0.1) # sleep (or work) with the data
+ async with asyncio.TaskGroup() as task_group:
+ task_group.create_task(send(sender))
+ for _ in range(3):
+ msg = await receiver.receive()
+ print(f"received {msg}")
+ await asyncio.sleep(0.1) # sleep (or work) with the data
- acast = channel.Anycast()
+ asyncio.run(main())
+ ```
+
+ The output should look something like (although the sending and received might
+ appear more interleaved):
+
+ ```
+ sending 0
+ sending 1
+ sending 2
+ received 0
+ received 1
+ received 2
+ ```
+
+ Example: Send a few number from multiple senders to multiple receivers
+ This is a more complex example that sends a few numbers from multiple senders to
+ multiple receivers, using a small buffer to force the senders to block.
+
+ ```python
+ import asyncio
+
+ from frequenz.channels import Anycast, Receiver, ReceiverStoppedError, Sender
+
- sender = acast.new_sender()
- receiver_1 = acast.new_receiver()
+ async def send(name: str, sender: Sender[int], start: int, stop: int) -> None:
+ for msg in range(start, stop):
+ print(f"{name} sending {msg}")
+ await sender.send(msg)
- asyncio.create_task(send(sender))
- await recv(1, receiver_1)
+ async def recv(name: str, receiver: Receiver[int]) -> None:
+ try:
+ async for msg in receiver:
+ print(f"{name} received {msg}")
+ await asyncio.sleep(0.1) # sleep (or work) with the data
+ except ReceiverStoppedError:
+ pass
+
+
+ async def main() -> None:
+ acast = Anycast[int](name="numbers", limit=2)
+
+ async with asyncio.TaskGroup() as task_group:
+ task_group.create_task(send("sender_1", acast.new_sender(), 10, 13))
+ task_group.create_task(send("sender_2", acast.new_sender(), 20, 22))
+ task_group.create_task(recv("receiver_1", acast.new_receiver()))
+ task_group.create_task(recv("receiver_2", acast.new_receiver()))
+
+
+ asyncio.run(main())
```
- Check the `tests` and `benchmarks` directories for more examples.
+ The output should look something like this(although the sending and received
+ might appear interleaved in a different way):
+
+ ```
+ sender_1 sending 10
+ sender_1 sending 11
+ sender_1 sending 12
+ Anycast channel [Anycast:numbers:_Sender] is full, blocking sender until a receiver
+ consumes a value
+ sender_2 sending 20
+ Anycast channel [Anycast:numbers:_Sender] is full, blocking sender until a receiver
+ consumes a value
+ receiver_1 received 10
+ receiver_1 received 11
+ sender_2 sending 21
+ Anycast channel [Anycast:numbers:_Sender] is full, blocking sender until a receiver
+ consumes a value
+ receiver_1 received 12
+ receiver_1 received 20
+ receiver_1 received 21
+ ```
"""
def __init__(self, *, name: str, limit: int = 10) -> None:
diff --git a/src/frequenz/channels/_broadcast.py b/src/frequenz/channels/_broadcast.py
index 8da322b1..f7fb1fe0 100644
--- a/src/frequenz/channels/_broadcast.py
+++ b/src/frequenz/channels/_broadcast.py
@@ -21,47 +21,166 @@
class Broadcast(Generic[_T]):
- """A channel to broadcast messages to multiple receivers.
+ """A channel that deliver all messages to all receivers.
- `Broadcast` channels can have multiple senders and multiple receivers. Each
- message sent through any of the senders is received by all of the
- receivers.
+ # Description
- Internally, a broadcast receiver's buffer is implemented with just
- append/pop operations on either side of a [deque][collections.deque], which
- are thread-safe. Because of this, `Broadcast` channels are thread-safe.
+ [Broadcast][frequenz.channels.Broadcast] channels can have multiple
+ [senders][frequenz.channels.Sender] and multiple
+ [receivers][frequenz.channels.Receiver]. Each message sent through any of the
+ senders will be received by all receivers.
- When there are multiple channel receivers, they can be awaited
- simultaneously using [select][frequenz.channels.select] or
- [merge][frequenz.channels.merge].
+
+ ```bob
+ .---------. msg1 msg1,msg2 .-----------.
+ | Sender +------. .---------->| Receiver |
+ '---------' | .----------. | '-----------'
+ +----->| Channel +-----+
+ .---------. | '----------' | .-----------.
+ | Sender +------' '----------->| Receiver |
+ '---------' msg2 msg1,msg2 '-----------'
+ ```
+
- Example:
- ``` python
- async def send(sender: channel.Sender) -> None:
- while True:
- next = random.randint(3, 17)
- print(f"sending: {next}")
- await sender.send(next)
+ !!! Note inline end "Characteristics"
+ * **Buffered:** Yes, with one buffer per receiver
+ * **Buffer full policy:** Drop oldest message
+ * **Multiple receivers:** Yes
+ * **Multiple senders:** Yes
+ * **Thread-safe:** No
- async def recv(id: int, receiver: channel.Receiver) -> None:
- while True:
- next = await receiver.receive()
- print(f"receiver_{id} received {next}")
- await asyncio.sleep(0.1) # sleep (or work) with the data
+ This channel is buffered, and when messages are not being consumed fast
+ enough and the buffer fills up, old messages will get dropped.
+ Each receiver has its own buffer, so messages will only be dropped for
+ receivers that can't keep up with the senders, and not for the whole
+ channel.
- bcast = channel.Broadcast()
+ To create a new [senders][frequenz.channels.Sender] and
+ [receivers][frequenz.channels.Receiver] you can use the
+ [`new_sender()`][frequenz.channels.Broadcast.new_sender] and
+ [`new_receiver()`][frequenz.channels.Broadcast.new_receiver] methods
+ respectively.
- sender = bcast.new_sender()
- receiver_1 = bcast.new_receiver()
+ When a channel is not needed anymore, it should be closed with
+ [`close()`][frequenz.channels.Broadcast.close]. This will prevent further
+ attempts to [`send()`][frequenz.channels.Sender.send] data, and will allow
+ receivers to drain the pending items on their queues, but after that,
+ subsequent [receive()][frequenz.channels.Receiver.receive] calls will
+ raise a [`ReceiverStoppedError`][frequenz.channels.ReceiverStoppedError].
- asyncio.create_task(send(sender))
+ This channel is useful, for example, to implement a pub/sub pattern, where
+ multiple receivers can subscribe to a channel to receive all messages.
- await recv(1, receiver_1)
+ In cases where each message needs to be delivered only to one receiver, an
+ [anycast][frequenz.channels.Anycast] channel may be used.
+
+ # Examples
+
+ Example: Send a few numbers to a receiver
+ This is a very simple example that sends a few numbers from a single sender to
+ a single receiver.
+
+ ```python
+ import asyncio
+
+ from frequenz.channels import Broadcast, Sender
+
+
+ async def send(sender: Sender[int]) -> None:
+ for msg in range(3):
+ print(f"sending {msg}")
+ await sender.send(msg)
+
+
+ async def main() -> None:
+ channel = Broadcast[int](name="numbers")
+
+ sender = channel.new_sender()
+ receiver = channel.new_receiver()
+
+ async with asyncio.TaskGroup() as task_group:
+ task_group.create_task(send(sender))
+ for _ in range(3):
+ msg = await receiver.receive()
+ print(f"received {msg}")
+ await asyncio.sleep(0.1) # sleep (or work) with the data
+
+
+ asyncio.run(main())
+ ```
+
+ The output should look something like (although the sending and received might
+ appear more interleaved):
+
+ ```
+ sending 0
+ sending 1
+ sending 2
+ received 0
+ received 1
+ received 2
```
- Check the `tests` and `benchmarks` directories for more examples.
+ Example: Send a few number from multiple senders to multiple receivers
+ This is a more complex example that sends a few numbers from multiple senders to
+ multiple receivers, using a small buffer to force the senders to block.
+
+ ```python
+ import asyncio
+
+ from frequenz.channels import Broadcast, Receiver, ReceiverStoppedError, Sender
+
+
+ async def send(name: str, sender: Sender[int], start: int, stop: int) -> None:
+ for msg in range(start, stop):
+ print(f"{name} sending {msg}")
+ await sender.send(msg)
+
+
+ async def recv(name: str, receiver: Receiver[int]) -> None:
+ try:
+ async for msg in receiver:
+ print(f"{name} received {msg}")
+ await asyncio.sleep(0.1) # sleep (or work) with the data
+ except ReceiverStoppedError:
+ pass
+
+
+ async def main() -> None:
+ acast = Broadcast[int](name="numbers")
+
+ async with asyncio.TaskGroup() as task_group:
+ task_group.create_task(send("sender_1", acast.new_sender(), 10, 13))
+ task_group.create_task(send("sender_2", acast.new_sender(), 20, 22))
+ task_group.create_task(recv("receiver_1", acast.new_receiver()))
+ task_group.create_task(recv("receiver_2", acast.new_receiver()))
+
+
+ asyncio.run(main())
+ ```
+
+ The output should look something like this(although the sending and received
+ might appear interleaved in a different way):
+
+ ```
+ sender_1 sending 10
+ sender_1 sending 11
+ sender_1 sending 12
+ sender_2 sending 20
+ sender_2 sending 21
+ receiver_1 received 10
+ receiver_1 received 11
+ receiver_1 received 12
+ receiver_1 received 20
+ receiver_1 received 21
+ receiver_2 received 10
+ receiver_2 received 11
+ receiver_2 received 12
+ receiver_2 received 20
+ receiver_2 received 21
+ ```
"""
def __init__(self, *, name: str, resend_latest: bool = False) -> None:
diff --git a/src/frequenz/channels/_exceptions.py b/src/frequenz/channels/_exceptions.py
index 559b2df7..719b26da 100644
--- a/src/frequenz/channels/_exceptions.py
+++ b/src/frequenz/channels/_exceptions.py
@@ -1,7 +1,70 @@
# License: MIT
# Copyright © 2022 Frequenz Energy-as-a-Service GmbH
-"""Base exception classes."""
+"""Base exception classes.
+
+# Exceptions
+
+All exceptions generated by this library inherit from the
+[`Error`][frequenz.channels.Error] exception.
+
+Exceptions generated by channels inherit from the
+[`ChannelError`][frequenz.channels.ChannelError] exception. When there is an attempt to
+use a closed channel, a [`ChannelClosedError`][frequenz.channels.ChannelClosedError]
+exception is raised.
+
+# Causes
+
+When a exception is caused by another exception, for example if the underlying channel
+was closed while seding or receiving a message, the original exception will be
+available as the cause of the exception:
+
+```python
+from frequenz.channels import Anycast, ChannelClosedError, SenderError
+
+channel = Anycast[int](name="test-channel")
+sender = channel.new_sender()
+
+try:
+ await sender.send(42)
+except SenderError as error:
+ match error.__cause__:
+ case None:
+ print("The message couldn't be sent for an known reason")
+ case ChannelClosedError() as closed_error:
+ print(f"The message couldn't be sent, channel closed: {closed_error}")
+ case _ as unknown_error:
+ print(f"The message couldn't be sent: {unknown_error}")
+```
+
+Tip:
+ If you are using the async iteration interface for receivers, then you can
+ access the cause of the
+ [`ReceiverStoppedError`][frequenz.channels.ReceiverStoppedError] exception by
+ explicitly calling [`receive()`][frequenz.channels.Receiver.receive] on the
+ receiver after the iteration is done:
+
+ ```python
+ from frequenz.channels import Anycast, ChannelClosedError, ReceiverStoppedError
+
+ channel = Anycast[int](name="test-channel")
+ receiver = channel.new_receiver()
+
+ async for value in receiver:
+ print(value)
+ try:
+ await receiver.receive()
+ except ReceiverStoppedError as error:
+ print("The receiver was stopped")
+ match error.__cause__:
+ case None:
+ print("The receiver was stopped without a known reason")
+ case ChannelClosedError() as closed_error:
+ print(f"The channel was closed with error: {closed_error}")
+ case _ as unknown_error:
+ print(f"The receiver was stopped due to an unknown error: {unknown_error}")
+ ```
+"""
from typing import Any
diff --git a/src/frequenz/channels/_merge.py b/src/frequenz/channels/_merge.py
index ff6f00c2..771bf5d5 100644
--- a/src/frequenz/channels/_merge.py
+++ b/src/frequenz/channels/_merge.py
@@ -1,7 +1,51 @@
# License: MIT
# Copyright © 2022 Frequenz Energy-as-a-Service GmbH
-"""Merge messages coming from channels into a single stream."""
+"""Merge messages coming from multiple receivers into a single receiver.
+
+# Usage
+
+If you just need to receive the same type of messages but from multiple sources in one
+stream, you can use [`merge()`][frequenz.channels.merge] to create a new receiver that
+will receive messages from all the given receivers:
+
+```python
+from frequenz.channels import Anycast, Receiver, merge
+
+channel1: Anycast[int] = Anycast(name="channel1")
+channel2: Anycast[int] = Anycast(name="channel2")
+receiver1 = channel1.new_receiver()
+receiver2 = channel2.new_receiver()
+
+async for value in merge(receiver1, receiver2):
+ print(value)
+```
+
+If the first message comes from `channel2` and the second message from `channel1`, the
+first message will be received immediately, and the second message will be received as
+soon as it is available.
+
+This can be helpful when you just need to receive messages and don't care about
+where are they coming from specifically. If you need to know where the message came
+from, you can use [`select()`][frequenz.channels.select] instead.
+
+# Stopping
+
+A merge receiver will be stopped automatically when all the receivers that it merges are
+stopped. When using the async iterator interface, this means that the iterator will stop
+as soon as all the receivers are stopped. When using
+[`receive()`][frequenz.channels.Receiver.receive], this means that the method will raise
+a [`ReceiverStoppedError`][frequenz.channels.ReceiverStoppedError] exception as soon as
+all the receivers are stopped.
+
+If you want to stop a merge receiver manually, you can use the
+[`stop()`][frequenz.channels.Merger.stop] method.
+
+When using [`receive()`][frequenz.channels.Receiver.receive], you should make sure to
+either stop all the receivers that you are merging, or to stop the merge receiver
+manually. This is to make sure that all the tasks created by the merge receiver are
+cleaned up properly.
+"""
from __future__ import annotations
diff --git a/src/frequenz/channels/_receiver.py b/src/frequenz/channels/_receiver.py
index d3308837..58e5edd0 100644
--- a/src/frequenz/channels/_receiver.py
+++ b/src/frequenz/channels/_receiver.py
@@ -1,7 +1,136 @@
# License: MIT
# Copyright © 2022 Frequenz Energy-as-a-Service GmbH
-"""Channel receiver and associated exceptions."""
+"""Receiver interface and related exceptions.
+
+# Receivers
+
+Messages are received from [channels](/user-guide/channels/index.md) through
+[Receiver][frequenz.channels.Receiver] objects. [Receivers][frequenz.channels.Receiver]
+are usually created by calling `channel.new_receiver()` and are [async
+iterators][typing.AsyncIterator], so the easiest way to receive values from them as
+a stream is to use `async for`:
+
+```python
+from frequenz.channels import Anycast
+
+channel = Anycast[int](name="test-channel")
+receiver = channel.new_receiver()
+
+async for value in receiver:
+ print(value)
+```
+
+If you need to receive values in different places or expecting a particular
+sequence, you can use the [`receive()`][frequenz.channels.Receiver.receive] method:
+
+```python
+from frequenz.channels import Anycast
+
+channel = Anycast[int](name="test-channel")
+receiver = channel.new_receiver()
+
+first_value = await receiver.receive()
+print(f"First value: {first_value}")
+
+second_value = await receiver.receive()
+print(f"Second value: {second_value}")
+```
+
+# Value Transformation
+
+If you need to transform the received values, receivers provide a
+[`map()`][frequenz.channels.Receiver.map] method to easily do so:
+
+```python
+from frequenz.channels import Anycast
+
+channel = Anycast[int](name="test-channel")
+receiver = channel.new_receiver()
+
+async for value in receiver.map(lambda x: x + 1):
+ print(value)
+```
+
+[`map()`][frequenz.channels.Receiver.map] returns a new full receiver, so you can
+use it in any of the ways described above.
+
+# Error Handling
+
+!!! Tip inline end
+
+ For more information about handling errors, please refer to the
+ [Error Handling](/user-guide/error-handling/) section of the user guide.
+
+If there is an error while receiving a message,
+a [`ReceiverError`][frequenz.channels.ReceiverError] exception is raised for both
+[`receive()`][frequenz.channels.Receiver.receive] method and async iteration
+interface.
+
+If the receiver has completely stopped (for example the underlying channel was
+closed), a [`ReceiverStoppedError`][frequenz.channels.ReceiverStoppedError] exception
+is raised by [`receive()`][frequenz.channels.Receiver.receive] method.
+
+```python
+from frequenz.channels import Anycast
+
+channel = Anycast[int](name="test-channel")
+receiver = channel.new_receiver()
+
+try:
+ await receiver.receive()
+except ReceiverStoppedError as error:
+ print("The receiver was stopped")
+except ReceiverError as error:
+ print(f"There was an error trying to receive: {error}")
+```
+
+When used as an async iterator, the iteration will just stop without raising an
+exception:
+
+```python
+from frequenz.channels import Anycast
+
+channel = Anycast[int](name="test-channel")
+receiver = channel.new_receiver()
+
+try:
+ async for value in receiver:
+ print(value)
+except ReceiverStoppedError as error:
+ print("Will never happen")
+except ReceiverError as error:
+ print(f"There was an error trying to receive: {error}")
+# If we get here, the receiver was stopped
+```
+
+# Advanced Usage
+
+!!! Warning inline end
+
+ This section is intended for library developers that want to build other low-level
+ abstractions on top of channels. If you are just using channels, you can safely
+ ignore this section.
+
+Receivers extend on the [async iterator protocol][typing.AsyncIterator] by providing
+a [`ready()`][frequenz.channels.Receiver.ready] and
+a [`consume()`][frequenz.channels.Receiver.consume] method.
+
+The [`ready()`][frequenz.channels.Receiver.ready] method is used to await until the
+receiver has a new value available, but without actually consuming it. The
+[`consume()`][frequenz.channels.Receiver.consume] method consumes the next available
+value and returns it.
+
+[`ready()`][frequenz.channels.Receiver.ready] can be called multiple times, and it
+will return immediately if the receiver is already ready.
+[`consume()`][frequenz.channels.Receiver.consume] must be called only after
+[`ready()`][frequenz.channels.Receiver.ready] is done and only once, until the next
+call to [`ready()`][frequenz.channels.Receiver.ready].
+
+Exceptions are never raised by [`ready()`][frequenz.channels.Receiver.ready], they
+are always delayed until [`consume()`][frequenz.channels.Receiver.consume] is
+called.
+"""
from __future__ import annotations
@@ -16,7 +145,7 @@
class Receiver(ABC, Generic[_T]):
- """A channel Receiver."""
+ """An endpoint to receive messages."""
async def __anext__(self) -> _T:
"""Await the next value in the async iteration over received values.
diff --git a/src/frequenz/channels/_select.py b/src/frequenz/channels/_select.py
index 2d30bd81..e3de99af 100644
--- a/src/frequenz/channels/_select.py
+++ b/src/frequenz/channels/_select.py
@@ -3,9 +3,137 @@
"""Select the first among multiple Receivers.
-Expects Receiver class to raise `StopAsyncIteration`
-exception once no more messages are expected or the channel
-is closed in case of `Receiver` class.
+# Usage
+
+If you need to receiver different types of messages from different receivers, you need
+to know the source of a particular received value to know the type of the value.
+
+[`select()`][frequenz.channels.select] allows you to do that. It is an
+[async iterator][typing.AsyncIterator] that will iterate over the values of all
+receivers as they receive new values.
+
+It yields a [`Selected`][frequenz.channels.Selected] object that will tell you the
+source of the received message. To make sure the received value is *cast* to the
+correct type, you need to use the [`selected_from()`][frequenz.channels.selected_from]
+function to check the source of the message, and the
+[`value`][frequenz.channels.Selected.value] attribute to access the message:
+
+```python
+from frequenz.channels import Anycast, ReceiverStoppedError, select, selected_from
+
+channel1: Anycast[int] = Anycast(name="channel1")
+channel2: Anycast[str] = Anycast(name="channel2")
+receiver1 = channel1.new_receiver()
+receiver2 = channel2.new_receiver()
+
+async for selected in select(receiver1, receiver2):
+ if selected_from(selected, receiver1):
+ print(f"Received from receiver1, next number: {selected.value + 1}")
+ elif selected_from(selected, receiver2):
+ print(f"Received from receiver2, length: {len(selected.value)}")
+ else:
+ assert False, "Unknown source, this should never happen"
+```
+
+Tip:
+ To prevent common bugs, like when a new receiver is added to the select loop but
+ the handling code is forgotten, [`select()`][frequenz.channels.select] will check
+ that all the selected receivers are handled in the if-chain.
+
+ If this happens, it will raise an
+ [`UnhandledSelectedError`][frequenz.channels.UnhandledSelectedError] exception.
+
+ Not handling a receiver is considered a programming error. Because of this, the
+ exception is a subclass of [`BaseException`][BaseException] instead of
+ [`Exception`][Exception]. This means that it will not be caught by [`except
+ Exception`][Exception] blocks.
+
+ If for some reason you want to ignore a received value, just add the receiver to
+ the if-chain and do nothing with the value:
+
+ ```python
+ from frequenz.channels import Anycast, select, selected_from
+
+ channel1: Anycast[int] = Anycast(name="channel1")
+ channel2: Anycast[str] = Anycast(name="channel2")
+ receiver1 = channel1.new_receiver()
+ receiver2 = channel2.new_receiver()
+
+ async for selected in select(receiver1, receiver2):
+ if selected_from(selected, receiver1):
+ continue
+ if selected_from(selected, receiver2):
+ print(f"Received from receiver2, length: {len(selected.value)}")
+ ```
+
+# Stopping
+
+The `select()` async iterator will stop as soon as all the receivers are stopped. You
+can also end the iteration early by breaking out of the loop as normal.
+
+When a single [receiver][frequenz.channels.Receiver] is stopped, it will be reported
+via the [`Selected`][frequenz.channels.Selected] object. You can use the
+[`was_stopped()`][frequenz.channels.Selected.was_stopped] method to check if the
+selected [receiver][frequenz.channels.Receiver] was stopped:
+
+```python
+from frequenz.channels import Anycast, select, selected_from
+
+channel1: Anycast[int] = Anycast(name="channel1")
+channel2: Anycast[str] = Anycast(name="channel2")
+receiver1 = channel1.new_receiver()
+receiver2 = channel2.new_receiver()
+
+async for selected in select(receiver1, receiver2):
+ if selected_from(selected, receiver1):
+ if selected.was_stopped():
+ print("receiver1 was stopped")
+ continue
+ print(f"Received from receiver1, the next number is: {selected.value + 1}")
+ # ...
+```
+
+Tip:
+ The [`was_stopped()`][frequenz.channels.Selected.was_stopped] method is a
+ convenience method that is equivalent to checking if the
+ [`exception`][frequenz.channels.Selected.exception] attribute is an instance of
+ [`ReceiverStoppedError`][frequenz.channels.ReceiverStoppedError].
+
+# Error Handling
+
+Tip:
+ For more information about handling errors, please refer to the
+ [Error Handling](/user-guide/error-handling/) section of the user guide.
+
+If a receiver raises an exception while receiving a value, the exception will be
+raised by the [`value`][frequenz.channels.Selected.value] attribute of the
+[`Selected`][frequenz.channels.Selected] object.
+
+You can use a try-except block to handle exceptions as usual:
+
+```python
+from frequenz.channels import Anycast, ReceiverStoppedError, select, selected_from
+
+channel1: Anycast[int] = Anycast(name="channel1")
+channel2: Anycast[str] = Anycast(name="channel2")
+receiver1 = channel1.new_receiver()
+receiver2 = channel2.new_receiver()
+
+async for selected in select(receiver1, receiver2):
+ if selected_from(selected, receiver1):
+ try:
+ print(f"Received from receiver1, next number: {selected.value + 1}")
+ except ReceiverStoppedError:
+ print("receiver1 was stopped")
+ except ValueError as value_error:
+ print(f"receiver1 raised a ValueError: {value_error}")
+ # ...
+ # ...
+```
+
+The [`Selected`][frequenz.channels.Selected] object also has a
+[`exception`][frequenz.channels.Selected.exception] attribute that will contain the
+exception that was raised by the receiver.
"""
import asyncio
@@ -297,9 +425,7 @@ async def select(*receivers: Receiver[Any]) -> AsyncIterator[Selected[Any]]:
print(f"timer2: exception={exception}")
case None:
# All good, no exception, we can use `selected.value` safely
- print(
- f"timer2: now={datetime.datetime.now()} drift={selected.value}"
- )
+ print(f"timer2: now={datetime.datetime.now()} drift={selected.value}")
case _ as unhanded:
assert_never(unhanded)
else:
diff --git a/src/frequenz/channels/_sender.py b/src/frequenz/channels/_sender.py
index 8cf00f86..41f8ef86 100644
--- a/src/frequenz/channels/_sender.py
+++ b/src/frequenz/channels/_sender.py
@@ -1,7 +1,53 @@
# License: MIT
# Copyright © 2022 Frequenz Energy-as-a-Service GmbH
-"""Channel sender and associated exceptions."""
+"""Sender interface and related exceptions.
+
+# Senders
+
+Messages are sent to a [channel](/user-guide/channels) through
+[Sender][frequenz.channels.Sender] objects. [Senders][frequenz.channels.Sender] are
+usually created by calling `channel.new_sender()`, and are a very simple abstraction
+that only provides a single [`send()`][frequenz.channels.Sender.send] method:
+
+```python
+from frequenz.channels import Anycast
+
+channel = Anycast[int](name="test-channel")
+sender = channel.new_sender()
+
+await sender.send("Hello, world!")
+```
+
+Although [`send()`][frequenz.channels.Sender.send] is an asynchronous method, some
+channels may implement it in a synchronous, non-blocking way. For example, buffered
+channels that drop messages when the buffer is full could guarantee that
+[`send()`][frequenz.channels.Sender.send] never blocks. However, please keep in mind
+that the [asyncio][] event loop could give control to another task at any time,
+effectively making the [`send()`][frequenz.channels.Sender.send] method blocking.
+
+# Error Handling
+
+!!! Tip inline end
+
+ For more information about handling errors, please refer to the
+ [Error Handling](/user-guide/error-handling/) section of the user guide.
+
+If there is any failure sending a message,
+a [SenderError][frequenz.channels.SenderError] exception is raised.
+
+```python
+from frequenz.channels import Anycast
+
+channel = Anycast[int](name="test-channel")
+sender = channel.new_sender()
+
+try:
+ await sender.send("Hello, world!")
+except SenderError as error:
+ print(f"Error sending message: {error}")
+```
+"""
from abc import ABC, abstractmethod
from typing import Generic, TypeVar
@@ -12,7 +58,7 @@
class Sender(ABC, Generic[_T]):
- """A channel Sender."""
+ """An endpoint to sends messages."""
@abstractmethod
async def send(self, msg: _T) -> None:
diff --git a/src/frequenz/channels/event.py b/src/frequenz/channels/event.py
index f546555e..87f91496 100644
--- a/src/frequenz/channels/event.py
+++ b/src/frequenz/channels/event.py
@@ -1,8 +1,18 @@
# License: MIT
# Copyright © 2023 Frequenz Energy-as-a-Service GmbH
-"""A receiver that can be made ready through an event."""
+"""A receiver that can be made ready directly.
+!!! Tip inline end
+
+ Read the [`Event`][frequenz.channels.event.Event] documentation for more
+ information.
+
+This module contains the following:
+
+* [`Event`][frequenz.channels.event.Event]:
+ {{docstring_summary("frequenz.channels.event.Event")}}
+"""
import asyncio as _asyncio
@@ -10,38 +20,66 @@
class Event(_receiver.Receiver[None]):
- """A receiver that can be made ready through an event.
+ """A receiver that can be made ready directly.
+
+ # Usage
- The receiver (the [`ready()`][frequenz.channels.event.Event.ready] method) will wait
- until [`set()`][frequenz.channels.event.Event.set] is called. At that point the
- receiver will wait again after the event is
- [`consume()`][frequenz.channels.Receiver.consume]d.
+ There are cases where it is useful to be able to send a signal to
+ a [`select()`][frequenz.channels.select] loop, for example, to stop a loop from
+ outside the loop itself.
- The receiver can be completely stopped by calling
+ To do that, you can use an [`Event`][frequenz.channels.event.Event] receiver and
+ call [`set()`][frequenz.channels.event.Event.set] on it when you want to make it
+ ready.
+
+ # Stopping
+
+ The receiver will be re-activated (will keep blocking) after the current set
+ event is received. To stop the receiver completely, you can call
[`stop()`][frequenz.channels.event.Event.stop].
- Example:
+ # Example
+
+ Example: Exit after printing the first 5 numbers
```python
import asyncio
- from frequenz.channels import Receiver, select, selected_from
+
+ from frequenz.channels import Anycast, select, selected_from
from frequenz.channels.event import Event
- other_receiver: Receiver[int] = ...
- exit_event = Event()
+ channel: Anycast[int] = Anycast(name="channel")
+ receiver = channel.new_receiver()
+ sender = channel.new_sender()
+ stop_event = Event(name="stop")
+
+
+ async def do_work() -> None:
+ async for selected in select(receiver, stop_event):
+ if selected_from(selected, receiver):
+ print(selected.value)
+ elif selected_from(selected, stop_event):
+ print("Stop event triggered")
+ stop_event.stop()
+ break
+
+
+ async def send_stuff() -> None:
+ for i in range(10):
+ if stop_event.is_stopped:
+ break
+ await asyncio.sleep(1)
+ await sender.send(i)
+
- async def exit_after_10_seconds() -> None:
- asyncio.sleep(10)
- exit_event.set()
+ async def main() -> None:
+ async with asyncio.TaskGroup() as task_group:
+ task_group.create_task(do_work(), name="do_work")
+ task_group.create_task(send_stuff(), name="send_stuff")
+ await asyncio.sleep(5.5)
+ stop_event.set()
- asyncio.ensure_future(exit_after_10_seconds())
- async for selected in select(exit_event, other_receiver):
- if selected_from(selected, exit_event):
- break
- if selected_from(selected, other_receiver):
- print(selected.value)
- else:
- assert False, "Unknown receiver selected"
+ asyncio.run(main())
```
"""
diff --git a/src/frequenz/channels/file_watcher.py b/src/frequenz/channels/file_watcher.py
index 64bc62e1..bc2fedb1 100644
--- a/src/frequenz/channels/file_watcher.py
+++ b/src/frequenz/channels/file_watcher.py
@@ -1,7 +1,22 @@
# License: MIT
# Copyright © 2022 Frequenz Energy-as-a-Service GmbH
-"""A Channel receiver for watching for new, modified or deleted files."""
+"""A receiver for watching for new, modified or deleted files.
+
+!!! Tip inline end
+
+ Read the [`FileWatcher`][frequenz.channels.file_watcher.FileWatcher]
+ documentation for more information.
+
+This module contains the following:
+
+* [`FileWatcher`][frequenz.channels.file_watcher.FileWatcher]:
+ {{docstring_summary("frequenz.channels.file_watcher.FileWatcher")}}
+* [`Event`][frequenz.channels.file_watcher.Event]:
+ {{docstring_summary("frequenz.channels.file_watcher.Event")}}
+* [`EventType`][frequenz.channels.file_watcher.EventType]:
+ {{docstring_summary("frequenz.channels.file_watcher.EventType")}}
+"""
import asyncio
import pathlib
@@ -16,16 +31,16 @@
class EventType(Enum):
- """Available types of changes to watch for."""
+ """The types of file events that can be observed."""
CREATE = Change.added
- """A new file was created."""
+ """The file was created."""
MODIFY = Change.modified
- """An existing file was modified."""
+ """The file was modified."""
DELETE = Change.deleted
- """An existing file was deleted."""
+ """The file was deleted."""
@dataclass(frozen=True)
@@ -34,12 +49,70 @@ class Event:
type: EventType
"""The type of change that was observed."""
+
path: pathlib.Path
"""The path where the change was observed."""
class FileWatcher(Receiver[Event]):
- """A channel receiver that watches for file events."""
+ """A receiver that watches for file events.
+
+ # Usage
+
+ A [`FileWatcher`][frequenz.channels.file_watcher.FileWatcher] receiver can be used
+ to watch for changes in a set of files. It will generate an
+ [`Event`][frequenz.channels.file_watcher.Event] message every time a file is
+ created, modified or deleted, depending on the type of events that it is configured
+ to watch for.
+
+ The [event][frequenz.channels.file_watcher.EventType] message contains the
+ [`type`][frequenz.channels.file_watcher.Event.type] of change that was observed and
+ the [`path`][frequenz.channels.file_watcher.Event.path] where the change was
+ observed.
+
+ # Event Types
+
+ The following event types are available:
+
+ * [`CREATE`][frequenz.channels.file_watcher.EventType.CREATE]:
+ {{docstring_summary("frequenz.channels.file_watcher.EventType.CREATE")}}
+ * [`MODIFY`][frequenz.channels.file_watcher.EventType.MODIFY]:
+ {{docstring_summary("frequenz.channels.file_watcher.EventType.MODIFY")}}
+ * [`DELETE`][frequenz.channels.file_watcher.EventType.DELETE]:
+ {{docstring_summary("frequenz.channels.file_watcher.EventType.DELETE")}}
+
+ # Example
+
+ Example: Watch for changes and exit after the file is modified
+ ```python
+ import asyncio
+
+ from frequenz.channels.file_watcher import EventType, FileWatcher
+
+ PATH = "/tmp/test.txt"
+ file_watcher = FileWatcher(paths=[PATH], event_types=[EventType.MODIFY])
+
+
+ async def update_file() -> None:
+ await asyncio.sleep(1)
+ with open(PATH, "w", encoding="utf-8") as file:
+ file.write("Hello, world!")
+
+
+ async def main() -> None:
+ # Create file
+ with open(PATH, "w", encoding="utf-8") as file:
+ file.write("Hello, world!")
+ async with asyncio.TaskGroup() as group:
+ group.create_task(update_file())
+ async for event in file_watcher:
+ print(f"File {event.path}: {event.type.name}")
+ break
+
+
+ asyncio.run(main())
+ ```
+ """
def __init__(
self,
diff --git a/src/frequenz/channels/timer.py b/src/frequenz/channels/timer.py
index c4d65bc9..44a0c472 100644
--- a/src/frequenz/channels/timer.py
+++ b/src/frequenz/channels/timer.py
@@ -1,14 +1,90 @@
# License: MIT
# Copyright © 2023 Frequenz Energy-as-a-Service GmbH
-"""A timer receiver that ticks every `interval`.
+"""A receiver that sends a message regularly.
-Note:
- This module always use `int`s to represent time. The time is always in
- microseconds, so the timer resolution is 1 microsecond.
+# Quick Start
- This is to avoid floating point errors when performing calculations with
- time, which can lead to very hard to reproduce, and debug, issues.
+If you need to do something as periodically as possible (avoiding
+[drifts](#missed-ticks-and-drifting)), you can use use
+a [`periodic()`][frequenz.channels.timer.Timer.periodic] timer.
+
+Example: Periodic Timer
+ ```python
+ import asyncio
+ from datetime import datetime, timedelta
+
+ from frequenz.channels.timer import Timer
+
+
+ async def main() -> None:
+ async for drift in Timer.periodic(timedelta(seconds=1.0)):
+ print(f"The timer has triggered at {datetime.now()} with a drift of {drift}")
+
+
+ asyncio.run(main())
+ ```
+
+If, instead, you need a timeout, for example to abort waiting for other receivers after
+a certain amount of time, you can use
+a [`timeout()`][frequenz.channels.timer.Timer.timeout] timer.
+
+Example: Timeout
+ ```python
+ import asyncio
+ from datetime import timedelta
+
+ from frequenz.channels import Anycast, select, selected_from
+ from frequenz.channels.timer import Timer
+
+
+ async def main() -> None:
+ channel = Anycast[int](name="data-channel")
+ data_receiver = channel.new_receiver()
+
+ timer = Timer.timeout(timedelta(seconds=1.0))
+
+ async for selected in select(data_receiver, timer):
+ if selected_from(selected, data_receiver):
+ print(f"Received data: {selected.value}")
+ timer.reset()
+ elif selected_from(selected, timer):
+ drift = selected.value
+ print(f"No data received for {timer.interval + drift} seconds, giving up")
+ break
+
+
+ asyncio.run(main())
+ ```
+
+ This timer will *rearm* itself automatically after it was triggered, so it will trigger
+ again after the selected interval, no matter what the current drift was.
+
+Tip:
+ It is extremely important to understand how timers behave when they are
+ delayed, we recommned emphatically to read about [missed ticks and
+ drifting](#missed-ticks-and-drifting) before using timers in production.
+
+# Missed Ticks And Drifting
+
+A [`Timer`][frequenz.channels.timer.Timer] can be used to send a messages at regular
+time intervals, but there is one fundamental issue with timers in the [asyncio][] world:
+the event loop could give control to another task at any time, and that task can take
+a long time to finish, making the time it takes the next timer message to be received
+longer than the desired interval.
+
+Because of this, it is very important for users to be able to understand and control
+how timers behave when they are delayed. Timers will handle missed ticks according to
+a *missing tick policy*.
+
+The following built-in policies are available:
+
+* [`SkipMissedAndDrift`][frequenz.channels.timer.SkipMissedAndDrift]:
+ {{docstring_summary("frequenz.channels.timer.SkipMissedAndDrift")}}
+* [`SkipMissedAndResync`][frequenz.channels.timer.SkipMissedAndResync]:
+ {{docstring_summary("frequenz.channels.timer.SkipMissedAndResync")}}
+* [`TriggerAllMissed`][frequenz.channels.timer.TriggerAllMissed]:
+ {{docstring_summary("frequenz.channels.timer.TriggerAllMissed")}}
"""
from __future__ import annotations
@@ -38,9 +114,34 @@ def _to_microseconds(time: float | timedelta) -> int:
class MissedTickPolicy(abc.ABC):
"""A policy to handle timer missed ticks.
- This is only relevant if the timer is not ready to trigger when it should
- (an interval passed) which can happen if the event loop is busy processing
- other tasks.
+ To implement a custom policy you need to subclass
+ [`MissedTickPolicy`][frequenz.channels.timer.MissedTickPolicy] and implement the
+ [`calculate_next_tick_time`][frequenz.channels.timer.MissedTickPolicy.calculate_next_tick_time]
+ method.
+
+ Example:
+ This policy will just wait one more second than the original interval if a
+ tick is missed:
+
+ ```python
+ class WaitOneMoreSecond(MissedTickPolicy):
+ def calculate_next_tick_time(
+ self, *, interval: int, scheduled_tick_time: int, now: int
+ ) -> int:
+ return scheduled_tick_time + interval + 1_000_000
+
+
+ async def main() -> None:
+ timer = Timer(
+ interval=timedelta(seconds=1),
+ missed_tick_policy=WaitOneMoreSecond(),
+ )
+
+ async for drift in timer:
+ print(f"The timer has triggered with a drift of {drift}")
+
+ asyncio.run(main())
+ ```
"""
@abc.abstractmethod
@@ -76,16 +177,36 @@ def __repr__(self) -> str:
class TriggerAllMissed(MissedTickPolicy):
"""A policy that triggers all the missed ticks immediately until it catches up.
+ The [`TriggerAllMissed`][frequenz.channels.timer.TriggerAllMissed] policy will
+ trigger all missed ticks immediately until it catches up with the current time.
+ This means that if the timer is delayed for any reason, when it finally gets some
+ time to run, it will trigger all the missed ticks in a burst. The drift is not
+ accumulated and the next tick will be scheduled according to the original start
+ time.
+
Example:
- Assume a timer with interval 1 second, the tick `T0` happens exactly
- at time 0, the second tick, `T1`, happens at time 1.2 (0.2 seconds
- late), so it triggers immediately. The third tick, `T2`, happens at
- time 2.3 (0.3 seconds late), so it also triggers immediately. The
- fourth tick, `T3`, happens at time 4.3 (1.3 seconds late), so it also
- triggers immediately as well as the fifth tick, `T4`, which was also
- already delayed (by 0.3 seconds), so it catches up. The sixth tick,
- `T5`, happens at 5.1 (0.1 seconds late), so it triggers immediately
- again. The seventh tick, `T6`, happens at 6.0, right on time.
+ This example represents a timer with interval 1 second.
+
+ 1. The first tick, `T0` happens exactly at time 0.
+
+ 2. The second tick, `T1`, happens at time 1.2 (0.2 seconds late), so it triggers
+ immediately. But it re-syncs, so the next tick is still expected at
+ 2 seconds. This re-sync happens on every tick, so all ticks are expected at
+ multiples of 1 second, not matter how delayed they were.
+
+ 3. The third tick, `T2`, happens at time 2.3 (0.3 seconds late), so it also
+ triggers immediately.
+
+ 4. The fourth tick, `T3`, happens at time 4.3 (1.3 seconds late), so it also
+ triggers immediately.
+
+ 5. The fifth tick, `T4`, which was also already delayed (by 0.3 seconds),
+ triggers immediately too, resulting in a small *catch-up* burst.
+
+ 6. The sixth tick, `T5`, happens at 5.1 (0.1 seconds late), so it triggers
+ immediately again.
+
+ 7. The seventh tick, `T6`, happens at 6.0, right on time.
```bob
@@ -122,21 +243,34 @@ def calculate_next_tick_time(
class SkipMissedAndResync(MissedTickPolicy):
"""A policy that drops all the missed ticks, triggers immediately and resyncs.
- If ticks are missed, the timer will trigger immediately returning the drift
- and it will schedule to trigger again on the next multiple of `interval`,
- effectively skipping any missed ticks, but resyncing with the original start
- time.
+ If ticks are missed, the
+ [`SkipMissedAndResync`][frequenz.channels.timer.SkipMissedAndResync] policy will
+ make the [`Timer`][frequenz.channels.timer.Timer] trigger immediately and it will
+ schedule to trigger again on the next multiple of the
+ [interval][frequenz.channels.timer.Timer.interval], effectively skipping any missed
+ ticks, but re-syncing with the original start time.
Example:
- Assume a timer with interval 1 second, the tick `T0` happens exactly
- at time 0, the second tick, `T1`, happens at time 1.2 (0.2 seconds
- late), so it triggers immediately. The third tick, `T2`, happens at
- time 2.3 (0.3 seconds late), so it also triggers immediately. The
- fourth tick, `T3`, happens at time 4.3 (1.3 seconds late), so it also
- triggers immediately but the fifth tick, `T4`, which was also
- already delayed (by 0.3 seconds) is skipped. The sixth tick,
- `T5`, happens at 5.1 (0.1 seconds late), so it triggers immediately
- again. The seventh tick, `T6`, happens at 6.0, right on time.
+ This example represents a timer with interval 1 second.
+
+ 1. The first tick `T0` happens exactly at time 0.
+
+ 2. The second tick, `T1`, happens at time 1.2 (0.2 seconds late), so it triggers
+ immediately. But it re-syncs, so the next tick is still expected at
+ 2 seconds. This re-sync happens on every tick, so all ticks are expected at
+ multiples of 1 second, not matter how delayed they were.
+
+ 3. The third tick, `T2`, happens at time 2.3 (0.3 seconds late), so it also
+ triggers immediately.
+
+ 4. The fourth tick, `T3`, happens at time 4.3 (1.3 seconds late), so it also
+ triggers immediately, but there was also a tick expected at 4 seconds, `T4`,
+ which is skipped according to this policy to avoid bursts of ticks.
+
+ 6. The sixth tick, `T5`, happens at 5.1 (0.1 seconds late), so it triggers
+ immediately again.
+
+ 7. The seventh tick, `T6`, happens at 6.0, right on time.
```bob
@@ -176,26 +310,36 @@ def calculate_next_tick_time(
class SkipMissedAndDrift(MissedTickPolicy):
"""A policy that drops all the missed ticks, triggers immediately and resets.
- This will behave effectively as if the timer was `reset()` at the time it
- had triggered last, so the start time will change (and the drift will be
- accumulated each time a tick is delayed, but only the relative drift will
- be returned on each tick).
+ The [`SkipMissedAndDrift`][frequenz.channels.timer.SkipMissedAndDrift] policy will
+ behave effectively as if the timer was
+ [reset][frequenz.channels.timer.Timer.reset] every time it is triggered. This means
+ the start time will change and the drift will be accumulated each time a tick is
+ delayed. Only the relative drift will be returned on each tick.
- The reset happens only if the delay is larger than `delay_tolerance`, so
- it is possible to ignore small delays and not drift in those cases.
+ The reset happens only if the delay is larger than the
+ [delay tolerance][frequenz.channels.timer.SkipMissedAndDrift.delay_tolerance], so it
+ is possible to ignore small delays and not drift in those cases.
Example:
- Assume a timer with interval 1 second and `delay_tolerance=0.1`, the
- first tick, `T0`, happens exactly at time 0, the second tick, `T1`,
- happens at time 1.2 (0.2 seconds late), so the timer triggers
- immediately but drifts a bit. The next tick, `T2.2`, happens at 2.3 seconds
- (0.1 seconds late), so it also triggers immediately but it doesn't
- drift because the delay is under the `delay_tolerance`. The next tick,
- `T3.2`, triggers at 4.3 seconds (1.1 seconds late), so it also triggers
- immediately but the timer drifts by 1.1 seconds and the tick `T4.2` is
- skipped (not triggered). The next tick, `T5.3`, triggers at 5.3 seconds
- so is right on time (no drift) and the same happens for tick `T6.3`,
- which triggers at 6.3 seconds.
+ This example represents a timer with interval 1 second and delay tolerance of
+ 0.1 seconds.
+
+ 1. The first tick, `T0`, happens exactly at time 0.
+
+ 2. The second tick, `T1.2`, happens at time 1.2 (0.2 seconds late), so the timer
+ triggers immediately but drifts a bit (0.2 seconds), so the next tick is
+ expected at 2.2 seconds.
+
+ 3. The third tick, `T2.2`, happens at 2.3 seconds (0.1 seconds late), so it also
+ triggers immediately but **it doesn't drift** because the delay is **under
+ the `delay_tolerance`**. The next tick is expected at 3.2 seconds.
+
+ 4. The fourth tick, `T4.2`, triggers at 4.3 seconds (1.1 seconds late), so it
+ also triggers immediately but the timer has drifted by 1.1 seconds, so a
+ potential tick `T3.2` is skipped (not triggered).
+
+ 5. The fifth tick, `T5.3`, triggers at 5.3 seconds so it is right on time (no
+ drift) and the same happens for tick `T6.3`, which triggers at 6.3 seconds.
```bob
@@ -282,116 +426,60 @@ def __repr__(self) -> str:
class Timer(Receiver[timedelta]):
- """A timer receiver that triggers every `interval` time.
+ """A receiver that sends a message regularly.
+
+ [`Timer`][frequenz.channels.timer.Timer]s are started by default after they are
+ created. This can be disabled by using `auto_start=False` option when creating
+ them. In this case, the timer will not be started until
+ [`reset()`][frequenz.channels.timer.Timer.reset] is called. Receiving from the timer
+ (either using [`receive()`][frequenz.channels.timer.Timer.receive] or using the
+ async iterator interface) will also start the timer at that point.
+
+ Timers need to be created in a context where
+ a [`asyncio`][] loop is already running. If no
+ [`loop`][frequenz.channels.timer.Timer.loop] is specified, the current running loop
+ is used.
+
+ Timers can be stopped by calling [`stop()`][frequenz.channels.timer.Timer.stop].
+ A stopped timer will raise
+ a [`ReceiverStoppedError`][frequenz.channels.ReceiverStoppedError] or stop the async
+ iteration as usual.
- The timer has microseconds resolution, so the
- [`interval`][frequenz.channels.timer.Timer.interval] must be at least
- 1 microsecond.
+ Once a timer is explicitly stopped, it can only be started again by explicitly
+ calling [`reset()`][frequenz.channels.timer.Timer.reset] (trying to receive from it
+ or using the async iterator interface will keep failing).
- The message it produces is a [`timedelta`][datetime.timedelta] containing the drift
- of the timer, i.e. the difference between when the timer should have triggered and
- the time when it actually triggered.
+ Timer messages are [`timedelta`][datetime.timedelta]s containing the drift of the
+ timer, i.e. the difference between when the timer should have triggered and the time
+ when it actually triggered.
This drift will likely never be `0`, because if there is a task that is
running when it should trigger, the timer will be delayed. In this case the
drift will be positive. A negative drift should be technically impossible,
- as the timer uses [`asyncio`][asyncio]s loop monotonic clock.
+ as the timer uses [`asyncio`][]s loop monotonic clock.
+
+ Warning:
+ Even when the [`asyncio`][] loop's monotonic clock is a [`float`][], timers use
+ `int`s to represent time internally. The internal time is tracked in
+ microseconds, so the timer resolution is 1 microsecond
+ ([`interval`][frequenz.channels.timer.Timer.interval] must be at least
+ 1 microsecond).
+
+ This is to avoid floating point errors when performing calculations with time,
+ which can lead to issues that are very hard to reproduce and debug.
If the timer is delayed too much, then it will behave according to the
[`missed_tick_policy`][frequenz.channels.timer.Timer.missed_tick_policy]. Missing
ticks might or might not trigger a message and the drift could be accumulated or not
depending on the chosen policy.
- These are the currently built-in available policies:
-
- * [`SkipMissedAndDrift`][frequenz.channels.timer.SkipMissedAndDrift]
- * [`SkipMissedAndResync`][frequenz.channels.timer.SkipMissedAndResync]
- * [`TriggerAllMissed`][frequenz.channels.timer.TriggerAllMissed]
-
For the most common cases, a specialized constructor is provided:
- * [`periodic()`][frequenz.channels.timer.Timer.periodic] (uses the
- [`TriggerAllMissed`][frequenz.channels.timer.TriggerAllMissed] or
- [`SkipMissedAndResync`][frequenz.channels.timer.SkipMissedAndResync] policy)
- * [`timeout()`][frequenz.channels.timer.Timer.timeout] (uses the
- [`SkipMissedAndDrift`][frequenz.channels.timer.SkipMissedAndDrift] policy)
-
- The timer accepts an optional [`loop`][frequenz.channels.timer.Timer.loop], which
- will be used to track the time. If `loop` is `None`, then the running loop will be
- used (if there is no running loop most calls will raise
- a [`RuntimeError`][RuntimeError]).
-
- Starting the timer can be delayed if necessary by using `auto_start=False`
- (for example until we have a running loop). A call to
- [`reset()`][frequenz.channels.timer.Timer.reset],
- [`ready()`][frequenz.channels.timer.Timer.ready],
- [`receive()`][frequenz.channels.timer.Timer.receive] or the async iterator interface
- to await for a new message will start the timer.
-
- Example: Periodic timer example
- ```python
- async for drift in Timer.periodic(timedelta(seconds=1.0)):
- print(f"The timer has triggered {drift=}")
- ```
-
- But you can also use a [`select`][frequenz.channels.select] to combine
- it with other receivers, and even start it (semi) manually:
-
- ```python
- from frequenz.channels import Broadcast, select, selected_from
-
- timer = Timer.timeout(timedelta(seconds=1.0), auto_start=False)
- chan = Broadcast[int](name="input-chan")
- battery_data = chan.new_receiver()
-
- timer = Timer.timeout(timedelta(seconds=1.0), auto_start=False)
- # Do some other initialization, the timer will start automatically if
- # a message is awaited (or manually via `reset()`).
- async for selected in select(battery_data, timer):
- if selected_from(selected, battery_data):
- if selected.was_closed():
- print("battery channel closed")
- continue
- battery_soc = selected.value
- elif selected_from(selected, timer):
- # Print some regular battery data
- print(f"Battery is charged at {battery_soc}%")
- ```
-
- Example: Timeout example
- ```python
- from frequenz.channels import Broadcast, select, selected_from
-
- def process_data(data: int):
- print(f"Processing data: {data}")
-
- def do_heavy_processing(data: int):
- print(f"Heavy processing data: {data}")
-
- timer = Timer.timeout(timedelta(seconds=1.0), auto_start=False)
- chan1 = Broadcast[int](name="input-chan-1")
- chan2 = Broadcast[int](name="input-chan-2")
- battery_data = chan1.new_receiver()
- heavy_process = chan2.new_receiver()
- async for selected in select(battery_data, heavy_process, timer):
- if selected_from(selected, battery_data):
- if selected.was_closed():
- print("battery channel closed")
- continue
- process_data(selected.value)
- timer.reset()
- elif selected_from(selected, heavy_process):
- if selected.was_closed():
- print("processing channel closed")
- continue
- do_heavy_processing(selected.value)
- elif selected_from(selected, timer):
- print("No data received in time")
- ```
+ * [`periodic()`][frequenz.channels.timer.Timer.periodic]:
+ {{docstring_summary("frequenz.channels.timer.Timer.periodic")}}
- In this case `do_heavy_processing` might take 2 seconds, and we don't
- want our timeout timer to trigger for the missed ticks, and want the
- next tick to be relative to the time timer was last triggered.
+ * [`timeout()`][frequenz.channels.timer.Timer.timeout]:
+ {{docstring_summary("frequenz.channels.timer.Timer.timeout")}}
"""
def __init__( # pylint: disable=too-many-arguments
@@ -506,10 +594,43 @@ def timeout( # noqa: DOC502
) -> Timer:
"""Create a timer useful for tracking timeouts.
- This is basically a shortcut to create a timer with
- `SkipMissedAndDrift(delay_tolerance=timedelta(0))` as the missed tick policy.
+ A [timeout][frequenz.channels.timer.Timer.timeout] is
+ a [`Timer`][frequenz.channels.timer.Timer] that
+ [resets][frequenz.channels.timer.Timer.reset] automatically after it triggers,
+ so it will trigger again after the selected interval, no matter what the current
+ drift was. This means timeout timers will accumulate drift.
- See the class documentation for details.
+ Tip:
+ Timeouts are a shortcut to create
+ a [`Timer`][frequenz.channels.timer.Timer] with the
+ [`SkipMissedAndDrift`][frequenz.channels.timer.SkipMissedAndDrift] policy.
+
+ Example: Timeout example
+ ```python
+ import asyncio
+ from datetime import timedelta
+
+ from frequenz.channels import Anycast, select, selected_from
+ from frequenz.channels.timer import Timer
+
+
+ async def main() -> None:
+ channel = Anycast[int](name="data-channel")
+ data_receiver = channel.new_receiver()
+
+ timer = Timer.timeout(timedelta(seconds=1.0))
+
+ async for selected in select(data_receiver, timer):
+ if selected_from(selected, data_receiver):
+ print(f"Received data: {selected.value}")
+ elif selected_from(selected, timer):
+ drift = selected.value
+ print(f"No data received for {timer.interval + drift} seconds, giving up")
+ break
+
+
+ asyncio.run(main())
+ ```
Args:
delay: The time until the timer ticks. Must be at least
@@ -557,11 +678,40 @@ def periodic( # noqa: DOC502 pylint: disable=too-many-arguments
) -> Timer:
"""Create a periodic timer.
- This is basically a shortcut to create a timer with either
- `TriggerAllMissed()` or `SkipMissedAndResync()` as the missed tick policy
- (depending on `skip_missed_ticks`).
+ A [periodic timer][frequenz.channels.timer.Timer.periodic] is
+ a [`Timer`][frequenz.channels.timer.Timer] that tries as hard as possible to
+ trigger at regular intervals. This means that if the timer is delayed for any
+ reason, it will trigger immediately and then try to catch up with the original
+ schedule.
- See the class documentation for details.
+ Optionally, a periodic timer can be configured to skip missed ticks and re-sync
+ with the original schedule (`skip_missed_ticks` argument). This could be useful
+ if you want the timer is as periodic as possible but if there are big delays you
+ don't end up with big bursts.
+
+ Tip:
+ Periodic timers are a shortcut to create
+ a [`Timer`][frequenz.channels.timer.Timer] with either the
+ [`TriggerAllMissed`][frequenz.channels.timer.TriggerAllMissed] policy (when
+ `skip_missed_ticks` is `False`) or
+ [`SkipMissedAndResync`][frequenz.channels.timer.SkipMissedAndResync]
+ otherwise.
+
+ Example:
+ ```python
+ import asyncio
+ from datetime import datetime, timedelta
+
+ from frequenz.channels.timer import Timer
+
+
+ async def main() -> None:
+ async for drift in Timer.periodic(timedelta(seconds=1.0)):
+ print(f"The timer has triggered at {datetime.now()} with a drift of {drift}")
+
+
+ asyncio.run(main())
+ ```
Args:
period: The time between timer ticks. Must be at least