|
13 | 13 | import pytest |
14 | 14 | import time_machine |
15 | 15 | from frequenz.channels import Receiver |
| 16 | +from frequenz.channels.timer import SkipMissedAndResync, Timer |
16 | 17 | from frequenz.client.common.microgrid import MicrogridId |
17 | 18 | from frequenz.client.dispatch.recurrence import Frequency, RecurrenceRule |
18 | 19 | from frequenz.client.dispatch.test.client import FakeClient, to_create_params |
19 | 20 | from frequenz.client.dispatch.test.generator import DispatchGenerator |
20 | 21 | from frequenz.client.dispatch.types import Dispatch as BaseDispatch |
21 | | -from frequenz.client.dispatch.types import TargetIds |
| 22 | +from frequenz.client.dispatch.types import DispatchId, TargetIds |
22 | 23 | from pytest import fixture |
23 | 24 |
|
24 | 25 | from frequenz.dispatch import ( |
@@ -692,6 +693,86 @@ async def test_multiple_dispatches_sequential_intervals_merge( |
692 | 693 | assert not stopped.started |
693 | 694 |
|
694 | 695 |
|
| 696 | +async def test_sequential_overlapping_dispatches_between_fetch( |
| 697 | + fake_time: time_machine.Coordinates, |
| 698 | + generator: DispatchGenerator, |
| 699 | +) -> None: |
| 700 | + """Test that sequential overlapping dispatches are handled correctly.""" |
| 701 | + microgrid_id = MicrogridId(randint(1, 100)) |
| 702 | + client = FakeClient() |
| 703 | + service = DispatchScheduler(microgrid_id=microgrid_id, client=client) |
| 704 | + service.start() |
| 705 | + |
| 706 | + receiver = await service.new_running_state_event_receiver( |
| 707 | + "TEST_TYPE", merge_strategy=MergeByType() |
| 708 | + ) |
| 709 | + |
| 710 | + # Create two overlapping dispatches |
| 711 | + dispatch1 = replace( |
| 712 | + generator.generate_dispatch(), |
| 713 | + active=True, |
| 714 | + duration=timedelta(seconds=10), |
| 715 | + target=TargetIds(1, 2), |
| 716 | + start_time=_now() + timedelta(seconds=5), |
| 717 | + recurrence=RecurrenceRule(), |
| 718 | + type="TEST_TYPE", |
| 719 | + ) |
| 720 | + dispatch2 = replace( |
| 721 | + generator.generate_dispatch(), |
| 722 | + active=True, |
| 723 | + duration=timedelta(seconds=10), |
| 724 | + target=TargetIds(3, 4), |
| 725 | + start_time=_now() + timedelta(seconds=8), # overlaps with dispatch1 |
| 726 | + recurrence=RecurrenceRule(), |
| 727 | + type="TEST_TYPE", |
| 728 | + ) |
| 729 | + await client.create(**to_create_params(microgrid_id, dispatch1)) |
| 730 | + |
| 731 | + timer = Timer(timedelta(seconds=100), SkipMissedAndResync(), auto_start=False) |
| 732 | + await service._fetch(timer) # pylint: disable=protected-access |
| 733 | + |
| 734 | + await client.create(**to_create_params(microgrid_id, dispatch2)) |
| 735 | + |
| 736 | + # Move time forward to start first |
| 737 | + fake_time.shift(timedelta(seconds=6)) |
| 738 | + await asyncio.sleep(1) |
| 739 | + import logging |
| 740 | + |
| 741 | + logging.debug("We see: %s", service._dispatches) |
| 742 | + |
| 743 | + started1 = await receiver.receive() |
| 744 | + assert started1.id == DispatchId(1) |
| 745 | + |
| 746 | + # Move time to second dispatch |
| 747 | + fake_time.shift(timedelta(seconds=6)) |
| 748 | + await asyncio.sleep(1) |
| 749 | + |
| 750 | + started2 = await receiver.receive() |
| 751 | + assert started2.id == DispatchId(2) |
| 752 | + assert started2.started |
| 753 | + assert started1.started |
| 754 | + |
| 755 | + # Now we move to when the first one ended |
| 756 | + fake_time.shift(timedelta(seconds=5)) |
| 757 | + await asyncio.sleep(1) |
| 758 | + |
| 759 | + with pytest.raises(asyncio.TimeoutError): |
| 760 | + logging.debug("Wait for now starts %s", _now()) |
| 761 | + started3 = await receiver.receive() |
| 762 | + assert started3.id != started2.id, "Received unexpected event" |
| 763 | + |
| 764 | + assert not started1.started |
| 765 | + assert started2.started |
| 766 | + await asyncio.sleep(1) |
| 767 | + |
| 768 | + # Next we move to when all dispatches should have stopped |
| 769 | + fake_time.shift(timedelta(seconds=4)) |
| 770 | + started4 = await receiver.receive() |
| 771 | + |
| 772 | + # We only expect a message for dispatch2, dispatch1 should never send a stop |
| 773 | + assert started4.id == DispatchId(2) |
| 774 | + |
| 775 | + |
695 | 776 | @pytest.mark.parametrize("merge_strategy", [MergeByType(), MergeByTypeTarget()]) |
696 | 777 | async def test_at_least_one_running_filter( |
697 | 778 | fake_time: time_machine.Coordinates, |
|
0 commit comments