Skip to content

Commit d1a1c20

Browse files
authored
Support key signing and handle dry_run cleaner (#184)
- **Support key signing** - **Consider dry_run status in dispatch merging**
2 parents 369822a + 7ca9ab0 commit d1a1c20

File tree

6 files changed

+132
-23
lines changed

6 files changed

+132
-23
lines changed

RELEASE_NOTES.md

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,15 @@
66

77
## Upgrading
88

9-
<!-- Here goes notes on how to upgrade from previous versions, including deprecations and what they should be replaced with -->
9+
* The `key` parameter in the `Dispatcher` constructor is now deprecated. Use `auth_key` instead. The `sign_secret` parameter is an additional optional parameter for signing.
1010

1111
## New Features
1212

13-
<!-- Here goes the main new features and examples or instructions on how to use them -->
13+
* `dry_run` status is now considered when merging dispatches. Dispatches with different `dry_run` values will no longer be merged, ensuring that dry-run and operational dispatches are handled by separate actors.
14+
* Two new parameters were added to the `Dispatcher` constructor:
15+
* `sign_secret`: A secret key used for signing messages.
16+
* `auth_key`: An authentication key for the Dispatch API.
1417

1518
## Bug Fixes
1619

17-
* The merge by type class now uses the correct logger path.
18-
* The merge by type was made more robust under heavy load, making sure to use the same `now` for all dispatches that are checked.
19-
* Fix that the merge filter was using an outdated dispatches dict once fetch() ran.
20+
<!-- Here goes notable bug fixes that are worth a special mention or explanation -->

pyproject.toml

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,7 @@ dependencies = [
4040
# plugins.mkdocstrings.handlers.python.import)
4141
"frequenz-sdk >= 1.0.0-rc2100, < 1.0.0-rc2200",
4242
"frequenz-channels >= 1.6.1, < 2.0.0",
43-
"frequenz-client-dispatch >= 0.11.1, < 0.12.0",
44-
"frequenz-client-common >= 0.3.2, < 0.4.0",
43+
"frequenz-client-dispatch >= 0.11.2, < 0.12.0",
4544
"frequenz-client-base >= 0.11.0, < 0.12.0",
4645
]
4746
dynamic = ["version"]
@@ -54,7 +53,7 @@ email = "[email protected]"
5453
dev-flake8 = [
5554
"flake8 == 7.3.0",
5655
"flake8-docstrings == 1.7.0",
57-
"flake8-pyproject == 1.2.3", # For reading the flake8 config from pyproject.toml
56+
"flake8-pyproject == 1.2.3", # For reading the flake8 config from pyproject.toml
5857
"pydoclint == 0.6.6",
5958
"pydocstyle == 6.3.0",
6059
]

src/frequenz/dispatch/_dispatcher.py

Lines changed: 38 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import asyncio
99
import logging
10+
import warnings
1011
from asyncio import Event
1112
from datetime import timedelta
1213
from typing import Awaitable, Callable, Self
@@ -64,7 +65,7 @@ async def run():
6465
async with Dispatcher(
6566
microgrid_id=microgrid_id,
6667
server_url=url,
67-
key=key
68+
auth_key=key
6869
) as dispatcher:
6970
dispatcher.start_managing(
7071
dispatch_type="DISPATCH_TYPE",
@@ -90,7 +91,7 @@ async def run():
9091
async with Dispatcher(
9192
microgrid_id=microgrid_id,
9293
server_url=url,
93-
key=key
94+
auth_key=key
9495
) as dispatcher:
9596
actor = MagicMock() # replace with your actor
9697
@@ -135,7 +136,7 @@ async def run():
135136
async with Dispatcher(
136137
microgrid_id=microgrid_id,
137138
server_url=url,
138-
key=key,
139+
auth_key=key,
139140
) as dispatcher:
140141
events_receiver = dispatcher.new_lifecycle_events_receiver("DISPATCH_TYPE")
141142
@@ -172,7 +173,7 @@ async def run():
172173
async with Dispatcher(
173174
microgrid_id=microgrid_id,
174175
server_url=url,
175-
key=key,
176+
auth_key=key,
176177
) as dispatcher:
177178
# Create a new dispatch
178179
new_dispatch = await dispatcher.client.create(
@@ -205,7 +206,9 @@ def __init__(
205206
*,
206207
microgrid_id: MicrogridId,
207208
server_url: str,
208-
key: str,
209+
key: str | None = None,
210+
auth_key: str | None = None,
211+
sign_secret: str | None = None,
209212
call_timeout: timedelta = timedelta(seconds=60),
210213
stream_timeout: timedelta = timedelta(minutes=5),
211214
):
@@ -214,15 +217,39 @@ def __init__(
214217
Args:
215218
microgrid_id: The microgrid id.
216219
server_url: The URL of the dispatch service.
217-
key: The key to access the service.
220+
key: The key to access the service, deprecated, use `auth_key` instead.
221+
auth_key: The authentication key to access the service.
222+
sign_secret: The secret to sign the requests, optional
218223
call_timeout: The timeout for API calls.
219224
stream_timeout: The timeout for streaming API calls.
225+
226+
Raises:
227+
ValueError: If both or neither `key` and `auth_key` are provided
220228
"""
221229
super().__init__()
222230

231+
if key is not None and auth_key is not None:
232+
raise ValueError(
233+
"Both 'key' and 'auth_key' are provided, please use only 'auth_key'."
234+
)
235+
236+
if key is None and auth_key is None:
237+
raise ValueError(
238+
"'auth_key' must be provided to access the dispatch service."
239+
)
240+
241+
if key is not None:
242+
auth_key = key
243+
warnings.warn(
244+
"'key' is deprecated, use 'auth_key' instead.",
245+
DeprecationWarning,
246+
stacklevel=2,
247+
)
248+
223249
self._client = DispatchApiClient(
224250
server_url=server_url,
225-
key=key,
251+
auth_key=auth_key,
252+
sign_secret=sign_secret,
226253
call_timeout=call_timeout,
227254
stream_timeout=stream_timeout,
228255
)
@@ -306,11 +333,11 @@ async def start_managing(
306333
This also decides how instances are mapped from dispatches to actors:
307334
308335
* [`MergeByType`][frequenz.dispatch.MergeByType] — All dispatches map to
309-
one single instance identified by the dispatch type.
336+
one single instance identified by the dispatch type and dry_run status.
310337
* [`MergeByTypeTarget`][frequenz.dispatch.MergeByTypeTarget] — A
311-
dispatch maps to an instance identified by the dispatch type and target.
312-
So different dispatches with equal type and target will map to the same
313-
instance.
338+
dispatch maps to an instance identified by the dispatch type, dry_run status
339+
and target. So different dispatches with equal type and target will map to
340+
the same instance.
314341
* `None` — No merging, each dispatch maps to a separate instance.
315342
316343
Args:

src/frequenz/dispatch/_merge_strategies.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ class MergeByType(MergeStrategy):
3030
@override
3131
def identity(self, dispatch: Dispatch) -> DispatchActorId:
3232
"""Identity function for the merge criteria."""
33-
return DispatchActorId(_hash_positive(dispatch.type))
33+
return DispatchActorId(_hash_positive((dispatch.type, dispatch.dry_run)))
3434

3535
@override
3636
def filter(
@@ -88,4 +88,6 @@ class MergeByTypeTarget(MergeByType):
8888
@override
8989
def identity(self, dispatch: Dispatch) -> DispatchActorId:
9090
"""Identity function for the merge criteria."""
91-
return DispatchActorId(_hash_positive((dispatch.type, tuple(dispatch.target))))
91+
return DispatchActorId(
92+
_hash_positive((dispatch.type, dispatch.dry_run, tuple(dispatch.target)))
93+
)

tests/test_frequenz_dispatch.py

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -781,3 +781,78 @@ async def test_at_least_one_running_filter(
781781
await asyncio.sleep(1)
782782
stopped_b = await receiver.receive()
783783
assert not stopped_b.started
784+
785+
786+
@pytest.mark.parametrize(
787+
"merge_strategy",
788+
[
789+
MergeByType(),
790+
MergeByTypeTarget(),
791+
],
792+
)
793+
async def test_dry_run_dispatches_not_merged(
794+
fake_time: time_machine.Coordinates,
795+
generator: DispatchGenerator,
796+
merge_strategy: MergeStrategy,
797+
) -> None:
798+
"""Test that dispatches with different dry_run values are not merged."""
799+
microgrid_id = MicrogridId(randint(1, 100))
800+
client = FakeClient()
801+
service = DispatchScheduler(
802+
microgrid_id=microgrid_id,
803+
client=client,
804+
)
805+
service.start()
806+
807+
receiver = await service.new_running_state_event_receiver(
808+
"TEST_TYPE", merge_strategy=merge_strategy
809+
)
810+
811+
# Create two dispatches with same type and target, but different dry_run
812+
dispatch1 = replace(
813+
generator.generate_dispatch(),
814+
active=True,
815+
duration=timedelta(seconds=10),
816+
target=TargetIds(1, 2),
817+
start_time=_now() + timedelta(seconds=5),
818+
recurrence=RecurrenceRule(),
819+
type="TEST_TYPE",
820+
dry_run=False,
821+
)
822+
dispatch2 = replace(
823+
dispatch1,
824+
dry_run=True,
825+
)
826+
827+
lifecycle_events = service.new_lifecycle_events_receiver("TEST_TYPE")
828+
829+
await client.create(**to_create_params(microgrid_id, dispatch1))
830+
await client.create(**to_create_params(microgrid_id, dispatch2))
831+
832+
# Wait for both to be registered
833+
await lifecycle_events.receive()
834+
await lifecycle_events.receive()
835+
836+
# Move time forward to start both dispatches
837+
fake_time.shift(timedelta(seconds=6))
838+
await asyncio.sleep(1)
839+
840+
started1 = await receiver.receive()
841+
started2 = await receiver.receive()
842+
843+
assert started1.started
844+
assert started2.started
845+
assert started1.dry_run != started2.dry_run
846+
847+
# Move time forward to the end of the dispatches
848+
fake_time.shift(timedelta(seconds=10))
849+
await asyncio.sleep(1)
850+
851+
stopped1 = await receiver.receive()
852+
stopped2 = await receiver.receive()
853+
854+
assert not stopped1.started
855+
assert not stopped2.started
856+
assert stopped1.dry_run != stopped2.dry_run
857+
858+
await service.stop()

tests/test_managing_actor.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -335,20 +335,25 @@ async def test_manage_abstraction(
335335
class MyFakeClient(FakeClient):
336336
"""Fake client for testing."""
337337

338+
# pylint: disable=too-many-arguments,unused-argument
338339
def __init__(
339340
self,
340341
*,
341342
server_url: str,
342-
key: str,
343+
auth_key: str | None = None,
344+
key: str | None = None,
345+
sign_secret: str | None = None,
343346
call_timeout: timedelta,
344347
stream_timeout: timedelta,
345348
) -> None:
346349
assert server_url
347-
assert key
350+
assert key or auth_key
348351
assert call_timeout
349352
assert stream_timeout
350353
super().__init__()
351354

355+
# pylint: enable=too-many-arguments,unused-argument
356+
352357
mid = MicrogridId(1)
353358

354359
# Patch `Client` class in Dispatcher with MyFakeClient

0 commit comments

Comments
 (0)