@@ -43,11 +43,12 @@ The following platforms are officially supported (tested):
4343
4444## Quick Start
4545
46- <!-- quick-start -->
47-
4846### Installing
4947
48+ <!-- quick-start-installing -->
49+
5050!!! Tip inline end
51+
5152 For more details please read the [Installation
5253 Guide](docs/user-guide/installation.md).
5354
@@ -57,182 +58,192 @@ Assuming a [supported](#supported-platforms) working Python environment:
5758python3 -m pip install frequenz-channels
5859```
5960
61+ <!-- /quick-start-installing -->
62+
6063### Examples
6164
62- !!! Example "Hello World"
63- ```python
64- import asyncio
65-
66- from frequenz.channels import Anycast
67-
68-
69- async def main() -> None:
70- hello_channel = Anycast[str](name="hello-world-channel")
71- sender = hello_channel.new_sender()
72- receiver = hello_channel.new_receiver()
73-
74- await sender.send("Hello World!")
75- msg = await receiver.receive()
76- print(msg)
77-
78-
79- asyncio.run(main())
80- ```
81-
82- !!! Example "Showcase"
83- This is a comprehensive example that shows most of the main features of the
84- library:
85-
86- ```python
87- import asyncio
88- from dataclasses import dataclass
89- from datetime import timedelta
90- from enum import Enum, auto
91- from typing import assert_never
92-
93- from frequenz.channels import (
94- Anycast,
95- Broadcast,
96- Receiver,
97- Sender,
98- merge,
99- select,
100- selected_from,
101- )
102- from frequenz.channels.timer import Timer
103-
104-
105- class Command(Enum):
106- PING = auto()
107- STOP_SENDER = auto()
108-
109-
110- class ReplyCommand(Enum):
111- PONG = auto()
112-
113-
114- @dataclass(frozen=True)
115- class Reply:
116- reply: ReplyCommand
117- source: str
118-
119-
120- async def send(
121- sender: Sender[str],
122- control_command: Receiver[Command],
123- control_reply: Sender[Reply],
124- ) -> None:
125- """Send a counter value every second, until a stop command is received."""
126- print(f"{sender}: Starting")
127- timer = Timer.periodic(timedelta(seconds=1.0))
128- counter = 0
129- async for selected in select(timer, control_command):
130- if selected_from(selected, timer):
131- print(f"{sender}: Sending {counter}")
132- await sender.send(f"{sender}: {counter}")
133- counter += 1
134- elif selected_from(selected, control_command):
135- print(f"{sender}: Received command: {selected.value.name}")
136- match selected.value:
137- case Command.STOP_SENDER:
138- print(f"{sender}: Stopping")
139- break
140- case Command.PING:
141- print(f"{sender}: Ping received, reply with pong")
142- await control_reply.send(Reply(ReplyCommand.PONG, str(sender)))
143- case _ as unknown:
144- assert_never(unknown)
145- print(f"{sender}: Finished")
146-
147-
148- async def receive(
149- receivers: list[Receiver[str]],
150- control_command: Receiver[Command],
151- control_reply: Sender[Reply],
152- ) -> None:
153- """Receive data from multiple channels, until no more data is received for 2 seconds."""
154- print("receive: Starting")
155- timer = Timer.timeout(timedelta(seconds=2.0))
156- print(f"{timer=}")
157- merged = merge(*receivers)
158- async for selected in select(merged, timer, control_command):
159- if selected_from(selected, merged):
160- message = selected.value
161- print(f"receive: Received {message=}")
162- timer.reset()
163- print(f"{timer=}")
164- elif selected_from(selected, control_command):
165- print(f"receive: received command: {selected.value.name}")
166- match selected.value:
167- case Command.PING:
168- print("receive: Ping received, reply with pong")
169- await control_reply.send(Reply(ReplyCommand.PONG, "receive"))
170- case Command.STOP_SENDER:
171- pass # Ignore
172- case _ as unknown:
173- assert_never(unknown)
174- elif selected_from(selected, timer):
175- drift = selected.value
176- print(
177- f"receive: No data received for {timer.interval + drift} seconds, "
178- "giving up"
179- )
180- break
181- print("receive: Finished")
182-
183-
184- async def main() -> None:
185- data_channel_1 = Anycast[str](name="data-channel-1")
186- data_channel_2 = Anycast[str](name="data-channel-2")
187- command_channel = Broadcast[Command](name="control-channel") # (1)!
188- reply_channel = Anycast[Reply](name="reply-channel")
189-
190- async with asyncio.TaskGroup() as tasks:
191- tasks.create_task(
192- send(
193- data_channel_1.new_sender(),
194- command_channel.new_receiver(),
195- reply_channel.new_sender(),
196- ),
197- name="send-channel-1",
198- )
199- tasks.create_task(
200- send(
201- data_channel_2.new_sender(),
202- command_channel.new_receiver(),
203- reply_channel.new_sender(),
204- ),
205- name="send-channel-2",
206- )
207- tasks.create_task(
208- receive(
209- [data_channel_1.new_receiver(), data_channel_2.new_receiver()],
210- command_channel.new_receiver(),
211- reply_channel.new_sender(),
212- ),
213- name="receive",
214- )
65+ #### Hello World
21566
216- control_sender = command_channel.new_sender()
217- reply_receiver = reply_channel.new_receiver()
67+ <!-- quick-start-hello-world -->
21868
219- # Send a ping command to all tasks and wait for the replies
220- await control_sender.send(Command.PING)
221- print(f"main: {await reply_receiver.receive()}")
222- print(f"main: {await reply_receiver.receive()}")
223- print(f"main: {await reply_receiver.receive()}")
69+ ``` python
70+ import asyncio
22471
225- await asyncio.sleep(5.0)
72+ from frequenz.channels import Anycast
22673
227- # Stop senders, after 2 seconds not receiving any data,
228- # the receiver will stop too
229- await control_sender.send(Command.STOP_SENDER)
23074
75+ async def main () -> None :
76+ hello_channel = Anycast[str ](name = " hello-world-channel" )
77+ sender = hello_channel.new_sender()
78+ receiver = hello_channel.new_receiver()
23179
232- asyncio.run(main())
233- ```
80+ await sender.send(" Hello World!" )
81+ msg = await receiver.receive()
82+ print (msg)
83+
84+
85+ asyncio.run(main())
86+ ```
87+
88+ <!-- /quick-start-hello-world -->
89+
90+ #### Showcase
91+
92+ <!-- quick-start-showcase -->
93+
94+ This is a comprehensive example that shows most of the main features of the
95+ library:
96+
97+ ``` python
98+ import asyncio
99+ from dataclasses import dataclass
100+ from datetime import timedelta
101+ from enum import Enum, auto
102+ from typing import assert_never
103+
104+ from frequenz.channels import (
105+ Anycast,
106+ Broadcast,
107+ Receiver,
108+ Sender,
109+ merge,
110+ select,
111+ selected_from,
112+ )
113+ from frequenz.channels.timer import Timer
114+
115+
116+ class Command (Enum ):
117+ PING = auto()
118+ STOP_SENDER = auto()
119+
120+
121+ class ReplyCommand (Enum ):
122+ PONG = auto()
123+
124+
125+ @dataclass (frozen = True )
126+ class Reply :
127+ reply: ReplyCommand
128+ source: str
129+
130+
131+ async def send (
132+ sender : Sender[str ],
133+ control_command : Receiver[Command],
134+ control_reply : Sender[Reply],
135+ ) -> None :
136+ """ Send a counter value every second, until a stop command is received."""
137+ print (f " { sender} : Starting " )
138+ timer = Timer.periodic(timedelta(seconds = 1.0 ))
139+ counter = 0
140+ async for selected in select(timer, control_command):
141+ if selected_from(selected, timer):
142+ print (f " { sender} : Sending { counter} " )
143+ await sender.send(f " { sender} : { counter} " )
144+ counter += 1
145+ elif selected_from(selected, control_command):
146+ print (f " { sender} : Received command: { selected.value.name} " )
147+ match selected.value:
148+ case Command.STOP_SENDER :
149+ print (f " { sender} : Stopping " )
150+ break
151+ case Command.PING :
152+ print (f " { sender} : Ping received, reply with pong " )
153+ await control_reply.send(Reply(ReplyCommand.PONG , str (sender)))
154+ case _ as unknown:
155+ assert_never(unknown)
156+ print (f " { sender} : Finished " )
157+
158+
159+ async def receive (
160+ receivers : list[Receiver[str ]],
161+ control_command : Receiver[Command],
162+ control_reply : Sender[Reply],
163+ ) -> None :
164+ """ Receive data from multiple channels, until no more data is received for 2 seconds."""
165+ print (" receive: Starting" )
166+ timer = Timer.timeout(timedelta(seconds = 2.0 ))
167+ print (f " { timer= } " )
168+ merged = merge(* receivers)
169+ async for selected in select(merged, timer, control_command):
170+ if selected_from(selected, merged):
171+ message = selected.value
172+ print (f " receive: Received { message= } " )
173+ timer.reset()
174+ print (f " { timer= } " )
175+ elif selected_from(selected, control_command):
176+ print (f " receive: received command: { selected.value.name} " )
177+ match selected.value:
178+ case Command.PING :
179+ print (" receive: Ping received, reply with pong" )
180+ await control_reply.send(Reply(ReplyCommand.PONG , " receive" ))
181+ case Command.STOP_SENDER :
182+ pass # Ignore
183+ case _ as unknown:
184+ assert_never(unknown)
185+ elif selected_from(selected, timer):
186+ drift = selected.value
187+ print (
188+ f " receive: No data received for { timer.interval + drift} seconds, "
189+ " giving up"
190+ )
191+ break
192+ print (" receive: Finished" )
193+
194+
195+ async def main () -> None :
196+ data_channel_1 = Anycast[str ](name = " data-channel-1" )
197+ data_channel_2 = Anycast[str ](name = " data-channel-2" )
198+ command_channel = Broadcast[Command](name = " control-channel" ) # (1)!
199+ reply_channel = Anycast[Reply](name = " reply-channel" )
200+
201+ async with asyncio.TaskGroup() as tasks:
202+ tasks.create_task(
203+ send(
204+ data_channel_1.new_sender(),
205+ command_channel.new_receiver(),
206+ reply_channel.new_sender(),
207+ ),
208+ name = " send-channel-1" ,
209+ )
210+ tasks.create_task(
211+ send(
212+ data_channel_2.new_sender(),
213+ command_channel.new_receiver(),
214+ reply_channel.new_sender(),
215+ ),
216+ name = " send-channel-2" ,
217+ )
218+ tasks.create_task(
219+ receive(
220+ [data_channel_1.new_receiver(), data_channel_2.new_receiver()],
221+ command_channel.new_receiver(),
222+ reply_channel.new_sender(),
223+ ),
224+ name = " receive" ,
225+ )
226+
227+ control_sender = command_channel.new_sender()
228+ reply_receiver = reply_channel.new_receiver()
229+
230+ # Send a ping command to all tasks and wait for the replies
231+ await control_sender.send(Command.PING )
232+ print (f " main: { await reply_receiver.receive()} " )
233+ print (f " main: { await reply_receiver.receive()} " )
234+ print (f " main: { await reply_receiver.receive()} " )
235+
236+ await asyncio.sleep(5.0 )
237+
238+ # Stop senders, after 2 seconds not receiving any data,
239+ # the receiver will stop too
240+ await control_sender.send(Command.STOP_SENDER )
241+
242+
243+ asyncio.run(main())
244+ ```
234245
235- <!-- /quick-start -->
246+ <!-- /quick-start-showcase -->
236247
237248## Documentation
238249
0 commit comments