diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 8c48b285..b4d54191 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -3,6 +3,7 @@ ## New Features * Update BaseApiClient to get the http2 keepalive feature. +* Some Methods from the high-level API have been moved to this repo: The dispatch class now offers: `until`, `started`, `next_run` and `next_run_after`. ## Bug Fixes diff --git a/pyproject.toml b/pyproject.toml index 82f35d7a..b15aa488 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -80,6 +80,7 @@ dev-mypy = [ "frequenz-client-dispatch[cli,dev-mkdocs,dev-noxfile,dev-pytest]", "grpc-stubs == 1.53.0.5", "types-protobuf == 5.28.3.20241030", + "types-python-dateutil == 2.9.0.20241003", ] dev-noxfile = ["nox == 2024.10.9", "frequenz-repo-config[lib] == 0.10.0"] dev-pylint = [ @@ -94,6 +95,7 @@ dev-pytest = [ "pytest-mock == 3.14.0", "pytest-asyncio == 0.24.0", "async-solipsism == 0.7", + "time-machine == 2.15.0", "hypothesis == 6.116.0", "frequenz-client-dispatch[cli]", ] diff --git a/src/frequenz/client/dispatch/__main__.py b/src/frequenz/client/dispatch/__main__.py index 8b69526c..58671539 100644 --- a/src/frequenz/client/dispatch/__main__.py +++ b/src/frequenz/client/dispatch/__main__.py @@ -16,7 +16,7 @@ from prompt_toolkit.patch_stdout import patch_stdout from prompt_toolkit.shortcuts import CompleteStyle -from frequenz.client.dispatch.types import ( +from frequenz.client.dispatch.recurrence import ( EndCriteria, Frequency, RecurrenceRule, diff --git a/src/frequenz/client/dispatch/_client.py b/src/frequenz/client/dispatch/_client.py index 7126786c..bb86b859 100644 --- a/src/frequenz/client/dispatch/_client.py +++ b/src/frequenz/client/dispatch/_client.py @@ -40,12 +40,12 @@ from frequenz.client.base.streaming import GrpcStreamBroadcaster from ._internal_types import DispatchCreateRequest +from .recurrence import RecurrenceRule from .types import ( ComponentSelector, Dispatch, DispatchEvent, - RecurrenceRule, - component_selector_to_protobuf, + _component_selector_to_protobuf, ) # pylint: enable=no-name-in-module @@ -166,7 +166,7 @@ def to_interval( # Setup parameters start_time_interval = to_interval(start_from, start_to) end_time_interval = to_interval(end_from, end_to) - selectors = list(map(component_selector_to_protobuf, component_selectors)) + selectors = list(map(_component_selector_to_protobuf, component_selectors)) filters = DispatchFilter( selectors=selectors, start_time_interval=start_time_interval, @@ -354,7 +354,7 @@ async def update( else: msg.update.duration = round(val.total_seconds()) case "selector": - msg.update.selector.CopyFrom(component_selector_to_protobuf(val)) + msg.update.selector.CopyFrom(_component_selector_to_protobuf(val)) case "is_active": msg.update.is_active = val case "payload": diff --git a/src/frequenz/client/dispatch/_internal_types.py b/src/frequenz/client/dispatch/_internal_types.py index c3831a8b..4922df06 100644 --- a/src/frequenz/client/dispatch/_internal_types.py +++ b/src/frequenz/client/dispatch/_internal_types.py @@ -18,11 +18,11 @@ from frequenz.client.base.conversion import to_datetime, to_timestamp +from .recurrence import RecurrenceRule from .types import ( ComponentSelector, - RecurrenceRule, - component_selector_from_protobuf, - component_selector_to_protobuf, + _component_selector_from_protobuf, + _component_selector_to_protobuf, ) # pylint: enable=no-name-in-module @@ -97,7 +97,9 @@ def from_protobuf( to_datetime(pb_object.dispatch_data.start_time) ), duration=duration, - selector=component_selector_from_protobuf(pb_object.dispatch_data.selector), + selector=_component_selector_from_protobuf( + pb_object.dispatch_data.selector + ), active=pb_object.dispatch_data.is_active, dry_run=pb_object.dispatch_data.is_dry_run, payload=MessageToDict(pb_object.dispatch_data.payload), @@ -121,7 +123,7 @@ def to_protobuf(self) -> PBDispatchCreateRequest: duration=( round(self.duration.total_seconds()) if self.duration else None ), - selector=component_selector_to_protobuf(self.selector), + selector=_component_selector_to_protobuf(self.selector), is_active=self.active, is_dry_run=self.dry_run, payload=payload, diff --git a/src/frequenz/client/dispatch/recurrence.py b/src/frequenz/client/dispatch/recurrence.py new file mode 100644 index 00000000..8460669a --- /dev/null +++ b/src/frequenz/client/dispatch/recurrence.py @@ -0,0 +1,223 @@ +# License: MIT +# Copyright © 2024 Frequenz Energy-as-a-Service GmbH + +"""Types for recurrence rules.""" + +from dataclasses import dataclass, field +from datetime import datetime +from enum import IntEnum + +from dateutil import rrule + +# pylint: disable=no-name-in-module +from frequenz.api.dispatch.v1.dispatch_pb2 import RecurrenceRule as PBRecurrenceRule + +from frequenz.client.base.conversion import to_datetime, to_timestamp + +# pylint: enable=no-name-in-module + + +class Weekday(IntEnum): + """Enum representing the day of the week.""" + + UNSPECIFIED = PBRecurrenceRule.WEEKDAY_UNSPECIFIED + MONDAY = PBRecurrenceRule.WEEKDAY_MONDAY + TUESDAY = PBRecurrenceRule.WEEKDAY_TUESDAY + WEDNESDAY = PBRecurrenceRule.WEEKDAY_WEDNESDAY + THURSDAY = PBRecurrenceRule.WEEKDAY_THURSDAY + FRIDAY = PBRecurrenceRule.WEEKDAY_FRIDAY + SATURDAY = PBRecurrenceRule.WEEKDAY_SATURDAY + SUNDAY = PBRecurrenceRule.WEEKDAY_SUNDAY + + +class Frequency(IntEnum): + """Enum representing the frequency of the recurrence.""" + + UNSPECIFIED = PBRecurrenceRule.FREQUENCY_UNSPECIFIED + MINUTELY = PBRecurrenceRule.FREQUENCY_MINUTELY + HOURLY = PBRecurrenceRule.FREQUENCY_HOURLY + DAILY = PBRecurrenceRule.FREQUENCY_DAILY + WEEKLY = PBRecurrenceRule.FREQUENCY_WEEKLY + MONTHLY = PBRecurrenceRule.FREQUENCY_MONTHLY + YEARLY = PBRecurrenceRule.FREQUENCY_YEARLY + + +_RRULE_FREQ_MAP = { + Frequency.MINUTELY: rrule.MINUTELY, + Frequency.HOURLY: rrule.HOURLY, + Frequency.DAILY: rrule.DAILY, + Frequency.WEEKLY: rrule.WEEKLY, + Frequency.MONTHLY: rrule.MONTHLY, +} +"""To map from our Frequency enum to the dateutil library enum.""" + +_RRULE_WEEKDAY_MAP = { + Weekday.MONDAY: rrule.MO, + Weekday.TUESDAY: rrule.TU, + Weekday.WEDNESDAY: rrule.WE, + Weekday.THURSDAY: rrule.TH, + Weekday.FRIDAY: rrule.FR, + Weekday.SATURDAY: rrule.SA, + Weekday.SUNDAY: rrule.SU, +} +"""To map from our Weekday enum to the dateutil library enum.""" + + +@dataclass(kw_only=True) +class EndCriteria: + """Controls when a recurring dispatch should end.""" + + count: int | None = None + """The number of times this dispatch should recur.""" + until: datetime | None = None + """The end time of this dispatch in UTC.""" + + @classmethod + def from_protobuf(cls, pb_criteria: PBRecurrenceRule.EndCriteria) -> "EndCriteria": + """Convert a protobuf end criteria to an end criteria. + + Args: + pb_criteria: The protobuf end criteria to convert. + + Returns: + The converted end criteria. + """ + instance = cls() + + match pb_criteria.WhichOneof("count_or_until"): + case "count": + instance.count = pb_criteria.count + case "until": + instance.until = to_datetime(pb_criteria.until) + return instance + + def to_protobuf(self) -> PBRecurrenceRule.EndCriteria: + """Convert an end criteria to a protobuf end criteria. + + Returns: + The converted protobuf end criteria. + """ + pb_criteria = PBRecurrenceRule.EndCriteria() + + if self.count is not None: + pb_criteria.count = self.count + elif self.until is not None: + pb_criteria.until.CopyFrom(to_timestamp(self.until)) + + return pb_criteria + + +# pylint: disable=too-many-instance-attributes +@dataclass(kw_only=True) +class RecurrenceRule: + """Ruleset governing when and how a dispatch should re-occur. + + Attributes follow the iCalendar specification (RFC5545) for recurrence rules. + """ + + frequency: Frequency = Frequency.UNSPECIFIED + """The frequency specifier of this recurring dispatch.""" + + interval: int = 0 + """How often this dispatch should recur, based on the frequency.""" + + end_criteria: EndCriteria | None = None + """When this dispatch should end. + + Can recur a fixed number of times or until a given timestamp.""" + + byminutes: list[int] = field(default_factory=list) + """On which minute(s) of the hour the event occurs.""" + + byhours: list[int] = field(default_factory=list) + """On which hour(s) of the day the event occurs.""" + + byweekdays: list[Weekday] = field(default_factory=list) + """On which day(s) of the week the event occurs.""" + + bymonthdays: list[int] = field(default_factory=list) + """On which day(s) of the month the event occurs.""" + + bymonths: list[int] = field(default_factory=list) + """On which month(s) of the year the event occurs.""" + + @classmethod + def from_protobuf(cls, pb_rule: PBRecurrenceRule) -> "RecurrenceRule": + """Convert a protobuf recurrence rule to a recurrence rule. + + Args: + pb_rule: The protobuf recurrence rule to convert. + + Returns: + The converted recurrence rule. + """ + return RecurrenceRule( + frequency=Frequency(pb_rule.freq), + interval=pb_rule.interval, + end_criteria=( + EndCriteria.from_protobuf(pb_rule.end_criteria) + if pb_rule.HasField("end_criteria") + else None + ), + byminutes=list(pb_rule.byminutes), + byhours=list(pb_rule.byhours), + byweekdays=[Weekday(day) for day in pb_rule.byweekdays], + bymonthdays=list(pb_rule.bymonthdays), + bymonths=list(pb_rule.bymonths), + ) + + def to_protobuf(self) -> PBRecurrenceRule: + """Convert a recurrence rule to a protobuf recurrence rule. + + Returns: + The converted protobuf recurrence rule. + """ + pb_rule = PBRecurrenceRule() + + pb_rule.freq = self.frequency.value + pb_rule.interval = self.interval + if self.end_criteria is not None: + pb_rule.end_criteria.CopyFrom(self.end_criteria.to_protobuf()) + pb_rule.byminutes.extend(self.byminutes) + pb_rule.byhours.extend(self.byhours) + pb_rule.byweekdays.extend([day.value for day in self.byweekdays]) + pb_rule.bymonthdays.extend(self.bymonthdays) + pb_rule.bymonths.extend(self.bymonths) + + return pb_rule + + def _as_rrule(self, start_time: datetime) -> rrule.rrule: + """Prepare the rrule object. + + Args: + start_time: The start time of the dispatch. + + Returns: + The rrule object. + + Raises: + ValueError: If the interval is 0. + """ + if self.interval == 0: + raise ValueError("Interval must be greater than 0") + + count, until = (None, None) + if end := self.end_criteria: + count = end.count + until = end.until + + rrule_obj = rrule.rrule( + freq=_RRULE_FREQ_MAP[self.frequency], + dtstart=start_time, + count=count, + until=until, + byminute=self.byminutes or None, + byhour=self.byhours or None, + byweekday=[_RRULE_WEEKDAY_MAP[weekday] for weekday in self.byweekdays] + or None, + bymonthday=self.bymonthdays or None, + bymonth=self.bymonths or None, + interval=self.interval, + ) + + return rrule_obj diff --git a/src/frequenz/client/dispatch/test/generator.py b/src/frequenz/client/dispatch/test/generator.py index 68862bf6..636990a1 100644 --- a/src/frequenz/client/dispatch/test/generator.py +++ b/src/frequenz/client/dispatch/test/generator.py @@ -9,7 +9,8 @@ from frequenz.client.common.microgrid.components import ComponentCategory from .._internal_types import rounded_start_time -from ..types import Dispatch, EndCriteria, Frequency, RecurrenceRule, Weekday +from ..recurrence import EndCriteria, Frequency, RecurrenceRule, Weekday +from ..types import Dispatch class DispatchGenerator: diff --git a/src/frequenz/client/dispatch/types.py b/src/frequenz/client/dispatch/types.py index e19976c2..95a084a4 100644 --- a/src/frequenz/client/dispatch/types.py +++ b/src/frequenz/client/dispatch/types.py @@ -4,8 +4,8 @@ """Type wrappers for the generated protobuf messages.""" -from dataclasses import dataclass, field -from datetime import datetime, timedelta +from dataclasses import dataclass +from datetime import datetime, timedelta, timezone from enum import IntEnum from typing import Any, cast @@ -14,9 +14,11 @@ ComponentSelector as PBComponentSelector, ) from frequenz.api.dispatch.v1.dispatch_pb2 import Dispatch as PBDispatch -from frequenz.api.dispatch.v1.dispatch_pb2 import DispatchData, DispatchMetadata -from frequenz.api.dispatch.v1.dispatch_pb2 import RecurrenceRule as PBRecurrenceRule -from frequenz.api.dispatch.v1.dispatch_pb2 import StreamMicrogridDispatchesResponse +from frequenz.api.dispatch.v1.dispatch_pb2 import ( + DispatchData, + DispatchMetadata, + StreamMicrogridDispatchesResponse, +) from google.protobuf.json_format import MessageToDict from google.protobuf.struct_pb2 import Struct @@ -25,6 +27,8 @@ # pylint: enable=no-name-in-module from frequenz.client.common.microgrid.components import ComponentCategory +from .recurrence import Frequency, RecurrenceRule, Weekday + ComponentSelector = list[int] | list[ComponentCategory] """A component selector specifying which components a dispatch targets. @@ -32,7 +36,7 @@ """ -def component_selector_from_protobuf( +def _component_selector_from_protobuf( pb_selector: PBComponentSelector, ) -> ComponentSelector: """Convert a protobuf component selector to a component selector. @@ -62,7 +66,7 @@ def component_selector_from_protobuf( raise ValueError("Invalid component selector") -def component_selector_to_protobuf( +def _component_selector_to_protobuf( selector: ComponentSelector, ) -> PBComponentSelector: """Convert a component selector to a protobuf component selector. @@ -94,155 +98,6 @@ def component_selector_to_protobuf( return pb_selector -class Weekday(IntEnum): - """Enum representing the day of the week.""" - - UNSPECIFIED = PBRecurrenceRule.WEEKDAY_UNSPECIFIED - MONDAY = PBRecurrenceRule.WEEKDAY_MONDAY - TUESDAY = PBRecurrenceRule.WEEKDAY_TUESDAY - WEDNESDAY = PBRecurrenceRule.WEEKDAY_WEDNESDAY - THURSDAY = PBRecurrenceRule.WEEKDAY_THURSDAY - FRIDAY = PBRecurrenceRule.WEEKDAY_FRIDAY - SATURDAY = PBRecurrenceRule.WEEKDAY_SATURDAY - SUNDAY = PBRecurrenceRule.WEEKDAY_SUNDAY - - -class Frequency(IntEnum): - """Enum representing the frequency of the recurrence.""" - - UNSPECIFIED = PBRecurrenceRule.FREQUENCY_UNSPECIFIED - MINUTELY = PBRecurrenceRule.FREQUENCY_MINUTELY - HOURLY = PBRecurrenceRule.FREQUENCY_HOURLY - DAILY = PBRecurrenceRule.FREQUENCY_DAILY - WEEKLY = PBRecurrenceRule.FREQUENCY_WEEKLY - MONTHLY = PBRecurrenceRule.FREQUENCY_MONTHLY - YEARLY = PBRecurrenceRule.FREQUENCY_YEARLY - - -@dataclass(kw_only=True) -class EndCriteria: - """Controls when a recurring dispatch should end.""" - - count: int | None = None - """The number of times this dispatch should recur.""" - until: datetime | None = None - """The end time of this dispatch in UTC.""" - - @classmethod - def from_protobuf(cls, pb_criteria: PBRecurrenceRule.EndCriteria) -> "EndCriteria": - """Convert a protobuf end criteria to an end criteria. - - Args: - pb_criteria: The protobuf end criteria to convert. - - Returns: - The converted end criteria. - """ - instance = cls() - - match pb_criteria.WhichOneof("count_or_until"): - case "count": - instance.count = pb_criteria.count - case "until": - instance.until = to_datetime(pb_criteria.until) - return instance - - def to_protobuf(self) -> PBRecurrenceRule.EndCriteria: - """Convert an end criteria to a protobuf end criteria. - - Returns: - The converted protobuf end criteria. - """ - pb_criteria = PBRecurrenceRule.EndCriteria() - - if self.count is not None: - pb_criteria.count = self.count - elif self.until is not None: - pb_criteria.until.CopyFrom(to_timestamp(self.until)) - - return pb_criteria - - -# pylint: disable=too-many-instance-attributes -@dataclass(kw_only=True) -class RecurrenceRule: - """Ruleset governing when and how a dispatch should re-occur. - - Attributes follow the iCalendar specification (RFC5545) for recurrence rules. - """ - - frequency: Frequency = Frequency.UNSPECIFIED - """The frequency specifier of this recurring dispatch.""" - - interval: int = 0 - """How often this dispatch should recur, based on the frequency.""" - - end_criteria: EndCriteria | None = None - """When this dispatch should end. - - Can recur a fixed number of times or until a given timestamp.""" - - byminutes: list[int] = field(default_factory=list) - """On which minute(s) of the hour the event occurs.""" - - byhours: list[int] = field(default_factory=list) - """On which hour(s) of the day the event occurs.""" - - byweekdays: list[Weekday] = field(default_factory=list) - """On which day(s) of the week the event occurs.""" - - bymonthdays: list[int] = field(default_factory=list) - """On which day(s) of the month the event occurs.""" - - bymonths: list[int] = field(default_factory=list) - """On which month(s) of the year the event occurs.""" - - @classmethod - def from_protobuf(cls, pb_rule: PBRecurrenceRule) -> "RecurrenceRule": - """Convert a protobuf recurrence rule to a recurrence rule. - - Args: - pb_rule: The protobuf recurrence rule to convert. - - Returns: - The converted recurrence rule. - """ - return RecurrenceRule( - frequency=Frequency(pb_rule.freq), - interval=pb_rule.interval, - end_criteria=( - EndCriteria.from_protobuf(pb_rule.end_criteria) - if pb_rule.HasField("end_criteria") - else None - ), - byminutes=list(pb_rule.byminutes), - byhours=list(pb_rule.byhours), - byweekdays=[Weekday(day) for day in pb_rule.byweekdays], - bymonthdays=list(pb_rule.bymonthdays), - bymonths=list(pb_rule.bymonths), - ) - - def to_protobuf(self) -> PBRecurrenceRule: - """Convert a recurrence rule to a protobuf recurrence rule. - - Returns: - The converted protobuf recurrence rule. - """ - pb_rule = PBRecurrenceRule() - - pb_rule.freq = self.frequency.value - pb_rule.interval = self.interval - if self.end_criteria is not None: - pb_rule.end_criteria.CopyFrom(self.end_criteria.to_protobuf()) - pb_rule.byminutes.extend(self.byminutes) - pb_rule.byhours.extend(self.byhours) - pb_rule.byweekdays.extend([day.value for day in self.byweekdays]) - pb_rule.bymonthdays.extend(self.bymonthdays) - pb_rule.bymonths.extend(self.bymonths) - - return pb_rule - - @dataclass(frozen=True, kw_only=True) class TimeIntervalFilter: """Filter for a time interval.""" @@ -261,7 +116,7 @@ class TimeIntervalFilter: @dataclass(kw_only=True, frozen=True) -class Dispatch: +class Dispatch: # pylint: disable=too-many-instance-attributes """Represents a dispatch operation within a microgrid system.""" id: int @@ -305,6 +160,123 @@ class Dispatch: update_time: datetime """The last update time of the dispatch in UTC. Set when a dispatch is modified.""" + @property + def started(self) -> bool: + """Check if the dispatch has started. + + A dispatch is considered started if the current time is after the start + time but before the end time. + + Recurring dispatches are considered started if the current time is after + the start time of the last occurrence but before the end time of the + last occurrence. + """ + if not self.active: + return False + + now = datetime.now(tz=timezone.utc) + + if now < self.start_time: + return False + + # A dispatch without duration is always running, once it started + if self.duration is None: + return True + + if until := self._until(now): + return now < until + + return False + + @property + def until(self) -> datetime | None: + """Time when the dispatch should end. + + Returns the time that a running dispatch should end. + If the dispatch is not running, None is returned. + + Returns: + The time when the dispatch should end or None if the dispatch is not running. + """ + if not self.active: + return None + + now = datetime.now(tz=timezone.utc) + return self._until(now) + + @property + def next_run(self) -> datetime | None: + """Calculate the next run of a dispatch. + + Returns: + The next run of the dispatch or None if the dispatch is finished. + """ + return self.next_run_after(datetime.now(tz=timezone.utc)) + + def next_run_after(self, after: datetime) -> datetime | None: + """Calculate the next run of a dispatch. + + Args: + after: The time to calculate the next run from. + + Returns: + The next run of the dispatch or None if the dispatch is finished. + """ + if ( + not self.recurrence.frequency + or self.recurrence.frequency == Frequency.UNSPECIFIED + or self.duration is None # Infinite duration + ): + if after > self.start_time: + return None + return self.start_time + + # Make sure no weekday is UNSPECIFIED + if Weekday.UNSPECIFIED in self.recurrence.byweekdays: + return None + + # No type information for rrule, so we need to cast + return cast( + datetime | None, + self.recurrence._as_rrule( # pylint: disable=protected-access + self.start_time + ).after(after, inc=True), + ) + + def _until(self, now: datetime) -> datetime | None: + """Calculate the time when the dispatch should end. + + If no previous run is found, None is returned. + + Args: + now: The current time. + + Returns: + The time when the dispatch should end or None if the dispatch is not running. + + Raises: + ValueError: If the dispatch has no duration. + """ + if self.duration is None: + raise ValueError("_until: Dispatch has no duration") + + if ( + not self.recurrence.frequency + or self.recurrence.frequency == Frequency.UNSPECIFIED + ): + return self.start_time + self.duration + + latest_past_start: datetime | None = ( + self.recurrence._as_rrule( # pylint: disable=protected-access + self.start_time + ).before(now, inc=True) + ) + + if not latest_past_start: + return None + + return latest_past_start + self.duration + @classmethod def from_protobuf(cls, pb_object: PBDispatch) -> "Dispatch": """Convert a protobuf dispatch to a dispatch. @@ -326,7 +298,7 @@ def from_protobuf(cls, pb_object: PBDispatch) -> "Dispatch": if pb_object.data.duration else None ), - selector=component_selector_from_protobuf(pb_object.data.selector), + selector=_component_selector_from_protobuf(pb_object.data.selector), active=pb_object.data.is_active, dry_run=pb_object.data.is_dry_run, payload=MessageToDict(pb_object.data.payload), @@ -354,7 +326,7 @@ def to_protobuf(self) -> PBDispatch: duration=( round(self.duration.total_seconds()) if self.duration else None ), - selector=component_selector_to_protobuf(self.selector), + selector=_component_selector_to_protobuf(self.selector), is_active=self.active, is_dry_run=self.dry_run, payload=payload, diff --git a/tests/test_dispatch_cli.py b/tests/test_cli.py similarity index 99% rename from tests/test_dispatch_cli.py rename to tests/test_cli.py index 92e67b32..3fdbcfc6 100644 --- a/tests/test_dispatch_cli.py +++ b/tests/test_cli.py @@ -14,14 +14,14 @@ from frequenz.client.common.microgrid.components import ComponentCategory from frequenz.client.dispatch.__main__ import cli -from frequenz.client.dispatch.test.client import ALL_KEY, FakeClient -from frequenz.client.dispatch.types import ( - Dispatch, +from frequenz.client.dispatch.recurrence import ( EndCriteria, Frequency, RecurrenceRule, Weekday, ) +from frequenz.client.dispatch.test.client import ALL_KEY, FakeClient +from frequenz.client.dispatch.types import Dispatch TEST_NOW = datetime(2023, 1, 1, 0, 0, 0, tzinfo=timezone.utc) """Arbitrary time used as NOW for testing.""" diff --git a/tests/test_dispatch_client.py b/tests/test_client.py similarity index 100% rename from tests/test_dispatch_client.py rename to tests/test_client.py diff --git a/tests/test_dispatch.py b/tests/test_dispatch.py new file mode 100644 index 00000000..5b3af0b8 --- /dev/null +++ b/tests/test_dispatch.py @@ -0,0 +1,257 @@ +# License: MIT +# Copyright © 2024 Frequenz Energy-as-a-Service GmbH + +"""Tests for the Dispatch class methods using pytest parametrization.""" + +from dataclasses import replace +from datetime import datetime, timedelta, timezone + +import pytest +import time_machine + +from frequenz.client.common.microgrid.components import ComponentCategory +from frequenz.client.dispatch.recurrence import Frequency, RecurrenceRule, Weekday +from frequenz.client.dispatch.types import Dispatch + +# Define a fixed current time for testing to avoid issues with datetime.now() +CURRENT_TIME = datetime(2023, 1, 1, 12, 0, 0, tzinfo=timezone.utc) + + +@pytest.fixture +def dispatch_base() -> Dispatch: + """Fixture to create a base Dispatch instance.""" + return Dispatch( + id=1, + type="TypeA", + start_time=CURRENT_TIME, + duration=timedelta(minutes=20), + selector=[ComponentCategory.BATTERY], + active=True, + dry_run=False, + payload={}, + recurrence=RecurrenceRule(), + create_time=CURRENT_TIME - timedelta(hours=1), + update_time=CURRENT_TIME - timedelta(minutes=30), + ) + + +@time_machine.travel(CURRENT_TIME) +@pytest.mark.parametrize( + "active, start_time_offset, duration, expected_state", + [ + # Dispatch is inactive + ( + False, + timedelta(minutes=-10), + timedelta(minutes=20), + False, + ), + # Current time is before the start time + ( + True, + timedelta(minutes=10), + timedelta(minutes=20), + False, + ), + # Dispatch with infinite duration + ( + True, + timedelta(minutes=-10), + None, + True, + ), + # Dispatch is currently running + ( + True, + timedelta(minutes=-10), + timedelta(minutes=20), + True, + ), + # Dispatch duration has passed + ( + True, + timedelta(minutes=-30), + timedelta(minutes=20), + False, + ), + ], +) +# pylint: disable=too-many-arguments,too-many-positional-arguments +def test_dispatch_running( + dispatch_base: Dispatch, + active: bool, + start_time_offset: timedelta, + duration: timedelta | None, + expected_state: bool, +) -> None: + """Test the running method of the Dispatch class.""" + dispatch = replace( + dispatch_base, + start_time=CURRENT_TIME + start_time_offset, + duration=duration, + active=active, + ) + + assert dispatch.started == expected_state + + +@time_machine.travel(CURRENT_TIME) +@pytest.mark.parametrize( + "active, duration, start_time_offset, expected_until_offset", + [ + # Dispatch is inactive + (False, timedelta(minutes=20), timedelta(minutes=-10), None), + # Dispatch with infinite duration (no duration) + (True, None, timedelta(minutes=-10), None), + # Current time is before the start time + (True, timedelta(minutes=20), timedelta(minutes=10), timedelta(minutes=30)), + # Dispatch is currently running + ( + True, + timedelta(minutes=20), + timedelta(minutes=-10), + timedelta(minutes=10), + ), + ], +) +def test_dispatch_until( + dispatch_base: Dispatch, + active: bool, + duration: timedelta | None, + start_time_offset: timedelta, + expected_until_offset: timedelta | None, +) -> None: + """Test the until property of the Dispatch class.""" + start_time = CURRENT_TIME + start_time_offset + dispatch = replace( + dispatch_base, + active=active, + duration=duration, + start_time=start_time, + ) + + if duration is None: + with pytest.raises(ValueError): + _ = dispatch.until + return + + expected_until = ( + CURRENT_TIME + expected_until_offset + if expected_until_offset is not None + else None + ) + + assert dispatch.until == expected_until + + +@time_machine.travel(CURRENT_TIME) +@pytest.mark.parametrize( + "recurrence, duration, start_time_offset, expected_next_run_offset", + [ + # No recurrence and start time in the past + (RecurrenceRule(), timedelta(minutes=20), timedelta(minutes=-10), None), + # No recurrence and start time in the future + ( + RecurrenceRule(), + timedelta(minutes=20), + timedelta(minutes=10), + timedelta(minutes=10), + ), + # Daily recurrence + ( + RecurrenceRule(frequency=Frequency.DAILY, interval=1), + timedelta(minutes=20), + timedelta(minutes=-10), + timedelta(days=1, minutes=-10), + ), + # Weekly recurrence on Monday + ( + RecurrenceRule( + frequency=Frequency.WEEKLY, byweekdays=[Weekday.MONDAY], interval=1 + ), + timedelta(minutes=20), + timedelta(minutes=-10), + None, # We'll compute expected_next_run inside the test + ), + ], +) +def test_dispatch_next_run( + dispatch_base: Dispatch, + recurrence: RecurrenceRule, + duration: timedelta | None, + start_time_offset: timedelta, + expected_next_run_offset: timedelta | None, +) -> None: + """Test the next_run property of the Dispatch class.""" + start_time = CURRENT_TIME + start_time_offset + dispatch = replace( + dispatch_base, + start_time=start_time, + duration=duration, + recurrence=recurrence, + ) + + if recurrence.frequency == Frequency.WEEKLY: + # Compute the next run based on the recurrence rule + expected_next_run = recurrence._as_rrule( # pylint: disable=protected-access + start_time + ).after(CURRENT_TIME, inc=False) + elif expected_next_run_offset is not None: + expected_next_run = CURRENT_TIME + expected_next_run_offset + else: + expected_next_run = None + + assert dispatch.next_run == expected_next_run + + +@time_machine.travel(CURRENT_TIME) +@pytest.mark.parametrize( + "after_offset, recurrence, duration, expected_next_run_after_offset", + [ + # No recurrence + (timedelta(minutes=10), RecurrenceRule(), timedelta(minutes=20), None), + # Weekly recurrence, after current time + ( + timedelta(days=2), + RecurrenceRule( + frequency=Frequency.WEEKLY, byweekdays=[Weekday.MONDAY], interval=1 + ), + timedelta(minutes=20), + None, # We'll compute expected_next_run_after inside the test + ), + # Daily recurrence + ( + timedelta(minutes=10), + RecurrenceRule(frequency=Frequency.DAILY, interval=1), + timedelta(minutes=20), + timedelta(days=1), + ), + ], +) +def test_dispatch_next_run_after( + dispatch_base: Dispatch, + after_offset: timedelta, + recurrence: RecurrenceRule, + duration: timedelta | None, + expected_next_run_after_offset: timedelta | None, +) -> None: + """Test the next_run_after method of the Dispatch class.""" + after = CURRENT_TIME + after_offset + dispatch = replace( + dispatch_base, + recurrence=recurrence, + duration=duration, + ) + + if recurrence.frequency == Frequency.WEEKLY: + expected_next_run_after = ( + recurrence._as_rrule( # pylint: disable=protected-access + dispatch.start_time + ).after(after, inc=True) + ) + elif expected_next_run_after_offset is not None: + expected_next_run_after = CURRENT_TIME + expected_next_run_after_offset + else: + expected_next_run_after = None + + assert dispatch.next_run_after(after) == expected_next_run_after diff --git a/tests/test_dispatch_types.py b/tests/test_proto.py similarity index 94% rename from tests/test_dispatch_types.py rename to tests/test_proto.py index 1da99f0e..d6a33599 100644 --- a/tests/test_dispatch_types.py +++ b/tests/test_proto.py @@ -7,14 +7,16 @@ from frequenz.client.common.microgrid.components import ComponentCategory from frequenz.client.dispatch._internal_types import DispatchCreateRequest -from frequenz.client.dispatch.types import ( - Dispatch, +from frequenz.client.dispatch.recurrence import ( EndCriteria, Frequency, RecurrenceRule, Weekday, - component_selector_from_protobuf, - component_selector_to_protobuf, +) +from frequenz.client.dispatch.types import ( + Dispatch, + _component_selector_from_protobuf, + _component_selector_to_protobuf, ) @@ -28,8 +30,8 @@ def test_component_selector() -> None: [ComponentCategory.METER], [ComponentCategory.EV_CHARGER, ComponentCategory.BATTERY], ): - protobuf = component_selector_to_protobuf(selector) - assert component_selector_from_protobuf(protobuf) == selector + protobuf = _component_selector_to_protobuf(selector) + assert _component_selector_from_protobuf(protobuf) == selector def test_end_criteria() -> None: