@@ -29,6 +29,7 @@ channels](https://tour.golang.org/concurrency/2) but it also borrows ideas from
2929<!-- supported-platforms -->
3030
3131!!! Note inline end
32+
3233 Other platforms are likely to work too, but they are not
3334 automatically tested, so we cannot guarantee that they will
3435 work.
@@ -43,11 +44,12 @@ The following platforms are officially supported (tested):
4344
4445## Quick Start
4546
46- <!-- quick-start -->
47-
4847### Installing
4948
49+ <!-- quick-start-installing -->
50+
5051!!! Tip inline end
52+
5153 For more details please read the [Installation
5254 Guide](docs/user-guide/installation.md).
5355
@@ -57,182 +59,192 @@ Assuming a [supported](#supported-platforms) working Python environment:
5759python3 -m pip install frequenz-channels
5860```
5961
62+ <!-- /quick-start-installing -->
63+
6064### Examples
6165
62- !!! Example "Hello World"
63- ```python
64- import asyncio
65-
66- from frequenz.channels import Anycast
67-
68-
69- async def main() -> None:
70- hello_channel = Anycast[str](name="hello-world-channel")
71- sender = hello_channel.new_sender()
72- receiver = hello_channel.new_receiver()
73-
74- await sender.send("Hello World!")
75- msg = await receiver.receive()
76- print(msg)
77-
78-
79- asyncio.run(main())
80- ```
81-
82- !!! Example "Showcase"
83- This is a comprehensive example that shows most of the main features of the
84- library:
85-
86- ```python
87- import asyncio
88- from dataclasses import dataclass
89- from datetime import timedelta
90- from enum import Enum, auto
91- from typing import assert_never
92-
93- from frequenz.channels import (
94- Anycast,
95- Broadcast,
96- Receiver,
97- Sender,
98- merge,
99- select,
100- selected_from,
101- )
102- from frequenz.channels.timer import Timer
103-
104-
105- class Command(Enum):
106- PING = auto()
107- STOP_SENDER = auto()
108-
109-
110- class ReplyCommand(Enum):
111- PONG = auto()
112-
113-
114- @dataclass(frozen=True)
115- class Reply:
116- reply: ReplyCommand
117- source: str
118-
119-
120- async def send(
121- sender: Sender[str],
122- control_command: Receiver[Command],
123- control_reply: Sender[Reply],
124- ) -> None:
125- """Send a counter value every second, until a stop command is received."""
126- print(f"{sender}: Starting")
127- timer = Timer.periodic(timedelta(seconds=1.0))
128- counter = 0
129- async for selected in select(timer, control_command):
130- if selected_from(selected, timer):
131- print(f"{sender}: Sending {counter}")
132- await sender.send(f"{sender}: {counter}")
133- counter += 1
134- elif selected_from(selected, control_command):
135- print(f"{sender}: Received command: {selected.value.name}")
136- match selected.value:
137- case Command.STOP_SENDER:
138- print(f"{sender}: Stopping")
139- break
140- case Command.PING:
141- print(f"{sender}: Ping received, reply with pong")
142- await control_reply.send(Reply(ReplyCommand.PONG, str(sender)))
143- case _ as unknown:
144- assert_never(unknown)
145- print(f"{sender}: Finished")
146-
147-
148- async def receive(
149- receivers: list[Receiver[str]],
150- control_command: Receiver[Command],
151- control_reply: Sender[Reply],
152- ) -> None:
153- """Receive data from multiple channels, until no more data is received for 2 seconds."""
154- print("receive: Starting")
155- timer = Timer.timeout(timedelta(seconds=2.0))
156- print(f"{timer=}")
157- merged = merge(*receivers)
158- async for selected in select(merged, timer, control_command):
159- if selected_from(selected, merged):
160- message = selected.value
161- print(f"receive: Received {message=}")
162- timer.reset()
163- print(f"{timer=}")
164- elif selected_from(selected, control_command):
165- print(f"receive: received command: {selected.value.name}")
166- match selected.value:
167- case Command.PING:
168- print("receive: Ping received, reply with pong")
169- await control_reply.send(Reply(ReplyCommand.PONG, "receive"))
170- case Command.STOP_SENDER:
171- pass # Ignore
172- case _ as unknown:
173- assert_never(unknown)
174- elif selected_from(selected, timer):
175- drift = selected.value
176- print(
177- f"receive: No data received for {timer.interval + drift} seconds, "
178- "giving up"
179- )
180- break
181- print("receive: Finished")
182-
183-
184- async def main() -> None:
185- data_channel_1 = Anycast[str](name="data-channel-1")
186- data_channel_2 = Anycast[str](name="data-channel-2")
187- command_channel = Broadcast[Command](name="control-channel") # (1)!
188- reply_channel = Anycast[Reply](name="reply-channel")
189-
190- async with asyncio.TaskGroup() as tasks:
191- tasks.create_task(
192- send(
193- data_channel_1.new_sender(),
194- command_channel.new_receiver(),
195- reply_channel.new_sender(),
196- ),
197- name="send-channel-1",
198- )
199- tasks.create_task(
200- send(
201- data_channel_2.new_sender(),
202- command_channel.new_receiver(),
203- reply_channel.new_sender(),
204- ),
205- name="send-channel-2",
206- )
207- tasks.create_task(
208- receive(
209- [data_channel_1.new_receiver(), data_channel_2.new_receiver()],
210- command_channel.new_receiver(),
211- reply_channel.new_sender(),
212- ),
213- name="receive",
214- )
66+ #### Hello World
67+
68+ <!-- quick-start-hello-world -->
21569
216- control_sender = command_channel.new_sender()
217- reply_receiver = reply_channel.new_receiver()
70+ ``` python
71+ import asyncio
21872
219- # Send a ping command to all tasks and wait for the replies
220- await control_sender.send(Command.PING)
221- print(f"main: {await reply_receiver.receive()}")
222- print(f"main: {await reply_receiver.receive()}")
223- print(f"main: {await reply_receiver.receive()}")
73+ from frequenz.channels import Anycast
22474
225- await asyncio.sleep(5.0)
22675
227- # Stop senders, after 2 seconds not receiving any data,
228- # the receiver will stop too
229- await control_sender.send(Command.STOP_SENDER)
76+ async def main () -> None :
77+ hello_channel = Anycast[str ](name = " hello-world-channel" )
78+ sender = hello_channel.new_sender()
79+ receiver = hello_channel.new_receiver()
23080
81+ await sender.send(" Hello World!" )
82+ msg = await receiver.receive()
83+ print (msg)
23184
232- asyncio.run(main())
233- ```
23485
235- <!-- /quick-start -->
86+ asyncio.run(main())
87+ ```
88+
89+ <!-- /quick-start-hello-world -->
90+
91+ #### Showcase
92+
93+ <!-- quick-start-showcase -->
94+
95+ This is a comprehensive example that shows most of the main features of the
96+ library:
97+
98+ ``` python
99+ import asyncio
100+ from dataclasses import dataclass
101+ from datetime import timedelta
102+ from enum import Enum, auto
103+ from typing import assert_never
104+
105+ from frequenz.channels import (
106+ Anycast,
107+ Broadcast,
108+ Receiver,
109+ Sender,
110+ merge,
111+ select,
112+ selected_from,
113+ )
114+ from frequenz.channels.timer import Timer
115+
116+
117+ class Command (Enum ):
118+ PING = auto()
119+ STOP_SENDER = auto()
120+
121+
122+ class ReplyCommand (Enum ):
123+ PONG = auto()
124+
125+
126+ @dataclass (frozen = True )
127+ class Reply :
128+ reply: ReplyCommand
129+ source: str
130+
131+
132+ async def send (
133+ sender : Sender[str ],
134+ control_command : Receiver[Command],
135+ control_reply : Sender[Reply],
136+ ) -> None :
137+ """ Send a counter value every second, until a stop command is received."""
138+ print (f " { sender} : Starting " )
139+ timer = Timer.periodic(timedelta(seconds = 1.0 ))
140+ counter = 0
141+ async for selected in select(timer, control_command):
142+ if selected_from(selected, timer):
143+ print (f " { sender} : Sending { counter} " )
144+ await sender.send(f " { sender} : { counter} " )
145+ counter += 1
146+ elif selected_from(selected, control_command):
147+ print (f " { sender} : Received command: { selected.value.name} " )
148+ match selected.value:
149+ case Command.STOP_SENDER :
150+ print (f " { sender} : Stopping " )
151+ break
152+ case Command.PING :
153+ print (f " { sender} : Ping received, reply with pong " )
154+ await control_reply.send(Reply(ReplyCommand.PONG , str (sender)))
155+ case _ as unknown:
156+ assert_never(unknown)
157+ print (f " { sender} : Finished " )
158+
159+
160+ async def receive (
161+ receivers : list[Receiver[str ]],
162+ control_command : Receiver[Command],
163+ control_reply : Sender[Reply],
164+ ) -> None :
165+ """ Receive data from multiple channels, until no more data is received for 2 seconds."""
166+ print (" receive: Starting" )
167+ timer = Timer.timeout(timedelta(seconds = 2.0 ))
168+ print (f " { timer= } " )
169+ merged = merge(* receivers)
170+ async for selected in select(merged, timer, control_command):
171+ if selected_from(selected, merged):
172+ message = selected.value
173+ print (f " receive: Received { message= } " )
174+ timer.reset()
175+ print (f " { timer= } " )
176+ elif selected_from(selected, control_command):
177+ print (f " receive: received command: { selected.value.name} " )
178+ match selected.value:
179+ case Command.PING :
180+ print (" receive: Ping received, reply with pong" )
181+ await control_reply.send(Reply(ReplyCommand.PONG , " receive" ))
182+ case Command.STOP_SENDER :
183+ pass # Ignore
184+ case _ as unknown:
185+ assert_never(unknown)
186+ elif selected_from(selected, timer):
187+ drift = selected.value
188+ print (
189+ f " receive: No data received for { timer.interval + drift} seconds, "
190+ " giving up"
191+ )
192+ break
193+ print (" receive: Finished" )
194+
195+
196+ async def main () -> None :
197+ data_channel_1 = Anycast[str ](name = " data-channel-1" )
198+ data_channel_2 = Anycast[str ](name = " data-channel-2" )
199+ command_channel = Broadcast[Command](name = " control-channel" ) # (1)!
200+ reply_channel = Anycast[Reply](name = " reply-channel" )
201+
202+ async with asyncio.TaskGroup() as tasks:
203+ tasks.create_task(
204+ send(
205+ data_channel_1.new_sender(),
206+ command_channel.new_receiver(),
207+ reply_channel.new_sender(),
208+ ),
209+ name = " send-channel-1" ,
210+ )
211+ tasks.create_task(
212+ send(
213+ data_channel_2.new_sender(),
214+ command_channel.new_receiver(),
215+ reply_channel.new_sender(),
216+ ),
217+ name = " send-channel-2" ,
218+ )
219+ tasks.create_task(
220+ receive(
221+ [data_channel_1.new_receiver(), data_channel_2.new_receiver()],
222+ command_channel.new_receiver(),
223+ reply_channel.new_sender(),
224+ ),
225+ name = " receive" ,
226+ )
227+
228+ control_sender = command_channel.new_sender()
229+ reply_receiver = reply_channel.new_receiver()
230+
231+ # Send a ping command to all tasks and wait for the replies
232+ await control_sender.send(Command.PING )
233+ print (f " main: { await reply_receiver.receive()} " )
234+ print (f " main: { await reply_receiver.receive()} " )
235+ print (f " main: { await reply_receiver.receive()} " )
236+
237+ await asyncio.sleep(5.0 )
238+
239+ # Stop senders, after 2 seconds not receiving any data,
240+ # the receiver will stop too
241+ await control_sender.send(Command.STOP_SENDER )
242+
243+
244+ asyncio.run(main())
245+ ```
246+
247+ <!-- /quick-start-showcase -->
236248
237249## Documentation
238250
0 commit comments