From 66cc6dd6f2b33c13d9bf13a3eb92d4b04e73eb73 Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Thu, 23 Nov 2023 15:56:02 +0100 Subject: [PATCH 01/18] Add note about other platforms in the README Signed-off-by: Leandro Lucarella --- README.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/README.md b/README.md index e3bd5f61..bf594a17 100644 --- a/README.md +++ b/README.md @@ -28,6 +28,11 @@ channels](https://tour.golang.org/concurrency/2) but it also borrows ideas from +!!! Note inline end + + Newer Python versions and other operating systems and architectures might + work too, but they are not automatically tested, so we cannot guarantee it. + The following platforms are officially supported (tested): - **Python:** 3.11 From 2a8494a5b6119e965b0f9ba9710deeb5dc1feb42 Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Thu, 23 Nov 2023 16:17:09 +0100 Subject: [PATCH 02/18] Add a quick start section to the README This section includes basic installation instructions and some quick examples so users can have a very broad overview of what the library is about. Signed-off-by: Leandro Lucarella --- README.md | 185 +++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 183 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index bf594a17..fc0086e5 100644 --- a/README.md +++ b/README.md @@ -43,6 +43,10 @@ The following platforms are officially supported (tested): ## Quick Start + + +### Installing + We assume you are on a system with Python available. If that is not the case, please [download and install Python](https://www.python.org/downloads/) first. @@ -61,12 +65,189 @@ Then, just install using `pip`: python3 -m pip install frequenz-channels ``` +### Examples + +!!! Example "Hello World" + ```python + import asyncio + + from frequenz.channels import Anycast + + + async def main() -> None: + hello_channel = Anycast[str](name="hello-world-channel") + sender = hello_channel.new_sender() + receiver = hello_channel.new_receiver() + + await sender.send("Hello World!") + msg = await receiver.receive() + print(msg) + + + asyncio.run(main()) + ``` + +!!! Example "Showcase" + This is a comprehensive example that shows most of the main features of the + library: + + ```python + import asyncio + from dataclasses import dataclass + from datetime import timedelta + from enum import Enum, auto + from typing import assert_never + + from frequenz.channels import ( + Anycast, + Broadcast, + Receiver, + Sender, + merge, + select, + selected_from, + ) + from frequenz.channels.timer import Timer + + + class Command(Enum): + PING = auto() + STOP_SENDER = auto() + + + class ReplyCommand(Enum): + PONG = auto() + + + @dataclass(frozen=True) + class Reply: + reply: ReplyCommand + source: str + + + async def send( + sender: Sender[str], + control_command: Receiver[Command], + control_reply: Sender[Reply], + ) -> None: + """Send a counter value every second, until a stop command is received.""" + print(f"{sender}: Starting") + timer = Timer.periodic(timedelta(seconds=1.0)) + counter = 0 + async for selected in select(timer, control_command): + if selected_from(selected, timer): + print(f"{sender}: Sending {counter}") + await sender.send(f"{sender}: {counter}") + counter += 1 + elif selected_from(selected, control_command): + print(f"{sender}: Received command: {selected.value.name}") + match selected.value: + case Command.STOP_SENDER: + print(f"{sender}: Stopping") + break + case Command.PING: + print(f"{sender}: Ping received, reply with pong") + await control_reply.send(Reply(ReplyCommand.PONG, str(sender))) + case _ as unknown: + assert_never(unknown) + print(f"{sender}: Finished") + + + async def receive( + receivers: list[Receiver[str]], + control_command: Receiver[Command], + control_reply: Sender[Reply], + ) -> None: + """Receive data from multiple channels, until no more data is received for 2 seconds.""" + print("receive: Starting") + timer = Timer.timeout(timedelta(seconds=2.0)) + print(f"{timer=}") + merged = merge(*receivers) + async for selected in select(merged, timer, control_command): + if selected_from(selected, merged): + message = selected.value + print(f"receive: Received {message=}") + timer.reset() + print(f"{timer=}") + elif selected_from(selected, control_command): + print(f"receive: received command: {selected.value.name}") + match selected.value: + case Command.PING: + print("receive: Ping received, reply with pong") + await control_reply.send(Reply(ReplyCommand.PONG, "receive")) + case Command.STOP_SENDER: + pass # Ignore + case _ as unknown: + assert_never(unknown) + elif selected_from(selected, timer): + drift = selected.value + print( + f"receive: No data received for {timer.interval + drift} seconds, " + "giving up" + ) + break + print("receive: Finished") + + + async def main() -> None: + data_channel_1 = Anycast[str](name="data-channel-1") + data_channel_2 = Anycast[str](name="data-channel-2") + command_channel = Broadcast[Command](name="control-channel") # (1)! + reply_channel = Anycast[Reply](name="reply-channel") + + async with asyncio.TaskGroup() as tasks: + tasks.create_task( + send( + data_channel_1.new_sender(), + command_channel.new_receiver(), + reply_channel.new_sender(), + ), + name="send-channel-1", + ) + tasks.create_task( + send( + data_channel_2.new_sender(), + command_channel.new_receiver(), + reply_channel.new_sender(), + ), + name="send-channel-2", + ) + tasks.create_task( + receive( + [data_channel_1.new_receiver(), data_channel_2.new_receiver()], + command_channel.new_receiver(), + reply_channel.new_sender(), + ), + name="receive", + ) + + control_sender = command_channel.new_sender() + reply_receiver = reply_channel.new_receiver() + + # Send a ping command to all tasks and wait for the replies + await control_sender.send(Command.PING) + print(f"main: {await reply_receiver.receive()}") + print(f"main: {await reply_receiver.receive()}") + print(f"main: {await reply_receiver.receive()}") + + await asyncio.sleep(5.0) + + # Stop senders, after 2 seconds not receiving any data, + # the receiver will stop too + await control_sender.send(Command.STOP_SENDER) + + + asyncio.run(main()) + ``` + + + ## Documentation -For more information, please visit the [documentation +For more information, please read the [documentation website](https://frequenz-floss.github.io/frequenz-channels-python/). ## Contributing If you want to know how to build this project and contribute to it, please -check out the [Contributing Guide](CONTRIBUTING.md). +check out the [Contributing Guide](docs/CONTRIBUTING.md). From 483d23d15d0c7212e98b506776d651c561846c19 Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Thu, 23 Nov 2023 16:23:15 +0100 Subject: [PATCH 03/18] Add a User Guide top level section with a Quick Start The quick start section is reused from the README. Signed-off-by: Leandro Lucarella --- docs/SUMMARY.md | 1 + docs/user-guide/SUMMARY.md | 1 + docs/user-guide/quick-start.md | 7 +++++++ 3 files changed, 9 insertions(+) create mode 100644 docs/user-guide/SUMMARY.md create mode 100644 docs/user-guide/quick-start.md diff --git a/docs/SUMMARY.md b/docs/SUMMARY.md index 3755def6..6d7d06eb 100644 --- a/docs/SUMMARY.md +++ b/docs/SUMMARY.md @@ -1,3 +1,4 @@ * [Home](index.md) +* [User Guide](user-guide/) * [API Reference](reference/) * [Contributing](CONTRIBUTING.md) diff --git a/docs/user-guide/SUMMARY.md b/docs/user-guide/SUMMARY.md new file mode 100644 index 00000000..485e8902 --- /dev/null +++ b/docs/user-guide/SUMMARY.md @@ -0,0 +1 @@ +* [Quick Start](quick-start.md) diff --git a/docs/user-guide/quick-start.md b/docs/user-guide/quick-start.md new file mode 100644 index 00000000..8b951952 --- /dev/null +++ b/docs/user-guide/quick-start.md @@ -0,0 +1,7 @@ +# Quick Start + +{% + include-markdown "../../README.md" + start="" + end="" +%} From 740566146a4449ec071b14a51f86a3d99faf798b Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Thu, 23 Nov 2023 16:25:17 +0100 Subject: [PATCH 04/18] Add the Quick Start also to the docs home page We want users to be able to quickly get a feeling of the library, so it is better to also have this quick start guide in the home of the documentation. Signed-off-by: Leandro Lucarella --- docs/index.md | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/docs/index.md b/docs/index.md index 17590073..a45408e1 100644 --- a/docs/index.md +++ b/docs/index.md @@ -16,6 +16,14 @@ end="" %} +## Quick Start + +{% + include-markdown "../README.md" + start="" + end="" +%} + ## Installation First, you need to make sure you have Python installed (at least version 3.11): From 2907ef5cd8a1e10c214866e2790690660a34fdb7 Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Thu, 23 Nov 2023 16:29:10 +0100 Subject: [PATCH 05/18] Move the installation instructions to the user guide Now the installation instruction have their own page in the user guide. We also add a tip linking to the full installation instructions in the README. Signed-off-by: Leandro Lucarella --- README.md | 16 ++------ docs/index.md | 64 ----------------------------- docs/user-guide/SUMMARY.md | 1 + docs/user-guide/installation.md | 71 +++++++++++++++++++++++++++++++++ 4 files changed, 76 insertions(+), 76 deletions(-) create mode 100644 docs/user-guide/installation.md diff --git a/README.md b/README.md index fc0086e5..680ed09a 100644 --- a/README.md +++ b/README.md @@ -47,19 +47,11 @@ The following platforms are officially supported (tested): ### Installing -We assume you are on a system with Python available. If that is not the case, -please [download and install Python](https://www.python.org/downloads/) first. +!!! Tip inline end + For more details please read the [Installation + Guide](docs/user-guide/installation.md). -To install Frequenz Channels, you probably want to create a new virtual -environment first. For example, if you use a `sh` compatible shell, you can do -this: - -```sh -python3 -m venv .venv -. .venv/bin/activate -``` - -Then, just install using `pip`: +Assuming a [supported](#supported-platforms) working Python environment: ```sh python3 -m pip install frequenz-channels diff --git a/docs/index.md b/docs/index.md index a45408e1..10bcbed6 100644 --- a/docs/index.md +++ b/docs/index.md @@ -23,67 +23,3 @@ start="" end="" %} - -## Installation - -First, you need to make sure you have Python installed (at least version 3.11): - -```console -$ python3 --version -Python 3.11.4 -``` - -!!! note - - These instructions assume you are using a [POSIX compatible - `sh`](https://pubs.opengroup.org/onlinepubs/9699919799/utilities/sh.html) - shell. - -If that command doesn't print a version newer than 3.11.0, you'll need to -[download and install Python](https://www.python.org/downloads/) first. - -To install Frequenz Channels, you probably want to create a new virtual -environment first: - -```sh -mkdir my-channels-project -cd my-channels-project -python3 -m venv .venv -. .venv/bin/activate -``` - -!!! tip - - Using [`direnv`](https://direnv.net/) can greatly simplify this process as - it automates the creation, activation, and deactivation of the virtual - environment. The first time you enable `direnv`, the virtual environment - will be created, and each time you enter or leave a subdirectory, it will be - activated and deactivated, respectively. - - ```sh - sudo apt install direnv # if you use Debian/Ubuntu - mkdir my-channels-project - cd my-channels-project - echo "layout python python3" > .envrc - direnv allow - ``` - - This will create the virtual environment and activate it automatically for you. - -Now you can install Frequenz Channels by using `pip` (if you don't have `pip` installed -you can follow [the official instructions](https://pip.pypa.io/en/stable/installation/)): - -```sh -python3 -m pip install frequenz-channels -``` - -To verify that the installation worked, you can invoke the Python interpreter and -import the `frequenz.channels` module: - -```console -$ python3 -Python 3.11.4 (main, Jun 7 2023, 10:13:09) [GCC 12.2.0] on linux -Type "help", "copyright", "credits" or "license" for more information. ->>> import frequenz.channels ->>> -``` diff --git a/docs/user-guide/SUMMARY.md b/docs/user-guide/SUMMARY.md index 485e8902..b14c9bd8 100644 --- a/docs/user-guide/SUMMARY.md +++ b/docs/user-guide/SUMMARY.md @@ -1 +1,2 @@ * [Quick Start](quick-start.md) +* [Installation](installation.md) diff --git a/docs/user-guide/installation.md b/docs/user-guide/installation.md new file mode 100644 index 00000000..dced442e --- /dev/null +++ b/docs/user-guide/installation.md @@ -0,0 +1,71 @@ +# Installation + +{% + include-markdown "../../README.md" + start="" + end="" +%} + +## Installation + +First, you need to make sure you have Python installed (at least version 3.11): + +!!! Note inline end + + These instructions assume you are using a [POSIX compatible + `sh`](https://pubs.opengroup.org/onlinepubs/9699919799/utilities/sh.html) + shell. + +```console +$ python3 --version +Python 3.11.4 +``` + +If that command doesn't print a version newer than 3.11.0, you'll need to +[download and install Python](https://www.python.org/downloads/) first. + +To install Frequenz Channels, you probably want to create a new virtual +environment first: + +```sh +mkdir my-channels-project +cd my-channels-project +python3 -m venv .venv +. .venv/bin/activate +``` + +!!! Tip + + Using [`direnv`](https://direnv.net/) can greatly simplify this process as + it automates the creation, activation, and deactivation of the virtual + environment. The first time you enable `direnv`, the virtual environment + will be created, and each time you enter or leave a subdirectory, it will be + activated and deactivated, respectively. + + ```sh + sudo apt install direnv # if you use Debian/Ubuntu + mkdir my-channels-project + cd my-channels-project + echo "layout python python3" > .envrc + direnv allow + ``` + + This will create the virtual environment and activate it automatically for you. + +Now you can install Frequenz Channels by using `pip` (if you don't have `pip` installed +you can follow [the official instructions](https://pip.pypa.io/en/stable/installation/)): + +```sh +python3 -m pip install frequenz-channels +``` + +To verify that the installation worked, you can invoke the Python interpreter and +import the `frequenz.channels` module: + +```console +$ python3 +Python 3.11.4 (main, Jun 7 2023, 10:13:09) [GCC 12.2.0] on linux +Type "help", "copyright", "credits" or "license" for more information. +>>> import frequenz.channels +>>> +``` From a2b446299336b2732f29f2528ea9b2aec5725c07 Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Thu, 23 Nov 2023 16:33:37 +0100 Subject: [PATCH 06/18] Add the user guide structure These are mainly pages including docstrings from the source code. There are only a few pages with content of their own, and they are usually just glue to put some related docstrings together. There is one notable exception, the general channel concepts, that don't have a better home. The content of the docstring will be improved later to make the user guide more cohesive. Signed-off-by: Leandro Lucarella --- docs/user-guide/SUMMARY.md | 5 + docs/user-guide/channels/anycast.md | 10 ++ docs/user-guide/channels/broadcast.md | 10 ++ docs/user-guide/channels/index.md | 41 +++++++ docs/user-guide/error-handling.md | 10 ++ docs/user-guide/receiving/index.md | 10 ++ .../receiving/multiple-sources/index.md | 25 ++++ .../receiving/multiple-sources/merge.md | 10 ++ .../receiving/multiple-sources/select.md | 11 ++ docs/user-guide/sending.md | 10 ++ docs/user-guide/utilities/events.md | 10 ++ docs/user-guide/utilities/file-watchers.md | 10 ++ docs/user-guide/utilities/timers.md | 116 ++++++++++++++++++ 13 files changed, 278 insertions(+) create mode 100644 docs/user-guide/channels/anycast.md create mode 100644 docs/user-guide/channels/broadcast.md create mode 100644 docs/user-guide/channels/index.md create mode 100644 docs/user-guide/error-handling.md create mode 100644 docs/user-guide/receiving/index.md create mode 100644 docs/user-guide/receiving/multiple-sources/index.md create mode 100644 docs/user-guide/receiving/multiple-sources/merge.md create mode 100644 docs/user-guide/receiving/multiple-sources/select.md create mode 100644 docs/user-guide/sending.md create mode 100644 docs/user-guide/utilities/events.md create mode 100644 docs/user-guide/utilities/file-watchers.md create mode 100644 docs/user-guide/utilities/timers.md diff --git a/docs/user-guide/SUMMARY.md b/docs/user-guide/SUMMARY.md index b14c9bd8..eada3abf 100644 --- a/docs/user-guide/SUMMARY.md +++ b/docs/user-guide/SUMMARY.md @@ -1,2 +1,7 @@ * [Quick Start](quick-start.md) * [Installation](installation.md) +* [Channels](channels/) +* [Sending](sending.md) +* [Receiving](receiving/) +* [Error Handling](error-handling.md) +* [Utilities](utilities/) diff --git a/docs/user-guide/channels/anycast.md b/docs/user-guide/channels/anycast.md new file mode 100644 index 00000000..eca01889 --- /dev/null +++ b/docs/user-guide/channels/anycast.md @@ -0,0 +1,10 @@ +# Anycast + +::: frequenz.channels._anycast.Anycast + options: + inherited_members: [] + members: [] + show_bases: false + show_root_heading: false + show_root_toc_entry: false + show_source: false diff --git a/docs/user-guide/channels/broadcast.md b/docs/user-guide/channels/broadcast.md new file mode 100644 index 00000000..f759bce3 --- /dev/null +++ b/docs/user-guide/channels/broadcast.md @@ -0,0 +1,10 @@ +# Broadcast + +::: frequenz.channels._broadcast.Broadcast + options: + inherited_members: [] + members: [] + show_bases: false + show_root_heading: false + show_root_toc_entry: false + show_source: false diff --git a/docs/user-guide/channels/index.md b/docs/user-guide/channels/index.md new file mode 100644 index 00000000..c6f2df64 --- /dev/null +++ b/docs/user-guide/channels/index.md @@ -0,0 +1,41 @@ +# Channels + +A channel is a communication mechanism that allows data (messages) to be +transmitted between different coroutines. It consists of +[senders](../sending.md), which send messages, and +[receivers](../receiving/index.md), which receive those messages. The channel itself +acts as a conduit for these messages. + +Conceptually, a channel looks like this: + +
+```bob +.---------. Message .----------. Message .-----------. +| Sender +------------>| Channel +------------>| Receiver | +'---------' '----------' '-----------' +``` +
+ +Besides this simple model, there are many variations of channels depending on +various factors: + +* How many senders and receivers can a channel have? + +* Do all receivers receive all messages from all senders? + +* How many messages can a channel hold (buffered), or can it hold any messages + at all (unbuffered)? + +* What happens if a sender tries to send a message to a full channel? + + * Does the send operation block until the channel has space again? + * Does it fail? + * Does it silently drop the message? + +Because these questions can have many answers, there are different types of +channels. Frequenz Channels offers a few of them: + +* [Anycast](anycast.md) +* [Broadcast](broadcast.md) + +More might be added in the future, and you can also create your own. diff --git a/docs/user-guide/error-handling.md b/docs/user-guide/error-handling.md new file mode 100644 index 00000000..fc48a0ca --- /dev/null +++ b/docs/user-guide/error-handling.md @@ -0,0 +1,10 @@ +# Error Handling + +::: frequenz.channels._exceptions + options: + inherited_members: [] + members: [] + show_bases: false + show_root_heading: false + show_root_toc_entry: false + show_source: false diff --git a/docs/user-guide/receiving/index.md b/docs/user-guide/receiving/index.md new file mode 100644 index 00000000..9dc06a95 --- /dev/null +++ b/docs/user-guide/receiving/index.md @@ -0,0 +1,10 @@ +# Receiving + +::: frequenz.channels._receiver + options: + inherited_members: [] + members: [] + show_bases: false + show_root_heading: false + show_root_toc_entry: false + show_source: false diff --git a/docs/user-guide/receiving/multiple-sources/index.md b/docs/user-guide/receiving/multiple-sources/index.md new file mode 100644 index 00000000..88b05680 --- /dev/null +++ b/docs/user-guide/receiving/multiple-sources/index.md @@ -0,0 +1,25 @@ +# Multiple Sources + +If you need to receive values from multiple sources it can be complicated, as +you probably want to get the first message of each receiver as soon as it is +available. A naive approach like this will not work: + +```python +receiver1: Receiver[int] = channel1.new_receiver() +receiver2: Receiver[int] = channel2.new_receiver() + +msg = await receiver1.receive() +print(f"Received from channel1: {msg}") + +msg = await receiver2.receive() +print(f"Received from channel2: {msg}") +``` + +The problem is that if the first message is not available in `channel1` but in +`channel2`, the program will be blocked until a message is available in +`channel1`, but you probably want to receive the first message from `channel2` +as soon as it is available. + +Frequenz Channels provides two tools to solve this issue: +[`merge()`][frequenz.channels.merge] and +[`select()`][frequenz.channels.select]. diff --git a/docs/user-guide/receiving/multiple-sources/merge.md b/docs/user-guide/receiving/multiple-sources/merge.md new file mode 100644 index 00000000..42fc65a4 --- /dev/null +++ b/docs/user-guide/receiving/multiple-sources/merge.md @@ -0,0 +1,10 @@ +# Merging + +::: frequenz.channels._merge + options: + inherited_members: [] + members: [] + show_bases: false + show_root_heading: false + show_root_toc_entry: false + show_source: false diff --git a/docs/user-guide/receiving/multiple-sources/select.md b/docs/user-guide/receiving/multiple-sources/select.md new file mode 100644 index 00000000..514ba723 --- /dev/null +++ b/docs/user-guide/receiving/multiple-sources/select.md @@ -0,0 +1,11 @@ +# Selecting + +::: frequenz.channels._select + options: + inherited_members: [] + members: [] + show_bases: false + show_root_heading: false + show_root_toc_entry: false + show_source: false + diff --git a/docs/user-guide/sending.md b/docs/user-guide/sending.md new file mode 100644 index 00000000..38afae2f --- /dev/null +++ b/docs/user-guide/sending.md @@ -0,0 +1,10 @@ +# Sending + +::: frequenz.channels._sender + options: + inherited_members: [] + members: [] + show_bases: false + show_root_heading: false + show_root_toc_entry: false + show_source: false diff --git a/docs/user-guide/utilities/events.md b/docs/user-guide/utilities/events.md new file mode 100644 index 00000000..6d4594bc --- /dev/null +++ b/docs/user-guide/utilities/events.md @@ -0,0 +1,10 @@ +# Events + +::: frequenz.channels.event.Event + options: + inherited_members: [] + members: [] + show_bases: false + show_root_heading: false + show_root_toc_entry: false + show_source: false diff --git a/docs/user-guide/utilities/file-watchers.md b/docs/user-guide/utilities/file-watchers.md new file mode 100644 index 00000000..c9853a57 --- /dev/null +++ b/docs/user-guide/utilities/file-watchers.md @@ -0,0 +1,10 @@ +# File Watchers + +::: frequenz.channels.file_watcher.FileWatcher + options: + inherited_members: [] + members: [] + show_bases: false + show_root_heading: false + show_root_toc_entry: false + show_source: false diff --git a/docs/user-guide/utilities/timers.md b/docs/user-guide/utilities/timers.md new file mode 100644 index 00000000..5a82364f --- /dev/null +++ b/docs/user-guide/utilities/timers.md @@ -0,0 +1,116 @@ +# Timers + +::: frequenz.channels.timer + options: + inherited_members: [] + members: [] + show_bases: false + show_root_heading: false + show_root_toc_entry: false + show_source: false + +### Policies + +#### Skip Missed And Drift + +::: frequenz.channels.timer.SkipMissedAndDrift + options: + inherited_members: [] + members: [] + show_bases: false + show_root_heading: false + show_root_toc_entry: false + show_source: false + +#### Skip Missed And Re-Sync + +::: frequenz.channels.timer.SkipMissedAndResync + options: + inherited_members: [] + members: [] + show_bases: false + show_root_heading: false + show_root_toc_entry: false + show_source: false + +#### Trigger All Missed + +::: frequenz.channels.timer.TriggerAllMissed + options: + inherited_members: [] + members: [] + show_bases: false + show_root_heading: false + show_root_toc_entry: false + show_source: false + +## High-level Interface + +::: frequenz.channels.timer.Timer + options: + inherited_members: [] + members: [] + show_bases: false + show_root_heading: false + show_root_toc_entry: false + show_source: false + +### Periodic Timers + +::: frequenz.channels.timer.Timer.periodic + options: + inherited_members: [] + members: [] + show_bases: false + show_root_heading: false + show_root_toc_entry: false + show_source: false + show_docstring_attributes: false + show_docstring_functions: false + show_docstring_classes: false + show_docstring_other_parameters: false + show_docstring_parameters: false + show_docstring_raises: false + show_docstring_receives: false + show_docstring_returns: false + show_docstring_warns: false + show_docstring_yields: false + +### Timeouts + +::: frequenz.channels.timer.Timer.timeout + options: + inherited_members: [] + members: [] + show_bases: false + show_root_heading: false + show_root_toc_entry: false + show_source: false + show_docstring_attributes: false + show_docstring_functions: false + show_docstring_classes: false + show_docstring_other_parameters: false + show_docstring_parameters: false + show_docstring_raises: false + show_docstring_receives: false + show_docstring_returns: false + show_docstring_warns: false + show_docstring_yields: false + +## Low-level Interface + +A [`Timer`][frequenz.channels.timer.Timer] can be created using an arbitrary missed +ticks policy by calling the [low-level +constructor][frequenz.channels.timer.Timer.__init__] and passing the policy via the +[`missed_tick_policy`][frequenz.channels.timer.Timer.missed_tick_policy] argument. + +### Custom Missed Tick Policies + +::: frequenz.channels.timer.MissedTickPolicy + options: + inherited_members: [] + members: [] + show_bases: false + show_root_heading: false + show_root_toc_entry: false + show_source: false From fbe7ab8665158bb2178abf9b821026725f6d4bfc Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Thu, 23 Nov 2023 16:33:51 +0100 Subject: [PATCH 07/18] Improve `Anycast` class documentation Add a "Characteristics" summary with the most important properties of this channel type, a diagram to show how messages are routed from multiple senders to multiple receivers, explain a bit more about what happens when buffers are full and improve and add a new example. The formatting is done having in mind this documentation is now included in the User Guide. Signed-off-by: Leandro Lucarella --- src/frequenz/channels/_anycast.py | 182 +++++++++++++++++++++++++----- 1 file changed, 154 insertions(+), 28 deletions(-) diff --git a/src/frequenz/channels/_anycast.py b/src/frequenz/channels/_anycast.py index 2a5ac56f..453caf84 100644 --- a/src/frequenz/channels/_anycast.py +++ b/src/frequenz/channels/_anycast.py @@ -20,54 +20,180 @@ class Anycast(Generic[_T]): - """A channel for sending data across async tasks. + """A channel that delivers each message to exactly one receiver. - Anycast channels support multiple senders and multiple receivers. A message sent - through a sender will be received by exactly one receiver. + # Description + + !!! Tip inline end + + [Anycast][frequenz.channels.Anycast] channels behave like the + [Golang](https://golang.org/) [channels](https://go.dev/ref/spec#Channel_types). + + [Anycast][frequenz.channels.Anycast] channels support multiple + [senders][frequenz.channels.Sender] and multiple + [receivers][frequenz.channels.Receiver]. Each message sent through any of the + senders will be received by exactly one receiver (but **any** receiver). + +
+ ```bob + .---------. msg1 msg1 .-----------. + | Sender +------. .------>| Receiver | + '---------' | .----------. | '-----------' + +----->| Channel +-----+ + .---------. | '----------' | .-----------. + | Sender +------' '------>| Receiver | + '---------' msg2 msg2 '-----------' + ``` +
+ + !!! Note inline end "Characteristics" + + * **Buffered:** Yes, with a global channel buffer + * **Buffer full policy:** Block senders + * **Multiple receivers:** Yes + * **Multiple senders:** Yes + * **Thread-safe:** No This channel is buffered, and if the senders are faster than the receivers, then the channel's buffer will fill up. In that case, the senders will block at the - [send()][frequenz.channels.Sender.send] method until the receivers consume the + [`send()`][frequenz.channels.Sender.send] method until the receivers consume the messages in the channel's buffer. The channel's buffer size can be configured at creation time via the `limit` argument. + The first receiver that is awaited will get the next message. When multiple + receivers are waiting, the [asyncio][] loop scheduler picks a receiver for each next + massage. + + This means that, in practice, there might be only one receiver receiving all the + messages, depending on how tasks are schduled. + + If you need to ensure some delivery policy (like round-robin or uniformly random), + then you will have to implement it yourself. + + To create a new [senders][frequenz.channels.Sender] and + [receivers][frequenz.channels.Receiver] you can use the + [`new_sender()`][frequenz.channels.Broadcast.new_sender] and + [`new_receiver()`][frequenz.channels.Broadcast.new_receiver] methods + respectively. + + When the channel is not needed anymore, it should be closed with the + [`close()`][frequenz.channels.Anycast.close] method. This will prevent further + attempts to [`send()`][frequenz.channels.Sender.send] data. Receivers will still be + able to drain the pending values on the channel, but after that, subsequent + [`receive()`][frequenz.channels.Receiver.receive] calls will raise a + [`ReceiverStoppedError`][frequenz.channels.ReceiverStoppedError] exception. + + This channel is useful, for example, to distribute work across multiple workers. + In cases where each message need to be received by every receiver, a - [Broadcast][frequenz.channels.Broadcast] channel may be used. + [broadcast][frequenz.channels.Broadcast] channel may be used. + + # Examples + + Example: Send a few numbers to a receiver + This is a very simple example that sends a few numbers from a single sender to + a single receiver. + + ```python + import asyncio + + from frequenz.channels import Anycast, Sender + - Uses an [deque][collections.deque] internally, so Anycast channels are not - thread-safe. + async def send(sender: Sender[int]) -> None: + for msg in range(3): + print(f"sending {msg}") + await sender.send(msg) - When there are multiple channel receivers, they can be awaited - simultaneously using [select][frequenz.channels.select] or - [merge][frequenz.channels.merge]. - Example: - ``` python - async def send(sender: channel.Sender) -> None: - while True: - next = random.randint(3, 17) - print(f"sending: {next}") - await sender.send(next) + async def main() -> None: + channel = Anycast[int](name="numbers") + sender = channel.new_sender() + receiver = channel.new_receiver() - async def recv(id: int, receiver: channel.Receiver) -> None: - while True: - next = await receiver.receive() - print(f"receiver_{id} received {next}") - await asyncio.sleep(0.1) # sleep (or work) with the data + async with asyncio.TaskGroup() as task_group: + task_group.create_task(send(sender)) + for _ in range(3): + msg = await receiver.receive() + print(f"received {msg}") + await asyncio.sleep(0.1) # sleep (or work) with the data - acast = channel.Anycast() + asyncio.run(main()) + ``` + + The output should look something like (although the sending and received might + appear more interleaved): + + ``` + sending 0 + sending 1 + sending 2 + received 0 + received 1 + received 2 + ``` + + Example: Send a few number from multiple senders to multiple receivers + This is a more complex example that sends a few numbers from multiple senders to + multiple receivers, using a small buffer to force the senders to block. + + ```python + import asyncio + + from frequenz.channels import Anycast, Receiver, ReceiverStoppedError, Sender + - sender = acast.new_sender() - receiver_1 = acast.new_receiver() + async def send(name: str, sender: Sender[int], start: int, stop: int) -> None: + for msg in range(start, stop): + print(f"{name} sending {msg}") + await sender.send(msg) - asyncio.create_task(send(sender)) - await recv(1, receiver_1) + async def recv(name: str, receiver: Receiver[int]) -> None: + try: + async for msg in receiver: + print(f"{name} received {msg}") + await asyncio.sleep(0.1) # sleep (or work) with the data + except ReceiverStoppedError: + pass + + + async def main() -> None: + acast = Anycast[int](name="numbers", limit=2) + + async with asyncio.TaskGroup() as task_group: + task_group.create_task(send("sender_1", acast.new_sender(), 10, 13)) + task_group.create_task(send("sender_2", acast.new_sender(), 20, 22)) + task_group.create_task(recv("receiver_1", acast.new_receiver())) + task_group.create_task(recv("receiver_2", acast.new_receiver())) + + + asyncio.run(main()) ``` - Check the `tests` and `benchmarks` directories for more examples. + The output should look something like this(although the sending and received + might appear interleaved in a different way): + + ``` + sender_1 sending 10 + sender_1 sending 11 + sender_1 sending 12 + Anycast channel [Anycast:numbers:_Sender] is full, blocking sender until a receiver + consumes a value + sender_2 sending 20 + Anycast channel [Anycast:numbers:_Sender] is full, blocking sender until a receiver + consumes a value + receiver_1 received 10 + receiver_1 received 11 + sender_2 sending 21 + Anycast channel [Anycast:numbers:_Sender] is full, blocking sender until a receiver + consumes a value + receiver_1 received 12 + receiver_1 received 20 + receiver_1 received 21 + ``` """ def __init__(self, *, name: str, limit: int = 10) -> None: From 5b340e5c2f8fff7839a8e23372f1f3d4b31824c5 Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Fri, 24 Nov 2023 10:16:27 +0100 Subject: [PATCH 08/18] Improve `Broadcast` class documentation Add a "Characteristics" summary with the most important properties of this channel type, a diagram to show how messages are routed from multiple senders to multiple receivers, explain a bit more about what happens when buffers are full and improve and add a new example. The formatting is done having in mind this documentation is now included in the User Guide. Signed-off-by: Leandro Lucarella --- src/frequenz/channels/_broadcast.py | 175 +++++++++++++++++++++++----- 1 file changed, 147 insertions(+), 28 deletions(-) diff --git a/src/frequenz/channels/_broadcast.py b/src/frequenz/channels/_broadcast.py index 8da322b1..f7fb1fe0 100644 --- a/src/frequenz/channels/_broadcast.py +++ b/src/frequenz/channels/_broadcast.py @@ -21,47 +21,166 @@ class Broadcast(Generic[_T]): - """A channel to broadcast messages to multiple receivers. + """A channel that deliver all messages to all receivers. - `Broadcast` channels can have multiple senders and multiple receivers. Each - message sent through any of the senders is received by all of the - receivers. + # Description - Internally, a broadcast receiver's buffer is implemented with just - append/pop operations on either side of a [deque][collections.deque], which - are thread-safe. Because of this, `Broadcast` channels are thread-safe. + [Broadcast][frequenz.channels.Broadcast] channels can have multiple + [senders][frequenz.channels.Sender] and multiple + [receivers][frequenz.channels.Receiver]. Each message sent through any of the + senders will be received by all receivers. - When there are multiple channel receivers, they can be awaited - simultaneously using [select][frequenz.channels.select] or - [merge][frequenz.channels.merge]. +
+ ```bob + .---------. msg1 msg1,msg2 .-----------. + | Sender +------. .---------->| Receiver | + '---------' | .----------. | '-----------' + +----->| Channel +-----+ + .---------. | '----------' | .-----------. + | Sender +------' '----------->| Receiver | + '---------' msg2 msg1,msg2 '-----------' + ``` +
- Example: - ``` python - async def send(sender: channel.Sender) -> None: - while True: - next = random.randint(3, 17) - print(f"sending: {next}") - await sender.send(next) + !!! Note inline end "Characteristics" + * **Buffered:** Yes, with one buffer per receiver + * **Buffer full policy:** Drop oldest message + * **Multiple receivers:** Yes + * **Multiple senders:** Yes + * **Thread-safe:** No - async def recv(id: int, receiver: channel.Receiver) -> None: - while True: - next = await receiver.receive() - print(f"receiver_{id} received {next}") - await asyncio.sleep(0.1) # sleep (or work) with the data + This channel is buffered, and when messages are not being consumed fast + enough and the buffer fills up, old messages will get dropped. + Each receiver has its own buffer, so messages will only be dropped for + receivers that can't keep up with the senders, and not for the whole + channel. - bcast = channel.Broadcast() + To create a new [senders][frequenz.channels.Sender] and + [receivers][frequenz.channels.Receiver] you can use the + [`new_sender()`][frequenz.channels.Broadcast.new_sender] and + [`new_receiver()`][frequenz.channels.Broadcast.new_receiver] methods + respectively. - sender = bcast.new_sender() - receiver_1 = bcast.new_receiver() + When a channel is not needed anymore, it should be closed with + [`close()`][frequenz.channels.Broadcast.close]. This will prevent further + attempts to [`send()`][frequenz.channels.Sender.send] data, and will allow + receivers to drain the pending items on their queues, but after that, + subsequent [receive()][frequenz.channels.Receiver.receive] calls will + raise a [`ReceiverStoppedError`][frequenz.channels.ReceiverStoppedError]. - asyncio.create_task(send(sender)) + This channel is useful, for example, to implement a pub/sub pattern, where + multiple receivers can subscribe to a channel to receive all messages. - await recv(1, receiver_1) + In cases where each message needs to be delivered only to one receiver, an + [anycast][frequenz.channels.Anycast] channel may be used. + + # Examples + + Example: Send a few numbers to a receiver + This is a very simple example that sends a few numbers from a single sender to + a single receiver. + + ```python + import asyncio + + from frequenz.channels import Broadcast, Sender + + + async def send(sender: Sender[int]) -> None: + for msg in range(3): + print(f"sending {msg}") + await sender.send(msg) + + + async def main() -> None: + channel = Broadcast[int](name="numbers") + + sender = channel.new_sender() + receiver = channel.new_receiver() + + async with asyncio.TaskGroup() as task_group: + task_group.create_task(send(sender)) + for _ in range(3): + msg = await receiver.receive() + print(f"received {msg}") + await asyncio.sleep(0.1) # sleep (or work) with the data + + + asyncio.run(main()) + ``` + + The output should look something like (although the sending and received might + appear more interleaved): + + ``` + sending 0 + sending 1 + sending 2 + received 0 + received 1 + received 2 ``` - Check the `tests` and `benchmarks` directories for more examples. + Example: Send a few number from multiple senders to multiple receivers + This is a more complex example that sends a few numbers from multiple senders to + multiple receivers, using a small buffer to force the senders to block. + + ```python + import asyncio + + from frequenz.channels import Broadcast, Receiver, ReceiverStoppedError, Sender + + + async def send(name: str, sender: Sender[int], start: int, stop: int) -> None: + for msg in range(start, stop): + print(f"{name} sending {msg}") + await sender.send(msg) + + + async def recv(name: str, receiver: Receiver[int]) -> None: + try: + async for msg in receiver: + print(f"{name} received {msg}") + await asyncio.sleep(0.1) # sleep (or work) with the data + except ReceiverStoppedError: + pass + + + async def main() -> None: + acast = Broadcast[int](name="numbers") + + async with asyncio.TaskGroup() as task_group: + task_group.create_task(send("sender_1", acast.new_sender(), 10, 13)) + task_group.create_task(send("sender_2", acast.new_sender(), 20, 22)) + task_group.create_task(recv("receiver_1", acast.new_receiver())) + task_group.create_task(recv("receiver_2", acast.new_receiver())) + + + asyncio.run(main()) + ``` + + The output should look something like this(although the sending and received + might appear interleaved in a different way): + + ``` + sender_1 sending 10 + sender_1 sending 11 + sender_1 sending 12 + sender_2 sending 20 + sender_2 sending 21 + receiver_1 received 10 + receiver_1 received 11 + receiver_1 received 12 + receiver_1 received 20 + receiver_1 received 21 + receiver_2 received 10 + receiver_2 received 11 + receiver_2 received 12 + receiver_2 received 20 + receiver_2 received 21 + ``` """ def __init__(self, *, name: str, resend_latest: bool = False) -> None: From 89f84faf47db2bfd600e6ac2688d4f6197a88537 Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Fri, 24 Nov 2023 10:18:12 +0100 Subject: [PATCH 09/18] Improve the `_exceptions` module description Explain the base exceptions and the basics of error handling, including how to get the cause of an exception. This module is not publicly available, so users won't be able to access the information in IDEs for example, but it will be rendered as part of the User Guide. Signed-off-by: Leandro Lucarella --- src/frequenz/channels/_exceptions.py | 65 +++++++++++++++++++++++++++- 1 file changed, 64 insertions(+), 1 deletion(-) diff --git a/src/frequenz/channels/_exceptions.py b/src/frequenz/channels/_exceptions.py index 559b2df7..719b26da 100644 --- a/src/frequenz/channels/_exceptions.py +++ b/src/frequenz/channels/_exceptions.py @@ -1,7 +1,70 @@ # License: MIT # Copyright © 2022 Frequenz Energy-as-a-Service GmbH -"""Base exception classes.""" +"""Base exception classes. + +# Exceptions + +All exceptions generated by this library inherit from the +[`Error`][frequenz.channels.Error] exception. + +Exceptions generated by channels inherit from the +[`ChannelError`][frequenz.channels.ChannelError] exception. When there is an attempt to +use a closed channel, a [`ChannelClosedError`][frequenz.channels.ChannelClosedError] +exception is raised. + +# Causes + +When a exception is caused by another exception, for example if the underlying channel +was closed while seding or receiving a message, the original exception will be +available as the cause of the exception: + +```python +from frequenz.channels import Anycast, ChannelClosedError, SenderError + +channel = Anycast[int](name="test-channel") +sender = channel.new_sender() + +try: + await sender.send(42) +except SenderError as error: + match error.__cause__: + case None: + print("The message couldn't be sent for an known reason") + case ChannelClosedError() as closed_error: + print(f"The message couldn't be sent, channel closed: {closed_error}") + case _ as unknown_error: + print(f"The message couldn't be sent: {unknown_error}") +``` + +Tip: + If you are using the async iteration interface for receivers, then you can + access the cause of the + [`ReceiverStoppedError`][frequenz.channels.ReceiverStoppedError] exception by + explicitly calling [`receive()`][frequenz.channels.Receiver.receive] on the + receiver after the iteration is done: + + ```python + from frequenz.channels import Anycast, ChannelClosedError, ReceiverStoppedError + + channel = Anycast[int](name="test-channel") + receiver = channel.new_receiver() + + async for value in receiver: + print(value) + try: + await receiver.receive() + except ReceiverStoppedError as error: + print("The receiver was stopped") + match error.__cause__: + case None: + print("The receiver was stopped without a known reason") + case ChannelClosedError() as closed_error: + print(f"The channel was closed with error: {closed_error}") + case _ as unknown_error: + print(f"The receiver was stopped due to an unknown error: {unknown_error}") + ``` +""" from typing import Any From b8e4624087f90bdc9dd6117ab2eafd0ac5a38c14 Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Fri, 24 Nov 2023 10:19:50 +0100 Subject: [PATCH 10/18] Improve the `_merge` module description Explain how to use the `merge()` function and the `Merger` type on a higher level than the function documentation. This module is not publicly available, so users won't be able to access the information in IDEs for example, but it will be rendered as part of the User Guide. Signed-off-by: Leandro Lucarella --- src/frequenz/channels/_merge.py | 46 ++++++++++++++++++++++++++++++++- 1 file changed, 45 insertions(+), 1 deletion(-) diff --git a/src/frequenz/channels/_merge.py b/src/frequenz/channels/_merge.py index ff6f00c2..771bf5d5 100644 --- a/src/frequenz/channels/_merge.py +++ b/src/frequenz/channels/_merge.py @@ -1,7 +1,51 @@ # License: MIT # Copyright © 2022 Frequenz Energy-as-a-Service GmbH -"""Merge messages coming from channels into a single stream.""" +"""Merge messages coming from multiple receivers into a single receiver. + +# Usage + +If you just need to receive the same type of messages but from multiple sources in one +stream, you can use [`merge()`][frequenz.channels.merge] to create a new receiver that +will receive messages from all the given receivers: + +```python +from frequenz.channels import Anycast, Receiver, merge + +channel1: Anycast[int] = Anycast(name="channel1") +channel2: Anycast[int] = Anycast(name="channel2") +receiver1 = channel1.new_receiver() +receiver2 = channel2.new_receiver() + +async for value in merge(receiver1, receiver2): + print(value) +``` + +If the first message comes from `channel2` and the second message from `channel1`, the +first message will be received immediately, and the second message will be received as +soon as it is available. + +This can be helpful when you just need to receive messages and don't care about +where are they coming from specifically. If you need to know where the message came +from, you can use [`select()`][frequenz.channels.select] instead. + +# Stopping + +A merge receiver will be stopped automatically when all the receivers that it merges are +stopped. When using the async iterator interface, this means that the iterator will stop +as soon as all the receivers are stopped. When using +[`receive()`][frequenz.channels.Receiver.receive], this means that the method will raise +a [`ReceiverStoppedError`][frequenz.channels.ReceiverStoppedError] exception as soon as +all the receivers are stopped. + +If you want to stop a merge receiver manually, you can use the +[`stop()`][frequenz.channels.Merger.stop] method. + +When using [`receive()`][frequenz.channels.Receiver.receive], you should make sure to +either stop all the receivers that you are merging, or to stop the merge receiver +manually. This is to make sure that all the tasks created by the merge receiver are +cleaned up properly. +""" from __future__ import annotations From 55f3608416d143cfc4f583841fceaf09fa234ddb Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Fri, 24 Nov 2023 10:21:39 +0100 Subject: [PATCH 11/18] Improve the `_receiver` module description Give a general introduction to receivers, explaining how one usually create them, how to map values, how to handle receiving errors and how to use the low-level methods. This module is not publicly available, so users won't be able to access the information in IDEs for example, but it will be rendered as part of the User Guide. Signed-off-by: Leandro Lucarella --- src/frequenz/channels/_receiver.py | 133 ++++++++++++++++++++++++++++- 1 file changed, 131 insertions(+), 2 deletions(-) diff --git a/src/frequenz/channels/_receiver.py b/src/frequenz/channels/_receiver.py index d3308837..58e5edd0 100644 --- a/src/frequenz/channels/_receiver.py +++ b/src/frequenz/channels/_receiver.py @@ -1,7 +1,136 @@ # License: MIT # Copyright © 2022 Frequenz Energy-as-a-Service GmbH -"""Channel receiver and associated exceptions.""" +"""Receiver interface and related exceptions. + +# Receivers + +Messages are received from [channels](/user-guide/channels/index.md) through +[Receiver][frequenz.channels.Receiver] objects. [Receivers][frequenz.channels.Receiver] +are usually created by calling `channel.new_receiver()` and are [async +iterators][typing.AsyncIterator], so the easiest way to receive values from them as +a stream is to use `async for`: + +```python +from frequenz.channels import Anycast + +channel = Anycast[int](name="test-channel") +receiver = channel.new_receiver() + +async for value in receiver: + print(value) +``` + +If you need to receive values in different places or expecting a particular +sequence, you can use the [`receive()`][frequenz.channels.Receiver.receive] method: + +```python +from frequenz.channels import Anycast + +channel = Anycast[int](name="test-channel") +receiver = channel.new_receiver() + +first_value = await receiver.receive() +print(f"First value: {first_value}") + +second_value = await receiver.receive() +print(f"Second value: {second_value}") +``` + +# Value Transformation + +If you need to transform the received values, receivers provide a +[`map()`][frequenz.channels.Receiver.map] method to easily do so: + +```python +from frequenz.channels import Anycast + +channel = Anycast[int](name="test-channel") +receiver = channel.new_receiver() + +async for value in receiver.map(lambda x: x + 1): + print(value) +``` + +[`map()`][frequenz.channels.Receiver.map] returns a new full receiver, so you can +use it in any of the ways described above. + +# Error Handling + +!!! Tip inline end + + For more information about handling errors, please refer to the + [Error Handling](/user-guide/error-handling/) section of the user guide. + +If there is an error while receiving a message, +a [`ReceiverError`][frequenz.channels.ReceiverError] exception is raised for both +[`receive()`][frequenz.channels.Receiver.receive] method and async iteration +interface. + +If the receiver has completely stopped (for example the underlying channel was +closed), a [`ReceiverStoppedError`][frequenz.channels.ReceiverStoppedError] exception +is raised by [`receive()`][frequenz.channels.Receiver.receive] method. + +```python +from frequenz.channels import Anycast + +channel = Anycast[int](name="test-channel") +receiver = channel.new_receiver() + +try: + await receiver.receive() +except ReceiverStoppedError as error: + print("The receiver was stopped") +except ReceiverError as error: + print(f"There was an error trying to receive: {error}") +``` + +When used as an async iterator, the iteration will just stop without raising an +exception: + +```python +from frequenz.channels import Anycast + +channel = Anycast[int](name="test-channel") +receiver = channel.new_receiver() + +try: + async for value in receiver: + print(value) +except ReceiverStoppedError as error: + print("Will never happen") +except ReceiverError as error: + print(f"There was an error trying to receive: {error}") +# If we get here, the receiver was stopped +``` + +# Advanced Usage + +!!! Warning inline end + + This section is intended for library developers that want to build other low-level + abstractions on top of channels. If you are just using channels, you can safely + ignore this section. + +Receivers extend on the [async iterator protocol][typing.AsyncIterator] by providing +a [`ready()`][frequenz.channels.Receiver.ready] and +a [`consume()`][frequenz.channels.Receiver.consume] method. + +The [`ready()`][frequenz.channels.Receiver.ready] method is used to await until the +receiver has a new value available, but without actually consuming it. The +[`consume()`][frequenz.channels.Receiver.consume] method consumes the next available +value and returns it. + +[`ready()`][frequenz.channels.Receiver.ready] can be called multiple times, and it +will return immediately if the receiver is already ready. +[`consume()`][frequenz.channels.Receiver.consume] must be called only after +[`ready()`][frequenz.channels.Receiver.ready] is done and only once, until the next +call to [`ready()`][frequenz.channels.Receiver.ready]. + +Exceptions are never raised by [`ready()`][frequenz.channels.Receiver.ready], they +are always delayed until [`consume()`][frequenz.channels.Receiver.consume] is +called. +""" from __future__ import annotations @@ -16,7 +145,7 @@ class Receiver(ABC, Generic[_T]): - """A channel Receiver.""" + """An endpoint to receive messages.""" async def __anext__(self) -> _T: """Await the next value in the async iteration over received values. From d040ba75c210547aa088c5a3dea4b370f48293a4 Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Fri, 24 Nov 2023 11:07:00 +0100 Subject: [PATCH 12/18] Improve the `_select` module description Explaining how `select()` should be used, including how to stop the iteration, how to handle errors and mentioning the exhaustiveness check. This module is not publicly available, so users won't be able to access the information in IDEs for example, but it will be rendered as part of the User Guide. Signed-off-by: Leandro Lucarella --- src/frequenz/channels/_select.py | 138 +++++++++++++++++++++++++++++-- 1 file changed, 132 insertions(+), 6 deletions(-) diff --git a/src/frequenz/channels/_select.py b/src/frequenz/channels/_select.py index 2d30bd81..e3de99af 100644 --- a/src/frequenz/channels/_select.py +++ b/src/frequenz/channels/_select.py @@ -3,9 +3,137 @@ """Select the first among multiple Receivers. -Expects Receiver class to raise `StopAsyncIteration` -exception once no more messages are expected or the channel -is closed in case of `Receiver` class. +# Usage + +If you need to receiver different types of messages from different receivers, you need +to know the source of a particular received value to know the type of the value. + +[`select()`][frequenz.channels.select] allows you to do that. It is an +[async iterator][typing.AsyncIterator] that will iterate over the values of all +receivers as they receive new values. + +It yields a [`Selected`][frequenz.channels.Selected] object that will tell you the +source of the received message. To make sure the received value is *cast* to the +correct type, you need to use the [`selected_from()`][frequenz.channels.selected_from] +function to check the source of the message, and the +[`value`][frequenz.channels.Selected.value] attribute to access the message: + +```python +from frequenz.channels import Anycast, ReceiverStoppedError, select, selected_from + +channel1: Anycast[int] = Anycast(name="channel1") +channel2: Anycast[str] = Anycast(name="channel2") +receiver1 = channel1.new_receiver() +receiver2 = channel2.new_receiver() + +async for selected in select(receiver1, receiver2): + if selected_from(selected, receiver1): + print(f"Received from receiver1, next number: {selected.value + 1}") + elif selected_from(selected, receiver2): + print(f"Received from receiver2, length: {len(selected.value)}") + else: + assert False, "Unknown source, this should never happen" +``` + +Tip: + To prevent common bugs, like when a new receiver is added to the select loop but + the handling code is forgotten, [`select()`][frequenz.channels.select] will check + that all the selected receivers are handled in the if-chain. + + If this happens, it will raise an + [`UnhandledSelectedError`][frequenz.channels.UnhandledSelectedError] exception. + + Not handling a receiver is considered a programming error. Because of this, the + exception is a subclass of [`BaseException`][BaseException] instead of + [`Exception`][Exception]. This means that it will not be caught by [`except + Exception`][Exception] blocks. + + If for some reason you want to ignore a received value, just add the receiver to + the if-chain and do nothing with the value: + + ```python + from frequenz.channels import Anycast, select, selected_from + + channel1: Anycast[int] = Anycast(name="channel1") + channel2: Anycast[str] = Anycast(name="channel2") + receiver1 = channel1.new_receiver() + receiver2 = channel2.new_receiver() + + async for selected in select(receiver1, receiver2): + if selected_from(selected, receiver1): + continue + if selected_from(selected, receiver2): + print(f"Received from receiver2, length: {len(selected.value)}") + ``` + +# Stopping + +The `select()` async iterator will stop as soon as all the receivers are stopped. You +can also end the iteration early by breaking out of the loop as normal. + +When a single [receiver][frequenz.channels.Receiver] is stopped, it will be reported +via the [`Selected`][frequenz.channels.Selected] object. You can use the +[`was_stopped()`][frequenz.channels.Selected.was_stopped] method to check if the +selected [receiver][frequenz.channels.Receiver] was stopped: + +```python +from frequenz.channels import Anycast, select, selected_from + +channel1: Anycast[int] = Anycast(name="channel1") +channel2: Anycast[str] = Anycast(name="channel2") +receiver1 = channel1.new_receiver() +receiver2 = channel2.new_receiver() + +async for selected in select(receiver1, receiver2): + if selected_from(selected, receiver1): + if selected.was_stopped(): + print("receiver1 was stopped") + continue + print(f"Received from receiver1, the next number is: {selected.value + 1}") + # ... +``` + +Tip: + The [`was_stopped()`][frequenz.channels.Selected.was_stopped] method is a + convenience method that is equivalent to checking if the + [`exception`][frequenz.channels.Selected.exception] attribute is an instance of + [`ReceiverStoppedError`][frequenz.channels.ReceiverStoppedError]. + +# Error Handling + +Tip: + For more information about handling errors, please refer to the + [Error Handling](/user-guide/error-handling/) section of the user guide. + +If a receiver raises an exception while receiving a value, the exception will be +raised by the [`value`][frequenz.channels.Selected.value] attribute of the +[`Selected`][frequenz.channels.Selected] object. + +You can use a try-except block to handle exceptions as usual: + +```python +from frequenz.channels import Anycast, ReceiverStoppedError, select, selected_from + +channel1: Anycast[int] = Anycast(name="channel1") +channel2: Anycast[str] = Anycast(name="channel2") +receiver1 = channel1.new_receiver() +receiver2 = channel2.new_receiver() + +async for selected in select(receiver1, receiver2): + if selected_from(selected, receiver1): + try: + print(f"Received from receiver1, next number: {selected.value + 1}") + except ReceiverStoppedError: + print("receiver1 was stopped") + except ValueError as value_error: + print(f"receiver1 raised a ValueError: {value_error}") + # ... + # ... +``` + +The [`Selected`][frequenz.channels.Selected] object also has a +[`exception`][frequenz.channels.Selected.exception] attribute that will contain the +exception that was raised by the receiver. """ import asyncio @@ -297,9 +425,7 @@ async def select(*receivers: Receiver[Any]) -> AsyncIterator[Selected[Any]]: print(f"timer2: exception={exception}") case None: # All good, no exception, we can use `selected.value` safely - print( - f"timer2: now={datetime.datetime.now()} drift={selected.value}" - ) + print(f"timer2: now={datetime.datetime.now()} drift={selected.value}") case _ as unhanded: assert_never(unhanded) else: From 28666a1efaddb1490a3992792556f9cdfd020e64 Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Fri, 24 Nov 2023 11:09:22 +0100 Subject: [PATCH 13/18] Improve the `_sender` module description Give a short introduction about how `Sender`s are created and used, including how to handle errors. This module is not publicly available, so users won't be able to access the information in IDEs for example, but it will be rendered as part of the User Guide. Signed-off-by: Leandro Lucarella --- src/frequenz/channels/_sender.py | 50 ++++++++++++++++++++++++++++++-- 1 file changed, 48 insertions(+), 2 deletions(-) diff --git a/src/frequenz/channels/_sender.py b/src/frequenz/channels/_sender.py index 8cf00f86..41f8ef86 100644 --- a/src/frequenz/channels/_sender.py +++ b/src/frequenz/channels/_sender.py @@ -1,7 +1,53 @@ # License: MIT # Copyright © 2022 Frequenz Energy-as-a-Service GmbH -"""Channel sender and associated exceptions.""" +"""Sender interface and related exceptions. + +# Senders + +Messages are sent to a [channel](/user-guide/channels) through +[Sender][frequenz.channels.Sender] objects. [Senders][frequenz.channels.Sender] are +usually created by calling `channel.new_sender()`, and are a very simple abstraction +that only provides a single [`send()`][frequenz.channels.Sender.send] method: + +```python +from frequenz.channels import Anycast + +channel = Anycast[int](name="test-channel") +sender = channel.new_sender() + +await sender.send("Hello, world!") +``` + +Although [`send()`][frequenz.channels.Sender.send] is an asynchronous method, some +channels may implement it in a synchronous, non-blocking way. For example, buffered +channels that drop messages when the buffer is full could guarantee that +[`send()`][frequenz.channels.Sender.send] never blocks. However, please keep in mind +that the [asyncio][] event loop could give control to another task at any time, +effectively making the [`send()`][frequenz.channels.Sender.send] method blocking. + +# Error Handling + +!!! Tip inline end + + For more information about handling errors, please refer to the + [Error Handling](/user-guide/error-handling/) section of the user guide. + +If there is any failure sending a message, +a [SenderError][frequenz.channels.SenderError] exception is raised. + +```python +from frequenz.channels import Anycast + +channel = Anycast[int](name="test-channel") +sender = channel.new_sender() + +try: + await sender.send("Hello, world!") +except SenderError as error: + print(f"Error sending message: {error}") +``` +""" from abc import ABC, abstractmethod from typing import Generic, TypeVar @@ -12,7 +58,7 @@ class Sender(ABC, Generic[_T]): - """A channel Sender.""" + """An endpoint to sends messages.""" @abstractmethod async def send(self, msg: _T) -> None: From 90d596df25399758175126cf61ec48c690bc28a2 Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Fri, 24 Nov 2023 11:12:23 +0100 Subject: [PATCH 14/18] Improve the `event` module documentation Add a summary of the contents of the module in the module documentation and improve the `Event` class documentation and example. The formatting is done having in mind this documentation is now included in the User Guide. Signed-off-by: Leandro Lucarella --- src/frequenz/channels/event.py | 82 +++++++++++++++++++++++++--------- 1 file changed, 60 insertions(+), 22 deletions(-) diff --git a/src/frequenz/channels/event.py b/src/frequenz/channels/event.py index f546555e..87f91496 100644 --- a/src/frequenz/channels/event.py +++ b/src/frequenz/channels/event.py @@ -1,8 +1,18 @@ # License: MIT # Copyright © 2023 Frequenz Energy-as-a-Service GmbH -"""A receiver that can be made ready through an event.""" +"""A receiver that can be made ready directly. +!!! Tip inline end + + Read the [`Event`][frequenz.channels.event.Event] documentation for more + information. + +This module contains the following: + +* [`Event`][frequenz.channels.event.Event]: + {{docstring_summary("frequenz.channels.event.Event")}} +""" import asyncio as _asyncio @@ -10,38 +20,66 @@ class Event(_receiver.Receiver[None]): - """A receiver that can be made ready through an event. + """A receiver that can be made ready directly. + + # Usage - The receiver (the [`ready()`][frequenz.channels.event.Event.ready] method) will wait - until [`set()`][frequenz.channels.event.Event.set] is called. At that point the - receiver will wait again after the event is - [`consume()`][frequenz.channels.Receiver.consume]d. + There are cases where it is useful to be able to send a signal to + a [`select()`][frequenz.channels.select] loop, for example, to stop a loop from + outside the loop itself. - The receiver can be completely stopped by calling + To do that, you can use an [`Event`][frequenz.channels.event.Event] receiver and + call [`set()`][frequenz.channels.event.Event.set] on it when you want to make it + ready. + + # Stopping + + The receiver will be re-activated (will keep blocking) after the current set + event is received. To stop the receiver completely, you can call [`stop()`][frequenz.channels.event.Event.stop]. - Example: + # Example + + Example: Exit after printing the first 5 numbers ```python import asyncio - from frequenz.channels import Receiver, select, selected_from + + from frequenz.channels import Anycast, select, selected_from from frequenz.channels.event import Event - other_receiver: Receiver[int] = ... - exit_event = Event() + channel: Anycast[int] = Anycast(name="channel") + receiver = channel.new_receiver() + sender = channel.new_sender() + stop_event = Event(name="stop") + + + async def do_work() -> None: + async for selected in select(receiver, stop_event): + if selected_from(selected, receiver): + print(selected.value) + elif selected_from(selected, stop_event): + print("Stop event triggered") + stop_event.stop() + break + + + async def send_stuff() -> None: + for i in range(10): + if stop_event.is_stopped: + break + await asyncio.sleep(1) + await sender.send(i) + - async def exit_after_10_seconds() -> None: - asyncio.sleep(10) - exit_event.set() + async def main() -> None: + async with asyncio.TaskGroup() as task_group: + task_group.create_task(do_work(), name="do_work") + task_group.create_task(send_stuff(), name="send_stuff") + await asyncio.sleep(5.5) + stop_event.set() - asyncio.ensure_future(exit_after_10_seconds()) - async for selected in select(exit_event, other_receiver): - if selected_from(selected, exit_event): - break - if selected_from(selected, other_receiver): - print(selected.value) - else: - assert False, "Unknown receiver selected" + asyncio.run(main()) ``` """ From 7f51085a03dc6a7ef48b8cd26d38047dce6a3f3c Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Fri, 24 Nov 2023 11:16:40 +0100 Subject: [PATCH 15/18] Improve the `file_watcher` module documentation Add a summary of the contents of the module in the module documentation and improve the `EventType` enum and `FileWatcher` class documentation and example. The formatting is done having in mind this documentation is now included in the User Guide. Signed-off-by: Leandro Lucarella --- src/frequenz/channels/file_watcher.py | 85 +++++++++++++++++++++++++-- 1 file changed, 79 insertions(+), 6 deletions(-) diff --git a/src/frequenz/channels/file_watcher.py b/src/frequenz/channels/file_watcher.py index 64bc62e1..bc2fedb1 100644 --- a/src/frequenz/channels/file_watcher.py +++ b/src/frequenz/channels/file_watcher.py @@ -1,7 +1,22 @@ # License: MIT # Copyright © 2022 Frequenz Energy-as-a-Service GmbH -"""A Channel receiver for watching for new, modified or deleted files.""" +"""A receiver for watching for new, modified or deleted files. + +!!! Tip inline end + + Read the [`FileWatcher`][frequenz.channels.file_watcher.FileWatcher] + documentation for more information. + +This module contains the following: + +* [`FileWatcher`][frequenz.channels.file_watcher.FileWatcher]: + {{docstring_summary("frequenz.channels.file_watcher.FileWatcher")}} +* [`Event`][frequenz.channels.file_watcher.Event]: + {{docstring_summary("frequenz.channels.file_watcher.Event")}} +* [`EventType`][frequenz.channels.file_watcher.EventType]: + {{docstring_summary("frequenz.channels.file_watcher.EventType")}} +""" import asyncio import pathlib @@ -16,16 +31,16 @@ class EventType(Enum): - """Available types of changes to watch for.""" + """The types of file events that can be observed.""" CREATE = Change.added - """A new file was created.""" + """The file was created.""" MODIFY = Change.modified - """An existing file was modified.""" + """The file was modified.""" DELETE = Change.deleted - """An existing file was deleted.""" + """The file was deleted.""" @dataclass(frozen=True) @@ -34,12 +49,70 @@ class Event: type: EventType """The type of change that was observed.""" + path: pathlib.Path """The path where the change was observed.""" class FileWatcher(Receiver[Event]): - """A channel receiver that watches for file events.""" + """A receiver that watches for file events. + + # Usage + + A [`FileWatcher`][frequenz.channels.file_watcher.FileWatcher] receiver can be used + to watch for changes in a set of files. It will generate an + [`Event`][frequenz.channels.file_watcher.Event] message every time a file is + created, modified or deleted, depending on the type of events that it is configured + to watch for. + + The [event][frequenz.channels.file_watcher.EventType] message contains the + [`type`][frequenz.channels.file_watcher.Event.type] of change that was observed and + the [`path`][frequenz.channels.file_watcher.Event.path] where the change was + observed. + + # Event Types + + The following event types are available: + + * [`CREATE`][frequenz.channels.file_watcher.EventType.CREATE]: + {{docstring_summary("frequenz.channels.file_watcher.EventType.CREATE")}} + * [`MODIFY`][frequenz.channels.file_watcher.EventType.MODIFY]: + {{docstring_summary("frequenz.channels.file_watcher.EventType.MODIFY")}} + * [`DELETE`][frequenz.channels.file_watcher.EventType.DELETE]: + {{docstring_summary("frequenz.channels.file_watcher.EventType.DELETE")}} + + # Example + + Example: Watch for changes and exit after the file is modified + ```python + import asyncio + + from frequenz.channels.file_watcher import EventType, FileWatcher + + PATH = "/tmp/test.txt" + file_watcher = FileWatcher(paths=[PATH], event_types=[EventType.MODIFY]) + + + async def update_file() -> None: + await asyncio.sleep(1) + with open(PATH, "w", encoding="utf-8") as file: + file.write("Hello, world!") + + + async def main() -> None: + # Create file + with open(PATH, "w", encoding="utf-8") as file: + file.write("Hello, world!") + async with asyncio.TaskGroup() as group: + group.create_task(update_file()) + async for event in file_watcher: + print(f"File {event.path}: {event.type.name}") + break + + + asyncio.run(main()) + ``` + """ def __init__( self, From 5a7e63c5dc0cda0574bd7352441000ca74de70f5 Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Fri, 24 Nov 2023 11:22:26 +0100 Subject: [PATCH 16/18] Improve the `timer` module documentation Add a quick start guide with a couple of examples in the module documentation, as well as a general explanation about missing ticks. Improve documentation of the `MissedTickPolicy` class to explain how to create custom policies, including an example. Also improve the documentation of each missed tick policy provided by the module. Improve the `Timer` class documentation to focus on how to use timers more generally, referring to the `timeout()` and `periodic()` constructors for more information, and improve both constructors documentation to explain what they are useful for and include examples. The formatting is done having in mind this documentation is now included in the User Guide. Signed-off-by: Leandro Lucarella --- src/frequenz/channels/timer.py | 450 ++++++++++++++++++++++----------- 1 file changed, 300 insertions(+), 150 deletions(-) diff --git a/src/frequenz/channels/timer.py b/src/frequenz/channels/timer.py index c4d65bc9..44a0c472 100644 --- a/src/frequenz/channels/timer.py +++ b/src/frequenz/channels/timer.py @@ -1,14 +1,90 @@ # License: MIT # Copyright © 2023 Frequenz Energy-as-a-Service GmbH -"""A timer receiver that ticks every `interval`. +"""A receiver that sends a message regularly. -Note: - This module always use `int`s to represent time. The time is always in - microseconds, so the timer resolution is 1 microsecond. +# Quick Start - This is to avoid floating point errors when performing calculations with - time, which can lead to very hard to reproduce, and debug, issues. +If you need to do something as periodically as possible (avoiding +[drifts](#missed-ticks-and-drifting)), you can use use +a [`periodic()`][frequenz.channels.timer.Timer.periodic] timer. + +Example: Periodic Timer + ```python + import asyncio + from datetime import datetime, timedelta + + from frequenz.channels.timer import Timer + + + async def main() -> None: + async for drift in Timer.periodic(timedelta(seconds=1.0)): + print(f"The timer has triggered at {datetime.now()} with a drift of {drift}") + + + asyncio.run(main()) + ``` + +If, instead, you need a timeout, for example to abort waiting for other receivers after +a certain amount of time, you can use +a [`timeout()`][frequenz.channels.timer.Timer.timeout] timer. + +Example: Timeout + ```python + import asyncio + from datetime import timedelta + + from frequenz.channels import Anycast, select, selected_from + from frequenz.channels.timer import Timer + + + async def main() -> None: + channel = Anycast[int](name="data-channel") + data_receiver = channel.new_receiver() + + timer = Timer.timeout(timedelta(seconds=1.0)) + + async for selected in select(data_receiver, timer): + if selected_from(selected, data_receiver): + print(f"Received data: {selected.value}") + timer.reset() + elif selected_from(selected, timer): + drift = selected.value + print(f"No data received for {timer.interval + drift} seconds, giving up") + break + + + asyncio.run(main()) + ``` + + This timer will *rearm* itself automatically after it was triggered, so it will trigger + again after the selected interval, no matter what the current drift was. + +Tip: + It is extremely important to understand how timers behave when they are + delayed, we recommned emphatically to read about [missed ticks and + drifting](#missed-ticks-and-drifting) before using timers in production. + +# Missed Ticks And Drifting + +A [`Timer`][frequenz.channels.timer.Timer] can be used to send a messages at regular +time intervals, but there is one fundamental issue with timers in the [asyncio][] world: +the event loop could give control to another task at any time, and that task can take +a long time to finish, making the time it takes the next timer message to be received +longer than the desired interval. + +Because of this, it is very important for users to be able to understand and control +how timers behave when they are delayed. Timers will handle missed ticks according to +a *missing tick policy*. + +The following built-in policies are available: + +* [`SkipMissedAndDrift`][frequenz.channels.timer.SkipMissedAndDrift]: + {{docstring_summary("frequenz.channels.timer.SkipMissedAndDrift")}} +* [`SkipMissedAndResync`][frequenz.channels.timer.SkipMissedAndResync]: + {{docstring_summary("frequenz.channels.timer.SkipMissedAndResync")}} +* [`TriggerAllMissed`][frequenz.channels.timer.TriggerAllMissed]: + {{docstring_summary("frequenz.channels.timer.TriggerAllMissed")}} """ from __future__ import annotations @@ -38,9 +114,34 @@ def _to_microseconds(time: float | timedelta) -> int: class MissedTickPolicy(abc.ABC): """A policy to handle timer missed ticks. - This is only relevant if the timer is not ready to trigger when it should - (an interval passed) which can happen if the event loop is busy processing - other tasks. + To implement a custom policy you need to subclass + [`MissedTickPolicy`][frequenz.channels.timer.MissedTickPolicy] and implement the + [`calculate_next_tick_time`][frequenz.channels.timer.MissedTickPolicy.calculate_next_tick_time] + method. + + Example: + This policy will just wait one more second than the original interval if a + tick is missed: + + ```python + class WaitOneMoreSecond(MissedTickPolicy): + def calculate_next_tick_time( + self, *, interval: int, scheduled_tick_time: int, now: int + ) -> int: + return scheduled_tick_time + interval + 1_000_000 + + + async def main() -> None: + timer = Timer( + interval=timedelta(seconds=1), + missed_tick_policy=WaitOneMoreSecond(), + ) + + async for drift in timer: + print(f"The timer has triggered with a drift of {drift}") + + asyncio.run(main()) + ``` """ @abc.abstractmethod @@ -76,16 +177,36 @@ def __repr__(self) -> str: class TriggerAllMissed(MissedTickPolicy): """A policy that triggers all the missed ticks immediately until it catches up. + The [`TriggerAllMissed`][frequenz.channels.timer.TriggerAllMissed] policy will + trigger all missed ticks immediately until it catches up with the current time. + This means that if the timer is delayed for any reason, when it finally gets some + time to run, it will trigger all the missed ticks in a burst. The drift is not + accumulated and the next tick will be scheduled according to the original start + time. + Example: - Assume a timer with interval 1 second, the tick `T0` happens exactly - at time 0, the second tick, `T1`, happens at time 1.2 (0.2 seconds - late), so it triggers immediately. The third tick, `T2`, happens at - time 2.3 (0.3 seconds late), so it also triggers immediately. The - fourth tick, `T3`, happens at time 4.3 (1.3 seconds late), so it also - triggers immediately as well as the fifth tick, `T4`, which was also - already delayed (by 0.3 seconds), so it catches up. The sixth tick, - `T5`, happens at 5.1 (0.1 seconds late), so it triggers immediately - again. The seventh tick, `T6`, happens at 6.0, right on time. + This example represents a timer with interval 1 second. + + 1. The first tick, `T0` happens exactly at time 0. + + 2. The second tick, `T1`, happens at time 1.2 (0.2 seconds late), so it triggers + immediately. But it re-syncs, so the next tick is still expected at + 2 seconds. This re-sync happens on every tick, so all ticks are expected at + multiples of 1 second, not matter how delayed they were. + + 3. The third tick, `T2`, happens at time 2.3 (0.3 seconds late), so it also + triggers immediately. + + 4. The fourth tick, `T3`, happens at time 4.3 (1.3 seconds late), so it also + triggers immediately. + + 5. The fifth tick, `T4`, which was also already delayed (by 0.3 seconds), + triggers immediately too, resulting in a small *catch-up* burst. + + 6. The sixth tick, `T5`, happens at 5.1 (0.1 seconds late), so it triggers + immediately again. + + 7. The seventh tick, `T6`, happens at 6.0, right on time.
```bob @@ -122,21 +243,34 @@ def calculate_next_tick_time( class SkipMissedAndResync(MissedTickPolicy): """A policy that drops all the missed ticks, triggers immediately and resyncs. - If ticks are missed, the timer will trigger immediately returning the drift - and it will schedule to trigger again on the next multiple of `interval`, - effectively skipping any missed ticks, but resyncing with the original start - time. + If ticks are missed, the + [`SkipMissedAndResync`][frequenz.channels.timer.SkipMissedAndResync] policy will + make the [`Timer`][frequenz.channels.timer.Timer] trigger immediately and it will + schedule to trigger again on the next multiple of the + [interval][frequenz.channels.timer.Timer.interval], effectively skipping any missed + ticks, but re-syncing with the original start time. Example: - Assume a timer with interval 1 second, the tick `T0` happens exactly - at time 0, the second tick, `T1`, happens at time 1.2 (0.2 seconds - late), so it triggers immediately. The third tick, `T2`, happens at - time 2.3 (0.3 seconds late), so it also triggers immediately. The - fourth tick, `T3`, happens at time 4.3 (1.3 seconds late), so it also - triggers immediately but the fifth tick, `T4`, which was also - already delayed (by 0.3 seconds) is skipped. The sixth tick, - `T5`, happens at 5.1 (0.1 seconds late), so it triggers immediately - again. The seventh tick, `T6`, happens at 6.0, right on time. + This example represents a timer with interval 1 second. + + 1. The first tick `T0` happens exactly at time 0. + + 2. The second tick, `T1`, happens at time 1.2 (0.2 seconds late), so it triggers + immediately. But it re-syncs, so the next tick is still expected at + 2 seconds. This re-sync happens on every tick, so all ticks are expected at + multiples of 1 second, not matter how delayed they were. + + 3. The third tick, `T2`, happens at time 2.3 (0.3 seconds late), so it also + triggers immediately. + + 4. The fourth tick, `T3`, happens at time 4.3 (1.3 seconds late), so it also + triggers immediately, but there was also a tick expected at 4 seconds, `T4`, + which is skipped according to this policy to avoid bursts of ticks. + + 6. The sixth tick, `T5`, happens at 5.1 (0.1 seconds late), so it triggers + immediately again. + + 7. The seventh tick, `T6`, happens at 6.0, right on time.
```bob @@ -176,26 +310,36 @@ def calculate_next_tick_time( class SkipMissedAndDrift(MissedTickPolicy): """A policy that drops all the missed ticks, triggers immediately and resets. - This will behave effectively as if the timer was `reset()` at the time it - had triggered last, so the start time will change (and the drift will be - accumulated each time a tick is delayed, but only the relative drift will - be returned on each tick). + The [`SkipMissedAndDrift`][frequenz.channels.timer.SkipMissedAndDrift] policy will + behave effectively as if the timer was + [reset][frequenz.channels.timer.Timer.reset] every time it is triggered. This means + the start time will change and the drift will be accumulated each time a tick is + delayed. Only the relative drift will be returned on each tick. - The reset happens only if the delay is larger than `delay_tolerance`, so - it is possible to ignore small delays and not drift in those cases. + The reset happens only if the delay is larger than the + [delay tolerance][frequenz.channels.timer.SkipMissedAndDrift.delay_tolerance], so it + is possible to ignore small delays and not drift in those cases. Example: - Assume a timer with interval 1 second and `delay_tolerance=0.1`, the - first tick, `T0`, happens exactly at time 0, the second tick, `T1`, - happens at time 1.2 (0.2 seconds late), so the timer triggers - immediately but drifts a bit. The next tick, `T2.2`, happens at 2.3 seconds - (0.1 seconds late), so it also triggers immediately but it doesn't - drift because the delay is under the `delay_tolerance`. The next tick, - `T3.2`, triggers at 4.3 seconds (1.1 seconds late), so it also triggers - immediately but the timer drifts by 1.1 seconds and the tick `T4.2` is - skipped (not triggered). The next tick, `T5.3`, triggers at 5.3 seconds - so is right on time (no drift) and the same happens for tick `T6.3`, - which triggers at 6.3 seconds. + This example represents a timer with interval 1 second and delay tolerance of + 0.1 seconds. + + 1. The first tick, `T0`, happens exactly at time 0. + + 2. The second tick, `T1.2`, happens at time 1.2 (0.2 seconds late), so the timer + triggers immediately but drifts a bit (0.2 seconds), so the next tick is + expected at 2.2 seconds. + + 3. The third tick, `T2.2`, happens at 2.3 seconds (0.1 seconds late), so it also + triggers immediately but **it doesn't drift** because the delay is **under + the `delay_tolerance`**. The next tick is expected at 3.2 seconds. + + 4. The fourth tick, `T4.2`, triggers at 4.3 seconds (1.1 seconds late), so it + also triggers immediately but the timer has drifted by 1.1 seconds, so a + potential tick `T3.2` is skipped (not triggered). + + 5. The fifth tick, `T5.3`, triggers at 5.3 seconds so it is right on time (no + drift) and the same happens for tick `T6.3`, which triggers at 6.3 seconds.
```bob @@ -282,116 +426,60 @@ def __repr__(self) -> str: class Timer(Receiver[timedelta]): - """A timer receiver that triggers every `interval` time. + """A receiver that sends a message regularly. + + [`Timer`][frequenz.channels.timer.Timer]s are started by default after they are + created. This can be disabled by using `auto_start=False` option when creating + them. In this case, the timer will not be started until + [`reset()`][frequenz.channels.timer.Timer.reset] is called. Receiving from the timer + (either using [`receive()`][frequenz.channels.timer.Timer.receive] or using the + async iterator interface) will also start the timer at that point. + + Timers need to be created in a context where + a [`asyncio`][] loop is already running. If no + [`loop`][frequenz.channels.timer.Timer.loop] is specified, the current running loop + is used. + + Timers can be stopped by calling [`stop()`][frequenz.channels.timer.Timer.stop]. + A stopped timer will raise + a [`ReceiverStoppedError`][frequenz.channels.ReceiverStoppedError] or stop the async + iteration as usual. - The timer has microseconds resolution, so the - [`interval`][frequenz.channels.timer.Timer.interval] must be at least - 1 microsecond. + Once a timer is explicitly stopped, it can only be started again by explicitly + calling [`reset()`][frequenz.channels.timer.Timer.reset] (trying to receive from it + or using the async iterator interface will keep failing). - The message it produces is a [`timedelta`][datetime.timedelta] containing the drift - of the timer, i.e. the difference between when the timer should have triggered and - the time when it actually triggered. + Timer messages are [`timedelta`][datetime.timedelta]s containing the drift of the + timer, i.e. the difference between when the timer should have triggered and the time + when it actually triggered. This drift will likely never be `0`, because if there is a task that is running when it should trigger, the timer will be delayed. In this case the drift will be positive. A negative drift should be technically impossible, - as the timer uses [`asyncio`][asyncio]s loop monotonic clock. + as the timer uses [`asyncio`][]s loop monotonic clock. + + Warning: + Even when the [`asyncio`][] loop's monotonic clock is a [`float`][], timers use + `int`s to represent time internally. The internal time is tracked in + microseconds, so the timer resolution is 1 microsecond + ([`interval`][frequenz.channels.timer.Timer.interval] must be at least + 1 microsecond). + + This is to avoid floating point errors when performing calculations with time, + which can lead to issues that are very hard to reproduce and debug. If the timer is delayed too much, then it will behave according to the [`missed_tick_policy`][frequenz.channels.timer.Timer.missed_tick_policy]. Missing ticks might or might not trigger a message and the drift could be accumulated or not depending on the chosen policy. - These are the currently built-in available policies: - - * [`SkipMissedAndDrift`][frequenz.channels.timer.SkipMissedAndDrift] - * [`SkipMissedAndResync`][frequenz.channels.timer.SkipMissedAndResync] - * [`TriggerAllMissed`][frequenz.channels.timer.TriggerAllMissed] - For the most common cases, a specialized constructor is provided: - * [`periodic()`][frequenz.channels.timer.Timer.periodic] (uses the - [`TriggerAllMissed`][frequenz.channels.timer.TriggerAllMissed] or - [`SkipMissedAndResync`][frequenz.channels.timer.SkipMissedAndResync] policy) - * [`timeout()`][frequenz.channels.timer.Timer.timeout] (uses the - [`SkipMissedAndDrift`][frequenz.channels.timer.SkipMissedAndDrift] policy) - - The timer accepts an optional [`loop`][frequenz.channels.timer.Timer.loop], which - will be used to track the time. If `loop` is `None`, then the running loop will be - used (if there is no running loop most calls will raise - a [`RuntimeError`][RuntimeError]). - - Starting the timer can be delayed if necessary by using `auto_start=False` - (for example until we have a running loop). A call to - [`reset()`][frequenz.channels.timer.Timer.reset], - [`ready()`][frequenz.channels.timer.Timer.ready], - [`receive()`][frequenz.channels.timer.Timer.receive] or the async iterator interface - to await for a new message will start the timer. - - Example: Periodic timer example - ```python - async for drift in Timer.periodic(timedelta(seconds=1.0)): - print(f"The timer has triggered {drift=}") - ``` - - But you can also use a [`select`][frequenz.channels.select] to combine - it with other receivers, and even start it (semi) manually: - - ```python - from frequenz.channels import Broadcast, select, selected_from - - timer = Timer.timeout(timedelta(seconds=1.0), auto_start=False) - chan = Broadcast[int](name="input-chan") - battery_data = chan.new_receiver() - - timer = Timer.timeout(timedelta(seconds=1.0), auto_start=False) - # Do some other initialization, the timer will start automatically if - # a message is awaited (or manually via `reset()`). - async for selected in select(battery_data, timer): - if selected_from(selected, battery_data): - if selected.was_closed(): - print("battery channel closed") - continue - battery_soc = selected.value - elif selected_from(selected, timer): - # Print some regular battery data - print(f"Battery is charged at {battery_soc}%") - ``` - - Example: Timeout example - ```python - from frequenz.channels import Broadcast, select, selected_from - - def process_data(data: int): - print(f"Processing data: {data}") - - def do_heavy_processing(data: int): - print(f"Heavy processing data: {data}") - - timer = Timer.timeout(timedelta(seconds=1.0), auto_start=False) - chan1 = Broadcast[int](name="input-chan-1") - chan2 = Broadcast[int](name="input-chan-2") - battery_data = chan1.new_receiver() - heavy_process = chan2.new_receiver() - async for selected in select(battery_data, heavy_process, timer): - if selected_from(selected, battery_data): - if selected.was_closed(): - print("battery channel closed") - continue - process_data(selected.value) - timer.reset() - elif selected_from(selected, heavy_process): - if selected.was_closed(): - print("processing channel closed") - continue - do_heavy_processing(selected.value) - elif selected_from(selected, timer): - print("No data received in time") - ``` + * [`periodic()`][frequenz.channels.timer.Timer.periodic]: + {{docstring_summary("frequenz.channels.timer.Timer.periodic")}} - In this case `do_heavy_processing` might take 2 seconds, and we don't - want our timeout timer to trigger for the missed ticks, and want the - next tick to be relative to the time timer was last triggered. + * [`timeout()`][frequenz.channels.timer.Timer.timeout]: + {{docstring_summary("frequenz.channels.timer.Timer.timeout")}} """ def __init__( # pylint: disable=too-many-arguments @@ -506,10 +594,43 @@ def timeout( # noqa: DOC502 ) -> Timer: """Create a timer useful for tracking timeouts. - This is basically a shortcut to create a timer with - `SkipMissedAndDrift(delay_tolerance=timedelta(0))` as the missed tick policy. + A [timeout][frequenz.channels.timer.Timer.timeout] is + a [`Timer`][frequenz.channels.timer.Timer] that + [resets][frequenz.channels.timer.Timer.reset] automatically after it triggers, + so it will trigger again after the selected interval, no matter what the current + drift was. This means timeout timers will accumulate drift. - See the class documentation for details. + Tip: + Timeouts are a shortcut to create + a [`Timer`][frequenz.channels.timer.Timer] with the + [`SkipMissedAndDrift`][frequenz.channels.timer.SkipMissedAndDrift] policy. + + Example: Timeout example + ```python + import asyncio + from datetime import timedelta + + from frequenz.channels import Anycast, select, selected_from + from frequenz.channels.timer import Timer + + + async def main() -> None: + channel = Anycast[int](name="data-channel") + data_receiver = channel.new_receiver() + + timer = Timer.timeout(timedelta(seconds=1.0)) + + async for selected in select(data_receiver, timer): + if selected_from(selected, data_receiver): + print(f"Received data: {selected.value}") + elif selected_from(selected, timer): + drift = selected.value + print(f"No data received for {timer.interval + drift} seconds, giving up") + break + + + asyncio.run(main()) + ``` Args: delay: The time until the timer ticks. Must be at least @@ -557,11 +678,40 @@ def periodic( # noqa: DOC502 pylint: disable=too-many-arguments ) -> Timer: """Create a periodic timer. - This is basically a shortcut to create a timer with either - `TriggerAllMissed()` or `SkipMissedAndResync()` as the missed tick policy - (depending on `skip_missed_ticks`). + A [periodic timer][frequenz.channels.timer.Timer.periodic] is + a [`Timer`][frequenz.channels.timer.Timer] that tries as hard as possible to + trigger at regular intervals. This means that if the timer is delayed for any + reason, it will trigger immediately and then try to catch up with the original + schedule. - See the class documentation for details. + Optionally, a periodic timer can be configured to skip missed ticks and re-sync + with the original schedule (`skip_missed_ticks` argument). This could be useful + if you want the timer is as periodic as possible but if there are big delays you + don't end up with big bursts. + + Tip: + Periodic timers are a shortcut to create + a [`Timer`][frequenz.channels.timer.Timer] with either the + [`TriggerAllMissed`][frequenz.channels.timer.TriggerAllMissed] policy (when + `skip_missed_ticks` is `False`) or + [`SkipMissedAndResync`][frequenz.channels.timer.SkipMissedAndResync] + otherwise. + + Example: + ```python + import asyncio + from datetime import datetime, timedelta + + from frequenz.channels.timer import Timer + + + async def main() -> None: + async for drift in Timer.periodic(timedelta(seconds=1.0)): + print(f"The timer has triggered at {datetime.now()} with a drift of {drift}") + + + asyncio.run(main()) + ``` Args: period: The time between timer ticks. Must be at least From 48906acb16a8cad1a7e431780d5f319c52fc5219 Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Fri, 24 Nov 2023 11:44:36 +0100 Subject: [PATCH 17/18] README: Make admonitions more readable in GitHub Admonitions are not properly rendered by GitHub, so they don't look great there, but they should be readable. Without leaving an empty line they are rendered as a single paragraph, which is very messy. Since the extra indentation is interpreted as a code block, we also need to treat examples differently, as otherwise the markdown is rendered verbatim, missing syntax highlighting and other features. To overcome this issue we use simple headings for examples in the README and then include them in the documentation as admnonitions. Becuase of this we need to have more fine-grained include blocks in the README, which is also not great, but it's better than the alternative. Signed-off-by: Leandro Lucarella --- README.md | 349 +++++++++++++++++---------------- docs/index.md | 26 ++- docs/user-guide/quick-start.md | 26 ++- 3 files changed, 228 insertions(+), 173 deletions(-) diff --git a/README.md b/README.md index 680ed09a..8ec5f93f 100644 --- a/README.md +++ b/README.md @@ -43,11 +43,12 @@ The following platforms are officially supported (tested): ## Quick Start - - ### Installing + + !!! Tip inline end + For more details please read the [Installation Guide](docs/user-guide/installation.md). @@ -57,182 +58,192 @@ Assuming a [supported](#supported-platforms) working Python environment: python3 -m pip install frequenz-channels ``` + + ### Examples -!!! Example "Hello World" - ```python - import asyncio - - from frequenz.channels import Anycast - - - async def main() -> None: - hello_channel = Anycast[str](name="hello-world-channel") - sender = hello_channel.new_sender() - receiver = hello_channel.new_receiver() - - await sender.send("Hello World!") - msg = await receiver.receive() - print(msg) - - - asyncio.run(main()) - ``` - -!!! Example "Showcase" - This is a comprehensive example that shows most of the main features of the - library: - - ```python - import asyncio - from dataclasses import dataclass - from datetime import timedelta - from enum import Enum, auto - from typing import assert_never - - from frequenz.channels import ( - Anycast, - Broadcast, - Receiver, - Sender, - merge, - select, - selected_from, - ) - from frequenz.channels.timer import Timer - - - class Command(Enum): - PING = auto() - STOP_SENDER = auto() - - - class ReplyCommand(Enum): - PONG = auto() - - - @dataclass(frozen=True) - class Reply: - reply: ReplyCommand - source: str - - - async def send( - sender: Sender[str], - control_command: Receiver[Command], - control_reply: Sender[Reply], - ) -> None: - """Send a counter value every second, until a stop command is received.""" - print(f"{sender}: Starting") - timer = Timer.periodic(timedelta(seconds=1.0)) - counter = 0 - async for selected in select(timer, control_command): - if selected_from(selected, timer): - print(f"{sender}: Sending {counter}") - await sender.send(f"{sender}: {counter}") - counter += 1 - elif selected_from(selected, control_command): - print(f"{sender}: Received command: {selected.value.name}") - match selected.value: - case Command.STOP_SENDER: - print(f"{sender}: Stopping") - break - case Command.PING: - print(f"{sender}: Ping received, reply with pong") - await control_reply.send(Reply(ReplyCommand.PONG, str(sender))) - case _ as unknown: - assert_never(unknown) - print(f"{sender}: Finished") - - - async def receive( - receivers: list[Receiver[str]], - control_command: Receiver[Command], - control_reply: Sender[Reply], - ) -> None: - """Receive data from multiple channels, until no more data is received for 2 seconds.""" - print("receive: Starting") - timer = Timer.timeout(timedelta(seconds=2.0)) - print(f"{timer=}") - merged = merge(*receivers) - async for selected in select(merged, timer, control_command): - if selected_from(selected, merged): - message = selected.value - print(f"receive: Received {message=}") - timer.reset() - print(f"{timer=}") - elif selected_from(selected, control_command): - print(f"receive: received command: {selected.value.name}") - match selected.value: - case Command.PING: - print("receive: Ping received, reply with pong") - await control_reply.send(Reply(ReplyCommand.PONG, "receive")) - case Command.STOP_SENDER: - pass # Ignore - case _ as unknown: - assert_never(unknown) - elif selected_from(selected, timer): - drift = selected.value - print( - f"receive: No data received for {timer.interval + drift} seconds, " - "giving up" - ) - break - print("receive: Finished") - - - async def main() -> None: - data_channel_1 = Anycast[str](name="data-channel-1") - data_channel_2 = Anycast[str](name="data-channel-2") - command_channel = Broadcast[Command](name="control-channel") # (1)! - reply_channel = Anycast[Reply](name="reply-channel") - - async with asyncio.TaskGroup() as tasks: - tasks.create_task( - send( - data_channel_1.new_sender(), - command_channel.new_receiver(), - reply_channel.new_sender(), - ), - name="send-channel-1", - ) - tasks.create_task( - send( - data_channel_2.new_sender(), - command_channel.new_receiver(), - reply_channel.new_sender(), - ), - name="send-channel-2", - ) - tasks.create_task( - receive( - [data_channel_1.new_receiver(), data_channel_2.new_receiver()], - command_channel.new_receiver(), - reply_channel.new_sender(), - ), - name="receive", - ) +#### Hello World - control_sender = command_channel.new_sender() - reply_receiver = reply_channel.new_receiver() + - # Send a ping command to all tasks and wait for the replies - await control_sender.send(Command.PING) - print(f"main: {await reply_receiver.receive()}") - print(f"main: {await reply_receiver.receive()}") - print(f"main: {await reply_receiver.receive()}") +```python +import asyncio - await asyncio.sleep(5.0) +from frequenz.channels import Anycast - # Stop senders, after 2 seconds not receiving any data, - # the receiver will stop too - await control_sender.send(Command.STOP_SENDER) +async def main() -> None: + hello_channel = Anycast[str](name="hello-world-channel") + sender = hello_channel.new_sender() + receiver = hello_channel.new_receiver() - asyncio.run(main()) - ``` + await sender.send("Hello World!") + msg = await receiver.receive() + print(msg) + + +asyncio.run(main()) +``` + + + +#### Showcase + + + +This is a comprehensive example that shows most of the main features of the +library: + +```python +import asyncio +from dataclasses import dataclass +from datetime import timedelta +from enum import Enum, auto +from typing import assert_never + +from frequenz.channels import ( + Anycast, + Broadcast, + Receiver, + Sender, + merge, + select, + selected_from, +) +from frequenz.channels.timer import Timer + + +class Command(Enum): + PING = auto() + STOP_SENDER = auto() + + +class ReplyCommand(Enum): + PONG = auto() + + +@dataclass(frozen=True) +class Reply: + reply: ReplyCommand + source: str + + +async def send( + sender: Sender[str], + control_command: Receiver[Command], + control_reply: Sender[Reply], +) -> None: + """Send a counter value every second, until a stop command is received.""" + print(f"{sender}: Starting") + timer = Timer.periodic(timedelta(seconds=1.0)) + counter = 0 + async for selected in select(timer, control_command): + if selected_from(selected, timer): + print(f"{sender}: Sending {counter}") + await sender.send(f"{sender}: {counter}") + counter += 1 + elif selected_from(selected, control_command): + print(f"{sender}: Received command: {selected.value.name}") + match selected.value: + case Command.STOP_SENDER: + print(f"{sender}: Stopping") + break + case Command.PING: + print(f"{sender}: Ping received, reply with pong") + await control_reply.send(Reply(ReplyCommand.PONG, str(sender))) + case _ as unknown: + assert_never(unknown) + print(f"{sender}: Finished") + + +async def receive( + receivers: list[Receiver[str]], + control_command: Receiver[Command], + control_reply: Sender[Reply], +) -> None: + """Receive data from multiple channels, until no more data is received for 2 seconds.""" + print("receive: Starting") + timer = Timer.timeout(timedelta(seconds=2.0)) + print(f"{timer=}") + merged = merge(*receivers) + async for selected in select(merged, timer, control_command): + if selected_from(selected, merged): + message = selected.value + print(f"receive: Received {message=}") + timer.reset() + print(f"{timer=}") + elif selected_from(selected, control_command): + print(f"receive: received command: {selected.value.name}") + match selected.value: + case Command.PING: + print("receive: Ping received, reply with pong") + await control_reply.send(Reply(ReplyCommand.PONG, "receive")) + case Command.STOP_SENDER: + pass # Ignore + case _ as unknown: + assert_never(unknown) + elif selected_from(selected, timer): + drift = selected.value + print( + f"receive: No data received for {timer.interval + drift} seconds, " + "giving up" + ) + break + print("receive: Finished") + + +async def main() -> None: + data_channel_1 = Anycast[str](name="data-channel-1") + data_channel_2 = Anycast[str](name="data-channel-2") + command_channel = Broadcast[Command](name="control-channel") # (1)! + reply_channel = Anycast[Reply](name="reply-channel") + + async with asyncio.TaskGroup() as tasks: + tasks.create_task( + send( + data_channel_1.new_sender(), + command_channel.new_receiver(), + reply_channel.new_sender(), + ), + name="send-channel-1", + ) + tasks.create_task( + send( + data_channel_2.new_sender(), + command_channel.new_receiver(), + reply_channel.new_sender(), + ), + name="send-channel-2", + ) + tasks.create_task( + receive( + [data_channel_1.new_receiver(), data_channel_2.new_receiver()], + command_channel.new_receiver(), + reply_channel.new_sender(), + ), + name="receive", + ) + + control_sender = command_channel.new_sender() + reply_receiver = reply_channel.new_receiver() + + # Send a ping command to all tasks and wait for the replies + await control_sender.send(Command.PING) + print(f"main: {await reply_receiver.receive()}") + print(f"main: {await reply_receiver.receive()}") + print(f"main: {await reply_receiver.receive()}") + + await asyncio.sleep(5.0) + + # Stop senders, after 2 seconds not receiving any data, + # the receiver will stop too + await control_sender.send(Command.STOP_SENDER) + + +asyncio.run(main()) +``` - + ## Documentation diff --git a/docs/index.md b/docs/index.md index 10bcbed6..cfe85510 100644 --- a/docs/index.md +++ b/docs/index.md @@ -18,8 +18,30 @@ ## Quick Start +### Installing + {% include-markdown "../README.md" - start="" - end="" + start="" + end="" %} + +### Examples + +!!! Example "Hello World" + + {% + include-markdown "../README.md" + preserve-includer-indent=true + start="" + end="" + %} + +!!! Example "Showcase" + + {% + include-markdown "../README.md" + preserve-includer-indent=true + start="" + end="" + %} diff --git a/docs/user-guide/quick-start.md b/docs/user-guide/quick-start.md index 8b951952..95f7bb0a 100644 --- a/docs/user-guide/quick-start.md +++ b/docs/user-guide/quick-start.md @@ -1,7 +1,29 @@ # Quick Start +## Installing + {% include-markdown "../../README.md" - start="" - end="" + start="" + end="" %} + +## Examples + +!!! Example "Hello World" + + {% + include-markdown "../../README.md" + preserve-includer-indent=true + start="" + end="" + %} + +!!! Example "Showcase" + + {% + include-markdown "../../README.md" + preserve-includer-indent=true + start="" + end="" + %} From 80a2e78ed1401d0b333f7a50e7fd9a99be89725d Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Fri, 24 Nov 2023 11:33:05 +0100 Subject: [PATCH 18/18] Update release notes Signed-off-by: Leandro Lucarella --- RELEASE_NOTES.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 3322bcbc..918d46a9 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -101,6 +101,8 @@ ## New Features +* A new User's Guide was added to the documentation and the documentation was greately improved in general. + * A new `merge()` function was added to replace `Merge`. * `Anycast`