Skip to content

Commit 178427a

Browse files
committed
Update the SDK codebase to work with the latest channel version
Signed-off-by: Sahas Subramanian <[email protected]>
1 parent dc8c560 commit 178427a

31 files changed

+167
-165
lines changed

benchmarks/data_ingestion/benchmark_microgrid_data.py

Lines changed: 40 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@
1717
from frequenz.api.microgrid.inverter_pb2 import Inverter
1818
from frequenz.api.microgrid.meter_pb2 import Data as PbMeterData
1919
from frequenz.api.microgrid.meter_pb2 import Meter
20-
from frequenz.channels import Broadcast, MergeNamed, Receiver, Select, Sender
20+
from frequenz.channels import Broadcast, Receiver, Sender
21+
from frequenz.channels.util import MergeNamed, Select
2122
from google.protobuf.timestamp_pb2 import Timestamp # pylint:disable=no-name-in-module
2223

2324
import frequenz.sdk.microgrid.graph as gr
@@ -237,15 +238,15 @@ async def meter_data(
237238
data.
238239
"""
239240
if component_id in self._component_streams:
240-
return self._component_streams[component_id].get_receiver()
241+
return self._component_streams[component_id].new_receiver()
241242
chan: Broadcast[MeterData] = Broadcast(f"{component_id=}")
242243
component_category, meter_connection = self.categories_from_id(component_id)
243244
if component_category == ComponentCategory.METER:
244245
if meter_connection is None:
245-
asyncio.create_task(market_data_sender(component_id, chan.get_sender()))
246+
asyncio.create_task(market_data_sender(component_id, chan.new_sender()))
246247
if meter_connection == ComponentCategory.PV_ARRAY:
247-
asyncio.create_task(pv_data_sender(component_id, chan.get_sender()))
248-
return chan.get_receiver()
248+
asyncio.create_task(pv_data_sender(component_id, chan.new_sender()))
249+
return chan.new_receiver()
249250

250251
async def battery_data(
251252
self,
@@ -261,10 +262,10 @@ async def battery_data(
261262
battery data.
262263
"""
263264
if component_id in self._component_streams:
264-
return self._component_streams[component_id].get_receiver()
265+
return self._component_streams[component_id].new_receiver()
265266
chan: Broadcast[BatteryData] = Broadcast(f"{component_id=}")
266-
asyncio.create_task(battery_data_sender(component_id, chan.get_sender()))
267-
return chan.get_receiver()
267+
asyncio.create_task(battery_data_sender(component_id, chan.new_sender()))
268+
return chan.new_receiver()
268269

269270
async def inverter_data(
270271
self,
@@ -280,10 +281,10 @@ async def inverter_data(
280281
inverter data.
281282
"""
282283
if component_id in self._component_streams:
283-
return self._component_streams[component_id].get_receiver()
284+
return self._component_streams[component_id].new_receiver()
284285
chan: Broadcast[InverterData] = Broadcast(f"{component_id=}")
285-
asyncio.create_task(inverter_data_sender(component_id, chan.get_sender()))
286-
return chan.get_receiver()
286+
asyncio.create_task(inverter_data_sender(component_id, chan.new_sender()))
287+
return chan.new_receiver()
287288

288289

289290
# pylint: disable=too-many-locals
@@ -339,20 +340,20 @@ async def benchmark_multiple_batteries(n_bat: int = 10, n_msg: int = 100) -> Non
339340
microgrid_client,
340341
component_graph,
341342
{
342-
"client_load": client_load_chan.get_sender(),
343-
"grid_load": grid_load_chan.get_sender(),
344-
"pv_prod": pv_prod_chan.get_sender(),
345-
"ev_chargers_consumption": ev_chargers_consumption_chan.get_sender(),
346-
"batteries_remaining_energy": batteries_remaining_energy_chan.get_sender(),
347-
"batteries_active_power": batteries_active_power_chan.get_sender(),
348-
"batteries_active_power_bounds": batteries_active_power_bounds_chan.get_sender(),
343+
"client_load": client_load_chan.new_sender(),
344+
"grid_load": grid_load_chan.new_sender(),
345+
"pv_prod": pv_prod_chan.new_sender(),
346+
"ev_chargers_consumption": ev_chargers_consumption_chan.new_sender(),
347+
"batteries_remaining_energy": batteries_remaining_energy_chan.new_sender(),
348+
"batteries_active_power": batteries_active_power_chan.new_sender(),
349+
"batteries_active_power_bounds": batteries_active_power_bounds_chan.new_sender(),
349350
},
350351
formula_calculator,
351352
)
352353
select = Select(
353-
batteries_remaining_energy=batteries_remaining_energy_chan.get_receiver(),
354-
batteries_active_power=batteries_active_power_chan.get_receiver(),
355-
batteries_active_power_bounds=batteries_active_power_bounds_chan.get_receiver(),
354+
batteries_remaining_energy=batteries_remaining_energy_chan.new_receiver(),
355+
batteries_active_power=batteries_active_power_chan.new_receiver(),
356+
batteries_active_power_bounds=batteries_active_power_bounds_chan.new_receiver(),
356357
)
357358
start = time.time()
358359
while await select.ready():
@@ -431,21 +432,21 @@ async def benchmark_multiple_meters(n_meter: int = 5, n_msg: int = 100) -> None:
431432
microgrid_client,
432433
component_graph,
433434
{
434-
"client_load": client_load_chan.get_sender(),
435-
"grid_load": grid_load_chan.get_sender(),
436-
"pv_prod": pv_prod_chan.get_sender(),
437-
"ev_chargers_consumption": ev_chargers_consumption_chan.get_sender(),
438-
"batteries_remaining_energy": batteries_remaining_energy_chan.get_sender(),
439-
"batteries_active_power": batteries_active_power_chan.get_sender(),
440-
"batteries_active_power_bounds": batteries_active_power_bounds_chan.get_sender(),
435+
"client_load": client_load_chan.new_sender(),
436+
"grid_load": grid_load_chan.new_sender(),
437+
"pv_prod": pv_prod_chan.new_sender(),
438+
"ev_chargers_consumption": ev_chargers_consumption_chan.new_sender(),
439+
"batteries_remaining_energy": batteries_remaining_energy_chan.new_sender(),
440+
"batteries_active_power": batteries_active_power_chan.new_sender(),
441+
"batteries_active_power_bounds": batteries_active_power_bounds_chan.new_sender(),
441442
},
442443
formula_calculator,
443444
)
444445
n_client_load_msg = 0
445446
n_pv_prod_msg = 0
446447
select = Select(
447-
client_load=client_load_chan.get_receiver(),
448-
pv_prod=pv_prod_chan.get_receiver(),
448+
client_load=client_load_chan.new_receiver(),
449+
pv_prod=pv_prod_chan.new_receiver(),
449450
)
450451
start = time.time()
451452
while await select.ready():
@@ -529,19 +530,19 @@ async def benchmark_multiple_formulas(n_formula: int = 100, n_msg: int = 100) ->
529530
for index in range(n_formula)
530531
}
531532
senders: Dict[str, Sender[TimeSeriesEntry[Any]]] = {
532-
"client_load": client_load_chan.get_sender(),
533-
"grid_load": grid_load_chan.get_sender(),
534-
"pv_prod": pv_prod_chan.get_sender(),
535-
"ev_chargers_consumption": ev_chargers_consumption_chan.get_sender(),
536-
"batteries_remaining_energy": batteries_remaining_energy_chan.get_sender(),
537-
"batteries_active_power": batteries_active_power_chan.get_sender(),
538-
"batteries_active_power_bounds": batteries_active_power_bounds_chan.get_sender(),
533+
"client_load": client_load_chan.new_sender(),
534+
"grid_load": grid_load_chan.new_sender(),
535+
"pv_prod": pv_prod_chan.new_sender(),
536+
"ev_chargers_consumption": ev_chargers_consumption_chan.new_sender(),
537+
"batteries_remaining_energy": batteries_remaining_energy_chan.new_sender(),
538+
"batteries_active_power": batteries_active_power_chan.new_sender(),
539+
"batteries_active_power_bounds": batteries_active_power_bounds_chan.new_sender(),
539540
}
540541
for index in range(n_formula):
541542
locals()["f" + str(index) + "_chan"] = Broadcast[TimeSeriesEntry[Any]](
542543
"f" + str(index)
543544
)
544-
senders["f" + str(index)] = locals()["f" + str(index) + "_chan"].get_sender()
545+
senders["f" + str(index)] = locals()["f" + str(index) + "_chan"].new_sender()
545546
formula_calculator = FormulaCalculator(
546547
component_graph, additional_formulas=microgrid_formulas
547548
)
@@ -550,7 +551,7 @@ async def benchmark_multiple_formulas(n_formula: int = 100, n_msg: int = 100) ->
550551
)
551552
recv = {}
552553
for index in range(n_formula):
553-
recv["f" + str(index)] = locals()["f" + str(index) + "_chan"].get_receiver()
554+
recv["f" + str(index)] = locals()["f" + str(index) + "_chan"].new_receiver()
554555

555556
recv_merged = MergeNamed(**recv)
556557
n_formula_msg = {"f" + str(fi): 0 for fi in range(n_formula)}

benchmarks/power_distribution/power_distributor.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
from typing import Any, Coroutine, Dict, List, Set # pylint: disable=unused-import
1212

1313
import grpc.aio as grpcaio
14-
from frequenz.channels import Bidirectional, BidirectionalHandle
14+
from frequenz.channels import Bidirectional
1515

1616
from frequenz.sdk.microgrid.client import MicrogridApiClient, MicrogridGrpcClient
1717
from frequenz.sdk.microgrid.component import Component, ComponentCategory
@@ -27,7 +27,7 @@ class User:
2727
"""User definition."""
2828

2929
user_id: str
30-
channel: BidirectionalHandle[Request, Result]
30+
channel: Bidirectional.Handle[Request, Result]
3131

3232

3333
async def run_user(user: User, batteries: Set[int], request_num: int) -> List[Result]:

examples/sdk_resampling_example.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@
55

66
import asyncio
77

8-
from frequenz.channels import Broadcast, MergeNamed
8+
from frequenz.channels import Broadcast
9+
from frequenz.channels.util import MergeNamed
910

1011
from frequenz.sdk.actor import ChannelRegistry
1112
from frequenz.sdk.actor.data_sourcing import DataSourcingActor
@@ -30,14 +31,14 @@ async def run() -> None:
3031
data_source_request_channel = Broadcast[ComponentMetricRequest](
3132
"Data Source Request Channel"
3233
)
33-
data_source_request_sender = data_source_request_channel.get_sender()
34-
data_source_request_receiver = data_source_request_channel.get_receiver()
34+
data_source_request_sender = data_source_request_channel.new_sender()
35+
data_source_request_receiver = data_source_request_channel.new_receiver()
3536

3637
resampling_actor_request_channel = Broadcast[ComponentMetricRequest](
3738
"Resampling Actor Request Channel"
3839
)
39-
resampling_actor_request_sender = resampling_actor_request_channel.get_sender()
40-
resampling_actor_request_receiver = resampling_actor_request_channel.get_receiver()
40+
resampling_actor_request_sender = resampling_actor_request_channel.new_sender()
41+
resampling_actor_request_receiver = resampling_actor_request_channel.new_receiver()
4142

4243
# Instantiate a data sourcing actor
4344
_data_sourcing_actor = DataSourcingActor(
@@ -81,7 +82,7 @@ async def run() -> None:
8182
# Store sample receivers for each subscription
8283
sample_receiver = MergeNamed(
8384
**{
84-
channel_name: channel_registry.get_receiver(channel_name)
85+
channel_name: channel_registry.new_receiver(channel_name)
8586
for channel_name in map(
8687
lambda req: req.get_channel_name(), subscription_requests
8788
)

examples/sdk_usage_example.py

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,7 @@
1616
from queue import Queue
1717
from typing import Any, List, Optional, Set
1818

19-
from frequenz.channels import (
20-
Bidirectional,
21-
BidirectionalHandle,
22-
Broadcast,
23-
Receiver,
24-
Sender,
25-
)
19+
from frequenz.channels import Bidirectional, Broadcast, Receiver, Sender
2620

2721
from frequenz.sdk.actor import actor
2822
from frequenz.sdk.data_handling import TimeSeriesEntry
@@ -48,7 +42,7 @@ class DecisionMakingActor:
4842
def __init__( # pylint: disable=too-many-arguments
4943
self,
5044
power_channel: Receiver[List[float]],
51-
power_distributor_handle: BidirectionalHandle[Request, Result],
45+
power_distributor_handle: Bidirectional.Handle[Request, Result],
5246
batteries: Set[int],
5347
) -> None:
5448
"""Create actor instance.
@@ -177,7 +171,7 @@ async def run() -> None:
177171
# microgrid_client=microgrid_api.microgrid_api, # in v0.8.0
178172
component_graph=api.component_graph,
179173
outputs={
180-
key: channel.get_sender()
174+
key: channel.new_sender()
181175
for key, channel in microgrid_data_channels.items()
182176
},
183177
formula_calculator=formula_calculator,
@@ -212,18 +206,18 @@ async def run() -> None:
212206
)
213207

214208
service_actor = DecisionMakingActor(
215-
power_channel=request_channel.get_receiver(),
209+
power_channel=request_channel.new_receiver(),
216210
power_distributor_handle=power_distributor_channels[
217211
sending_actor_id
218212
].client_handle,
219213
batteries={battery.component_id for battery in batteries},
220214
)
221215

222216
client_actor = DataCollectingActor(
223-
request_channel=request_channel.get_sender(),
217+
request_channel=request_channel.new_sender(),
224218
active_power_data=microgrid_data_channels[
225219
"batteries_active_power"
226-
].get_receiver(name="DecisionMakingActor"),
220+
].new_receiver(name="DecisionMakingActor"),
227221
)
228222

229223
# pylint: disable=no-member

src/frequenz/sdk/actor/channel_registry.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ def __init__(self, *, name: str) -> None:
2424
self._name = name
2525
self._channels: Dict[str, Broadcast[Any]] = {}
2626

27-
def get_sender(self, key: str) -> Sender[Any]:
27+
def new_sender(self, key: str) -> Sender[Any]:
2828
"""Get a sender to a dynamically created channel with the given key.
2929
3030
Args:
@@ -35,9 +35,9 @@ def get_sender(self, key: str) -> Sender[Any]:
3535
"""
3636
if key not in self._channels:
3737
self._channels[key] = Broadcast(f"{self._name}-{key}")
38-
return self._channels[key].get_sender()
38+
return self._channels[key].new_sender()
3939

40-
def get_receiver(self, key: str) -> Receiver[Any]:
40+
def new_receiver(self, key: str) -> Receiver[Any]:
4141
"""Get a receiver to a dynamically created channel with the given key.
4242
4343
Args:
@@ -48,4 +48,4 @@ def get_receiver(self, key: str) -> Receiver[Any]:
4848
"""
4949
if key not in self._channels:
5050
self._channels[key] = Broadcast(f"{self._name}-{key}")
51-
return self._channels[key].get_receiver()
51+
return self._channels[key].new_receiver()

src/frequenz/sdk/actor/data_sourcing/microgrid_api_source.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,7 @@ def _get_metric_senders(
275275
(
276276
self._get_data_extraction_method(category, metric),
277277
[
278-
self._registry.get_sender(request.get_channel_name())
278+
self._registry.new_sender(request.get_channel_name())
279279
for request in reqlist
280280
],
281281
)

src/frequenz/sdk/actor/decorator.py

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -106,13 +106,13 @@ async def run(self) -> None:
106106
107107
echo_actor = EchoActor(
108108
"EchoActor",
109-
recv1=input_chan_1.get_receiver(),
110-
recv2=input_chan_2.get_receiver(),
111-
output=echo_chan.get_sender(),
109+
recv1=input_chan_1.new_receiver(),
110+
recv2=input_chan_2.new_receiver(),
111+
output=echo_chan.new_sender(),
112112
)
113-
echo_rx = echo_chan.get_receiver()
113+
echo_rx = echo_chan.new_receiver()
114114
115-
await input_chan_2.get_sender().send(True)
115+
await input_chan_2.new_sender().send(True)
116116
msg = await echo_rx.receive()
117117
```
118118
@@ -157,18 +157,18 @@ async def run(self) -> None:
157157
a2_chan: Broadcast[bool] = Broadcast["A2 stream"]
158158
a1 = Actor1(
159159
name="ActorOne",
160-
recv=input_chan.get_receiver(),
161-
output=a1_chan.get_sender(),
160+
recv=input_chan.new_receiver(),
161+
output=a1_chan.new_sender(),
162162
)
163163
a2 = Actor2(
164164
name="ActorTwo",
165-
recv=a1_chan.get_receiver(),
166-
output=a2_chan.get_sender(),
165+
recv=a1_chan.new_receiver(),
166+
output=a2_chan.new_sender(),
167167
)
168168
169-
a2_rx = a2_chan.get_receiver()
169+
a2_rx = a2_chan.new_receiver()
170170
171-
await input_chan.get_sender().send(True)
171+
await input_chan.new_sender().send(True)
172172
msg = await a2_rx.receive()
173173
```
174174

0 commit comments

Comments
 (0)