Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
213 changes: 201 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ channels](https://tour.golang.org/concurrency/2) but it also borrows ideas from

<!-- supported-platforms -->

!!! Note inline end

Newer Python versions and other operating systems and architectures might
work too, but they are not automatically tested, so we cannot guarantee it.

The following platforms are officially supported (tested):

- **Python:** 3.11
Expand All @@ -38,30 +43,214 @@ The following platforms are officially supported (tested):

## Quick Start

We assume you are on a system with Python available. If that is not the case,
please [download and install Python](https://www.python.org/downloads/) first.
### Installing

To install Frequenz Channels, you probably want to create a new virtual
environment first. For example, if you use a `sh` compatible shell, you can do
this:
<!-- quick-start-installing -->

```sh
python3 -m venv .venv
. .venv/bin/activate
```
!!! Tip inline end

For more details please read the [Installation
Guide](docs/user-guide/installation.md).

Then, just install using `pip`:
Assuming a [supported](#supported-platforms) working Python environment:

```sh
python3 -m pip install frequenz-channels
```

<!-- /quick-start-installing -->

### Examples

#### Hello World

<!-- quick-start-hello-world -->

```python
import asyncio

from frequenz.channels import Anycast


async def main() -> None:
hello_channel = Anycast[str](name="hello-world-channel")
sender = hello_channel.new_sender()
receiver = hello_channel.new_receiver()

await sender.send("Hello World!")
msg = await receiver.receive()
print(msg)


asyncio.run(main())
```

<!-- /quick-start-hello-world -->

#### Showcase

<!-- quick-start-showcase -->

This is a comprehensive example that shows most of the main features of the
library:

```python
import asyncio
from dataclasses import dataclass
from datetime import timedelta
from enum import Enum, auto
from typing import assert_never

from frequenz.channels import (
Anycast,
Broadcast,
Receiver,
Sender,
merge,
select,
selected_from,
)
from frequenz.channels.timer import Timer


class Command(Enum):
PING = auto()
STOP_SENDER = auto()


class ReplyCommand(Enum):
PONG = auto()


@dataclass(frozen=True)
class Reply:
reply: ReplyCommand
source: str


async def send(
sender: Sender[str],
control_command: Receiver[Command],
control_reply: Sender[Reply],
) -> None:
"""Send a counter value every second, until a stop command is received."""
print(f"{sender}: Starting")
timer = Timer.periodic(timedelta(seconds=1.0))
counter = 0
async for selected in select(timer, control_command):
if selected_from(selected, timer):
print(f"{sender}: Sending {counter}")
await sender.send(f"{sender}: {counter}")
counter += 1
elif selected_from(selected, control_command):
print(f"{sender}: Received command: {selected.value.name}")
match selected.value:
case Command.STOP_SENDER:
print(f"{sender}: Stopping")
break
case Command.PING:
print(f"{sender}: Ping received, reply with pong")
await control_reply.send(Reply(ReplyCommand.PONG, str(sender)))
case _ as unknown:
assert_never(unknown)
print(f"{sender}: Finished")


async def receive(
receivers: list[Receiver[str]],
control_command: Receiver[Command],
control_reply: Sender[Reply],
) -> None:
"""Receive data from multiple channels, until no more data is received for 2 seconds."""
print("receive: Starting")
timer = Timer.timeout(timedelta(seconds=2.0))
print(f"{timer=}")
merged = merge(*receivers)
async for selected in select(merged, timer, control_command):
if selected_from(selected, merged):
message = selected.value
print(f"receive: Received {message=}")
timer.reset()
print(f"{timer=}")
elif selected_from(selected, control_command):
print(f"receive: received command: {selected.value.name}")
match selected.value:
case Command.PING:
print("receive: Ping received, reply with pong")
await control_reply.send(Reply(ReplyCommand.PONG, "receive"))
case Command.STOP_SENDER:
pass # Ignore
case _ as unknown:
assert_never(unknown)
elif selected_from(selected, timer):
drift = selected.value
print(
f"receive: No data received for {timer.interval + drift} seconds, "
"giving up"
)
break
print("receive: Finished")


async def main() -> None:
data_channel_1 = Anycast[str](name="data-channel-1")
data_channel_2 = Anycast[str](name="data-channel-2")
command_channel = Broadcast[Command](name="control-channel") # (1)!
reply_channel = Anycast[Reply](name="reply-channel")

async with asyncio.TaskGroup() as tasks:
tasks.create_task(
send(
data_channel_1.new_sender(),
command_channel.new_receiver(),
reply_channel.new_sender(),
),
name="send-channel-1",
)
tasks.create_task(
send(
data_channel_2.new_sender(),
command_channel.new_receiver(),
reply_channel.new_sender(),
),
name="send-channel-2",
)
tasks.create_task(
receive(
[data_channel_1.new_receiver(), data_channel_2.new_receiver()],
command_channel.new_receiver(),
reply_channel.new_sender(),
),
name="receive",
)

control_sender = command_channel.new_sender()
reply_receiver = reply_channel.new_receiver()

# Send a ping command to all tasks and wait for the replies
await control_sender.send(Command.PING)
print(f"main: {await reply_receiver.receive()}")
print(f"main: {await reply_receiver.receive()}")
print(f"main: {await reply_receiver.receive()}")

await asyncio.sleep(5.0)

# Stop senders, after 2 seconds not receiving any data,
# the receiver will stop too
await control_sender.send(Command.STOP_SENDER)


asyncio.run(main())
```

<!-- /quick-start-showcase -->

## Documentation

For more information, please visit the [documentation
For more information, please read the [documentation
website](https://frequenz-floss.github.io/frequenz-channels-python/).

## Contributing

If you want to know how to build this project and contribute to it, please
check out the [Contributing Guide](CONTRIBUTING.md).
check out the [Contributing Guide](docs/CONTRIBUTING.md).
2 changes: 2 additions & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@

## New Features

* A new User's Guide was added to the documentation and the documentation was greately improved in general.

* A new `merge()` function was added to replace `Merge`.

* `Anycast`
Expand Down
1 change: 1 addition & 0 deletions docs/SUMMARY.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
* [Home](index.md)
* [User Guide](user-guide/)
* [API Reference](reference/)
* [Contributing](CONTRIBUTING.md)
78 changes: 22 additions & 56 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,66 +16,32 @@
end="<!-- /supported-platforms -->"
%}

## Installation
## Quick Start

First, you need to make sure you have Python installed (at least version 3.11):
### Installing

```console
$ python3 --version
Python 3.11.4
```

!!! note

These instructions assume you are using a [POSIX compatible
`sh`](https://pubs.opengroup.org/onlinepubs/9699919799/utilities/sh.html)
shell.

If that command doesn't print a version newer than 3.11.0, you'll need to
[download and install Python](https://www.python.org/downloads/) first.

To install Frequenz Channels, you probably want to create a new virtual
environment first:

```sh
mkdir my-channels-project
cd my-channels-project
python3 -m venv .venv
. .venv/bin/activate
```

!!! tip

Using [`direnv`](https://direnv.net/) can greatly simplify this process as
it automates the creation, activation, and deactivation of the virtual
environment. The first time you enable `direnv`, the virtual environment
will be created, and each time you enter or leave a subdirectory, it will be
activated and deactivated, respectively.

```sh
sudo apt install direnv # if you use Debian/Ubuntu
mkdir my-channels-project
cd my-channels-project
echo "layout python python3" > .envrc
direnv allow
```
{%
include-markdown "../README.md"
start="<!-- quick-start-installing -->"
end="<!-- /quick-start-installing -->"
%}

This will create the virtual environment and activate it automatically for you.
### Examples

Now you can install Frequenz Channels by using `pip` (if you don't have `pip` installed
you can follow [the official instructions](https://pip.pypa.io/en/stable/installation/)):
!!! Example "Hello World"

```sh
python3 -m pip install frequenz-channels
```
{%
include-markdown "../README.md"
preserve-includer-indent=true
start="<!-- quick-start-hello-world -->"
end="<!-- /quick-start-hello-world -->"
%}

To verify that the installation worked, you can invoke the Python interpreter and
import the `frequenz.channels` module:
!!! Example "Showcase"

```console
$ python3
Python 3.11.4 (main, Jun 7 2023, 10:13:09) [GCC 12.2.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import frequenz.channels
>>>
```
{%
include-markdown "../README.md"
preserve-includer-indent=true
start="<!-- quick-start-showcase -->"
end="<!-- /quick-start-showcase -->"
%}
7 changes: 7 additions & 0 deletions docs/user-guide/SUMMARY.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
* [Quick Start](quick-start.md)
* [Installation](installation.md)
* [Channels](channels/)
* [Sending](sending.md)
* [Receiving](receiving/)
* [Error Handling](error-handling.md)
* [Utilities](utilities/)
10 changes: 10 additions & 0 deletions docs/user-guide/channels/anycast.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# Anycast

::: frequenz.channels._anycast.Anycast
options:
inherited_members: []
members: []
show_bases: false
show_root_heading: false
show_root_toc_entry: false
show_source: false
10 changes: 10 additions & 0 deletions docs/user-guide/channels/broadcast.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# Broadcast

::: frequenz.channels._broadcast.Broadcast
options:
inherited_members: []
members: []
show_bases: false
show_root_heading: false
show_root_toc_entry: false
show_source: false
Loading