Skip to content

Commit 7ca9ab0

Browse files
committed
Consider dry_run status in dispatch merging
Dispatches with different `dry_run` values are now handled by separate actor instances. This change modifies the `MergeByType` and `MergeByTypeTarget` merge strategies to include the `dry_run` status in the identity function. Previously, two dispatches with the same type and target, but different `dry_run` statuses, would be merged into a single actor instance. This could lead to unexpected behavior, as dry-run dispatches are intended for testing and should not affect production actors. With this change, dispatches are only merged if they have the same type, target (for `MergeByTypeTarget`), and `dry_run` status. Signed-off-by: Mathias L. Baumann <[email protected]>
1 parent 594c05d commit 7ca9ab0

File tree

4 files changed

+84
-6
lines changed

4 files changed

+84
-6
lines changed

RELEASE_NOTES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
## New Features
1212

13+
* `dry_run` status is now considered when merging dispatches. Dispatches with different `dry_run` values will no longer be merged, ensuring that dry-run and operational dispatches are handled by separate actors.
1314
* Two new parameters were added to the `Dispatcher` constructor:
1415
* `sign_secret`: A secret key used for signing messages.
1516
* `auth_key`: An authentication key for the Dispatch API.

src/frequenz/dispatch/_dispatcher.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -333,11 +333,11 @@ async def start_managing(
333333
This also decides how instances are mapped from dispatches to actors:
334334
335335
* [`MergeByType`][frequenz.dispatch.MergeByType] — All dispatches map to
336-
one single instance identified by the dispatch type.
336+
one single instance identified by the dispatch type and dry_run status.
337337
* [`MergeByTypeTarget`][frequenz.dispatch.MergeByTypeTarget] — A
338-
dispatch maps to an instance identified by the dispatch type and target.
339-
So different dispatches with equal type and target will map to the same
340-
instance.
338+
dispatch maps to an instance identified by the dispatch type, dry_run status
339+
and target. So different dispatches with equal type and target will map to
340+
the same instance.
341341
* `None` — No merging, each dispatch maps to a separate instance.
342342
343343
Args:

src/frequenz/dispatch/_merge_strategies.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ class MergeByType(MergeStrategy):
3030
@override
3131
def identity(self, dispatch: Dispatch) -> DispatchActorId:
3232
"""Identity function for the merge criteria."""
33-
return DispatchActorId(_hash_positive(dispatch.type))
33+
return DispatchActorId(_hash_positive((dispatch.type, dispatch.dry_run)))
3434

3535
@override
3636
def filter(
@@ -88,4 +88,6 @@ class MergeByTypeTarget(MergeByType):
8888
@override
8989
def identity(self, dispatch: Dispatch) -> DispatchActorId:
9090
"""Identity function for the merge criteria."""
91-
return DispatchActorId(_hash_positive((dispatch.type, tuple(dispatch.target))))
91+
return DispatchActorId(
92+
_hash_positive((dispatch.type, dispatch.dry_run, tuple(dispatch.target)))
93+
)

tests/test_frequenz_dispatch.py

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -781,3 +781,78 @@ async def test_at_least_one_running_filter(
781781
await asyncio.sleep(1)
782782
stopped_b = await receiver.receive()
783783
assert not stopped_b.started
784+
785+
786+
@pytest.mark.parametrize(
787+
"merge_strategy",
788+
[
789+
MergeByType(),
790+
MergeByTypeTarget(),
791+
],
792+
)
793+
async def test_dry_run_dispatches_not_merged(
794+
fake_time: time_machine.Coordinates,
795+
generator: DispatchGenerator,
796+
merge_strategy: MergeStrategy,
797+
) -> None:
798+
"""Test that dispatches with different dry_run values are not merged."""
799+
microgrid_id = MicrogridId(randint(1, 100))
800+
client = FakeClient()
801+
service = DispatchScheduler(
802+
microgrid_id=microgrid_id,
803+
client=client,
804+
)
805+
service.start()
806+
807+
receiver = await service.new_running_state_event_receiver(
808+
"TEST_TYPE", merge_strategy=merge_strategy
809+
)
810+
811+
# Create two dispatches with same type and target, but different dry_run
812+
dispatch1 = replace(
813+
generator.generate_dispatch(),
814+
active=True,
815+
duration=timedelta(seconds=10),
816+
target=TargetIds(1, 2),
817+
start_time=_now() + timedelta(seconds=5),
818+
recurrence=RecurrenceRule(),
819+
type="TEST_TYPE",
820+
dry_run=False,
821+
)
822+
dispatch2 = replace(
823+
dispatch1,
824+
dry_run=True,
825+
)
826+
827+
lifecycle_events = service.new_lifecycle_events_receiver("TEST_TYPE")
828+
829+
await client.create(**to_create_params(microgrid_id, dispatch1))
830+
await client.create(**to_create_params(microgrid_id, dispatch2))
831+
832+
# Wait for both to be registered
833+
await lifecycle_events.receive()
834+
await lifecycle_events.receive()
835+
836+
# Move time forward to start both dispatches
837+
fake_time.shift(timedelta(seconds=6))
838+
await asyncio.sleep(1)
839+
840+
started1 = await receiver.receive()
841+
started2 = await receiver.receive()
842+
843+
assert started1.started
844+
assert started2.started
845+
assert started1.dry_run != started2.dry_run
846+
847+
# Move time forward to the end of the dispatches
848+
fake_time.shift(timedelta(seconds=10))
849+
await asyncio.sleep(1)
850+
851+
stopped1 = await receiver.receive()
852+
stopped2 = await receiver.receive()
853+
854+
assert not stopped1.started
855+
assert not stopped2.started
856+
assert stopped1.dry_run != stopped2.dry_run
857+
858+
await service.stop()

0 commit comments

Comments
 (0)