Skip to content

Commit b6f4f5f

Browse files
authored
Upgrade channel version to 1.0.0-rc1 (#893)
This version of `frequenz-channels` includes a number of improvements and a fix for a timer bug: https://github.com/frequenz-floss/frequenz-channels-python/releases/tag/v1.0.0-rc.1
2 parents c7a58e1 + 600ab98 commit b6f4f5f

35 files changed

+263
-216
lines changed

RELEASE_NOTES.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88

99
- The SDK is now using the microgrid API client from [`frequenz-client-microgrid`](https://github.com/frequenz-floss/frequenz-client-microgrid-python/). You should update your code if you are using the microgrid API client directly.
1010

11+
- The minimum required `frequenz-channels` version is now [`v1.0.0-rc1`](https://github.com/frequenz-floss/frequenz-channels-python/releases/tag/v1.0.0-rc.1).
12+
1113
## New Features
1214

1315
<!-- Here goes the main new features and examples or instructions on how to use them -->

benchmarks/power_distribution/power_distributor.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -111,9 +111,9 @@ async def run_test( # pylint: disable=too-many-locals
111111
"""
112112
start = timeit.default_timer()
113113

114-
power_request_channel = Broadcast[Request]("power-request")
115-
battery_status_channel = Broadcast[ComponentPoolStatus]("battery-status")
116-
power_result_channel = Broadcast[Result]("power-result")
114+
power_request_channel = Broadcast[Request](name="power-request")
115+
battery_status_channel = Broadcast[ComponentPoolStatus](name="battery-status")
116+
power_result_channel = Broadcast[Result](name="power-result")
117117
async with PowerDistributingActor(
118118
requests_receiver=power_request_channel.new_receiver(),
119119
results_sender=power_result_channel.new_sender(),

benchmarks/timeseries/benchmark_datasourcing.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,12 +77,13 @@ async def benchmark_data_sourcing(
7777
mock_grid.start_mock_client(enable_mock_client)
7878

7979
request_channel = Broadcast[ComponentMetricRequest](
80-
"DataSourcingActor Request Channel"
80+
name="DataSourcingActor Request Channel"
8181
)
8282

8383
channel_registry = ChannelRegistry(name="Microgrid Channel Registry")
8484
request_receiver = request_channel.new_receiver(
85-
"datasourcing-benchmark", maxsize=(num_ev_chargers * len(COMPONENT_METRIC_IDS))
85+
name="datasourcing-benchmark",
86+
limit=(num_ev_chargers * len(COMPONENT_METRIC_IDS)),
8687
)
8788
request_sender = request_channel.new_sender()
8889

benchmarks/timeseries/periodic_feature_extractor.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ async def init_feature_extractor(
3333
) -> collections.abc.AsyncIterator[PeriodicFeatureExtractor]:
3434
"""Initialize the PeriodicFeatureExtractor class."""
3535
# We only need the moving window to initialize the PeriodicFeatureExtractor class.
36-
lm_chan = Broadcast[Sample[Quantity]]("lm_net_power")
36+
lm_chan = Broadcast[Sample[Quantity]](name="lm_net_power")
3737
async with MovingWindow(
3838
timedelta(seconds=1), lm_chan.new_receiver(), timedelta(seconds=1)
3939
) as moving_window:

examples/battery_pool.py

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,8 @@
77
import asyncio
88
import logging
99
from datetime import timedelta
10-
from typing import Any
1110

12-
from frequenz.channels import Receiver
13-
from frequenz.channels.util import MergeNamed
11+
from frequenz.channels import merge
1412

1513
from frequenz.sdk import microgrid
1614
from frequenz.sdk.actor import ResamplerConfig
@@ -31,17 +29,16 @@ async def main() -> None:
3129
)
3230

3331
battery_pool = microgrid.battery_pool()
34-
receivers: dict[str, Receiver[Any]] = {
35-
"soc": battery_pool.soc.new_receiver(maxsize=1),
36-
"capacity": battery_pool.capacity.new_receiver(maxsize=1),
32+
receivers = [
33+
battery_pool.soc.new_receiver(maxsize=1),
34+
battery_pool.capacity.new_receiver(maxsize=1),
3735
# pylint: disable=protected-access
38-
"power_bounds": battery_pool._system_power_bounds.new_receiver(maxsize=1),
36+
battery_pool._system_power_bounds.new_receiver(maxsize=1),
3937
# pylint: enable=protected-access
40-
}
38+
]
4139

42-
merged_channel = MergeNamed[Any](**receivers)
43-
async for metric_name, metric in merged_channel:
44-
print(f"Received new {metric_name}: {metric}")
40+
async for metric in merge(*receivers):
41+
print(f"Received new metric: {metric}")
4542

4643

4744
asyncio.run(main())

pyproject.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@ dependencies = [
3030
# Make sure to update the mkdocs.yml file when
3131
# changing the version
3232
# (plugins.mkdocstrings.handlers.python.import)
33-
"frequenz-channels == 1.0.0b2",
34-
"frequenz-client-microgrid >= 0.1.2, < 0.2.0",
33+
"frequenz-channels == 1.0.0-rc1",
34+
"frequenz-client-microgrid >= 0.2.0, < 0.3.0",
3535
"google-api-python-client >= 2.71, < 3",
3636
"grpcio >= 1.54.2, < 2",
3737
"grpcio-tools >= 1.54.2, < 2",

src/frequenz/sdk/_internal/_channels.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,14 @@
99

1010
from frequenz.channels import Receiver
1111

12-
T = typing.TypeVar("T")
12+
T_co = typing.TypeVar("T_co", covariant=True)
1313

1414

15-
class ReceiverFetcher(typing.Generic[T], typing.Protocol):
15+
class ReceiverFetcher(typing.Generic[T_co], typing.Protocol):
1616
"""An interface that just exposes a `new_receiver` method."""
1717

1818
@abc.abstractmethod
19-
def new_receiver(self, *, maxsize: int = 50) -> Receiver[T]:
19+
def new_receiver(self, *, maxsize: int = 50) -> Receiver[T_co]:
2020
"""Get a receiver from the channel.
2121
2222
Args:
@@ -31,20 +31,20 @@ class _Sentinel:
3131
"""A sentinel to denote that no value has been received yet."""
3232

3333

34-
class LatestValueCache(typing.Generic[T]):
34+
class LatestValueCache(typing.Generic[T_co]):
3535
"""A cache that stores the latest value in a receiver."""
3636

37-
def __init__(self, receiver: Receiver[T]) -> None:
37+
def __init__(self, receiver: Receiver[T_co]) -> None:
3838
"""Create a new cache.
3939
4040
Args:
4141
receiver: The receiver to cache.
4242
"""
4343
self._receiver = receiver
44-
self._latest_value: T | _Sentinel = _Sentinel()
44+
self._latest_value: T_co | _Sentinel = _Sentinel()
4545
self._task = asyncio.create_task(self._run())
4646

47-
def get(self) -> T:
47+
def get(self) -> T_co:
4848
"""Return the latest value that has been received.
4949
5050
This raises a `ValueError` if no value has been received yet. Use `has_value` to

src/frequenz/sdk/actor/_channel_registry.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,9 @@ def get_or_create(self, message_type: type[_T], key: str) -> Broadcast[_T]:
9898
message_type,
9999
"".join(traceback.format_stack(limit=10)[:9]),
100100
)
101-
self._channels[key] = _Entry(message_type, Broadcast(f"{self._name}-{key}"))
101+
self._channels[key] = _Entry(
102+
message_type, Broadcast(name=f"{self._name}-{key}")
103+
)
102104

103105
entry = self._channels[key]
104106
if entry.message_type is not message_type:

src/frequenz/sdk/actor/_config_managing.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
from typing import Any, assert_never
1111

1212
from frequenz.channels import Sender
13-
from frequenz.channels.util import FileWatcher
13+
from frequenz.channels.file_watcher import EventType, FileWatcher
1414

1515
from ..actor._actor import Actor
1616
from ..config import Config
@@ -34,7 +34,7 @@ def __init__(
3434
self,
3535
config_path: pathlib.Path | str,
3636
output: Sender[Config],
37-
event_types: abc.Set[FileWatcher.EventType] = frozenset(FileWatcher.EventType),
37+
event_types: abc.Set[EventType] = frozenset(EventType),
3838
*,
3939
name: str | None = None,
4040
) -> None:
@@ -99,21 +99,21 @@ async def _run(self) -> None:
9999
continue
100100

101101
match event.type:
102-
case FileWatcher.EventType.CREATE:
102+
case EventType.CREATE:
103103
_logger.info(
104104
"%s: The configuration file %s was created, sending new config...",
105105
self,
106106
self._config_path,
107107
)
108108
await self.send_config()
109-
case FileWatcher.EventType.MODIFY:
109+
case EventType.MODIFY:
110110
_logger.info(
111111
"%s: The configuration file %s was modified, sending update...",
112112
self,
113113
self._config_path,
114114
)
115115
await self.send_config()
116-
case FileWatcher.EventType.DELETE:
116+
case EventType.DELETE:
117117
_logger.info(
118118
"%s: The configuration file %s was deleted, ignoring...",
119119
self,

src/frequenz/sdk/actor/_power_managing/_power_managing_actor.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@
1010
import typing
1111
from datetime import datetime, timedelta, timezone
1212

13-
from frequenz.channels import Receiver, Sender
14-
from frequenz.channels.util import SkipMissedAndDrift, Timer, select, selected_from
13+
from frequenz.channels import Receiver, Sender, select, selected_from
14+
from frequenz.channels.timer import SkipMissedAndDrift, Timer
1515
from frequenz.client.microgrid import ComponentCategory
1616
from typing_extensions import override
1717

@@ -195,7 +195,7 @@ async def _run(self) -> None:
195195
drop_old_proposals_timer,
196196
):
197197
if selected_from(selected, self._proposals_receiver):
198-
proposal = selected.value
198+
proposal = selected.message
199199
if proposal.component_ids not in self._bound_tracker_tasks:
200200
self._add_bounds_tracker(proposal.component_ids)
201201

@@ -214,7 +214,7 @@ async def _run(self) -> None:
214214
await self._send_reports(proposal.component_ids)
215215

216216
elif selected_from(selected, self._bounds_subscription_receiver):
217-
sub = selected.value
217+
sub = selected.message
218218
component_ids = sub.component_ids
219219
priority = sub.priority
220220

@@ -239,7 +239,7 @@ async def _run(self) -> None:
239239
power_distributing,
240240
)
241241

242-
result = selected.value
242+
result = selected.message
243243
self._distribution_results[frozenset(result.request.component_ids)] = (
244244
result
245245
)

0 commit comments

Comments
 (0)