@@ -28,6 +28,11 @@ channels](https://tour.golang.org/concurrency/2) but it also borrows ideas from
2828
2929<!-- supported-platforms -->
3030
31+ !!! Note inline end
32+
33+ Newer Python versions and other operating systems and architectures might
34+ work too, but they are not automatically tested, so we cannot guarantee it.
35+
3136The following platforms are officially supported (tested):
3237
3338- ** Python:** 3.11
@@ -38,30 +43,214 @@ The following platforms are officially supported (tested):
3843
3944## Quick Start
4045
41- We assume you are on a system with Python available. If that is not the case,
42- please [ download and install Python] ( https://www.python.org/downloads/ ) first.
46+ ### Installing
4347
44- To install Frequenz Channels, you probably want to create a new virtual
45- environment first. For example, if you use a ` sh ` compatible shell, you can do
46- this:
48+ <!-- quick-start-installing -->
4749
48- ``` sh
49- python3 -m venv .venv
50- . .venv/bin/activate
51- ```
50+ !!! Tip inline end
51+
52+ For more details please read the [Installation
53+ Guide](docs/user-guide/installation.md).
5254
53- Then, just install using ` pip ` :
55+ Assuming a [ supported ] ( #supported-platforms ) working Python environment :
5456
5557``` sh
5658python3 -m pip install frequenz-channels
5759```
5860
61+ <!-- /quick-start-installing -->
62+
63+ ### Examples
64+
65+ #### Hello World
66+
67+ <!-- quick-start-hello-world -->
68+
69+ ``` python
70+ import asyncio
71+
72+ from frequenz.channels import Anycast
73+
74+
75+ async def main () -> None :
76+ hello_channel = Anycast[str ](name = " hello-world-channel" )
77+ sender = hello_channel.new_sender()
78+ receiver = hello_channel.new_receiver()
79+
80+ await sender.send(" Hello World!" )
81+ msg = await receiver.receive()
82+ print (msg)
83+
84+
85+ asyncio.run(main())
86+ ```
87+
88+ <!-- /quick-start-hello-world -->
89+
90+ #### Showcase
91+
92+ <!-- quick-start-showcase -->
93+
94+ This is a comprehensive example that shows most of the main features of the
95+ library:
96+
97+ ``` python
98+ import asyncio
99+ from dataclasses import dataclass
100+ from datetime import timedelta
101+ from enum import Enum, auto
102+ from typing import assert_never
103+
104+ from frequenz.channels import (
105+ Anycast,
106+ Broadcast,
107+ Receiver,
108+ Sender,
109+ merge,
110+ select,
111+ selected_from,
112+ )
113+ from frequenz.channels.timer import Timer
114+
115+
116+ class Command (Enum ):
117+ PING = auto()
118+ STOP_SENDER = auto()
119+
120+
121+ class ReplyCommand (Enum ):
122+ PONG = auto()
123+
124+
125+ @dataclass (frozen = True )
126+ class Reply :
127+ reply: ReplyCommand
128+ source: str
129+
130+
131+ async def send (
132+ sender : Sender[str ],
133+ control_command : Receiver[Command],
134+ control_reply : Sender[Reply],
135+ ) -> None :
136+ """ Send a counter value every second, until a stop command is received."""
137+ print (f " { sender} : Starting " )
138+ timer = Timer.periodic(timedelta(seconds = 1.0 ))
139+ counter = 0
140+ async for selected in select(timer, control_command):
141+ if selected_from(selected, timer):
142+ print (f " { sender} : Sending { counter} " )
143+ await sender.send(f " { sender} : { counter} " )
144+ counter += 1
145+ elif selected_from(selected, control_command):
146+ print (f " { sender} : Received command: { selected.value.name} " )
147+ match selected.value:
148+ case Command.STOP_SENDER :
149+ print (f " { sender} : Stopping " )
150+ break
151+ case Command.PING :
152+ print (f " { sender} : Ping received, reply with pong " )
153+ await control_reply.send(Reply(ReplyCommand.PONG , str (sender)))
154+ case _ as unknown:
155+ assert_never(unknown)
156+ print (f " { sender} : Finished " )
157+
158+
159+ async def receive (
160+ receivers : list[Receiver[str ]],
161+ control_command : Receiver[Command],
162+ control_reply : Sender[Reply],
163+ ) -> None :
164+ """ Receive data from multiple channels, until no more data is received for 2 seconds."""
165+ print (" receive: Starting" )
166+ timer = Timer.timeout(timedelta(seconds = 2.0 ))
167+ print (f " { timer= } " )
168+ merged = merge(* receivers)
169+ async for selected in select(merged, timer, control_command):
170+ if selected_from(selected, merged):
171+ message = selected.value
172+ print (f " receive: Received { message= } " )
173+ timer.reset()
174+ print (f " { timer= } " )
175+ elif selected_from(selected, control_command):
176+ print (f " receive: received command: { selected.value.name} " )
177+ match selected.value:
178+ case Command.PING :
179+ print (" receive: Ping received, reply with pong" )
180+ await control_reply.send(Reply(ReplyCommand.PONG , " receive" ))
181+ case Command.STOP_SENDER :
182+ pass # Ignore
183+ case _ as unknown:
184+ assert_never(unknown)
185+ elif selected_from(selected, timer):
186+ drift = selected.value
187+ print (
188+ f " receive: No data received for { timer.interval + drift} seconds, "
189+ " giving up"
190+ )
191+ break
192+ print (" receive: Finished" )
193+
194+
195+ async def main () -> None :
196+ data_channel_1 = Anycast[str ](name = " data-channel-1" )
197+ data_channel_2 = Anycast[str ](name = " data-channel-2" )
198+ command_channel = Broadcast[Command](name = " control-channel" ) # (1)!
199+ reply_channel = Anycast[Reply](name = " reply-channel" )
200+
201+ async with asyncio.TaskGroup() as tasks:
202+ tasks.create_task(
203+ send(
204+ data_channel_1.new_sender(),
205+ command_channel.new_receiver(),
206+ reply_channel.new_sender(),
207+ ),
208+ name = " send-channel-1" ,
209+ )
210+ tasks.create_task(
211+ send(
212+ data_channel_2.new_sender(),
213+ command_channel.new_receiver(),
214+ reply_channel.new_sender(),
215+ ),
216+ name = " send-channel-2" ,
217+ )
218+ tasks.create_task(
219+ receive(
220+ [data_channel_1.new_receiver(), data_channel_2.new_receiver()],
221+ command_channel.new_receiver(),
222+ reply_channel.new_sender(),
223+ ),
224+ name = " receive" ,
225+ )
226+
227+ control_sender = command_channel.new_sender()
228+ reply_receiver = reply_channel.new_receiver()
229+
230+ # Send a ping command to all tasks and wait for the replies
231+ await control_sender.send(Command.PING )
232+ print (f " main: { await reply_receiver.receive()} " )
233+ print (f " main: { await reply_receiver.receive()} " )
234+ print (f " main: { await reply_receiver.receive()} " )
235+
236+ await asyncio.sleep(5.0 )
237+
238+ # Stop senders, after 2 seconds not receiving any data,
239+ # the receiver will stop too
240+ await control_sender.send(Command.STOP_SENDER )
241+
242+
243+ asyncio.run(main())
244+ ```
245+
246+ <!-- /quick-start-showcase -->
247+
59248## Documentation
60249
61- For more information, please visit the [ documentation
250+ For more information, please read the [ documentation
62251website] ( https://frequenz-floss.github.io/frequenz-channels-python/ ) .
63252
64253## Contributing
65254
66255If you want to know how to build this project and contribute to it, please
67- check out the [ Contributing Guide] ( CONTRIBUTING.md ) .
256+ check out the [ Contributing Guide] ( docs/ CONTRIBUTING.md) .
0 commit comments