Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,12 @@ classifiers = [
]
requires-python = ">= 3.11, < 4"
dependencies = [
"frequenz-api-microgrid >= 0.17.2, < 0.18.0",
"frequenz-api-common >= 0.8.0, < 1.0.0",
"frequenz-api-microgrid >= 0.18.0, < 0.19.0",
"frequenz-channels >= 1.6.1, < 2.0.0",
"frequenz-client-base >= 0.10.0, < 0.12.0",
"frequenz-client-common >= 0.3.2, < 0.4.0",
"frequenz-client-common >= 0.3.6, < 0.4.0",
"frequenz-core >= 1.3.0, < 2.0.0",
"grpcio >= 1.72.1, < 2",
"protobuf >= 6.31.1, < 7",
"typing-extensions >= 4.13.0, < 5",
Expand Down Expand Up @@ -129,6 +131,10 @@ check-yield-types = false
arg-type-hints-in-docstring = false
arg-type-hints-in-signature = true
allow-init-docstring = true
per-file-ignores = [
# Ignore the max-line-length (E501) because enum values are just too long
"src/frequenz/client/microgrid/metrics/_metric.py:E501",
]

[tool.pylint.similarities]
ignore-comments = ['yes']
Expand Down
194 changes: 114 additions & 80 deletions src/frequenz/client/microgrid/_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,17 @@
from datetime import datetime, timedelta
from typing import Any, assert_never

from frequenz.api.common.v1.metrics import bounds_pb2, metric_sample_pb2
from frequenz.api.common.v1.microgrid.components import components_pb2
from frequenz.api.microgrid.v1 import microgrid_pb2, microgrid_pb2_grpc
from frequenz.api.common.v1alpha8.metrics import bounds_pb2, metrics_pb2
from frequenz.api.common.v1alpha8.microgrid.electrical_components import (
electrical_components_pb2,
)
from frequenz.api.microgrid.v1alpha18 import microgrid_pb2, microgrid_pb2_grpc
from frequenz.channels import Receiver
from frequenz.client.base import channel, client, conversion, retry, streaming
from frequenz.client.base.exception import ApiClientError
from frequenz.client.common.microgrid.components import ComponentId
from google.protobuf.empty_pb2 import Empty
from grpc.aio import AioRpcError
from typing_extensions import override

from ._exception import ClientNotConnected
Expand Down Expand Up @@ -90,7 +94,8 @@ def __init__(
self._component_data_broadcasters: dict[
str,
streaming.GrpcStreamBroadcaster[
microgrid_pb2.ReceiveComponentDataStreamResponse, ComponentDataSamples
microgrid_pb2.ReceiveElectricalComponentTelemetryStreamResponse,
ComponentDataSamples,
],
] = {}
self._sensor_data_broadcasters: dict[
Expand Down Expand Up @@ -165,10 +170,7 @@ async def get_microgrid_info( # noqa: DOC502 (raises ApiClientError indirectly)
"""
response = await client.call_stub_method(
self,
lambda: self.stub.GetMicrogridMetadata(
Empty(),
timeout=DEFAULT_GRPC_CALL_TIMEOUT,
),
lambda: self.stub.GetMicrogrid(Empty(), timeout=DEFAULT_GRPC_CALL_TIMEOUT),
method_name="GetMicrogridMetadata",
)

Expand Down Expand Up @@ -217,17 +219,19 @@ async def list_components( # noqa: DOC502 (raises ApiClientError indirectly)
"""
response = await client.call_stub_method(
self,
lambda: self.stub.ListComponents(
microgrid_pb2.ListComponentsRequest(
component_ids=map(_get_component_id, components),
categories=map(_get_category_value, categories),
lambda: self.stub.ListElectricalComponents(
microgrid_pb2.ListElectricalComponentsRequest(
electrical_component_ids=map(_get_component_id, components),
electrical_component_categories=map(
_get_category_value, categories
),
),
timeout=DEFAULT_GRPC_CALL_TIMEOUT,
),
method_name="ListComponents",
)

return map(component_from_proto, response.components)
return map(component_from_proto, response.electrical_components)

async def list_connections( # noqa: DOC502 (raises ApiClientError indirectly)
self,
Expand Down Expand Up @@ -271,10 +275,12 @@ async def list_connections( # noqa: DOC502 (raises ApiClientError indirectly)
"""
response = await client.call_stub_method(
self,
lambda: self.stub.ListConnections(
microgrid_pb2.ListConnectionsRequest(
starts=map(_get_component_id, sources),
ends=map(_get_component_id, destinations),
lambda: self.stub.ListElectricalComponentConnections(
microgrid_pb2.ListElectricalComponentConnectionsRequest(
source_electrical_component_ids=map(_get_component_id, sources),
destination_electrical_component_ids=map(
_get_component_id, destinations
),
),
timeout=DEFAULT_GRPC_CALL_TIMEOUT,
),
Expand All @@ -283,11 +289,19 @@ async def list_connections( # noqa: DOC502 (raises ApiClientError indirectly)

return (
conn
for conn in map(component_connection_from_proto, response.connections)
for conn in map(
component_connection_from_proto,
response.electrical_component_connections,
)
if conn is not None
)

async def set_component_power_active( # noqa: DOC502 (raises ApiClientError indirectly)
# pylint: disable-next=fixme
# TODO: Unifi set_component_power_active and set_component_power_reactive, or at
# least use a common implementation.
# Return an iterator or receiver with the streamed responses instead of
# returning just the first one
async def set_component_power_active( # noqa: DOC503
self,
component: ComponentId | Component,
power: float,
Expand Down Expand Up @@ -341,25 +355,37 @@ async def set_component_power_active( # noqa: DOC502 (raises ApiClientError ind
if validate_arguments:
_validate_set_power_args(power=power, request_lifetime=lifetime_seconds)

response = await client.call_stub_method(
self,
lambda: self.stub.SetComponentPowerActive(
microgrid_pb2.SetComponentPowerActiveRequest(
component_id=_get_component_id(component),
power=power,
request_lifetime=lifetime_seconds,
),
timeout=DEFAULT_GRPC_CALL_TIMEOUT,
),
method_name="SetComponentPowerActive",
)
method_name = "SetElectricalComponentPower"
if not self.is_connected:
raise ClientNotConnected(server_url=self.server_url, operation=method_name)

try:
response = await anext(
aiter(
self.stub.SetElectricalComponentPower(
microgrid_pb2.SetElectricalComponentPowerRequest(
electrical_component_id=_get_component_id(component),
power_type=microgrid_pb2.POWER_TYPE_ACTIVE,
power=power,
request_lifetime=lifetime_seconds,
),
timeout=DEFAULT_GRPC_CALL_TIMEOUT,
)
)
)
except AioRpcError as grpc_error:
raise ApiClientError.from_grpc_error(
server_url=self.server_url,
operation=method_name,
grpc_error=grpc_error,
) from grpc_error

if response.HasField("valid_until"):
return conversion.to_datetime(response.valid_until)
if response.HasField("valid_until_time"):
return conversion.to_datetime(response.valid_until_time)

return None

async def set_component_power_reactive( # noqa: DOC502 (raises ApiClientError indirectly)
async def set_component_power_reactive( # noqa: DOC503
self,
component: ComponentId | Component,
power: float,
Expand Down Expand Up @@ -419,21 +445,33 @@ async def set_component_power_reactive( # noqa: DOC502 (raises ApiClientError i
if validate_arguments:
_validate_set_power_args(power=power, request_lifetime=lifetime_seconds)

response = await client.call_stub_method(
self,
lambda: self.stub.SetComponentPowerReactive(
microgrid_pb2.SetComponentPowerReactiveRequest(
component_id=_get_component_id(component),
power=power,
request_lifetime=lifetime_seconds,
),
timeout=DEFAULT_GRPC_CALL_TIMEOUT,
),
method_name="SetComponentPowerReactive",
)
method_name = "SetElectricalComponentPower"
if not self.is_connected:
raise ClientNotConnected(server_url=self.server_url, operation=method_name)

if response.HasField("valid_until"):
return conversion.to_datetime(response.valid_until)
try:
response = await anext(
aiter(
self.stub.SetElectricalComponentPower(
microgrid_pb2.SetElectricalComponentPowerRequest(
electrical_component_id=_get_component_id(component),
power_type=microgrid_pb2.POWER_TYPE_REACTIVE,
power=power,
request_lifetime=lifetime_seconds,
),
timeout=DEFAULT_GRPC_CALL_TIMEOUT,
)
)
)
except AioRpcError as grpc_error:
raise ApiClientError.from_grpc_error(
server_url=self.server_url,
operation=method_name,
grpc_error=grpc_error,
) from grpc_error

if response.HasField("valid_until_time"):
return conversion.to_datetime(response.valid_until_time)

return None

Expand Down Expand Up @@ -506,14 +544,11 @@ async def add_component_bounds( # noqa: DOC502 (Raises ApiClientError indirectl
most likely a subclass of
[GrpcError][frequenz.client.microgrid.GrpcError].
"""
extra_args = {}
if validity is not None:
extra_args["validity_duration"] = validity.value
response = await client.call_stub_method(
self,
lambda: self.stub.AddComponentBounds(
microgrid_pb2.AddComponentBoundsRequest(
component_id=_get_component_id(component),
lambda: self.stub.AugmentElectricalComponentBounds(
microgrid_pb2.AugmentElectricalComponentBoundsRequest(
electrical_component_id=_get_component_id(component),
target_metric=_get_metric_value(target),
bounds=(
bounds_pb2.Bounds(
Expand All @@ -522,15 +557,15 @@ async def add_component_bounds( # noqa: DOC502 (Raises ApiClientError indirectl
)
for bound in bounds
),
**extra_args,
request_lifetime=validity.value if validity else None,
),
timeout=DEFAULT_GRPC_CALL_TIMEOUT,
),
method_name="AddComponentBounds",
)

if response.HasField("ts"):
return conversion.to_datetime(response.ts)
if response.HasField("valid_until_time"):
return conversion.to_datetime(response.valid_until_time)

return None

Expand Down Expand Up @@ -578,48 +613,43 @@ def receive_component_data_samples_stream(
stream_name = f"microgrid-client-{client_id}-component-data-{key}"
# Alias to avoid too long lines linter errors
# pylint: disable-next=invalid-name
Request = microgrid_pb2.ReceiveComponentDataStreamRequest
Request = microgrid_pb2.ReceiveElectricalComponentTelemetryStreamRequest
broadcaster = streaming.GrpcStreamBroadcaster(
stream_name,
lambda: aiter(
self.stub.ReceiveComponentDataStream(
self.stub.ReceiveElectricalComponentTelemetryStream(
Request(
component_id=_get_component_id(component),
filter=Request.ComponentDataStreamFilter(
electrical_component_id=_get_component_id(component),
filter=Request.ComponentTelemetryStreamFilter(
metrics=metrics_set
),
),
timeout=DEFAULT_GRPC_CALL_TIMEOUT,
)
),
lambda msg: component_data_samples_from_proto(msg.data),
lambda msg: component_data_samples_from_proto(msg.telemetry),
retry_strategy=self._retry_strategy,
)
self._component_data_broadcasters[key] = broadcaster
return broadcaster.new_receiver(maxsize=buffer_size)


# pylint: disable-next=fixme
# TODO: Remove this enum, now AugmentElectricalComponentBounds takes a simple timeout as
# an int.
class Validity(enum.Enum):
"""The duration for which a given list of bounds will stay in effect."""

FIVE_SECONDS = (
microgrid_pb2.ComponentBoundsValidityDuration.COMPONENT_BOUNDS_VALIDITY_DURATION_5_SECONDS
)
FIVE_SECONDS = 5
"""The bounds will stay in effect for 5 seconds."""

ONE_MINUTE = (
microgrid_pb2.ComponentBoundsValidityDuration.COMPONENT_BOUNDS_VALIDITY_DURATION_1_MINUTE
)
ONE_MINUTE = 60
"""The bounds will stay in effect for 1 minute."""

FIVE_MINUTES = (
microgrid_pb2.ComponentBoundsValidityDuration.COMPONENT_BOUNDS_VALIDITY_DURATION_5_MINUTES
)
FIVE_MINUTES = 60 * 5
"""The bounds will stay in effect for 5 minutes."""

FIFTEEN_MINUTES = (
microgrid_pb2.ComponentBoundsValidityDuration.COMPONENT_BOUNDS_VALIDITY_DURATION_15_MINUTES
)
FIFTEEN_MINUTES = 60 * 15
"""The bounds will stay in effect for 15 minutes."""


Expand All @@ -634,26 +664,30 @@ def _get_component_id(component: ComponentId | Component) -> int:
assert_never(unexpected)


def _get_metric_value(metric: Metric | int) -> metric_sample_pb2.Metric.ValueType:
def _get_metric_value(metric: Metric | int) -> metrics_pb2.Metric.ValueType:
"""Get the metric ID from a metric or metric ID."""
match metric:
case Metric():
return metric_sample_pb2.Metric.ValueType(metric.value)
return metrics_pb2.Metric.ValueType(metric.value)
case int():
return metric_sample_pb2.Metric.ValueType(metric)
return metrics_pb2.Metric.ValueType(metric)
case unexpected:
assert_never(unexpected)


def _get_category_value(
category: ComponentCategory | int,
) -> components_pb2.ComponentCategory.ValueType:
) -> electrical_components_pb2.ElectricalComponentCategory.ValueType:
"""Get the category value from a component or component category."""
match category:
case ComponentCategory():
return components_pb2.ComponentCategory.ValueType(category.value)
return electrical_components_pb2.ElectricalComponentCategory.ValueType(
category.value
)
case int():
return components_pb2.ComponentCategory.ValueType(category)
return electrical_components_pb2.ElectricalComponentCategory.ValueType(
category
)
case unexpected:
assert_never(unexpected)

Expand Down
2 changes: 1 addition & 1 deletion src/frequenz/client/microgrid/_delivery_area.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import enum
from dataclasses import dataclass

from frequenz.api.common.v1.grid import delivery_area_pb2
from frequenz.api.common.v1alpha8.grid import delivery_area_pb2


@enum.unique
Expand Down
2 changes: 1 addition & 1 deletion src/frequenz/client/microgrid/_delivery_area_proto.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import logging

from frequenz.api.common.v1.grid import delivery_area_pb2
from frequenz.api.common.v1alpha8.grid import delivery_area_pb2

from ._delivery_area import DeliveryArea, EnergyMarketCodeType
from ._util import enum_from_proto
Expand Down
2 changes: 1 addition & 1 deletion src/frequenz/client/microgrid/_lifetime_proto.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

"""Loading of Lifetime objects from protobuf messages."""

from frequenz.api.common.v1.microgrid import lifetime_pb2
from frequenz.api.common.v1alpha8.microgrid import lifetime_pb2
from frequenz.client.base.conversion import to_datetime

from ._lifetime import Lifetime
Expand Down
Loading
Loading