Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

* Update BaseApiClient to get the http2 keepalive feature.
* Some Methods from the high-level API have been moved to this repo: The dispatch class now offers: `until`, `started`, `next_run` and `next_run_after`.
* Add `start_immediately` support to the `create` method. You can now specify "NOW" as the start time to trigger immediate dispatch. Note: While the dispatch CLI previously allowed this by converting "NOW" to a timestamp client-side before sending it to the server, this functionality is now supported directly on the server side!

## Bug Fixes

Expand Down
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ classifiers = [
requires-python = ">= 3.11, < 4"
dependencies = [
"typing-extensions >= 4.6.1, < 5",
"frequenz-api-dispatch >= 0.15.1, < 0.16",
"frequenz-api-dispatch == 1.0.0-rc1",
"frequenz-client-base >= 0.7.0, < 0.8.0",
"frequenz-client-common >= 0.1.0, < 0.3.0",
"grpcio >= 1.66.1, < 2",
Expand Down Expand Up @@ -87,7 +87,7 @@ dev-pylint = [
"pylint == 3.3.1",
# For checking the noxfile, docs/ script, and tests
"frequenz-client-dispatch[cli,dev-mkdocs,dev-noxfile,dev-pytest]",
"frequenz-api-dispatch >= 0.15.1, < 0.16",
"frequenz-api-dispatch == 1.0.0-rc1",
]
dev-pytest = [
"pytest == 8.3.3",
Expand Down
21 changes: 11 additions & 10 deletions src/frequenz/client/dispatch/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
FuzzyIntRange,
FuzzyTimeDelta,
JsonDictParamType,
SelectorParamType,
TargetComponentParamType,
)
from ._client import Client

Expand Down Expand Up @@ -79,7 +79,7 @@ async def cli(ctx: click.Context, url: str, key: str) -> None:
@cli.command("list")
@click.pass_context
@click.argument("microgrid-id", required=True, type=int)
@click.option("--selector", "-s", type=SelectorParamType(), multiple=True)
@click.option("--target", "-t", type=TargetComponentParamType(), multiple=True)
@click.option("--start-from", type=FuzzyDateTime())
@click.option("--start-to", type=FuzzyDateTime())
@click.option("--end-from", type=FuzzyDateTime())
Expand All @@ -92,11 +92,12 @@ async def list_(ctx: click.Context, /, **filters: Any) -> None:

Lists all dispatches for MICROGRID_ID that match the given filters.

The selector option can be given multiple times.
The target option can be given multiple times.
"""
if "selector" in filters:
selector = filters.pop("selector")
filters["component_selectors"] = selector
if "target" in filters:
target = filters.pop("target")
# Name of the parameter in client.list()
filters["target_components"] = target

num_dispatches = 0
async for page in ctx.obj["client"].list(**filters):
Expand Down Expand Up @@ -186,7 +187,7 @@ def validate_reccurance(ctx: click.Context, param: click.Parameter, value: Any)
["--interval"],
type=int,
help="Interval of the dispatch, based on frequency",
default=0,
default=1,
),
click.Option(
["--count"],
Expand Down Expand Up @@ -241,7 +242,7 @@ def validate_reccurance(ctx: click.Context, param: click.Parameter, value: Any)
required=True,
type=str,
)
@click.argument("selector", required=True, type=SelectorParamType())
@click.argument("target", required=True, type=TargetComponentParamType())
@click.argument("start-time", required=True, type=FuzzyDateTime())
@click.argument("duration", required=False, type=FuzzyTimeDelta())
@click.option("--active", "-a", type=bool, default=True)
Expand All @@ -260,7 +261,7 @@ async def create(
Creates a new dispatch for MICROGRID_ID of type TYPE running for DURATION seconds
starting at START_TIME.

SELECTOR is a comma-separated list of either component categories or component IDs.
TARGET is a comma-separated list of either component categories or component IDs.
Possible component categories: "BATTERY, GRID, METER, INVERTER, EV_CHARGER, CHP".
"""
# Remove keys with `None` value
Expand All @@ -286,7 +287,7 @@ async def create(
@click.option("--start-time", type=FuzzyDateTime())
@click.option("--duration", type=FuzzyTimeDelta())
@click.option("--no-duration", is_flag=True)
@click.option("--selector", type=SelectorParamType())
@click.option("--target", type=TargetComponentParamType())
@click.option("--active", type=bool)
@click.option(
"--payload", "-p", type=JsonDictParamType(), help="JSON payload for the dispatch"
Expand Down
17 changes: 10 additions & 7 deletions src/frequenz/client/dispatch/_cli_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import json
from datetime import datetime, timedelta, timezone
from typing import Any, cast
from typing import Any, Literal, cast

import asyncclick as click
import parsedatetime # type: ignore
Expand All @@ -32,12 +32,15 @@ def __init__(self) -> None:

def convert(
self, value: Any, param: click.Parameter | None, ctx: click.Context | None
) -> datetime:
"""Convert the value to a datetime object."""
) -> datetime | Literal["NOW"] | None:
"""Convert the value to a datetime object or the string "NOW"."""
if isinstance(value, datetime):
return value

try:
if value.upper() == "NOW":
return "NOW"

parsed_dt, parse_status = self.cal.parseDT(value, tzinfo=self.local_tz)
if parse_status == 0:
self.fail(f"Invalid time expression: {value}", param, ctx)
Expand Down Expand Up @@ -130,10 +133,10 @@ def convert(
self.fail(f"Invalid integer range: {value}", param, ctx)


class SelectorParamType(click.ParamType):
"""Click parameter type for selectors."""
class TargetComponentParamType(click.ParamType):
"""Click parameter type for targets."""

name = "selector"
name = "target"

def convert(
self, value: Any, param: click.Parameter | None, ctx: click.Context | None
Expand All @@ -154,7 +157,7 @@ def convert(
values = value.split(",")

if len(values) == 0:
self.fail("Empty selector list", param, ctx)
self.fail("Empty target list", param, ctx)

error: Exception | None = None
# Attempt to parse component ids
Expand Down
42 changes: 23 additions & 19 deletions src/frequenz/client/dispatch/_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from datetime import datetime, timedelta
from importlib.resources import files
from pathlib import Path
from typing import Any, AsyncIterator, Awaitable, Iterator, cast
from typing import Any, AsyncIterator, Awaitable, Iterator, Literal, cast

# pylint: disable=no-name-in-module
from frequenz.api.common.v1.pagination.pagination_params_pb2 import PaginationParams
Expand Down Expand Up @@ -42,10 +42,10 @@
from ._internal_types import DispatchCreateRequest
from .recurrence import RecurrenceRule
from .types import (
ComponentSelector,
Dispatch,
DispatchEvent,
_component_selector_to_protobuf,
TargetComponents,
_target_components_to_protobuf,
)

# pylint: enable=no-name-in-module
Expand Down Expand Up @@ -110,7 +110,7 @@ async def list(
self,
microgrid_id: int,
*,
component_selectors: Iterator[ComponentSelector] = iter(()),
target_components: Iterator[TargetComponents] = iter(()),
start_from: datetime | None = None,
start_to: datetime | None = None,
end_from: datetime | None = None,
Expand All @@ -136,7 +136,7 @@ async def list(

Args:
microgrid_id: The microgrid_id to list dispatches for.
component_selectors: optional, list of component ids or categories to filter by.
target_components: optional, list of component ids or categories to filter by.
start_from: optional, filter by start_time >= start_from.
start_to: optional, filter by start_time < start_to.
end_from: optional, filter by end_time >= end_from.
Expand Down Expand Up @@ -166,9 +166,9 @@ def to_interval(
# Setup parameters
start_time_interval = to_interval(start_from, start_to)
end_time_interval = to_interval(end_from, end_to)
selectors = list(map(_component_selector_to_protobuf, component_selectors))
targets = list(map(_target_components_to_protobuf, target_components))
filters = DispatchFilter(
selectors=selectors,
targets=targets,
start_time_interval=start_time_interval,
end_time_interval=end_time_interval,
is_active=active,
Expand Down Expand Up @@ -254,9 +254,9 @@ async def create( # pylint: disable=too-many-positional-arguments
self,
microgrid_id: int,
type: str, # pylint: disable=redefined-builtin
start_time: datetime,
start_time: datetime | Literal["NOW"],
duration: timedelta | None,
selector: ComponentSelector,
target: TargetComponents,
*,
active: bool = True,
dry_run: bool = False,
Expand All @@ -268,10 +268,10 @@ async def create( # pylint: disable=too-many-positional-arguments
Args:
microgrid_id: The microgrid_id to create the dispatch for.
type: User defined string to identify the dispatch type.
start_time: The start time of the dispatch.
start_time: The start time of the dispatch. Can be "NOW" for immediate start.
duration: The duration of the dispatch. Can be `None` for infinite
or no-duration dispatches (e.g. switching a component on).
selector: The component selector for the dispatch.
target: The component target for the dispatch.
active: The active status of the dispatch.
dry_run: The dry_run status of the dispatch.
payload: The payload of the dispatch.
Expand All @@ -283,19 +283,23 @@ async def create( # pylint: disable=too-many-positional-arguments
Raises:
ValueError: If start_time is in the past.
"""
if start_time <= datetime.now(tz=start_time.tzinfo):
raise ValueError("start_time must not be in the past")
if isinstance(start_time, datetime):
if start_time <= datetime.now(tz=start_time.tzinfo):
raise ValueError("start_time must not be in the past")

# Raise if it's not UTC
if start_time.tzinfo is None or start_time.tzinfo.utcoffset(start_time) is None:
raise ValueError("start_time must be timezone aware")
# Raise if it's not UTC
if (
start_time.tzinfo is None
or start_time.tzinfo.utcoffset(start_time) is None
):
raise ValueError("start_time must be timezone aware")

request = DispatchCreateRequest(
microgrid_id=microgrid_id,
type=type,
start_time=start_time,
duration=duration,
selector=selector,
target=target,
active=active,
dry_run=dry_run,
payload=payload or {},
Expand Down Expand Up @@ -353,8 +357,8 @@ async def update(
msg.update.ClearField("duration")
else:
msg.update.duration = round(val.total_seconds())
case "selector":
msg.update.selector.CopyFrom(_component_selector_to_protobuf(val))
case "target":
msg.update.target.CopyFrom(_target_components_to_protobuf(val))
case "is_active":
msg.update.is_active = val
case "payload":
Expand Down
35 changes: 20 additions & 15 deletions src/frequenz/client/dispatch/_internal_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Any
from typing import Any, Literal

# pylint: disable=no-name-in-module
from frequenz.api.dispatch.v1.dispatch_pb2 import (
Expand All @@ -15,14 +15,15 @@
from frequenz.api.dispatch.v1.dispatch_pb2 import DispatchData
from google.protobuf.json_format import MessageToDict
from google.protobuf.struct_pb2 import Struct
from google.protobuf.timestamp_pb2 import Timestamp

from frequenz.client.base.conversion import to_datetime, to_timestamp

from .recurrence import RecurrenceRule
from .types import (
ComponentSelector,
_component_selector_from_protobuf,
_component_selector_to_protobuf,
TargetComponents,
_target_components_from_protobuf,
_target_components_to_protobuf,
)

# pylint: enable=no-name-in-module
Expand All @@ -41,7 +42,7 @@ class DispatchCreateRequest:

This is understood and processed by downstream applications."""

start_time: datetime
start_time: datetime | Literal["NOW"]
"""The start time of the dispatch in UTC."""

duration: timedelta | None
Expand All @@ -51,8 +52,8 @@ class DispatchCreateRequest:
like a command to turn on a component.
"""

selector: ComponentSelector
"""The component selector specifying which components the dispatch targets."""
target: TargetComponents
"""The target components of the dispatch."""

active: bool
"""Indicates whether the dispatch is active and eligible for processing."""
Expand All @@ -69,7 +70,6 @@ class DispatchCreateRequest:

recurrence: RecurrenceRule | None
"""The recurrence rule for the dispatch.

Defining any repeating patterns or schedules."""

@classmethod
Expand All @@ -93,13 +93,13 @@ def from_protobuf(
return DispatchCreateRequest(
microgrid_id=pb_object.microgrid_id,
type=pb_object.dispatch_data.type,
start_time=rounded_start_time(
to_datetime(pb_object.dispatch_data.start_time)
start_time=(
"NOW"
if pb_object.start_immediately
else rounded_start_time(to_datetime(pb_object.dispatch_data.start_time))
),
duration=duration,
selector=_component_selector_from_protobuf(
pb_object.dispatch_data.selector
),
target=_target_components_from_protobuf(pb_object.dispatch_data.target),
active=pb_object.dispatch_data.is_active,
dry_run=pb_object.dispatch_data.is_dry_run,
payload=MessageToDict(pb_object.dispatch_data.payload),
Expand All @@ -119,16 +119,21 @@ def to_protobuf(self) -> PBDispatchCreateRequest:
microgrid_id=self.microgrid_id,
dispatch_data=DispatchData(
type=self.type,
start_time=to_timestamp(self.start_time),
start_time=(
to_timestamp(self.start_time)
if isinstance(self.start_time, datetime)
else Timestamp()
),
duration=(
round(self.duration.total_seconds()) if self.duration else None
),
selector=_component_selector_to_protobuf(self.selector),
target=_target_components_to_protobuf(self.target),
is_active=self.active,
is_dry_run=self.dry_run,
payload=payload,
recurrence=self.recurrence.to_protobuf() if self.recurrence else None,
),
start_immediately=self.start_time == "NOW",
)


Expand Down
9 changes: 6 additions & 3 deletions src/frequenz/client/dispatch/test/_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,8 @@ def _filter_dispatch(
"""Filter a dispatch based on the request."""
if request.HasField("filter"):
_filter = request.filter
for selector in _filter.selectors:
if selector != dispatch.selector:
for target in _filter.targets:
if target != dispatch.target:
return False
if _filter.HasField("start_time_interval"):
if start_from := _filter.start_time_interval.__dict__["from"]:
Expand Down Expand Up @@ -272,7 +272,7 @@ async def UpdateMicrogridDispatch(
getattr(request.update, split_path[0]),
)
# Fields that need to be copied
case "start_time" | "selector" | "payload":
case "start_time" | "target" | "payload":
getattr(pb_dispatch.data, split_path[0]).CopyFrom(
getattr(request.update, split_path[0])
)
Expand Down Expand Up @@ -397,6 +397,9 @@ def _dispatch_from_request(
params = _request.__dict__
params.pop("microgrid_id")

if _request.start_time == "NOW":
params["start_time"] = datetime.now(tz=timezone.utc)

return Dispatch(
id=_id,
create_time=create_time,
Expand Down
Loading
Loading