diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 31dcbfc..d6e8765 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -2,43 +2,24 @@ ## Summary -This release introduces a complete Assets API client with CLI support for interacting with Frequenz microgrid assets, including comprehensive error handling and type safety. - -## Upgrading - -**Breaking Changes:** - -- Added new required dependencies: `frequenz-api-assets`, `frequenz-api-common`, `frequenz-client-base`, `grpcio` - -**CLI Support:** -Install with `pip install "frequenz-client-assets[cli]"` for command-line functionality. +This release introduces a Assets API client with CLI support for interacting with Frequenz microgrid assets. It provides comprehensive electrical components functionality including batteries, EV chargers, inverters, and grid connection points, with enhanced type safety and error handling. ## New Features -**Assets API Client:** - -- Complete gRPC client for Frequenz Assets API -- Extends `BaseApiClient` for authentication and connection management -- `get_microgrid_details()` method for retrieving microgrid information +* **Assets API Client**: + * `list_electrical_components()` method for retrieving electrical components in a microgrid -**Command-Line Interface:** +* **Electrical Components Support**: Comprehensive data classes for electrical components + * `ElectricalComponent` with category-specific information for batteries, EV chargers, inverters, grid connection points, and power transformers + * Battery types: Li-ion, Na-ion with proper enum mapping + * EV charger types: AC, DC, Hybrid charging support + * Operational lifetime tracking and metric configuration bounds -- `python -m frequenz.client.assets microgrid ` command -- Environment variable support for API credentials -- JSON output formatting +* **Command-Line Interface**: + * `assets-cli electrical-components ` command -**Type System:** - -- `Microgrid`, `DeliveryArea`, and `Location` data classes -- Protobuf integration with proper type safety - -**Exception Handling:** - -- Custom exception hierarchy (`AssetsApiError`, `NotFoundError`, `AuthenticationError`, `ServiceUnavailableError`) -- JSON serialization support for error responses +* **Type System**: Enhanced data classes with protobuf integration + * `Microgrid`, `DeliveryArea`, `Location`, and comprehensive electrical component types + * Proper enum mapping: `BatteryType`, `EvChargerType`, `InverterType`, `Metric` ## Bug Fixes - -- Improved dependency management with optional dependency groups -- Enhanced gRPC error handling and type safety -- Cleaned up deprecated code diff --git a/pyproject.toml b/pyproject.toml index 97c5a8a..f3d5b8f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -40,7 +40,7 @@ dependencies = [ "frequenz-api-assets @ git+https://github.com/frequenz-floss/frequenz-api-assets.git@v0.x.x", "frequenz-api-common >= 0.8.0, < 1", "frequenz-client-base >= 0.11.0, < 0.12.0", - "frequenz-client-common >= 0.3.2, < 0.4.0", + "frequenz-client-common >= 0.3.6, < 0.4.0", "grpcio >= 1.73.1, < 2", ] dynamic = ["version"] @@ -78,6 +78,7 @@ dev-mypy = [ "mypy == 1.17.1", "grpc-stubs == 1.53.0.6", "types-Markdown == 3.8.0.20250809", + "types-protobuf == 6.30.2.20250516", # For checking the noxfile, docs/ script, and tests "frequenz-client-assets[dev-mkdocs,dev-noxfile,dev-pytest,cli]", ] diff --git a/src/frequenz/client/assets/__init__.py b/src/frequenz/client/assets/__init__.py index d9fc068..cdce568 100644 --- a/src/frequenz/client/assets/__init__.py +++ b/src/frequenz/client/assets/__init__.py @@ -4,5 +4,15 @@ """Assets API client.""" from ._client import AssetsApiClient +from ._delivery_area import DeliveryArea, EnergyMarketCodeType +from ._location import Location +from ._microgrid import Microgrid, MicrogridStatus -__all__ = ["AssetsApiClient"] +__all__ = [ + "AssetsApiClient", + "DeliveryArea", + "EnergyMarketCodeType", + "Microgrid", + "MicrogridStatus", + "Location", +] diff --git a/src/frequenz/client/assets/_client.py b/src/frequenz/client/assets/_client.py index 18e5b4a..e93cfaa 100644 --- a/src/frequenz/client/assets/_client.py +++ b/src/frequenz/client/assets/_client.py @@ -10,36 +10,60 @@ from __future__ import annotations from frequenz.api.assets.v1 import assets_pb2, assets_pb2_grpc +from frequenz.client.base import channel from frequenz.client.base.client import BaseApiClient, call_stub_method -from frequenz.client.assets.types import Microgrid +from frequenz.client.assets.electrical_component._electrical_component import ( + ElectricalComponent, +) +from ._microgrid import Microgrid +from ._microgrid_proto import microgrid_from_proto +from .electrical_component._electrical_component_proto import electrical_component_proto from .exceptions import ClientNotConnected +DEFAULT_GRPC_CALL_TIMEOUT = 60.0 +"""The default timeout for gRPC calls made by this client (in seconds).""" -class AssetsApiClient(BaseApiClient[assets_pb2_grpc.PlatformAssetsStub]): + +class AssetsApiClient( + BaseApiClient[assets_pb2_grpc.PlatformAssetsStub] +): # pylint: disable=too-many-arguments """A client for the Assets API.""" def __init__( self, server_url: str, - auth_key: str | None, - sign_secret: str | None, + *, + auth_key: str | None = None, + sign_secret: str | None = None, + channel_defaults: channel.ChannelOptions = channel.ChannelOptions(), connect: bool = True, ) -> None: """ Initialize the AssetsApiClient. Args: - server_url: The URL of the server to connect to. - auth_key: The API key to use when connecting to the service. - sign_secret: The secret to use when creating message HMAC. - connect: Whether to connect to the server as soon as a client instance is created. + server_url: The location of the microgrid API server in the form of a URL. + The following format is expected: + "grpc://hostname{:`port`}{?ssl=`ssl`}", + where the `port` should be an int between 0 and 65535 (defaulting to + 9090) and `ssl` should be a boolean (defaulting to `true`). + For example: `grpc://localhost:1090?ssl=true`. + auth_key: The authentication key to use for the connection. + sign_secret: The secret to use for signing requests. + channel_defaults: The default options use to create the channel when not + specified in the URL. + connect: Whether to connect to the server as soon as a client instance is + created. If `False`, the client will not connect to the server until + [connect()][frequenz.client.base.client.BaseApiClient.connect] is + called. """ super().__init__( server_url, assets_pb2_grpc.PlatformAssetsStub, connect=connect, + channel_defaults=channel_defaults, auth_key=auth_key, sign_secret=sign_secret, ) @@ -61,7 +85,7 @@ def stub(self) -> assets_pb2_grpc.PlatformAssetsAsyncStub: # use the async stub, so we cast the sync stub to the async stub. return self._stub # type: ignore - async def get_microgrid_details( # noqa: DOC502 (raises ApiClientError indirectly) + async def get_microgrid( # noqa: DOC502 (raises ApiClientError indirectly) self, microgrid_id: int ) -> Microgrid: """ @@ -77,11 +101,40 @@ async def get_microgrid_details( # noqa: DOC502 (raises ApiClientError indirect ApiClientError: If there are any errors communicating with the Assets API, most likely a subclass of [GrpcError][frequenz.client.base.exception.GrpcError]. """ - request = assets_pb2.GetMicrogridRequest(microgrid_id=microgrid_id) response = await call_stub_method( self, - lambda: self.stub.GetMicrogrid(request), + lambda: self.stub.GetMicrogrid( + assets_pb2.GetMicrogridRequest(microgrid_id=microgrid_id), + timeout=DEFAULT_GRPC_CALL_TIMEOUT, + ), method_name="GetMicrogrid", ) - return Microgrid.from_protobuf(response.microgrid) + return microgrid_from_proto(response.microgrid) + + async def list_microgrid_electrical_components( + self, microgrid_id: int + ) -> list[ElectricalComponent]: + """ + Get the electrical components of a microgrid. + + Args: + microgrid_id: The ID of the microgrid to get the electrical components of. + + Returns: + The electrical components of the microgrid. + """ + response = await call_stub_method( + self, + lambda: self.stub.ListMicrogridElectricalComponents( + assets_pb2.ListMicrogridElectricalComponentsRequest( + microgrid_id=microgrid_id, + ), + timeout=DEFAULT_GRPC_CALL_TIMEOUT, + ), + method_name="ListMicrogridElectricalComponents", + ) + + return [ + electrical_component_proto(component) for component in response.components + ] diff --git a/src/frequenz/client/assets/_delivery_area.py b/src/frequenz/client/assets/_delivery_area.py new file mode 100644 index 0000000..64994b2 --- /dev/null +++ b/src/frequenz/client/assets/_delivery_area.py @@ -0,0 +1,89 @@ +# License: MIT +# Copyright © 2025 Frequenz Energy-as-a-Service GmbH + +"""Delivery area information for the energy market.""" + +import enum +from dataclasses import dataclass + +from frequenz.api.common.v1alpha8.grid import delivery_area_pb2 + + +@enum.unique +class EnergyMarketCodeType(enum.Enum): + """The identification code types used in the energy market. + + CodeType specifies the type of identification code used for uniquely + identifying various entities such as delivery areas, market participants, + and grid components within the energy market. + + This enumeration aims to + offer compatibility across different jurisdictional standards. + + Note: Understanding Code Types + Different regions or countries may have their own standards for uniquely + identifying various entities within the energy market. For example, in + Europe, the Energy Identification Code (EIC) is commonly used for this + purpose. + + Note: Extensibility + New code types can be added to this enum to accommodate additional regional + standards, enhancing the API's adaptability. + + Danger: Validation Required + The chosen code type should correspond correctly with the `code` field in + the relevant message objects, such as `DeliveryArea` or `Counterparty`. + Failure to match the code type with the correct code could lead to + processing errors. + """ + + UNSPECIFIED = delivery_area_pb2.ENERGY_MARKET_CODE_TYPE_UNSPECIFIED + """Unspecified type. This value is a placeholder and should not be used.""" + + EUROPE_EIC = delivery_area_pb2.ENERGY_MARKET_CODE_TYPE_EUROPE_EIC + """European Energy Identification Code Standard.""" + + US_NERC = delivery_area_pb2.ENERGY_MARKET_CODE_TYPE_US_NERC + """North American Electric Reliability Corporation identifiers.""" + + +@dataclass(frozen=True, kw_only=True) +class DeliveryArea: + """A geographical or administrative region where electricity deliveries occur. + + DeliveryArea represents the geographical or administrative region, usually defined + and maintained by a Transmission System Operator (TSO), where electricity deliveries + for a contract occur. + + The concept is important to energy trading as it delineates the agreed-upon delivery + location. Delivery areas can have different codes based on the jurisdiction in + which they operate. + + Note: Jurisdictional Differences + This is typically represented by specific codes according to local jurisdiction. + + In Europe, this is represented by an + [EIC](https://en.wikipedia.org/wiki/Energy_Identification_Code) (Energy + Identification Code). [List of + EICs](https://www.entsoe.eu/data/energy-identification-codes-eic/eic-approved-codes/). + """ + + code: str | None + """The code representing the unique identifier for the delivery area.""" + + code_type: EnergyMarketCodeType | int + """Type of code used for identifying the delivery area itself. + + This code could be extended in the future, in case an unknown code type is + encountered, a plain integer value is used to represent it. + """ + + def __str__(self) -> str: + """Return a human-readable string representation of this instance.""" + code = self.code or "" + code_type = ( + f"type={self.code_type}" + if isinstance(self.code_type, int) + else self.code_type.name + ) + return f"{code}[{code_type}]" diff --git a/src/frequenz/client/assets/_delivery_area_proto.py b/src/frequenz/client/assets/_delivery_area_proto.py new file mode 100644 index 0000000..08ea77a --- /dev/null +++ b/src/frequenz/client/assets/_delivery_area_proto.py @@ -0,0 +1,44 @@ +# License: MIT +# Copyright © 2025 Frequenz Energy-as-a-Service GmbH + +"""Loading of DeliveryArea objects from protobuf messages.""" + +import logging + +from frequenz.api.common.v1alpha8.grid import delivery_area_pb2 +from frequenz.client.common import enum_proto + +from ._delivery_area import DeliveryArea, EnergyMarketCodeType + +_logger = logging.getLogger(__name__) + + +def delivery_area_from_proto(message: delivery_area_pb2.DeliveryArea) -> DeliveryArea: + """Convert a protobuf delivery area message to a delivery area object. + + Args: + message: The protobuf message to convert. + + Returns: + The resulting delivery area object. + """ + issues: list[str] = [] + + code = message.code or None + if code is None: + issues.append("code is empty") + + code_type = enum_proto.enum_from_proto(message.code_type, EnergyMarketCodeType) + if code_type is EnergyMarketCodeType.UNSPECIFIED: + issues.append("code_type is unspecified") + elif isinstance(code_type, int): + issues.append("code_type is unrecognized") + + if issues: + _logger.warning( + "Found issues in delivery area: %s | Protobuf message:\n%s", + ", ".join(issues), + message, + ) + + return DeliveryArea(code=code, code_type=code_type) diff --git a/src/frequenz/client/assets/_lifetime.py b/src/frequenz/client/assets/_lifetime.py new file mode 100644 index 0000000..c2bd707 --- /dev/null +++ b/src/frequenz/client/assets/_lifetime.py @@ -0,0 +1,54 @@ +# License: MIT +# Copyright © 2025 Frequenz Energy-as-a-Service GmbH + +"""Lifetime of a microgrid asset.""" + + +from dataclasses import dataclass +from datetime import datetime, timezone + + +@dataclass(frozen=True, kw_only=True) +class Lifetime: + """An active operational period of a microgrid asset. + + Warning: + The [`end`][frequenz.client.microgrid.Lifetime.end] timestamp indicates that the + asset has been permanently removed from the system. + """ + + start: datetime | None = None + """The moment when the asset became operationally active. + + If `None`, the asset is considered to be active in any past moment previous to the + [`end`][frequenz.client.microgrid.Lifetime.end]. + """ + + end: datetime | None = None + """The moment when the asset's operational activity ceased. + + If `None`, the asset is considered to be active with no plans to be deactivated. + """ + + def __post_init__(self) -> None: + """Validate this lifetime.""" + if self.start is not None and self.end is not None and self.start > self.end: + raise ValueError( + f"Start ({self.start}) must be before or equal to end ({self.end})" + ) + + def is_operational_at(self, timestamp: datetime) -> bool: + """Check whether this lifetime is active at a specific timestamp.""" + # Handle start time - it's not active if start is in the future + if self.start is not None and self.start > timestamp: + return False + # Handle end time - active up to and including end time + if self.end is not None: + return self.end >= timestamp + # self.end is None, and either self.start is None or self.start <= timestamp, + # so it is active at this timestamp + return True + + def is_operational_now(self) -> bool: + """Whether this lifetime is currently active.""" + return self.is_operational_at(datetime.now(timezone.utc)) diff --git a/src/frequenz/client/assets/_lifetime_proto.py b/src/frequenz/client/assets/_lifetime_proto.py new file mode 100644 index 0000000..22abf69 --- /dev/null +++ b/src/frequenz/client/assets/_lifetime_proto.py @@ -0,0 +1,26 @@ +# License: MIT +# Copyright © 2025 Frequenz Energy-as-a-Service GmbH + +"""Loading of Lifetime objects from protobuf messages.""" + +from frequenz.api.common.v1alpha8.microgrid import lifetime_pb2 +from frequenz.client.base.conversion import to_datetime + +from ._lifetime import Lifetime + + +def lifetime_from_proto( + message: lifetime_pb2.Lifetime, +) -> Lifetime: + """Create a [`Lifetime`][frequenz.client.microgrid.Lifetime] from a protobuf message.""" + start = ( + to_datetime(message.start_timestamp) + if message.HasField("start_timestamp") + else None + ) + end = ( + to_datetime(message.end_timestamp) + if message.HasField("end_timestamp") + else None + ) + return Lifetime(start=start, end=end) diff --git a/src/frequenz/client/assets/_location.py b/src/frequenz/client/assets/_location.py new file mode 100644 index 0000000..fea238e --- /dev/null +++ b/src/frequenz/client/assets/_location.py @@ -0,0 +1,31 @@ +# License: MIT +# Copyright © 2025 Frequenz Energy-as-a-Service GmbH + +"""Location information for a microgrid.""" + + +from dataclasses import dataclass + + +@dataclass(frozen=True, kw_only=True) +class Location: + """A location of a microgrid.""" + + latitude: float | None + """The latitude of the microgrid in degree.""" + + longitude: float | None + """The longitude of the microgrid in degree.""" + + country_code: str | None + """The country code of the microgrid in ISO 3166-1 Alpha 2 format.""" + + def __str__(self) -> str: + """Return the short string representation of this instance.""" + country = self.country_code or "" + lat = f"{self.latitude:.2f}" if self.latitude is not None else "?" + lon = f"{self.longitude:.2f}" if self.longitude is not None else "?" + coordinates = "" + if self.latitude is not None or self.longitude is not None: + coordinates = f":({lat}, {lon})" + return f"{country}{coordinates}" diff --git a/src/frequenz/client/assets/_location_proto.py b/src/frequenz/client/assets/_location_proto.py new file mode 100644 index 0000000..ee20787 --- /dev/null +++ b/src/frequenz/client/assets/_location_proto.py @@ -0,0 +1,47 @@ +# License: MIT +# Copyright © 2025 Frequenz Energy-as-a-Service GmbH + +"""Loading of Location objects from protobuf messages.""" + +import logging + +from frequenz.api.common.v1alpha8.types import location_pb2 + +from ._location import Location + +_logger = logging.getLogger(__name__) + + +def location_from_proto(message: location_pb2.Location) -> Location: + """Convert a protobuf location message to a location object. + + Args: + message: The protobuf message to convert. + + Returns: + The resulting location object. + """ + issues: list[str] = [] + + latitude: float | None = message.latitude if -90 <= message.latitude <= 90 else None + if latitude is None: + issues.append("latitude out of range [-90, 90]") + + longitude: float | None = ( + message.longitude if -180 <= message.longitude <= 180 else None + ) + if longitude is None: + issues.append("longitude out of range [-180, 180]") + + country_code = message.country_code or None + if country_code is None: + issues.append("country code is empty") + + if issues: + _logger.warning( + "Found issues in location: %s | Protobuf message:\n%s", + ", ".join(issues), + message, + ) + + return Location(latitude=latitude, longitude=longitude, country_code=country_code) diff --git a/src/frequenz/client/assets/_microgrid.py b/src/frequenz/client/assets/_microgrid.py new file mode 100644 index 0000000..2acf3c2 --- /dev/null +++ b/src/frequenz/client/assets/_microgrid.py @@ -0,0 +1,87 @@ +# License: MIT +# Copyright © 2025 Frequenz Energy-as-a-Service GmbH + +"""Definition of a microgrid.""" + +import datetime +import enum +import logging +from dataclasses import dataclass +from functools import cached_property + +from frequenz.api.common.v1alpha8.microgrid import microgrid_pb2 +from frequenz.client.common.microgrid import EnterpriseId, MicrogridId + +from ._delivery_area import DeliveryArea +from ._location import Location + +_logger = logging.getLogger(__name__) + + +@enum.unique +class MicrogridStatus(enum.Enum): + """The possible statuses for a microgrid.""" + + UNSPECIFIED = microgrid_pb2.MICROGRID_STATUS_UNSPECIFIED + """The status is unspecified. This should not be used.""" + + ACTIVE = microgrid_pb2.MICROGRID_STATUS_ACTIVE + """The microgrid is active.""" + + INACTIVE = microgrid_pb2.MICROGRID_STATUS_INACTIVE + """The microgrid is inactive.""" + + +@dataclass(frozen=True, kw_only=True) +class Microgrid: + """A localized grouping of electricity generation, energy storage, and loads. + + A microgrid is a localized grouping of electricity generation, energy storage, and + loads that normally operates connected to a traditional centralized grid. + + Each microgrid has a unique identifier and is associated with an enterprise account. + + A key feature is that it has a physical location and is situated in a delivery area. + + Note: Key Concepts + - Physical Location: Geographical coordinates specify the exact physical + location of the microgrid. + - Delivery Area: Each microgrid is part of a broader delivery area, which is + crucial for energy trading and compliance. + """ + + id: MicrogridId + """The unique identifier of the microgrid.""" + + enterprise_id: EnterpriseId + """The unique identifier linking this microgrid to its parent enterprise account.""" + + name: str | None + """Name of the microgrid.""" + + delivery_area: DeliveryArea | None + """The delivery area where the microgrid is located, as identified by a specific code.""" + + location: Location | None + """Physical location of the microgrid, in geographical co-ordinates.""" + + status: MicrogridStatus | int + """The current status of the microgrid.""" + + create_timestamp: datetime.datetime + """The UTC timestamp indicating when the microgrid was initially created.""" + + @cached_property + def is_active(self) -> bool: + """Whether the microgrid is active.""" + if self.status is MicrogridStatus.UNSPECIFIED: + # Because this is a cached property, the warning will only be logged once. + _logger.warning( + "Microgrid %s has an unspecified status. Assuming it is active.", self + ) + return self.status in (MicrogridStatus.ACTIVE, MicrogridStatus.UNSPECIFIED) + + def __str__(self) -> str: + """Return the ID of this microgrid as a string.""" + name = f":{self.name}" if self.name else "" + return f"{self.id}{name}" diff --git a/src/frequenz/client/assets/_microgrid_json.py b/src/frequenz/client/assets/_microgrid_json.py new file mode 100644 index 0000000..de5a048 --- /dev/null +++ b/src/frequenz/client/assets/_microgrid_json.py @@ -0,0 +1,15 @@ +# License: MIT +# Copyright © 2025 Frequenz Energy-as-a-Service GmbH + +"""JSON encoder for Microgrid objects.""" + +import json +from dataclasses import asdict + +from ._microgrid import Microgrid +from ._utils import AssetsJSONEncoder + + +def microgrid_to_json(microgrid: Microgrid) -> str: + """Convert a Microgrid object to a JSON string.""" + return json.dumps(asdict(microgrid), cls=AssetsJSONEncoder, indent=2) diff --git a/src/frequenz/client/assets/_microgrid_proto.py b/src/frequenz/client/assets/_microgrid_proto.py new file mode 100644 index 0000000..7e59682 --- /dev/null +++ b/src/frequenz/client/assets/_microgrid_proto.py @@ -0,0 +1,79 @@ +# License: MIT +# Copyright © 2025 Frequenz Energy-as-a-Service GmbH + +"""Loading of MicrogridInfo objects from protobuf messages.""" + + +import logging + +from frequenz.api.common.v1alpha8.microgrid import microgrid_pb2 +from frequenz.client.base import conversion +from frequenz.client.common import enum_proto +from frequenz.client.common.microgrid import EnterpriseId, MicrogridId + +from ._delivery_area import DeliveryArea +from ._delivery_area_proto import delivery_area_from_proto +from ._location import Location +from ._location_proto import location_from_proto +from ._microgrid import Microgrid, MicrogridStatus + +_logger = logging.getLogger(__name__) + + +def microgrid_from_proto(message: microgrid_pb2.Microgrid) -> Microgrid: + """Convert a protobuf microgrid message to a microgrid object. + + Args: + message: The protobuf message to convert. + + Returns: + The resulting microgrid object. + """ + major_issues: list[str] = [] + minor_issues: list[str] = [] + + delivery_area: DeliveryArea | None = None + if message.HasField("delivery_area"): + delivery_area = delivery_area_from_proto(message.delivery_area) + else: + major_issues.append("delivery_area is missing") + + location: Location | None = None + if message.HasField("location"): + location = location_from_proto(message.location) + else: + major_issues.append("location is missing") + + name = message.name or None + if name is None: + minor_issues.append("name is empty") + + status = enum_proto.enum_from_proto(message.status, MicrogridStatus) + if status is MicrogridStatus.UNSPECIFIED: + major_issues.append("status is unspecified") + elif isinstance(status, int): + major_issues.append("status is unrecognized") + + if major_issues: + _logger.warning( + "Found issues in microgrid: %s | Protobuf message:\n%s", + ", ".join(major_issues), + message, + ) + + if minor_issues: + _logger.debug( + "Found minor issues in microgrid: %s | Protobuf message:\n%s", + ", ".join(minor_issues), + message, + ) + + return Microgrid( + id=MicrogridId(message.id), + enterprise_id=EnterpriseId(message.enterprise_id), + name=message.name or None, + delivery_area=delivery_area, + location=location, + status=status, + create_timestamp=conversion.to_datetime(message.create_timestamp), + ) diff --git a/src/frequenz/client/assets/_utils.py b/src/frequenz/client/assets/_utils.py new file mode 100644 index 0000000..72d0dfd --- /dev/null +++ b/src/frequenz/client/assets/_utils.py @@ -0,0 +1,63 @@ +# License: MIT +# Copyright © 2025 Frequenz Energy-as-a-Service GmbH + +"""Utility functions for the Assets API client.""" + +import enum +import json +from datetime import datetime, timezone +from typing import Any + +from frequenz.core.id import BaseId + + +class AssetsJSONEncoder(json.JSONEncoder): + """Custom JSON encoder for assets-related objects. + + Uses encode() method to pre-process objects and handle: + - Dictionary keys that are enums → converts to enum values (recursively processed) + - List/tuple/set/frozenset items → recursively processes nested structures + - Sets/frozensets are converted to lists for JSON compatibility + + Uses default() method to handle individual objects: + - BaseId objects → integers + - Enum objects → their values + - Datetime objects → UTC ISO format strings + """ + + def default(self, o: Any) -> Any: + """Convert supported objects to JSON-serializable format.""" + if isinstance(o, BaseId): + return int(o) + + if isinstance(o, enum.Enum): + return o.value + + if isinstance(o, datetime): + if o.tzinfo is None: + o = o.replace(tzinfo=timezone.utc) + else: + o = o.astimezone(timezone.utc) + return o.isoformat() + + return super().default(o) + + def _encode_containers_recursively(self, o: Any) -> Any: + """Recursively process objects to convert enum keys to their values.""" + if isinstance(o, dict): + return { + ( + self._encode_containers_recursively(key.value) + if isinstance(key, enum.Enum) + else self._encode_containers_recursively(key) + ): self._encode_containers_recursively(value) + for key, value in o.items() + } + if isinstance(o, (list, tuple, set, frozenset)): + items = [self._encode_containers_recursively(item) for item in o] + return items if isinstance(o, (list, tuple)) else items + return o + + def encode(self, o: Any) -> str: + """Encode the given object to a JSON string, handling enum keys.""" + return super().encode(self._encode_containers_recursively(o)) diff --git a/src/frequenz/client/assets/cli/__init__.py b/src/frequenz/client/assets/cli/__init__.py new file mode 100644 index 0000000..a81658c --- /dev/null +++ b/src/frequenz/client/assets/cli/__init__.py @@ -0,0 +1 @@ +"""CLI for the Assets API.""" diff --git a/src/frequenz/client/assets/cli/__main__.py b/src/frequenz/client/assets/cli/__main__.py index fd7ffc4..c887b64 100644 --- a/src/frequenz/client/assets/cli/__main__.py +++ b/src/frequenz/client/assets/cli/__main__.py @@ -34,33 +34,13 @@ import asyncio import json -from dataclasses import asdict import asyncclick as click from frequenz.client.assets._client import AssetsApiClient from frequenz.client.assets.exceptions import ApiClientError -from frequenz.client.assets.types import Microgrid - -def print_microgrid_details(microgrid: Microgrid) -> None: - """ - Print microgrid details to console in JSON format. - - This function converts the Microgrid instance to a dictionary and - outputs it as formatted JSON to the console. The output is designed - to be machine-readable and can be piped to tools like jq for further - processing. - - Args: - microgrid: The Microgrid instance to print to console. - """ - microgrid_dict = asdict(microgrid) - microgrid_dict["id"] = int(microgrid.id) - microgrid_dict["enterprise_id"] = int(microgrid.enterprise_id) - microgrid_dict["create_time"] = microgrid.create_time.isoformat() - - click.echo(json.dumps(microgrid_dict, indent=2)) +from ._utils import print_electrical_components, print_microgrid_details @click.group(invoke_without_command=True) @@ -161,7 +141,7 @@ async def get_microgrid( """ try: client = ctx.obj["client"] - microgrid_details = await client.get_microgrid_details(microgrid_id) + microgrid_details = await client.get_microgrid(microgrid_id) print_microgrid_details(microgrid_details) except ApiClientError as e: error_dict = { @@ -174,6 +154,53 @@ async def get_microgrid( raise click.Abort() +@cli.command("electrical-components") +@click.pass_context +@click.argument("microgrid-id", required=True, type=int) +async def list_microgrid_electrical_components( + ctx: click.Context, + microgrid_id: int, +) -> None: + """ + Get and display electrical components by microgrid ID. + + This command fetches detailed information about all electrical components + in a specific microgrid from the Assets API and displays it in JSON format. + The output can be piped to other tools for further processing. + + Args: + ctx: Click context object containing the initialized API client. + microgrid_id: The unique identifier of the microgrid to retrieve. + + Raises: + click.Abort: If there is an error printing the electrical components. + + Example: + ```bash + # Get details for microgrid with ID 123 + assets-cli electrical-components 123 + + # Pipe output to jq for filtering + assets-cli electrical-components 123 | jq ".id" + ``` + """ + try: + client = ctx.obj["client"] + electrical_components = await client.list_microgrid_electrical_components( + microgrid_id + ) + print_electrical_components(electrical_components) + except ApiClientError as e: + error_dict = { + "error_type": type(e).__name__, + "server_url": e.server_url, + "operation": e.operation, + "description": e.description, + } + click.echo(json.dumps(error_dict, indent=2)) + raise click.Abort() + + def main() -> None: """ Initialize and run the CLI application. diff --git a/src/frequenz/client/assets/cli/_utils.py b/src/frequenz/client/assets/cli/_utils.py new file mode 100644 index 0000000..a99e0b8 --- /dev/null +++ b/src/frequenz/client/assets/cli/_utils.py @@ -0,0 +1,44 @@ +"""Utility functions for the CLI.""" + +import asyncclick as click + +from frequenz.client.assets._microgrid import Microgrid +from frequenz.client.assets._microgrid_json import microgrid_to_json +from frequenz.client.assets.electrical_component._electrical_component import ( + ElectricalComponent, +) +from frequenz.client.assets.electrical_component._electrical_component_json import ( + electrical_components_to_json, +) + + +def print_microgrid_details(microgrid: Microgrid) -> None: + """ + Print microgrid details to console in JSON format using custom encoder. + + This function converts the Microgrid instance to JSON using a custom + encoder and outputs it as formatted JSON to the console. The output is + designed to be machine-readable and can be piped to tools like jq for + further processing. + + Args: + microgrid: The Microgrid instance to print to console. + """ + click.echo(microgrid_to_json(microgrid)) + + +def print_electrical_components( + electrical_components: list[ElectricalComponent], +) -> None: + """ + Print electrical components to console in JSON format using custom encoder. + + This function converts the ElectricalComponent instances to JSON using a custom + encoder and outputs it as formatted JSON to the console. The output is + designed to be machine-readable and can be piped to tools like jq for + further processing. + + Args: + electrical_components: The list of ElectricalComponent instances to print to console. + """ + click.echo(electrical_components_to_json(electrical_components)) diff --git a/src/frequenz/client/assets/electrical_component/__init__.py b/src/frequenz/client/assets/electrical_component/__init__.py new file mode 100644 index 0000000..5e6cbff --- /dev/null +++ b/src/frequenz/client/assets/electrical_component/__init__.py @@ -0,0 +1,90 @@ +# License: MIT +# Copyright © 2025 Frequenz Energy-as-a-Service GmbH + +"""Electrical component types.""" + +from ._battery import ( + BatteryType, + LiIonBattery, + NaIonBattery, + UnrecognizedBattery, + UnspecifiedBattery, +) +from ._breaker import Breaker +from ._capacitor_bank import CapacitorBank +from ._category import ElectricalComponentCategory +from ._chp import Chp +from ._converter import Converter +from ._crypto_miner import CryptoMiner +from ._electrical_component import ElectricalComponent +from ._electrolyzer import Electrolyzer +from ._ev_charger import ( + AcEvCharger, + DcEvCharger, + EvChargerType, + HybridEvCharger, + UnrecognizedEvCharger, + UnspecifiedEvCharger, +) +from ._grid_connection_point import GridConnectionPoint +from ._hvac import Hvac +from ._inverter import ( + BatteryInverter, + HybridInverter, + InverterType, + PvInverter, + UnrecognizedInverter, + UnspecifiedInverter, +) +from ._meter import Meter +from ._plc import Plc +from ._power_transformer import PowerTransformer +from ._precharger import Precharger +from ._problematic import ( + MismatchedCategoryComponent, + UnrecognizedComponent, + UnspecifiedComponent, +) +from ._static_transfer_switch import StaticTransferSwitch +from ._uninterruptible_power_supply import UninterruptiblePowerSupply +from ._wind_turbine import WindTurbine + +__all__ = [ + "Chp", + "CryptoMiner", + "BatteryType", + "LiIonBattery", + "NaIonBattery", + "UnrecognizedBattery", + "UnspecifiedBattery", + "Breaker", + "Converter", + "CapacitorBank", + "ElectricalComponentCategory", + "ElectricalComponent", + "Electrolyzer", + "AcEvCharger", + "DcEvCharger", + "EvChargerType", + "HybridEvCharger", + "UnrecognizedEvCharger", + "UnspecifiedEvCharger", + "GridConnectionPoint", + "Hvac", + "BatteryInverter", + "HybridInverter", + "InverterType", + "PvInverter", + "UnrecognizedInverter", + "UnspecifiedInverter", + "Meter", + "Plc", + "PowerTransformer", + "Precharger", + "MismatchedCategoryComponent", + "UnrecognizedComponent", + "UnspecifiedComponent", + "StaticTransferSwitch", + "UninterruptiblePowerSupply", + "WindTurbine", +] diff --git a/src/frequenz/client/assets/electrical_component/_battery.py b/src/frequenz/client/assets/electrical_component/_battery.py new file mode 100644 index 0000000..4bc56db --- /dev/null +++ b/src/frequenz/client/assets/electrical_component/_battery.py @@ -0,0 +1,133 @@ +# License: MIT +# Copyright © 2025 Frequenz Energy-as-a-Service GmbH + +"""Battery component.""" + +import dataclasses +import enum +from typing import Any, Literal, Self, TypeAlias + +from frequenz.api.common.v1alpha8.microgrid.electrical_components import ( + electrical_components_pb2, +) + +from ._category import ElectricalComponentCategory +from ._electrical_component import ElectricalComponent + + +@enum.unique +class BatteryType(enum.Enum): + """The known types of batteries.""" + + UNSPECIFIED = electrical_components_pb2.BATTERY_TYPE_UNSPECIFIED + """The battery type is unspecified.""" + + LI_ION = electrical_components_pb2.BATTERY_TYPE_LI_ION + """Lithium-ion (Li-ion) battery.""" + + NA_ION = electrical_components_pb2.BATTERY_TYPE_NA_ION + """Sodium-ion (Na-ion) battery.""" + + +@dataclasses.dataclass(frozen=True, kw_only=True) +class Battery(ElectricalComponent): + """An abstract battery component.""" + + category: Literal[ElectricalComponentCategory.BATTERY] = ( + ElectricalComponentCategory.BATTERY + ) + """The category of this component. + + Note: + This should not be used normally, you should test if a component + [`isinstance`][] of a concrete component class instead. + + It is only provided for using with a newer version of the API where the client + doesn't know about a new category yet (i.e. for use with + [`UnrecognizedComponent`][frequenz.client.assets.electrical_component.UnrecognizedComponent]) + and in case some low level code needs to know the category of a component. + """ + + type: BatteryType | int + """The type of this battery. + + Note: + This should not be used normally, you should test if a battery + [`isinstance`][] of a concrete battery class instead. + + It is only provided for using with a newer version of the API where the client + doesn't know about the new battery type yet (i.e. for use with + [`UnrecognizedBattery`][frequenz.client.assets.electrical_component.UnrecognizedBattery]). + """ + + # pylint: disable-next=unused-argument + def __new__(cls, *args: Any, **kwargs: Any) -> Self: + """Prevent instantiation of this class.""" + if cls is Battery: + raise TypeError(f"Cannot instantiate {cls.__name__} directly") + return super().__new__(cls) + + +@dataclasses.dataclass(frozen=True, kw_only=True) +class UnspecifiedBattery(Battery): + """A battery of a unspecified type.""" + + type: Literal[BatteryType.UNSPECIFIED] = BatteryType.UNSPECIFIED + """The type of this battery. + + Note: + This should not be used normally, you should test if a battery + [`isinstance`][] of a concrete battery class instead. + + It is only provided for using with a newer version of the API where the client + doesn't know about the new battery type yet (i.e. for use with + [`UnrecognizedBattery`][frequenz.client.assets.electrical_component.UnrecognizedBattery]). + """ + + +@dataclasses.dataclass(frozen=True, kw_only=True) +class LiIonBattery(Battery): + """A Li-ion battery.""" + + type: Literal[BatteryType.LI_ION] = BatteryType.LI_ION + """The type of this battery. + + Note: + This should not be used normally, you should test if a battery + [`isinstance`][] of a concrete battery class instead. + + It is only provided for using with a newer version of the API where the client + doesn't know about the new battery type yet (i.e. for use with + [`UnrecognizedBattery`][frequenz.client.assets.electrical_component.UnrecognizedBattery]). + """ + + +@dataclasses.dataclass(frozen=True, kw_only=True) +class NaIonBattery(Battery): + """A Na-ion battery.""" + + type: Literal[BatteryType.NA_ION] = BatteryType.NA_ION + """The type of this battery. + + Note: + This should not be used normally, you should test if a battery + [`isinstance`][] of a concrete battery class instead. + + It is only provided for using with a newer version of the API where the client + doesn't know about the new battery type yet (i.e. for use with + [`UnrecognizedBattery`][frequenz.client.assets.electrical_component.UnrecognizedBattery]). + """ + + +@dataclasses.dataclass(frozen=True, kw_only=True) +class UnrecognizedBattery(Battery): + """A battery of an unrecognized type.""" + + type: int + """The unrecognized type of this battery.""" + + +BatteryTypes: TypeAlias = ( + LiIonBattery | NaIonBattery | UnrecognizedBattery | UnspecifiedBattery +) +"""All possible battery types.""" diff --git a/src/frequenz/client/assets/electrical_component/_breaker.py b/src/frequenz/client/assets/electrical_component/_breaker.py new file mode 100644 index 0000000..9a60376 --- /dev/null +++ b/src/frequenz/client/assets/electrical_component/_breaker.py @@ -0,0 +1,20 @@ +# License: MIT +# Copyright © 2025 Frequenz Energy-as-a-Service GmbH + +"""Converter component.""" + +import dataclasses +from typing import Literal + +from ._category import ElectricalComponentCategory +from ._electrical_component import ElectricalComponent + + +@dataclasses.dataclass(frozen=True, kw_only=True) +class Breaker(ElectricalComponent): + """A breaker component.""" + + category: Literal[ElectricalComponentCategory.BREAKER] = ( + ElectricalComponentCategory.BREAKER + ) + """The category of this component.""" diff --git a/src/frequenz/client/assets/electrical_component/_capacitor_bank.py b/src/frequenz/client/assets/electrical_component/_capacitor_bank.py new file mode 100644 index 0000000..6bb39ca --- /dev/null +++ b/src/frequenz/client/assets/electrical_component/_capacitor_bank.py @@ -0,0 +1,20 @@ +# License: MIT +# Copyright © 2025 Frequenz Energy-as-a-Service GmbH + +"""CapacitorBank component.""" + +import dataclasses +from typing import Literal + +from ._category import ElectricalComponentCategory +from ._electrical_component import ElectricalComponent + + +@dataclasses.dataclass(frozen=True, kw_only=True) +class CapacitorBank(ElectricalComponent): + """A capacitor bank component.""" + + category: Literal[ElectricalComponentCategory.CAPACITOR_BANK] = ( + ElectricalComponentCategory.CAPACITOR_BANK + ) + """The category of this component.""" diff --git a/src/frequenz/client/assets/electrical_component/_category.py b/src/frequenz/client/assets/electrical_component/_category.py new file mode 100644 index 0000000..9928041 --- /dev/null +++ b/src/frequenz/client/assets/electrical_component/_category.py @@ -0,0 +1,88 @@ +# License: MIT +# Copyright © 2025 Frequenz Energy-as-a-Service GmbH + +"""The component categories that can be used in a microgrid.""" + +import enum + +from frequenz.api.common.v1alpha8.microgrid.electrical_components import ( + electrical_components_pb2, +) + + +@enum.unique +class ElectricalComponentCategory(enum.Enum): + """The known categories of components that can be present in a microgrid.""" + + UNSPECIFIED = electrical_components_pb2.ELECTRICAL_COMPONENT_CATEGORY_UNSPECIFIED + """The component category is unspecified, probably due to an error in the message.""" + + GRID = electrical_components_pb2.ELECTRICAL_COMPONENT_CATEGORY_GRID_CONNECTION_POINT + """The point where the local microgrid is connected to the grid.""" + + METER = electrical_components_pb2.ELECTRICAL_COMPONENT_CATEGORY_METER + """A meter, for measuring electrical metrics, e.g., current, voltage, etc.""" + + INVERTER = electrical_components_pb2.ELECTRICAL_COMPONENT_CATEGORY_INVERTER + """An electricity generator, with batteries or solar energy.""" + + BREAKER = electrical_components_pb2.ELECTRICAL_COMPONENT_CATEGORY_BREAKER + """A breaker, used to interrupt the flow of electricity.""" + + CONVERTER = electrical_components_pb2.ELECTRICAL_COMPONENT_CATEGORY_CONVERTER + """A DC-DC converter.""" + + BATTERY = electrical_components_pb2.ELECTRICAL_COMPONENT_CATEGORY_BATTERY + """A storage system for electrical energy, used by inverters.""" + + EV_CHARGER = electrical_components_pb2.ELECTRICAL_COMPONENT_CATEGORY_EV_CHARGER + """A station for charging electrical vehicles.""" + + CRYPTO_MINER = electrical_components_pb2.ELECTRICAL_COMPONENT_CATEGORY_CRYPTO_MINER + """A crypto miner.""" + + ELECTROLYZER = electrical_components_pb2.ELECTRICAL_COMPONENT_CATEGORY_ELECTROLYZER + """An electrolyzer for converting water into hydrogen and oxygen.""" + + CHP = electrical_components_pb2.ELECTRICAL_COMPONENT_CATEGORY_CHP + """A heat and power combustion plant (CHP stands for combined heat and power).""" + + PRECHARGER = electrical_components_pb2.ELECTRICAL_COMPONENT_CATEGORY_PRECHARGER + """A precharge module. + + Precharging involves gradually ramping up the DC voltage to prevent any + potential damage to sensitive electrical components like capacitors. + + While many inverters and batteries come equipped with in-built precharging + mechanisms, some may lack this feature. In such cases, we need to use + external precharging modules. + """ + + HVAC = electrical_components_pb2.ELECTRICAL_COMPONENT_CATEGORY_HVAC + """A Heating, Ventilation, and Air Conditioning (HVAC) system.""" + + POWER_TRANSFORMER = ( + electrical_components_pb2.ELECTRICAL_COMPONENT_CATEGORY_POWER_TRANSFORMER + ) + """A power transformer.""" + + PLC = electrical_components_pb2.ELECTRICAL_COMPONENT_CATEGORY_PLC + """An industrial controller or PLC (Programmable Logic Controller).""" + + STATIC_TRANSFER_SWITCH = ( + electrical_components_pb2.ELECTRICAL_COMPONENT_CATEGORY_STATIC_TRANSFER_SWITCH + ) + """A static transfer switch (STS).""" + + UNINTERRUPTIBLE_POWER_SUPPLY = ( + electrical_components_pb2.ELECTRICAL_COMPONENT_CATEGORY_UNINTERRUPTIBLE_POWER_SUPPLY + ) + """An uninterruptible power supply (UPS).""" + + CAPACITOR_BANK = ( + electrical_components_pb2.ELECTRICAL_COMPONENT_CATEGORY_CAPACITOR_BANK + ) + """A capacitor bank for power factor correction.""" + + WIND_TURBINE = electrical_components_pb2.ELECTRICAL_COMPONENT_CATEGORY_WIND_TURBINE + """A wind turbine.""" diff --git a/src/frequenz/client/assets/electrical_component/_chp.py b/src/frequenz/client/assets/electrical_component/_chp.py new file mode 100644 index 0000000..bed7d6f --- /dev/null +++ b/src/frequenz/client/assets/electrical_component/_chp.py @@ -0,0 +1,18 @@ +# License: MIT +# Copyright © 2025 Frequenz Energy-as-a-Service GmbH + +"""CHP component.""" + +import dataclasses +from typing import Literal + +from ._category import ElectricalComponentCategory +from ._electrical_component import ElectricalComponent + + +@dataclasses.dataclass(frozen=True, kw_only=True) +class Chp(ElectricalComponent): + """A combined heat and power (CHP) component.""" + + category: Literal[ElectricalComponentCategory.CHP] = ElectricalComponentCategory.CHP + """The category of this component.""" diff --git a/src/frequenz/client/assets/electrical_component/_converter.py b/src/frequenz/client/assets/electrical_component/_converter.py new file mode 100644 index 0000000..fc04e87 --- /dev/null +++ b/src/frequenz/client/assets/electrical_component/_converter.py @@ -0,0 +1,20 @@ +# License: MIT +# Copyright © 2025 Frequenz Energy-as-a-Service GmbH + +"""Converter component.""" + +import dataclasses +from typing import Literal + +from ._category import ElectricalComponentCategory +from ._electrical_component import ElectricalComponent + + +@dataclasses.dataclass(frozen=True, kw_only=True) +class Converter(ElectricalComponent): + """An AC-DC converter component.""" + + category: Literal[ElectricalComponentCategory.CONVERTER] = ( + ElectricalComponentCategory.CONVERTER + ) + """The category of this component.""" diff --git a/src/frequenz/client/assets/electrical_component/_crypto_miner.py b/src/frequenz/client/assets/electrical_component/_crypto_miner.py new file mode 100644 index 0000000..5747890 --- /dev/null +++ b/src/frequenz/client/assets/electrical_component/_crypto_miner.py @@ -0,0 +1,20 @@ +# License: MIT +# Copyright © 2025 Frequenz Energy-as-a-Service GmbH + +"""Crypto miner component.""" + +import dataclasses +from typing import Literal + +from ._category import ElectricalComponentCategory +from ._electrical_component import ElectricalComponent + + +@dataclasses.dataclass(frozen=True, kw_only=True) +class CryptoMiner(ElectricalComponent): + """A crypto miner component.""" + + category: Literal[ElectricalComponentCategory.CRYPTO_MINER] = ( + ElectricalComponentCategory.CRYPTO_MINER + ) + """The category of this component.""" diff --git a/src/frequenz/client/assets/electrical_component/_electrical_component.py b/src/frequenz/client/assets/electrical_component/_electrical_component.py new file mode 100644 index 0000000..45fbd55 --- /dev/null +++ b/src/frequenz/client/assets/electrical_component/_electrical_component.py @@ -0,0 +1,78 @@ +"""Electrical component types.""" + +import dataclasses +from collections.abc import Mapping +from dataclasses import dataclass +from typing import Any, Self + +from frequenz.client.common.microgrid import MicrogridId +from frequenz.client.common.microgrid.electrical_components import ElectricalComponentId + +from frequenz.client.assets.electrical_component._category import ( + ElectricalComponentCategory, +) +from frequenz.client.assets.metrics._bounds import Bounds +from frequenz.client.assets.metrics._metric import Metric + +from .._lifetime import Lifetime + + +@dataclass(frozen=True, kw_only=True) +class ElectricalComponent: # pylint: disable=too-many-instance-attributes + """A wrapper class for the protobuf ElectricalComponent message. + + An electrical component is a physical device that can be used to generate or consume + electricity. + """ + + id: ElectricalComponentId + """Unique identifier for the electrical component.""" + + microgrid_id: MicrogridId + """Unique identifier for the microgrid that the electrical component belongs to.""" + + name: str | None = None + """Human-readable name for the electrical component.""" + + category: ElectricalComponentCategory | int + """The component category. E.g., Inverter, Battery, etc.""" + + manufacturer: str | None = None + """The manufacturer of the electrical component.""" + + model_name: str | None = None + """The model name of the electrical component.""" + + operational_lifetime: Lifetime = dataclasses.field(default_factory=Lifetime) + """The operational lifetime of the electrical component.""" + + rated_bounds: Mapping[Metric | int, Bounds] = dataclasses.field( + default_factory=dict, + # dict is not hashable, so we don't use this field to calculate the hash. This + # shouldn't be a problem since it is very unlikely that two components with all + # other attributes being equal would have different category specific metadata, + # so hash collisions should be still very unlikely. + hash=False, + ) + """List of rated bounds present for the component identified by Metric.""" + + def __new__(cls, *_: Any, **__: Any) -> Self: + """Prevent instantiation of this class.""" + if cls is ElectricalComponent: + raise TypeError(f"Cannot instantiate {cls.__name__} directly") + return super().__new__(cls) + + @property + def identity(self) -> tuple[ElectricalComponentId, MicrogridId]: + """The identity of this component. + + This uses the electrical component ID and microgrid ID to identify a component + without considering the other attributes, so even if a component state + changed, the identity remains the same. + """ + return (self.id, self.microgrid_id) + + def __str__(self) -> str: + """Return the ID of this electrical component as a string.""" + name = f":{self.name}" if self.name else "" + return f"{self.id}{name}" diff --git a/src/frequenz/client/assets/electrical_component/_electrical_component_json.py b/src/frequenz/client/assets/electrical_component/_electrical_component_json.py new file mode 100644 index 0000000..1a8b2c9 --- /dev/null +++ b/src/frequenz/client/assets/electrical_component/_electrical_component_json.py @@ -0,0 +1,21 @@ +# License: MIT +# Copyright © 2025 Frequenz Energy-as-a-Service GmbH + +"""JSON encoder for ElectricalComponent objects.""" + +import json +from dataclasses import asdict + +from .._utils import AssetsJSONEncoder +from ._electrical_component import ElectricalComponent + + +def electrical_components_to_json( + electrical_components: list[ElectricalComponent], +) -> str: + """Convert a list of ElectricalComponent objects to a JSON string.""" + return json.dumps( + [asdict(component) for component in electrical_components], + cls=AssetsJSONEncoder, + indent=2, + ) diff --git a/src/frequenz/client/assets/electrical_component/_electrical_component_proto.py b/src/frequenz/client/assets/electrical_component/_electrical_component_proto.py new file mode 100644 index 0000000..c7468e7 --- /dev/null +++ b/src/frequenz/client/assets/electrical_component/_electrical_component_proto.py @@ -0,0 +1,507 @@ +# License: MIT +# Copyright © 2025 Frequenz Energy-as-a-Service GmbH + +"""Electrical component proto.""" + +import logging +from collections.abc import Sequence +from typing import NamedTuple, assert_never + +from frequenz.api.common.v1alpha8.microgrid.electrical_components import ( + electrical_components_pb2, +) +from frequenz.client.common import enum_proto +from frequenz.client.common.microgrid import MicrogridId +from frequenz.client.common.microgrid.electrical_components import ElectricalComponentId + +from .._lifetime import Lifetime +from .._lifetime_proto import lifetime_from_proto +from ..metrics._bounds import Bounds +from ..metrics._bounds_proto import bounds_from_proto +from ..metrics._metric import Metric +from ._battery import ( + BatteryType, + LiIonBattery, + NaIonBattery, + UnrecognizedBattery, + UnspecifiedBattery, +) +from ._breaker import Breaker +from ._capacitor_bank import CapacitorBank +from ._category import ElectricalComponentCategory +from ._chp import Chp +from ._converter import Converter +from ._crypto_miner import CryptoMiner +from ._electrolyzer import Electrolyzer +from ._ev_charger import ( + AcEvCharger, + DcEvCharger, + EvChargerType, + HybridEvCharger, + UnrecognizedEvCharger, + UnspecifiedEvCharger, +) +from ._grid_connection_point import GridConnectionPoint +from ._hvac import Hvac +from ._inverter import ( + BatteryInverter, + HybridInverter, + InverterType, + PvInverter, + UnrecognizedInverter, + UnspecifiedInverter, +) +from ._meter import Meter +from ._plc import Plc +from ._power_transformer import PowerTransformer +from ._precharger import Precharger +from ._problematic import UnrecognizedComponent, UnspecifiedComponent +from ._static_transfer_switch import StaticTransferSwitch +from ._types import ElectricalComponentType +from ._uninterruptible_power_supply import UninterruptiblePowerSupply +from ._wind_turbine import WindTurbine + +_logger = logging.getLogger(__name__) + + +def electrical_component_proto( + message: electrical_components_pb2.ElectricalComponent, +) -> ElectricalComponentType: + """Convert a protobuf message to a `Component` instance. + + Args: + message: The protobuf message. + + Returns: + The resulting `ElectricalComponent` instance. + """ + major_issues: list[str] = [] + minor_issues: list[str] = [] + + component = electrical_component_from_proto_with_issues( + message, + major_issues=major_issues, + minor_issues=minor_issues, + ) + + if major_issues: + _logger.warning( + "Found issues in component: %s | Protobuf message:\n%s", + ", ".join(major_issues), + message, + ) + if minor_issues: + _logger.debug( + "Found minor issues in component: %s | Protobuf message:\n%s", + ", ".join(minor_issues), + message, + ) + + return component + + +class ElectricalComponentBaseData(NamedTuple): + """Base data for a component, extracted from a protobuf message.""" + + component_id: ElectricalComponentId + microgrid_id: MicrogridId + name: str | None + manufacturer: str | None + model_name: str | None + category: ElectricalComponentCategory | int + lifetime: Lifetime + rated_bounds: dict[Metric | int, Bounds] + + +def component_base_from_proto_with_issues( + message: electrical_components_pb2.ElectricalComponent, + *, + major_issues: list[str], + minor_issues: list[str], +) -> ElectricalComponentBaseData: + """Extract base data from a protobuf message and collect issues. + + Args: + message: The protobuf message. + major_issues: A list to append major issues to. + minor_issues: A list to append minor issues to. + + Returns: + A `ElectricalComponentBaseData` named tuple containing the extracted data. + """ + component_id = ElectricalComponentId(message.id) + microgrid_id = MicrogridId(message.microgrid_id) + + name = message.name or None + if name is None: + minor_issues.append("name is empty") + + manufacturer = message.manufacturer or None + if manufacturer is None: + minor_issues.append("manufacturer is empty") + + model_name = message.model_name or None + if model_name is None: + minor_issues.append("model_name is empty") + + lifetime = _get_operational_lifetime_from_proto( + message, major_issues=major_issues, minor_issues=minor_issues + ) + + rated_bounds = _metric_config_bounds_from_proto( + message.metric_config_bounds, + major_issues=major_issues, + minor_issues=minor_issues, + ) + + category = enum_proto.enum_from_proto(message.category, ElectricalComponentCategory) + if category is ElectricalComponentCategory.UNSPECIFIED: + major_issues.append("category is unspecified") + elif isinstance(category, int): + major_issues.append(f"category {category} is unrecognized") + + return ElectricalComponentBaseData( + component_id, + microgrid_id, + name, + manufacturer, + model_name, + category, + lifetime, + rated_bounds, + ) + + +def electrical_component_from_proto_with_issues( + message: electrical_components_pb2.ElectricalComponent, + *, + major_issues: list[str], + minor_issues: list[str], +) -> ElectricalComponentType: + """Convert a protobuf message to a `ElectricalComponent` instance and collect issues. + + Args: + message: The protobuf message. + major_issues: A list to append major issues to. + minor_issues: A list to append minor issues to. + + Returns: + The resulting `ElectricalComponent` instance. + """ + base_data = component_base_from_proto_with_issues( + message, major_issues=major_issues, minor_issues=minor_issues + ) + + match base_data.category: + case int(): + return UnrecognizedComponent( + id=base_data.component_id, + microgrid_id=base_data.microgrid_id, + name=base_data.name, + manufacturer=base_data.manufacturer, + model_name=base_data.model_name, + category=base_data.category, + operational_lifetime=base_data.lifetime, + rated_bounds=base_data.rated_bounds, + ) + case ( + ElectricalComponentCategory.UNSPECIFIED + | ElectricalComponentCategory.CHP + | ElectricalComponentCategory.CONVERTER + | ElectricalComponentCategory.CRYPTO_MINER + | ElectricalComponentCategory.ELECTROLYZER + | ElectricalComponentCategory.HVAC + | ElectricalComponentCategory.METER + | ElectricalComponentCategory.PRECHARGER + | ElectricalComponentCategory.BREAKER + | ElectricalComponentCategory.PLC + | ElectricalComponentCategory.STATIC_TRANSFER_SWITCH + | ElectricalComponentCategory.UNINTERRUPTIBLE_POWER_SUPPLY + | ElectricalComponentCategory.CAPACITOR_BANK + | ElectricalComponentCategory.WIND_TURBINE + ): + return _trivial_category_to_class(base_data.category)( + id=base_data.component_id, + microgrid_id=base_data.microgrid_id, + name=base_data.name, + manufacturer=base_data.manufacturer, + model_name=base_data.model_name, + operational_lifetime=base_data.lifetime, + rated_bounds=base_data.rated_bounds, + ) + case ElectricalComponentCategory.BATTERY: + battery_enum_to_class: dict[ + BatteryType, type[UnspecifiedBattery | LiIonBattery | NaIonBattery] + ] = { + BatteryType.UNSPECIFIED: UnspecifiedBattery, + BatteryType.LI_ION: LiIonBattery, + BatteryType.NA_ION: NaIonBattery, + } + battery_type = enum_proto.enum_from_proto( + message.category_specific_info.battery.type, BatteryType + ) + match battery_type: + case BatteryType.UNSPECIFIED | BatteryType.LI_ION | BatteryType.NA_ION: + if battery_type is BatteryType.UNSPECIFIED: + major_issues.append("battery type is unspecified") + return battery_enum_to_class[battery_type]( + id=base_data.component_id, + microgrid_id=base_data.microgrid_id, + name=base_data.name, + manufacturer=base_data.manufacturer, + model_name=base_data.model_name, + operational_lifetime=base_data.lifetime, + rated_bounds=base_data.rated_bounds, + ) + case int(): + major_issues.append(f"battery type {battery_type} is unrecognized") + return UnrecognizedBattery( + id=base_data.component_id, + microgrid_id=base_data.microgrid_id, + name=base_data.name, + manufacturer=base_data.manufacturer, + model_name=base_data.model_name, + operational_lifetime=base_data.lifetime, + type=battery_type, + rated_bounds=base_data.rated_bounds, + ) + case unexpected_battery_type: + assert_never(unexpected_battery_type) + case ElectricalComponentCategory.EV_CHARGER: + ev_charger_enum_to_class: dict[ + EvChargerType, + type[ + UnspecifiedEvCharger | AcEvCharger | DcEvCharger | HybridEvCharger + ], + ] = { + EvChargerType.UNSPECIFIED: UnspecifiedEvCharger, + EvChargerType.AC: AcEvCharger, + EvChargerType.DC: DcEvCharger, + EvChargerType.HYBRID: HybridEvCharger, + } + ev_charger_type = enum_proto.enum_from_proto( + message.category_specific_info.ev_charger.type, EvChargerType + ) + match ev_charger_type: + case ( + EvChargerType.UNSPECIFIED + | EvChargerType.AC + | EvChargerType.DC + | EvChargerType.HYBRID + ): + if ev_charger_type is EvChargerType.UNSPECIFIED: + major_issues.append("ev_charger type is unspecified") + return ev_charger_enum_to_class[ev_charger_type]( + id=base_data.component_id, + microgrid_id=base_data.microgrid_id, + name=base_data.name, + manufacturer=base_data.manufacturer, + model_name=base_data.model_name, + operational_lifetime=base_data.lifetime, + rated_bounds=base_data.rated_bounds, + ) + case int(): + major_issues.append( + f"ev_charger type {ev_charger_type} is unrecognized" + ) + return UnrecognizedEvCharger( + id=base_data.component_id, + microgrid_id=base_data.microgrid_id, + name=base_data.name, + manufacturer=base_data.manufacturer, + model_name=base_data.model_name, + operational_lifetime=base_data.lifetime, + type=ev_charger_type, + rated_bounds=base_data.rated_bounds, + ) + case unexpected_ev_charger_type: + assert_never(unexpected_ev_charger_type) + case ElectricalComponentCategory.INVERTER: + inverter_enum_to_class: dict[ + InverterType, + type[ + UnspecifiedInverter | BatteryInverter | PvInverter | HybridInverter + ], + ] = { + InverterType.UNSPECIFIED: UnspecifiedInverter, + InverterType.BATTERY: BatteryInverter, + InverterType.PV: PvInverter, + InverterType.HYBRID: HybridInverter, + } + inverter_type = enum_proto.enum_from_proto( + message.category_specific_info.inverter.type, InverterType + ) + match inverter_type: + case ( + InverterType.UNSPECIFIED + | InverterType.BATTERY + | InverterType.PV + | InverterType.HYBRID + ): + if inverter_type is InverterType.UNSPECIFIED: + major_issues.append("inverter type is unspecified") + return inverter_enum_to_class[inverter_type]( + id=base_data.component_id, + microgrid_id=base_data.microgrid_id, + name=base_data.name, + manufacturer=base_data.manufacturer, + model_name=base_data.model_name, + operational_lifetime=base_data.lifetime, + rated_bounds=base_data.rated_bounds, + ) + case int(): + major_issues.append( + f"inverter type {inverter_type} is unrecognized" + ) + return UnrecognizedInverter( + id=base_data.component_id, + microgrid_id=base_data.microgrid_id, + name=base_data.name, + manufacturer=base_data.manufacturer, + model_name=base_data.model_name, + operational_lifetime=base_data.lifetime, + rated_bounds=base_data.rated_bounds, + type=inverter_type, + ) + case unexpected_inverter_type: + assert_never(unexpected_inverter_type) + case ElectricalComponentCategory.GRID: + rated_fuse_current = ( + message.category_specific_info.grid_connection_point.rated_fuse_current + ) + # No need to check for negatives because the protobuf type is uint32. + return GridConnectionPoint( + id=base_data.component_id, + microgrid_id=base_data.microgrid_id, + name=base_data.name, + manufacturer=base_data.manufacturer, + model_name=base_data.model_name, + operational_lifetime=base_data.lifetime, + rated_bounds=base_data.rated_bounds, + rated_fuse_current=rated_fuse_current, + ) + case ElectricalComponentCategory.POWER_TRANSFORMER: + return PowerTransformer( + id=base_data.component_id, + microgrid_id=base_data.microgrid_id, + name=base_data.name, + manufacturer=base_data.manufacturer, + model_name=base_data.model_name, + operational_lifetime=base_data.lifetime, + rated_bounds=base_data.rated_bounds, + primary_power=message.category_specific_info.power_transformer.primary, + secondary_power=message.category_specific_info.power_transformer.secondary, + ) + case unexpected_category: + assert_never(unexpected_category) + + +def _trivial_category_to_class( + category: ElectricalComponentCategory, +) -> type[ + UnspecifiedComponent + | Chp + | Converter + | CryptoMiner + | Electrolyzer + | Hvac + | Meter + | Precharger + | Breaker + | Plc + | StaticTransferSwitch + | UninterruptiblePowerSupply + | CapacitorBank + | WindTurbine +]: + """Return the class corresponding to a trivial component category.""" + return { + ElectricalComponentCategory.UNSPECIFIED: UnspecifiedComponent, + ElectricalComponentCategory.CHP: Chp, + ElectricalComponentCategory.CONVERTER: Converter, + ElectricalComponentCategory.CRYPTO_MINER: CryptoMiner, + ElectricalComponentCategory.ELECTROLYZER: Electrolyzer, + ElectricalComponentCategory.HVAC: Hvac, + ElectricalComponentCategory.METER: Meter, + ElectricalComponentCategory.PRECHARGER: Precharger, + ElectricalComponentCategory.BREAKER: Breaker, + ElectricalComponentCategory.PLC: Plc, + ElectricalComponentCategory.STATIC_TRANSFER_SWITCH: StaticTransferSwitch, + ElectricalComponentCategory.UNINTERRUPTIBLE_POWER_SUPPLY: UninterruptiblePowerSupply, + ElectricalComponentCategory.CAPACITOR_BANK: CapacitorBank, + ElectricalComponentCategory.WIND_TURBINE: WindTurbine, + }[category] + + +def _metric_config_bounds_from_proto( + message: Sequence[electrical_components_pb2.MetricConfigBounds], + *, + major_issues: list[str], + minor_issues: list[str], # pylint: disable=unused-argument +) -> dict[Metric | int, Bounds]: + """Convert a `MetricConfigBounds` message to a dictionary of `Metric` to `Bounds`. + + Args: + message: The `MetricConfigBounds` message. + major_issues: A list to append major issues to. + minor_issues: A list to append minor issues to. + + Returns: + The resulting dictionary of `Metric` to `Bounds`. + """ + bounds: dict[Metric | int, Bounds] = {} + for metric_bound in message: + metric = enum_proto.enum_from_proto(metric_bound.metric, Metric) + match metric: + case Metric.UNSPECIFIED: + major_issues.append("metric_config_bounds has an UNSPECIFIED metric") + case int(): + minor_issues.append( + f"metric_config_bounds has an unrecognized metric {metric}" + ) + + if not metric_bound.HasField("config_bounds"): + major_issues.append( + f"metric_config_bounds for {metric} is present but missing " + "`config_bounds`, considering it unbounded", + ) + continue + + try: + bound = bounds_from_proto(metric_bound.config_bounds) + except ValueError as exc: + major_issues.append( + f"metric_config_bounds for {metric} is invalid ({exc}), considering " + "it as missing (i.e. unbouded)", + ) + continue + if metric in bounds: + major_issues.append( + f"metric_config_bounds for {metric} is duplicated in the message" + f"using the last one ({bound})", + ) + bounds[metric] = bound + + return bounds + + +def _get_operational_lifetime_from_proto( + message: electrical_components_pb2.ElectricalComponent, + *, + major_issues: list[str], + minor_issues: list[str], +) -> Lifetime: + """Get the operational lifetime from a protobuf message.""" + if message.HasField("operational_lifetime"): + try: + return lifetime_from_proto(message.operational_lifetime) + except ValueError as exc: + major_issues.append( + f"invalid operational lifetime ({exc}), considering it as missing " + "(i.e. always operational)", + ) + else: + minor_issues.append( + "missing operational lifetime, considering it always operational", + ) + return Lifetime() diff --git a/src/frequenz/client/assets/electrical_component/_electrolyzer.py b/src/frequenz/client/assets/electrical_component/_electrolyzer.py new file mode 100644 index 0000000..15e3585 --- /dev/null +++ b/src/frequenz/client/assets/electrical_component/_electrolyzer.py @@ -0,0 +1,20 @@ +# License: MIT +# Copyright © 2025 Frequenz Energy-as-a-Service GmbH + +"""Electrolyzer component.""" + +import dataclasses +from typing import Literal + +from ._category import ElectricalComponentCategory +from ._electrical_component import ElectricalComponent + + +@dataclasses.dataclass(frozen=True, kw_only=True) +class Electrolyzer(ElectricalComponent): + """An electrolyzer component.""" + + category: Literal[ElectricalComponentCategory.ELECTROLYZER] = ( + ElectricalComponentCategory.ELECTROLYZER + ) + """The category of this component.""" diff --git a/src/frequenz/client/assets/electrical_component/_ev_charger.py b/src/frequenz/client/assets/electrical_component/_ev_charger.py new file mode 100644 index 0000000..99639a0 --- /dev/null +++ b/src/frequenz/client/assets/electrical_component/_ev_charger.py @@ -0,0 +1,157 @@ +# License: MIT +# Copyright © 2025 Frequenz Energy-as-a-Service GmbH + +"""Electric vehicle (EV) charger component.""" + +import dataclasses +import enum +from typing import Any, Literal, Self, TypeAlias + +from frequenz.api.common.v1alpha8.microgrid.electrical_components import ( + electrical_components_pb2, +) + +from ._category import ElectricalComponentCategory +from ._electrical_component import ElectricalComponent + + +@enum.unique +class EvChargerType(enum.Enum): + """The known types of electric vehicle (EV) chargers.""" + + UNSPECIFIED = electrical_components_pb2.EV_CHARGER_TYPE_UNSPECIFIED + """The type of the EV charger is unspecified.""" + + AC = electrical_components_pb2.EV_CHARGER_TYPE_AC + """The EV charging station supports AC charging only.""" + + DC = electrical_components_pb2.EV_CHARGER_TYPE_DC + """The EV charging station supports DC charging only.""" + + HYBRID = electrical_components_pb2.EV_CHARGER_TYPE_HYBRID + """The EV charging station supports both AC and DC.""" + + +@dataclasses.dataclass(frozen=True, kw_only=True) +class EvCharger(ElectricalComponent): + """An abstract EV charger component.""" + + category: Literal[ElectricalComponentCategory.EV_CHARGER] = ( + ElectricalComponentCategory.EV_CHARGER + ) + """The category of this component. + + Note: + This should not be used normally, you should test if a component + [`isinstance`][] of a concrete EV charger class instead. + + It is only provided for using with a newer version of the API where the client + doesn't know about a new category yet (i.e. for use with + [`UnrecognizedComponent`][frequenz.client.assets.electrical_component.UnrecognizedComponent]) + and in case some low level code needs to know the category of a component. + """ + + type: EvChargerType | int + """The type of this EV charger. + + Note: + This should not be used normally, you should test if a EV charger + [`isinstance`][] of a concrete component class instead. + + It is only provided for using with a newer version of the API where the client + doesn't know about the new EV charger type yet (i.e. for use with + [`UnrecognizedEvCharger`][frequenz.client.assets.electrical_component.UnrecognizedEvCharger]). + """ + + # pylint: disable-next=unused-argument + def __new__(cls, *args: Any, **kwargs: Any) -> Self: + """Prevent instantiation of this class.""" + if cls is EvCharger: + raise TypeError(f"Cannot instantiate {cls.__name__} directly") + return super().__new__(cls) + + +@dataclasses.dataclass(frozen=True, kw_only=True) +class UnspecifiedEvCharger(EvCharger): + """An EV charger of an unspecified type.""" + + type: Literal[EvChargerType.UNSPECIFIED] = EvChargerType.UNSPECIFIED + """The type of this EV charger. + + Note: + This should not be used normally, you should test if a EV charger + [`isinstance`][] of a concrete component class instead. + + It is only provided for using with a newer version of the API where the client + doesn't know about the new EV charger type yet (i.e. for use with + [`UnrecognizedEvCharger`][frequenz.client.assets.electrical_component.UnrecognizedEvCharger]). + """ + + +@dataclasses.dataclass(frozen=True, kw_only=True) +class AcEvCharger(EvCharger): + """An EV charger that supports AC charging only.""" + + type: Literal[EvChargerType.AC] = EvChargerType.AC + """The type of this EV charger. + + Note: + This should not be used normally, you should test if a EV charger + [`isinstance`][] of a concrete component class instead. + + It is only provided for using with a newer version of the API where the client + doesn't know about the new EV charger type yet (i.e. for use with + [`UnrecognizedEvCharger`][frequenz.client.assets.electrical_component.UnrecognizedEvCharger]). + """ + + +@dataclasses.dataclass(frozen=True, kw_only=True) +class DcEvCharger(EvCharger): + """An EV charger that supports DC charging only.""" + + type: Literal[EvChargerType.DC] = EvChargerType.DC + """The type of this EV charger. + + Note: + This should not be used normally, you should test if a EV charger + [`isinstance`][] of a concrete component class instead. + + It is only provided for using with a newer version of the API where the client + doesn't know about the new EV charger type yet (i.e. for use with + [`UnrecognizedEvCharger`][frequenz.client.assets.electrical_component.UnrecognizedEvCharger]). + """ + + +@dataclasses.dataclass(frozen=True, kw_only=True) +class HybridEvCharger(EvCharger): + """An EV charger that supports both AC and DC charging.""" + + type: Literal[EvChargerType.HYBRID] = EvChargerType.HYBRID + """The type of this EV charger. + + Note: + This should not be used normally, you should test if a EV charger + [`isinstance`][] of a concrete component class instead. + + It is only provided for using with a newer version of the API where the client + doesn't know about the new EV charger type yet (i.e. for use with + [`UnrecognizedEvCharger`][frequenz.client.assets.electrical_component.UnrecognizedEvCharger]). + """ + + +@dataclasses.dataclass(frozen=True, kw_only=True) +class UnrecognizedEvCharger(EvCharger): + """An EV charger of an unrecognized type.""" + + type: int + """The unrecognized type of this EV charger.""" + + +EvChargerTypes: TypeAlias = ( + UnspecifiedEvCharger + | AcEvCharger + | DcEvCharger + | HybridEvCharger + | UnrecognizedEvCharger +) +"""All possible EV charger types.""" diff --git a/src/frequenz/client/assets/electrical_component/_grid_connection_point.py b/src/frequenz/client/assets/electrical_component/_grid_connection_point.py new file mode 100644 index 0000000..a940c78 --- /dev/null +++ b/src/frequenz/client/assets/electrical_component/_grid_connection_point.py @@ -0,0 +1,61 @@ +# License: MIT +# Copyright © 2025 Frequenz Energy-as-a-Service GmbH + +"""Grid connection point component.""" + +import dataclasses +from typing import Literal + +from ._category import ElectricalComponentCategory +from ._electrical_component import ElectricalComponent + + +@dataclasses.dataclass(frozen=True, kw_only=True) +class GridConnectionPoint(ElectricalComponent): + """A point where a microgrid connects to the grid. + + The terms "Grid Connection Point" and "Point of Common Coupling" (PCC) are + commonly used in the context. + + While both terms describe a connection point to the grid, the + `GridConnectionPoint` is specifically the physical connection point of the + generation facility to the grid, often concerned with the technical and + ownership aspects of the connection. + + In contrast, the PCC is is more specific in terms of electrical engineering. + It refers to the point where a customer's local electrical system (such as a + microgrid) connects to the utility distribution grid in such a way that it + can affect other customers’ systems connected to the same network. It is the + point where the grid and customer's electrical systems interface and where + issues like power quality and supply regulations are assessed. + + The term `GridConnectionPoint` is used to make it clear that what is referred + to here is the physical connection point of the local facility to the grid. + Note that this may also be the PCC in some cases. + """ + + category: Literal[ElectricalComponentCategory.GRID] = ( + ElectricalComponentCategory.GRID + ) + """The category of this component.""" + + rated_fuse_current: int + """The maximum amount of electrical current that can flow through this connection, in amperes. + + The rated maximum amount of current the fuse at the grid connection point is + designed to safely carry under normal operating conditions. + + This limit applies to currents both flowing in or out of each of the 3 + phases individually. + + In other words, a current `i`A at one of the phases of the grid connection + point must comply with the following constraint: + `-rated_fuse_current <= i <= rated_fuse_current` + """ + + def __post_init__(self) -> None: + """Validate the fuse's rated current.""" + if self.rated_fuse_current < 0: + raise ValueError( + f"rated_fuse_current must be a positive integer, not {self.rated_fuse_current}" + ) diff --git a/src/frequenz/client/assets/electrical_component/_hvac.py b/src/frequenz/client/assets/electrical_component/_hvac.py new file mode 100644 index 0000000..552876f --- /dev/null +++ b/src/frequenz/client/assets/electrical_component/_hvac.py @@ -0,0 +1,20 @@ +# License: MIT +# Copyright © 2025 Frequenz Energy-as-a-Service GmbH + +"""HVAC component.""" + +import dataclasses +from typing import Literal + +from ._category import ElectricalComponentCategory +from ._electrical_component import ElectricalComponent + + +@dataclasses.dataclass(frozen=True, kw_only=True) +class Hvac(ElectricalComponent): + """A heating, ventilation, and air conditioning (HVAC) component.""" + + category: Literal[ElectricalComponentCategory.HVAC] = ( + ElectricalComponentCategory.HVAC + ) + """The category of this component.""" diff --git a/src/frequenz/client/assets/electrical_component/_inverter.py b/src/frequenz/client/assets/electrical_component/_inverter.py new file mode 100644 index 0000000..3306d0b --- /dev/null +++ b/src/frequenz/client/assets/electrical_component/_inverter.py @@ -0,0 +1,157 @@ +# License: MIT +# Copyright © 2025 Frequenz Energy-as-a-Service GmbH + +"""Inverter component.""" + +import dataclasses +import enum +from typing import Any, Literal, Self, TypeAlias + +from frequenz.api.common.v1alpha8.microgrid.electrical_components import ( + electrical_components_pb2, +) + +from ._category import ElectricalComponentCategory +from ._electrical_component import ElectricalComponent + + +@enum.unique +class InverterType(enum.Enum): + """The known types of inverters.""" + + UNSPECIFIED = electrical_components_pb2.INVERTER_TYPE_UNSPECIFIED + """The type of the inverter is unspecified.""" + + BATTERY = electrical_components_pb2.INVERTER_TYPE_BATTERY + """The inverter is a battery inverter.""" + + PV = electrical_components_pb2.INVERTER_TYPE_PV + """The inverter is a solar inverter.""" + + HYBRID = electrical_components_pb2.INVERTER_TYPE_HYBRID + """The inverter is a hybrid inverter.""" + + +@dataclasses.dataclass(frozen=True, kw_only=True) +class Inverter(ElectricalComponent): + """An abstract inverter component.""" + + category: Literal[ElectricalComponentCategory.INVERTER] = ( + ElectricalComponentCategory.INVERTER + ) + """The category of this component. + + Note: + This should not be used normally, you should test if a component + [`isinstance`][] of a concrete component class instead. + + It is only provided for using with a newer version of the API where the client + doesn't know about a new category yet (i.e. for use with + [`UnrecognizedComponent`][frequenz.client.assets.electrical_component.UnrecognizedComponent]) + and in case some low level code needs to know the category of a component. + """ + + type: InverterType | int + """The type of this inverter. + + Note: + This should not be used normally, you should test if a inverter + [`isinstance`][] of a concrete inverter class instead. + + It is only provided for using with a newer version of the API where the client + doesn't know about the new inverter type yet (i.e. for use with + [`UnrecognizedInverter`][frequenz.client.assets.electrical_component.UnrecognizedInverter]). + """ + + # pylint: disable-next=unused-argument + def __new__(cls, *args: Any, **kwargs: Any) -> Self: + """Prevent instantiation of this class.""" + if cls is Inverter: + raise TypeError(f"Cannot instantiate {cls.__name__} directly") + return super().__new__(cls) + + +@dataclasses.dataclass(frozen=True, kw_only=True) +class UnspecifiedInverter(Inverter): + """An inverter of an unspecified type.""" + + type: Literal[InverterType.UNSPECIFIED] = InverterType.UNSPECIFIED + """The type of this inverter. + + Note: + This should not be used normally, you should test if a inverter + [`isinstance`][] of a concrete inverter class instead. + + It is only provided for using with a newer version of the API where the client + doesn't know about the new inverter type yet (i.e. for use with + [`UnrecognizedInverter`][frequenz.client.assets.electrical_component.UnrecognizedInverter]). + """ + + +@dataclasses.dataclass(frozen=True, kw_only=True) +class BatteryInverter(Inverter): + """A battery inverter.""" + + type: Literal[InverterType.BATTERY] = InverterType.BATTERY + """The type of this inverter. + + Note: + This should not be used normally, you should test if a inverter + [`isinstance`][] of a concrete inverter class instead. + + It is only provided for using with a newer version of the API where the client + doesn't know about the new inverter type yet (i.e. for use with + [`UnrecognizedInverter`][frequenz.client.assets.electrical_component.UnrecognizedInverter]). + """ + + +@dataclasses.dataclass(frozen=True, kw_only=True) +class PvInverter(Inverter): + """A PV inverter.""" + + type: Literal[InverterType.PV] = InverterType.PV + """The type of this inverter. + + Note: + This should not be used normally, you should test if a inverter + [`isinstance`][] of a concrete inverter class instead. + + It is only provided for using with a newer version of the API where the client + doesn't know about the new inverter type yet (i.e. for use with + [`UnrecognizedInverter`][frequenz.client.assets.electrical_component.UnrecognizedInverter]). + """ + + +@dataclasses.dataclass(frozen=True, kw_only=True) +class HybridInverter(Inverter): + """A hybrid inverter.""" + + type: Literal[InverterType.HYBRID] = InverterType.HYBRID + """The type of this inverter. + + Note: + This should not be used normally, you should test if a inverter + [`isinstance`][] of a concrete inverter class instead. + + It is only provided for using with a newer version of the API where the client + doesn't know about the new inverter type yet (i.e. for use with + [`UnrecognizedInverter`][frequenz.client.assets.electrical_component.UnrecognizedInverter]). + """ + + +@dataclasses.dataclass(frozen=True, kw_only=True) +class UnrecognizedInverter(Inverter): + """An inverter component.""" + + type: int + """The unrecognized type of this inverter.""" + + +InverterTypes: TypeAlias = ( + UnspecifiedInverter + | BatteryInverter + | PvInverter + | HybridInverter + | UnrecognizedInverter +) +"""All possible inverter types.""" diff --git a/src/frequenz/client/assets/electrical_component/_meter.py b/src/frequenz/client/assets/electrical_component/_meter.py new file mode 100644 index 0000000..85a9c20 --- /dev/null +++ b/src/frequenz/client/assets/electrical_component/_meter.py @@ -0,0 +1,20 @@ +# License: MIT +# Copyright © 2025 Frequenz Energy-as-a-Service GmbH + +"""Meter component.""" + +import dataclasses +from typing import Literal + +from ._category import ElectricalComponentCategory +from ._electrical_component import ElectricalComponent + + +@dataclasses.dataclass(frozen=True, kw_only=True) +class Meter(ElectricalComponent): + """A measuring meter component.""" + + category: Literal[ElectricalComponentCategory.METER] = ( + ElectricalComponentCategory.METER + ) + """The category of this component.""" diff --git a/src/frequenz/client/assets/electrical_component/_plc.py b/src/frequenz/client/assets/electrical_component/_plc.py new file mode 100644 index 0000000..b1875cc --- /dev/null +++ b/src/frequenz/client/assets/electrical_component/_plc.py @@ -0,0 +1,18 @@ +# License: MIT +# Copyright © 2025 Frequenz Energy-as-a-Service GmbH + +"""PLC component.""" + +import dataclasses +from typing import Literal + +from ._category import ElectricalComponentCategory +from ._electrical_component import ElectricalComponent + + +@dataclasses.dataclass(frozen=True, kw_only=True) +class Plc(ElectricalComponent): + """A PLC component.""" + + category: Literal[ElectricalComponentCategory.PLC] = ElectricalComponentCategory.PLC + """The category of this component.""" diff --git a/src/frequenz/client/assets/electrical_component/_power_transformer.py b/src/frequenz/client/assets/electrical_component/_power_transformer.py new file mode 100644 index 0000000..5c9e69f --- /dev/null +++ b/src/frequenz/client/assets/electrical_component/_power_transformer.py @@ -0,0 +1,32 @@ +# License: MIT +# Copyright © 2025 Frequenz Energy-as-a-Service GmbH + +"""Power transformer component.""" + +import dataclasses +from typing import Literal + +from ._category import ElectricalComponentCategory +from ._electrical_component import ElectricalComponent + + +@dataclasses.dataclass(frozen=True, kw_only=True) +class PowerTransformer(ElectricalComponent): + """A power transformer designed for the bulk transfer of electrical energy. + + Power transformers are essential components in electrical power systems that + transfer electrical energy between different voltage levels. Their primary + function is to "step-up" or "step-down" voltage levels for efficient + transmission and distribution of power across the electrical grid. + """ + + category: Literal[ElectricalComponentCategory.POWER_TRANSFORMER] = ( + ElectricalComponentCategory.POWER_TRANSFORMER + ) + """The category of this component.""" + + primary_power: float + """The primary voltage of the power transformer.""" + + secondary_power: float + """The secondary voltage of the power transformer.""" diff --git a/src/frequenz/client/assets/electrical_component/_precharger.py b/src/frequenz/client/assets/electrical_component/_precharger.py new file mode 100644 index 0000000..694be8b --- /dev/null +++ b/src/frequenz/client/assets/electrical_component/_precharger.py @@ -0,0 +1,20 @@ +# License: MIT +# Copyright © 2025 Frequenz Energy-as-a-Service GmbH + +"""Precharger component.""" + +import dataclasses +from typing import Literal + +from ._category import ElectricalComponentCategory +from ._electrical_component import ElectricalComponent + + +@dataclasses.dataclass(frozen=True, kw_only=True) +class Precharger(ElectricalComponent): + """A precharger component.""" + + category: Literal[ElectricalComponentCategory.PRECHARGER] = ( + ElectricalComponentCategory.PRECHARGER + ) + """The category of this component.""" diff --git a/src/frequenz/client/assets/electrical_component/_problematic.py b/src/frequenz/client/assets/electrical_component/_problematic.py new file mode 100644 index 0000000..42291a8 --- /dev/null +++ b/src/frequenz/client/assets/electrical_component/_problematic.py @@ -0,0 +1,52 @@ +# License: MIT +# Copyright © 2025 Frequenz Energy-as-a-Service GmbH + +"""Unknown component.""" + +import dataclasses +from typing import Any, Literal, Self + +from ._category import ElectricalComponentCategory +from ._electrical_component import ElectricalComponent + + +@dataclasses.dataclass(frozen=True, kw_only=True) +class ProblematicComponent(ElectricalComponent): + """An abstract component with a problem.""" + + # pylint: disable-next=unused-argument + def __new__(cls, *args: Any, **kwargs: Any) -> Self: + """Prevent instantiation of this class.""" + if cls is ProblematicComponent: + raise TypeError(f"Cannot instantiate {cls.__name__} directly") + return super().__new__(cls) + + +@dataclasses.dataclass(frozen=True, kw_only=True) +class UnspecifiedComponent(ProblematicComponent): + """A component of unspecified type.""" + + category: Literal[ElectricalComponentCategory.UNSPECIFIED] = ( + ElectricalComponentCategory.UNSPECIFIED + ) + """The category of this component.""" + + +@dataclasses.dataclass(frozen=True, kw_only=True) +class UnrecognizedComponent(ProblematicComponent): + """A component of an unrecognized type.""" + + category: int + """The category of this component.""" + + +@dataclasses.dataclass(frozen=True, kw_only=True) +class MismatchedCategoryComponent(ProblematicComponent): + """A component with a mismatch in the category. + + This component declared a category but carries category specific metadata that + doesn't match the declared category. + """ + + category: ElectricalComponentCategory | int + """The category of this component.""" diff --git a/src/frequenz/client/assets/electrical_component/_static_transfer_switch.py b/src/frequenz/client/assets/electrical_component/_static_transfer_switch.py new file mode 100644 index 0000000..b0c850f --- /dev/null +++ b/src/frequenz/client/assets/electrical_component/_static_transfer_switch.py @@ -0,0 +1,20 @@ +# License: MIT +# Copyright © 2025 Frequenz Energy-as-a-Service GmbH + +"""StaticTransferSwitch component.""" + +import dataclasses +from typing import Literal + +from ._category import ElectricalComponentCategory +from ._electrical_component import ElectricalComponent + + +@dataclasses.dataclass(frozen=True, kw_only=True) +class StaticTransferSwitch(ElectricalComponent): + """A static transfer switch component.""" + + category: Literal[ElectricalComponentCategory.STATIC_TRANSFER_SWITCH] = ( + ElectricalComponentCategory.STATIC_TRANSFER_SWITCH + ) + """The category of this component.""" diff --git a/src/frequenz/client/assets/electrical_component/_types.py b/src/frequenz/client/assets/electrical_component/_types.py new file mode 100644 index 0000000..175161f --- /dev/null +++ b/src/frequenz/client/assets/electrical_component/_types.py @@ -0,0 +1,73 @@ +# License: MIT +# Copyright © 2025 Frequenz Energy-as-a-Service GmbH + +"""Electrical component types.""" + +from typing import TypeAlias + +from ._battery import BatteryTypes, UnrecognizedBattery, UnspecifiedBattery +from ._breaker import Breaker +from ._capacitor_bank import CapacitorBank +from ._chp import Chp +from ._converter import Converter +from ._crypto_miner import CryptoMiner +from ._electrolyzer import Electrolyzer +from ._ev_charger import EvChargerTypes, UnrecognizedEvCharger, UnspecifiedEvCharger +from ._grid_connection_point import GridConnectionPoint +from ._hvac import Hvac +from ._inverter import InverterTypes, UnrecognizedInverter, UnspecifiedInverter +from ._meter import Meter +from ._plc import Plc +from ._power_transformer import PowerTransformer +from ._precharger import Precharger +from ._problematic import ( + MismatchedCategoryComponent, + UnrecognizedComponent, + UnspecifiedComponent, +) +from ._static_transfer_switch import StaticTransferSwitch +from ._uninterruptible_power_supply import UninterruptiblePowerSupply +from ._wind_turbine import WindTurbine + +UnspecifiedComponentTypes: TypeAlias = ( + UnspecifiedBattery + | UnspecifiedComponent + | UnspecifiedEvCharger + | UnspecifiedInverter +) +"""All unspecified component types.""" + +UnrecognizedComponentTypes: TypeAlias = ( + UnrecognizedBattery + | UnrecognizedComponent + | UnrecognizedEvCharger + | UnrecognizedInverter +) + +ProblematicComponentTypes: TypeAlias = ( + MismatchedCategoryComponent | UnrecognizedComponentTypes | UnspecifiedComponentTypes +) +"""All possible component types that has a problem.""" + +ElectricalComponentType: TypeAlias = ( + BatteryTypes + | Chp + | Converter + | CryptoMiner + | Electrolyzer + | Hvac + | Meter + | Precharger + | Breaker + | Plc + | StaticTransferSwitch + | UninterruptiblePowerSupply + | CapacitorBank + | WindTurbine + | InverterTypes + | PowerTransformer + | EvChargerTypes + | GridConnectionPoint + | ProblematicComponentTypes +) +"""The type of the electrical component.""" diff --git a/src/frequenz/client/assets/electrical_component/_uninterruptible_power_supply.py b/src/frequenz/client/assets/electrical_component/_uninterruptible_power_supply.py new file mode 100644 index 0000000..7e381f9 --- /dev/null +++ b/src/frequenz/client/assets/electrical_component/_uninterruptible_power_supply.py @@ -0,0 +1,20 @@ +# License: MIT +# Copyright © 2025 Frequenz Energy-as-a-Service GmbH + +"""UninterruptiblePowerSupply component.""" + +import dataclasses +from typing import Literal + +from ._category import ElectricalComponentCategory +from ._electrical_component import ElectricalComponent + + +@dataclasses.dataclass(frozen=True, kw_only=True) +class UninterruptiblePowerSupply(ElectricalComponent): + """An uninterruptible power supply component.""" + + category: Literal[ElectricalComponentCategory.UNINTERRUPTIBLE_POWER_SUPPLY] = ( + ElectricalComponentCategory.UNINTERRUPTIBLE_POWER_SUPPLY + ) + """The category of this component.""" diff --git a/src/frequenz/client/assets/electrical_component/_wind_turbine.py b/src/frequenz/client/assets/electrical_component/_wind_turbine.py new file mode 100644 index 0000000..3c37e6b --- /dev/null +++ b/src/frequenz/client/assets/electrical_component/_wind_turbine.py @@ -0,0 +1,20 @@ +# License: MIT +# Copyright © 2025 Frequenz Energy-as-a-Service GmbH + +"""Wind Turbine component.""" + +import dataclasses +from typing import Literal + +from ._category import ElectricalComponentCategory +from ._electrical_component import ElectricalComponent + + +@dataclasses.dataclass(frozen=True, kw_only=True) +class WindTurbine(ElectricalComponent): + """A wind turbine component.""" + + category: Literal[ElectricalComponentCategory.WIND_TURBINE] = ( + ElectricalComponentCategory.WIND_TURBINE + ) + """The category of this component.""" diff --git a/src/frequenz/client/assets/exceptions.py b/src/frequenz/client/assets/exceptions.py index dcb61a1..9036862 100644 --- a/src/frequenz/client/assets/exceptions.py +++ b/src/frequenz/client/assets/exceptions.py @@ -47,4 +47,5 @@ "ServiceUnavailable", "UnknownError", "UnrecognizedGrpcStatus", + "PermissionDenied", ] diff --git a/src/frequenz/client/assets/metrics/__init__.py b/src/frequenz/client/assets/metrics/__init__.py new file mode 100644 index 0000000..bdf487d --- /dev/null +++ b/src/frequenz/client/assets/metrics/__init__.py @@ -0,0 +1,12 @@ +# License: MIT +# Copyright © 2025 Frequenz Energy-as-a-Service GmbH + +"""Metrics for the electrical component.""" + +from ._bounds import Bounds +from ._metric import Metric + +__all__ = [ + "Metric", + "Bounds", +] diff --git a/src/frequenz/client/assets/metrics/_bounds.py b/src/frequenz/client/assets/metrics/_bounds.py new file mode 100644 index 0000000..4080dc9 --- /dev/null +++ b/src/frequenz/client/assets/metrics/_bounds.py @@ -0,0 +1,44 @@ +# License: MIT +# Copyright © 2025 Frequenz Energy-as-a-Service GmbH + +"""Bounds for the metrics.""" + +from dataclasses import dataclass + + +@dataclass(frozen=True, kw_only=True) +class Bounds: + """A set of lower and upper bounds for any metric. + + The lower bound must be less than or equal to the upper bound. + + The units of the bounds are always the same as the related metric. + """ + + lower: float | None = None + """The lower bound. + + If `None`, there is no lower bound. + """ + + upper: float | None = None + """The upper bound. + + If `None`, there is no upper bound. + """ + + def __post_init__(self) -> None: + """Validate these bounds.""" + if self.lower is None: + return + if self.upper is None: + return + if self.lower > self.upper: + raise ValueError( + f"Lower bound ({self.lower}) must be less than or equal to upper " + f"bound ({self.upper})" + ) + + def __str__(self) -> str: + """Return a string representation of these bounds.""" + return f"[{self.lower}, {self.upper}]" diff --git a/src/frequenz/client/assets/metrics/_bounds_proto.py b/src/frequenz/client/assets/metrics/_bounds_proto.py new file mode 100644 index 0000000..a2b546f --- /dev/null +++ b/src/frequenz/client/assets/metrics/_bounds_proto.py @@ -0,0 +1,17 @@ +# License: MIT +# Copyright © 2025 Frequenz Energy-as-a-Service GmbH + +"""Loading of Bounds objects from protobuf messages.""" + + +from frequenz.api.common.v1alpha8.metrics import bounds_pb2 + +from ._bounds import Bounds + + +def bounds_from_proto(message: bounds_pb2.Bounds) -> Bounds: + """Create a `Bounds` from a protobuf message.""" + return Bounds( + lower=message.lower if message.HasField("lower") else None, + upper=message.upper if message.HasField("upper") else None, + ) diff --git a/src/frequenz/client/assets/metrics/_metric.py b/src/frequenz/client/assets/metrics/_metric.py new file mode 100644 index 0000000..8f8d418 --- /dev/null +++ b/src/frequenz/client/assets/metrics/_metric.py @@ -0,0 +1,261 @@ +# License: MIT +# Copyright © 2025 Frequenz Energy-as-a-Service GmbH + +"""Metrics for the electrical component.""" + +import enum + +from frequenz.api.common.v1alpha8.metrics import metrics_pb2 + + +@enum.unique +class Metric(enum.Enum): + """List of supported metrics. + + Note: AC energy metrics information + - This energy metric is reported directly from the component, and not a + result of aggregations in our systems. If a component does not have this + metric, this field cannot be populated. + + - Components that provide energy metrics reset this metric from time to + time. This behaviour is specific to each component model. E.g., some + components reset it on UTC 00:00:00. + + - This energy metric does not specify the start time of the accumulation + period,and therefore can be inconsistent. + """ + + UNSPECIFIED = metrics_pb2.METRIC_UNSPECIFIED + """The metric is unspecified (this should not be used).""" + + DC_VOLTAGE = metrics_pb2.METRIC_DC_VOLTAGE + """The direct current voltage.""" + + DC_CURRENT = metrics_pb2.METRIC_DC_CURRENT + """The direct current current.""" + + DC_POWER = metrics_pb2.METRIC_DC_POWER + """The direct current power.""" + + AC_FREQUENCY = metrics_pb2.METRIC_AC_FREQUENCY + """The alternating current frequency.""" + + AC_VOLTAGE = metrics_pb2.METRIC_AC_VOLTAGE + """The alternating current electric potential difference.""" + + AC_VOLTAGE_PHASE_1_N = metrics_pb2.METRIC_AC_VOLTAGE_PHASE_1_N + """The alternating current electric potential difference between phase 1 and neutral.""" + + AC_VOLTAGE_PHASE_2_N = metrics_pb2.METRIC_AC_VOLTAGE_PHASE_2_N + """The alternating current electric potential difference between phase 2 and neutral.""" + + AC_VOLTAGE_PHASE_3_N = metrics_pb2.METRIC_AC_VOLTAGE_PHASE_3_N + """The alternating current electric potential difference between phase 3 and neutral.""" + + AC_VOLTAGE_PHASE_1_PHASE_2 = metrics_pb2.METRIC_AC_VOLTAGE_PHASE_1_PHASE_2 + """The alternating current electric potential difference between phase 1 and phase 2.""" + + AC_VOLTAGE_PHASE_2_PHASE_3 = metrics_pb2.METRIC_AC_VOLTAGE_PHASE_2_PHASE_3 + """The alternating current electric potential difference between phase 2 and phase 3.""" + + AC_VOLTAGE_PHASE_3_PHASE_1 = metrics_pb2.METRIC_AC_VOLTAGE_PHASE_3_PHASE_1 + """The alternating current electric potential difference between phase 3 and phase 1.""" + + AC_CURRENT = metrics_pb2.METRIC_AC_CURRENT + """The alternating current current.""" + + AC_CURRENT_PHASE_1 = metrics_pb2.METRIC_AC_CURRENT_PHASE_1 + """The alternating current current in phase 1.""" + + AC_CURRENT_PHASE_2 = metrics_pb2.METRIC_AC_CURRENT_PHASE_2 + """The alternating current current in phase 2.""" + + AC_CURRENT_PHASE_3 = metrics_pb2.METRIC_AC_CURRENT_PHASE_3 + """The alternating current current in phase 3.""" + + AC_POWER_APPARENT = metrics_pb2.METRIC_AC_POWER_APPARENT + """The alternating current apparent power.""" + + AC_POWER_APPARENT_PHASE_1 = metrics_pb2.METRIC_AC_POWER_APPARENT_PHASE_1 + """The alternating current apparent power in phase 1.""" + + AC_POWER_APPARENT_PHASE_2 = metrics_pb2.METRIC_AC_POWER_APPARENT_PHASE_2 + """The alternating current apparent power in phase 2.""" + + AC_POWER_APPARENT_PHASE_3 = metrics_pb2.METRIC_AC_POWER_APPARENT_PHASE_3 + """The alternating current apparent power in phase 3.""" + + AC_POWER_ACTIVE = metrics_pb2.METRIC_AC_POWER_ACTIVE + """The alternating current active power.""" + + AC_POWER_ACTIVE_PHASE_1 = metrics_pb2.METRIC_AC_POWER_ACTIVE_PHASE_1 + """The alternating current active power in phase 1.""" + + AC_POWER_ACTIVE_PHASE_2 = metrics_pb2.METRIC_AC_POWER_ACTIVE_PHASE_2 + """The alternating current active power in phase 2.""" + + AC_POWER_ACTIVE_PHASE_3 = metrics_pb2.METRIC_AC_POWER_ACTIVE_PHASE_3 + """The alternating current active power in phase 3.""" + + AC_POWER_REACTIVE = metrics_pb2.METRIC_AC_POWER_REACTIVE + """The alternating current reactive power.""" + + AC_POWER_REACTIVE_PHASE_1 = metrics_pb2.METRIC_AC_POWER_REACTIVE_PHASE_1 + """The alternating current reactive power in phase 1.""" + + AC_POWER_REACTIVE_PHASE_2 = metrics_pb2.METRIC_AC_POWER_REACTIVE_PHASE_2 + """The alternating current reactive power in phase 2.""" + + AC_POWER_REACTIVE_PHASE_3 = metrics_pb2.METRIC_AC_POWER_REACTIVE_PHASE_3 + """The alternating current reactive power in phase 3.""" + + AC_POWER_FACTOR = metrics_pb2.METRIC_AC_POWER_FACTOR + """The alternating current power factor.""" + + AC_POWER_FACTOR_PHASE_1 = metrics_pb2.METRIC_AC_POWER_FACTOR_PHASE_1 + """The alternating current power factor in phase 1.""" + + AC_POWER_FACTOR_PHASE_2 = metrics_pb2.METRIC_AC_POWER_FACTOR_PHASE_2 + """The alternating current power factor in phase 2.""" + + AC_POWER_FACTOR_PHASE_3 = metrics_pb2.METRIC_AC_POWER_FACTOR_PHASE_3 + """The alternating current power factor in phase 3.""" + + AC_ENERGY_APPARENT = metrics_pb2.METRIC_AC_ENERGY_APPARENT + """The alternating current apparent energy.""" + + AC_ENERGY_APPARENT_PHASE_1 = metrics_pb2.METRIC_AC_ENERGY_APPARENT_PHASE_1 + """The alternating current apparent energy in phase 1.""" + + AC_ENERGY_APPARENT_PHASE_2 = metrics_pb2.METRIC_AC_ENERGY_APPARENT_PHASE_2 + """The alternating current apparent energy in phase 2.""" + + AC_ENERGY_APPARENT_PHASE_3 = metrics_pb2.METRIC_AC_ENERGY_APPARENT_PHASE_3 + """The alternating current apparent energy in phase 3.""" + + AC_ENERGY_ACTIVE = metrics_pb2.METRIC_AC_ENERGY_ACTIVE + """The alternating current active energy.""" + + AC_ENERGY_ACTIVE_PHASE_1 = metrics_pb2.METRIC_AC_ENERGY_ACTIVE_PHASE_1 + """The alternating current active energy in phase 1.""" + + AC_ENERGY_ACTIVE_PHASE_2 = metrics_pb2.METRIC_AC_ENERGY_ACTIVE_PHASE_2 + """The alternating current active energy in phase 2.""" + + AC_ENERGY_ACTIVE_PHASE_3 = metrics_pb2.METRIC_AC_ENERGY_ACTIVE_PHASE_3 + """The alternating current active energy in phase 3.""" + + AC_ENERGY_ACTIVE_CONSUMED = metrics_pb2.METRIC_AC_ENERGY_ACTIVE_CONSUMED + """The alternating current active energy consumed.""" + + AC_ENERGY_ACTIVE_CONSUMED_PHASE_1 = ( + metrics_pb2.METRIC_AC_ENERGY_ACTIVE_CONSUMED_PHASE_1 + ) + """The alternating current active energy consumed in phase 1.""" + + AC_ENERGY_ACTIVE_CONSUMED_PHASE_2 = ( + metrics_pb2.METRIC_AC_ENERGY_ACTIVE_CONSUMED_PHASE_2 + ) + """The alternating current active energy consumed in phase 2.""" + + AC_ENERGY_ACTIVE_CONSUMED_PHASE_3 = ( + metrics_pb2.METRIC_AC_ENERGY_ACTIVE_CONSUMED_PHASE_3 + ) + """The alternating current active energy consumed in phase 3.""" + + AC_ENERGY_ACTIVE_DELIVERED = metrics_pb2.METRIC_AC_ENERGY_ACTIVE_DELIVERED + """The alternating current active energy delivered.""" + + AC_ENERGY_ACTIVE_DELIVERED_PHASE_1 = ( + metrics_pb2.METRIC_AC_ENERGY_ACTIVE_DELIVERED_PHASE_1 + ) + """The alternating current active energy delivered in phase 1.""" + + AC_ENERGY_ACTIVE_DELIVERED_PHASE_2 = ( + metrics_pb2.METRIC_AC_ENERGY_ACTIVE_DELIVERED_PHASE_2 + ) + """The alternating current active energy delivered in phase 2.""" + + AC_ENERGY_ACTIVE_DELIVERED_PHASE_3 = ( + metrics_pb2.METRIC_AC_ENERGY_ACTIVE_DELIVERED_PHASE_3 + ) + """The alternating current active energy delivered in phase 3.""" + + AC_ENERGY_REACTIVE = metrics_pb2.METRIC_AC_ENERGY_REACTIVE + """The alternating current reactive energy.""" + + AC_ENERGY_REACTIVE_PHASE_1 = metrics_pb2.METRIC_AC_ENERGY_REACTIVE_PHASE_1 + """The alternating current reactive energy in phase 1.""" + + AC_ENERGY_REACTIVE_PHASE_2 = metrics_pb2.METRIC_AC_ENERGY_REACTIVE_PHASE_2 + """The alternating current reactive energy in phase 2.""" + + AC_ENERGY_REACTIVE_PHASE_3 = metrics_pb2.METRIC_AC_ENERGY_REACTIVE_PHASE_3 + """The alternating current reactive energy in phase 3.""" + + AC_TOTAL_HARMONIC_DISTORTION_CURRENT = ( + metrics_pb2.METRIC_AC_TOTAL_HARMONIC_DISTORTION_CURRENT + ) + """The alternating current total harmonic distortion current.""" + + AC_TOTAL_HARMONIC_DISTORTION_CURRENT_PHASE_1 = ( + metrics_pb2.METRIC_AC_TOTAL_HARMONIC_DISTORTION_CURRENT_PHASE_1 + ) + """The alternating current total harmonic distortion current in phase 1.""" + + AC_TOTAL_HARMONIC_DISTORTION_CURRENT_PHASE_2 = ( + metrics_pb2.METRIC_AC_TOTAL_HARMONIC_DISTORTION_CURRENT_PHASE_2 + ) + """The alternating current total harmonic distortion current in phase 2.""" + + AC_TOTAL_HARMONIC_DISTORTION_CURRENT_PHASE_3 = ( + metrics_pb2.METRIC_AC_TOTAL_HARMONIC_DISTORTION_CURRENT_PHASE_3 + ) + """The alternating current total harmonic distortion current in phase 3.""" + + BATTERY_CAPACITY = metrics_pb2.METRIC_BATTERY_CAPACITY + """The capacity of the battery.""" + + BATTERY_SOC_PCT = metrics_pb2.METRIC_BATTERY_SOC_PCT + """The state of charge of the battery as a percentage.""" + + BATTERY_TEMPERATURE = metrics_pb2.METRIC_BATTERY_TEMPERATURE + """The temperature of the battery.""" + + INVERTER_TEMPERATURE = metrics_pb2.METRIC_INVERTER_TEMPERATURE + """The temperature of the inverter.""" + + INVERTER_TEMPERATURE_CABINET = metrics_pb2.METRIC_INVERTER_TEMPERATURE_CABINET + """The temperature of the inverter cabinet.""" + + INVERTER_TEMPERATURE_HEATSINK = metrics_pb2.METRIC_INVERTER_TEMPERATURE_HEATSINK + """The temperature of the inverter heatsink.""" + + INVERTER_TEMPERATURE_TRANSFORMER = ( + metrics_pb2.METRIC_INVERTER_TEMPERATURE_TRANSFORMER + ) + """The temperature of the inverter transformer.""" + + EV_CHARGER_TEMPERATURE = metrics_pb2.METRIC_EV_CHARGER_TEMPERATURE + """The temperature of the EV charger.""" + + SENSOR_WIND_SPEED = metrics_pb2.METRIC_SENSOR_WIND_SPEED + """The speed of the wind measured.""" + + SENSOR_WIND_DIRECTION = metrics_pb2.METRIC_SENSOR_WIND_DIRECTION + """The direction of the wind measured.""" + + SENSOR_TEMPERATURE = metrics_pb2.METRIC_SENSOR_TEMPERATURE + """The temperature measured.""" + + SENSOR_RELATIVE_HUMIDITY = metrics_pb2.METRIC_SENSOR_RELATIVE_HUMIDITY + """The relative humidity measured.""" + + SENSOR_DEW_POINT = metrics_pb2.METRIC_SENSOR_DEW_POINT + """The dew point measured.""" + + SENSOR_AIR_PRESSURE = metrics_pb2.METRIC_SENSOR_AIR_PRESSURE + """The air pressure measured.""" + + SENSOR_IRRADIANCE = metrics_pb2.METRIC_SENSOR_IRRADIANCE + """The irradiance measured.""" diff --git a/src/frequenz/client/assets/types.py b/src/frequenz/client/assets/types.py deleted file mode 100644 index df16feb..0000000 --- a/src/frequenz/client/assets/types.py +++ /dev/null @@ -1,136 +0,0 @@ -# License: MIT -# Copyright © 2025 Frequenz Energy-as-a-Service GmbH - -"""Type definitions for the Assets API client. - -This module contains the core data types used by the Assets API client, -including data classes for representing assets, microgrids, and related entities. -""" - -from __future__ import annotations - -from dataclasses import dataclass -from datetime import datetime, timezone - -from frequenz.api.common.v1alpha8.grid.delivery_area_pb2 import ( - DeliveryArea as PBDeliveryArea, -) -from frequenz.api.common.v1alpha8.microgrid.microgrid_pb2 import ( - Microgrid as PBMicrogrid, -) -from frequenz.api.common.v1alpha8.types.location_pb2 import Location as PBLocation -from frequenz.client.common.microgrid import EnterpriseId, MicrogridId - - -@dataclass(frozen=True) -class DeliveryArea: - """A wrapper class for the protobuf DeliveryArea message. - - A delivery area is a geographical area that is served by a microgrid. - """ - - code: str - code_type: str - - @staticmethod - def from_protobuf(pb: PBDeliveryArea) -> "DeliveryArea": - """ - Create a DeliveryArea instance from a protobuf message. - - Args: - pb: The protobuf DeliveryArea message. - - Returns: - A new DeliveryArea instance populated with data from the protobuf message. - """ - return DeliveryArea( - code=pb.code, - code_type=str(pb.code_type), - ) - - -@dataclass(frozen=True) -class Location: - """A wrapper class for the protobuf Location message. - - A location is a geographical location that is served by a microgrid. - """ - - latitude: float - longitude: float - country_code: str | None - - @staticmethod - def from_protobuf(pb: PBLocation) -> "Location": - """Create a Location instance from a protobuf message. - - Args: - pb: The protobuf Location message. - - Returns: - A new Location instance populated with data from the protobuf message. - """ - return Location( - latitude=pb.latitude, - longitude=pb.longitude, - country_code=pb.country_code, - ) - - -@dataclass(frozen=True) -class Microgrid: - """A wrapper class for the protobuf Microgrid message. - - A microgrid is a localized group of electricity sources and loads that normally - operates connected to and synchronous with the traditional wide area electrical - grid (macrogrid), but is able to disconnect from the grid and operate autonomously. - """ - - id: MicrogridId - """Unique identifier for the microgrid.""" - - enterprise_id: EnterpriseId - """ID of the enterprise that owns this microgrid.""" - - name: str - """Human-readable name for the microgrid.""" - - delivery_area: DeliveryArea | None - """Delivery area served by the microgrid, if applicable.""" - - location: Location | None - """Geographical location of the microgrid, if applicable.""" - - status: int - """Current operational status of the microgrid, represented as an integer.""" - - create_time: datetime - """Timestamp when the microgrid was created, in UTC.""" - - @staticmethod - def from_protobuf(pb: PBMicrogrid) -> "Microgrid": - """Create a Microgrid instance from a protobuf message. - - Args: - pb: The protobuf Microgrid message. - - Returns: - A new Microgrid instance populated with data from the protobuf message. - """ - delivery_area: DeliveryArea | None = None - if pb.HasField("delivery_area"): - delivery_area = DeliveryArea.from_protobuf(pb.delivery_area) - - location: Location | None = None - if pb.HasField("location"): - location = Location.from_protobuf(pb.location) - - return Microgrid( - id=MicrogridId(pb.id), - enterprise_id=EnterpriseId(pb.enterprise_id), - name=pb.name, - delivery_area=delivery_area, - location=location, - status=pb.status, - create_time=pb.create_timestamp.ToDatetime().replace(tzinfo=timezone.utc), - ) diff --git a/tests/__init__.py b/tests/__init__.py index c5849fa..0c7dcb3 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -1,4 +1,4 @@ # License: MIT # Copyright © 2025 Frequenz Energy-as-a-Service GmbH -"""Tests package.""" +"""Tests for the Assets API Client.""" diff --git a/tests/client_test_cases/get_microgrid/defaults_case.py b/tests/client_test_cases/get_microgrid/defaults_case.py new file mode 100644 index 0000000..f30ae7d --- /dev/null +++ b/tests/client_test_cases/get_microgrid/defaults_case.py @@ -0,0 +1,40 @@ +# License: MIT +# Copyright © 2025 Frequenz Energy-as-a-Service GmbH + +"""Test data for successful microgrid retrieval.""" + +from datetime import datetime, timezone +from typing import Any +from unittest.mock import AsyncMock + +from frequenz.api.assets.v1 import assets_pb2 +from frequenz.api.common.v1alpha8.microgrid import microgrid_pb2 +from frequenz.client.common.microgrid import EnterpriseId, MicrogridId + +from frequenz.client.assets import Microgrid, MicrogridStatus + + +def assert_stub_method_call(stub_method: AsyncMock) -> None: + """Assert that the gRPC request matches the expected request.""" + stub_method.assert_called_once_with( + assets_pb2.GetMicrogridRequest(microgrid_id=1), timeout=60.0 + ) + + +client_args = (1,) +create_timestamp = datetime(2023, 1, 1, tzinfo=timezone.utc) +grpc_response = assets_pb2.GetMicrogridResponse(microgrid=microgrid_pb2.Microgrid()) + + +def assert_client_result(result: Any) -> None: + """Assert that the client result matches the expected Microgrid.""" + assert result == Microgrid( + id=MicrogridId(0), + enterprise_id=EnterpriseId(0), + name=None, + status=MicrogridStatus.UNSPECIFIED, + location=None, + delivery_area=None, + create_timestamp=datetime(1970, 1, 1, 0, 0, tzinfo=timezone.utc), + ) + assert result.is_active diff --git a/tests/client_test_cases/get_microgrid/error_case.py b/tests/client_test_cases/get_microgrid/error_case.py new file mode 100644 index 0000000..e74e16d --- /dev/null +++ b/tests/client_test_cases/get_microgrid/error_case.py @@ -0,0 +1,29 @@ +# License: MIT +# Copyright © 2025 Frequenz Energy-as-a-Service GmbH + +"""Test data for microgrid info retrieval with error.""" + +from typing import Any + +from frequenz.api.assets.v1 import assets_pb2 +from grpc import StatusCode + +from frequenz.client.assets.exceptions import PermissionDenied +from tests.util import make_grpc_error + + +def assert_stub_method_call(stub_method: Any) -> None: + """Assert that the gRPC request matches the expected request.""" + stub_method.assert_called_once_with( + assets_pb2.GetMicrogridRequest(microgrid_id=1), timeout=60.0 + ) + + +client_args = (1,) +grpc_response = make_grpc_error(StatusCode.PERMISSION_DENIED) + + +def assert_client_exception(exception: Exception) -> None: + """Assert that the client exception matches the expected error.""" + assert isinstance(exception, PermissionDenied) + assert exception.grpc_error == grpc_response diff --git a/tests/client_test_cases/get_microgrid/full_case.py b/tests/client_test_cases/get_microgrid/full_case.py new file mode 100644 index 0000000..f71e779 --- /dev/null +++ b/tests/client_test_cases/get_microgrid/full_case.py @@ -0,0 +1,70 @@ +# License: MIT +# Copyright © 2025 Frequenz Energy-as-a-Service GmbH + +"""Test data for successful microgrid info retrieval.""" + +from datetime import datetime, timezone +from typing import Any +from unittest.mock import AsyncMock + +import pytest +from frequenz.api.assets.v1 import assets_pb2 +from frequenz.api.common.v1alpha8.grid import delivery_area_pb2 +from frequenz.api.common.v1alpha8.microgrid import microgrid_pb2 +from frequenz.api.common.v1alpha8.types import location_pb2 +from frequenz.client.base.conversion import to_timestamp +from frequenz.client.common.microgrid import EnterpriseId, MicrogridId + +from frequenz.client.assets import ( + DeliveryArea, + EnergyMarketCodeType, + Location, + Microgrid, + MicrogridStatus, +) + + +def assert_stub_method_call(stub_method: AsyncMock) -> None: + """Assert that the gRPC request matches the expected request.""" + stub_method.assert_called_once_with( + assets_pb2.GetMicrogridRequest(microgrid_id=1234), timeout=60.0 + ) + + +client_args = (1234,) +create_timestamp = datetime(2023, 1, 1, tzinfo=timezone.utc) +grpc_response = assets_pb2.GetMicrogridResponse( + microgrid=microgrid_pb2.Microgrid( + id=1234, + enterprise_id=5678, + name="Test Microgrid", + delivery_area=delivery_area_pb2.DeliveryArea( + code="Test Delivery Area", + code_type=delivery_area_pb2.EnergyMarketCodeType.ENERGY_MARKET_CODE_TYPE_EUROPE_EIC, + ), + location=location_pb2.Location( + latitude=37.7749, longitude=-122.4194, country_code="DE" + ), + status=microgrid_pb2.MICROGRID_STATUS_INACTIVE, + create_timestamp=to_timestamp(create_timestamp), + ) +) + + +def assert_client_result(result: Any) -> None: + """Assert that the client result matches the expected Microgrid.""" + assert result == Microgrid( + id=MicrogridId(1234), + enterprise_id=EnterpriseId(5678), + name="Test Microgrid", + delivery_area=DeliveryArea( + code="Test Delivery Area", code_type=EnergyMarketCodeType.EUROPE_EIC + ), + location=Location( + latitude=pytest.approx(37.7749), # type: ignore[arg-type] + longitude=pytest.approx(-122.4194), # type: ignore[arg-type] + country_code="DE", + ), + status=MicrogridStatus.INACTIVE, + create_timestamp=datetime(2023, 1, 1, tzinfo=timezone.utc), + ) diff --git a/tests/client_test_cases/list_microgrid_electrical_components/empty_case.py b/tests/client_test_cases/list_microgrid_electrical_components/empty_case.py new file mode 100644 index 0000000..069d3fe --- /dev/null +++ b/tests/client_test_cases/list_microgrid_electrical_components/empty_case.py @@ -0,0 +1,24 @@ +# License: MIT +# Copyright © 2025 Frequenz Energy-as-a-Service GmbH + +"""Empty case for list electrical components.""" + +from typing import Any + +from frequenz.api.assets.v1 import assets_pb2 + + +def assert_stub_method_call(stub_method: Any) -> None: + """Assert that the gRPC request matches the expected request.""" + stub_method.assert_called_once_with( + assets_pb2.ListMicrogridElectricalComponentsRequest(microgrid_id=1234), + timeout=60.0, + ) + + +client_args = (1234,) +grpc_response = assets_pb2.ListMicrogridElectricalComponentsResponse(components=[]) + + +def assert_client_result(result: Any) -> None: # noqa: D103 + assert not list(result) diff --git a/tests/client_test_cases/list_microgrid_electrical_components/error_case.py b/tests/client_test_cases/list_microgrid_electrical_components/error_case.py new file mode 100644 index 0000000..55875de --- /dev/null +++ b/tests/client_test_cases/list_microgrid_electrical_components/error_case.py @@ -0,0 +1,30 @@ +# License: MIT +# Copyright © 2025 Frequenz Energy-as-a-Service GmbH + +"""Test list_electrical_components with error.""" + +from typing import Any + +from frequenz.api.assets.v1 import assets_pb2 +from grpc import StatusCode + +from frequenz.client.assets.exceptions import PermissionDenied +from tests.util import make_grpc_error + + +def assert_stub_method_call(stub_method: Any) -> None: + """Assert that the gRPC request matches the expected request.""" + stub_method.assert_called_once_with( + assets_pb2.ListMicrogridElectricalComponentsRequest(microgrid_id=1234), + timeout=60.0, + ) + + +client_args = (1234,) +grpc_response = make_grpc_error(StatusCode.PERMISSION_DENIED) + + +def assert_client_exception(exception: Exception) -> None: + """Assert that the client exception matches the expected error.""" + assert isinstance(exception, PermissionDenied) + assert exception.grpc_error == grpc_response diff --git a/tests/conftest.py b/tests/conftest.py deleted file mode 100644 index a2fddf2..0000000 --- a/tests/conftest.py +++ /dev/null @@ -1,72 +0,0 @@ -# License: MIT -# Copyright © 2025 Frequenz Energy-as-a-Service GmbH - -"""Global test configuration and fixtures.""" - -from collections.abc import Iterator -from dataclasses import dataclass -from unittest.mock import AsyncMock - -import pytest - -from frequenz.client.assets import AssetsApiClient - -from .helpers.factories import ( - create_delivery_area_mock, - create_location_mock, - create_microgrid_mock, - create_response_mock, -) -from .helpers.test_data import MockData - - -@dataclass -class ClientSetup: # pylint: disable=too-many-instance-attributes - """Parameters for setting up the client.""" - - client: AssetsApiClient - mock_stub: AsyncMock - microgrid_id: int - enterprise_id: int - test_name: str - mock_microgrid: AsyncMock - mock_delivery_area: AsyncMock - mock_location: AsyncMock - mock_response: AsyncMock - - -@pytest.fixture -def client_setup() -> Iterator[ClientSetup]: - """ - Generate a setup parameters for the client. - - This fixture is used to set up the client and the mock stub. - """ - client = AssetsApiClient( - server_url=MockData.server_url, - auth_key=MockData.auth_key, - sign_secret=MockData.sign_secret, - connect=False, - ) - - mock_stub = AsyncMock() - client._stub = mock_stub # pylint: disable=protected-access - client._channel = AsyncMock() # pylint: disable=protected-access - - # Create all mocks using factories - mock_microgrid = create_microgrid_mock() - mock_delivery_area = create_delivery_area_mock() - mock_location = create_location_mock() - mock_response = create_response_mock() - - yield ClientSetup( - client=client, - mock_stub=mock_stub, - microgrid_id=MockData.microgrid_id, - enterprise_id=MockData.enterprise_id, - test_name=MockData.microgrid_name, - mock_microgrid=mock_microgrid, - mock_delivery_area=mock_delivery_area, - mock_location=mock_location, - mock_response=mock_response, - ) diff --git a/tests/helpers/__init__.py b/tests/helpers/__init__.py deleted file mode 100644 index 4bd55a9..0000000 --- a/tests/helpers/__init__.py +++ /dev/null @@ -1,4 +0,0 @@ -# License: MIT -# Copyright © 2025 Frequenz Energy-as-a-Service GmbH - -"""Test helpers package.""" diff --git a/tests/helpers/assertions.py b/tests/helpers/assertions.py deleted file mode 100644 index cf04ce6..0000000 --- a/tests/helpers/assertions.py +++ /dev/null @@ -1,49 +0,0 @@ -# License: MIT -# Copyright © 2025 Frequenz Energy-as-a-Service GmbH - -"""Helper functions for common assertions.""" - -from frequenz.client.assets.types import Microgrid - -from .test_data import MockData - - -def assert_microgrid_basic_fields( - result: Microgrid, - expected_id: int = MockData.microgrid_id, - expected_enterprise_id: int = MockData.enterprise_id, - expected_name: str = MockData.microgrid_name, -) -> None: - """Assert basic microgrid fields are correct.""" - assert str(result.id) == f"MID{expected_id}" - assert str(result.enterprise_id) == f"EID{expected_enterprise_id}" - assert result.name == expected_name - - -def assert_delivery_area_present( - result: Microgrid, expected_code: str = MockData.delivery_area_code -) -> None: - """Assert delivery area is present and correct.""" - assert result.delivery_area is not None - assert result.delivery_area.code == expected_code - - -def assert_delivery_area_absent(result: Microgrid) -> None: - """Assert delivery area is not present.""" - assert result.delivery_area is None - - -def assert_location_present( - result: Microgrid, - expected_country: str = MockData.country_code, - expected_latitude: float = MockData.latitude, -) -> None: - """Assert location is present and correct.""" - assert result.location is not None - assert result.location.country_code == expected_country - assert result.location.latitude == expected_latitude - - -def assert_location_absent(result: Microgrid) -> None: - """Assert location is not present.""" - assert result.location is None diff --git a/tests/helpers/factories.py b/tests/helpers/factories.py deleted file mode 100644 index 6c7e856..0000000 --- a/tests/helpers/factories.py +++ /dev/null @@ -1,68 +0,0 @@ -# License: MIT -# Copyright © 2025 Frequenz Energy-as-a-Service GmbH - -"""Factory functions for creating test objects.""" - -from typing import Any -from unittest.mock import AsyncMock, MagicMock - -from frequenz.client.assets.types import DeliveryArea, Location, Microgrid - -from .test_data import MockData - - -def create_microgrid_mock( - microgrid_id: int = MockData.microgrid_id, - enterprise_id: int = MockData.enterprise_id, - name: str = MockData.microgrid_name, - status: int = MockData.microgrid_status, - **overrides: Any, -) -> AsyncMock: - """Create microgrid mocks with defaults.""" - mock = AsyncMock(name="microgrid_mock", spec=Microgrid) - mock.id = microgrid_id - mock.enterprise_id = enterprise_id - mock.name = name - mock.status = status - # Configure create_timestamp mock to return a datetime directly (not async) - mock_timestamp = MagicMock() - mock_timestamp.ToDatetime.return_value = MockData.create_date - mock.create_timestamp = mock_timestamp - - # Configure HasField to be synchronous (not async) - mock.HasField = MagicMock(return_value=False) - - # Apply any overrides - for key, value in overrides.items(): - setattr(mock, key, value) - - return mock - - -def create_delivery_area_mock( - code: str = MockData.delivery_area_code, - code_type: str = MockData.delivery_area_type, -) -> AsyncMock: - """Create delivery area mocks.""" - mock = AsyncMock(name="delivery_area_mock", spec=DeliveryArea) - mock.code = code - mock.code_type = code_type - return mock - - -def create_location_mock( - latitude: float = MockData.latitude, - longitude: float = MockData.longitude, - country_code: str = MockData.country_code, -) -> AsyncMock: - """Create location mocks.""" - mock = AsyncMock(name="location_mock", spec=Location) - mock.latitude = latitude - mock.longitude = longitude - mock.country_code = country_code - return mock - - -def create_response_mock() -> AsyncMock: - """Create response mocks.""" - return AsyncMock() diff --git a/tests/helpers/test_data.py b/tests/helpers/test_data.py deleted file mode 100644 index 50ee951..0000000 --- a/tests/helpers/test_data.py +++ /dev/null @@ -1,33 +0,0 @@ -# License: MIT -# Copyright © 2025 Frequenz Energy-as-a-Service GmbH - -"""Centralized test data and constants.""" - -from dataclasses import dataclass -from datetime import datetime, timezone - - -@dataclass -class MockData: # pylint: disable=too-many-instance-attributes - """Central place for all test data constants.""" - - # Microgrid data - microgrid_id: int = 123 - enterprise_id: int = 456 - microgrid_name: str = "Test Microgrid" - microgrid_status: int = 1 - create_date: datetime = datetime(2023, 1, 1, tzinfo=timezone.utc) - - # Delivery area data - delivery_area_code: str = "TEST_AREA" - delivery_area_type: str = "TEST_TYPE" - - # Location data - latitude: float = 52.5200 - longitude: float = 13.4050 - country_code: str = "DE" - - # Server config - server_url: str = "grpc://test.example.com:443" - auth_key: str = "test-key" - sign_secret: str = "test-secret" diff --git a/tests/test_client.py b/tests/test_client.py index f9dbdc1..a7fcd66 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -1,177 +1,52 @@ # License: MIT # Copyright © 2025 Frequenz Energy-as-a-Service GmbH -"""Tests for the frequenz.client package.""" +"""Test for the Assets API client.""" -from unittest.mock import AsyncMock +from __future__ import annotations -import pytest -from grpc import StatusCode -from grpc.aio import AioRpcError - -from frequenz.client.assets.exceptions import EntityNotFound - -from .conftest import ClientSetup -from .helpers.assertions import ( - assert_delivery_area_absent, - assert_delivery_area_present, - assert_location_absent, - assert_location_present, - assert_microgrid_basic_fields, -) +from collections.abc import AsyncIterator +from pathlib import Path +import pytest +from frequenz.api.assets.v1 import assets_pb2_grpc -def setup_microgrid_with_fields( - mock_microgrid: AsyncMock, - has_delivery_area: bool = False, - has_location: bool = False, - delivery_area: AsyncMock | None = None, - location: AsyncMock | None = None, -) -> None: - """Set up microgrid mock with optional fields.""" - - def has_field_side_effect(field_name: str) -> bool: - if field_name == "delivery_area": - return has_delivery_area - if field_name == "location": - return has_location - return False - - mock_microgrid.HasField.side_effect = has_field_side_effect +from frequenz.client.assets import AssetsApiClient - if has_delivery_area and delivery_area: - mock_microgrid.delivery_area = delivery_area +from .util import ApiClientTestCaseSpec, get_test_specs, patch_client_class - if has_location and location: - mock_microgrid.location = location +TESTS_DIR = Path(__file__).parent / "client_test_cases" -def create_microgrid_response( - mock_microgrid: AsyncMock, - mock_response: AsyncMock, -) -> AsyncMock: - """Create a complete response with microgrid.""" - mock_response.microgrid = mock_microgrid - return mock_response +@pytest.fixture +async def client() -> AsyncIterator[AssetsApiClient]: + """Fixture that provides a AssetsApiClient with a mock gRPC stub and channel.""" + with patch_client_class(AssetsApiClient, assets_pb2_grpc.PlatformAssetsStub): + client = AssetsApiClient("grpc://localhost:1234") + async with client: + yield client @pytest.mark.parametrize( - "has_delivery_area,has_location", - [ - (False, False), - (True, False), - (False, True), - (True, True), - ], - ids=[ - "without optional fields", - "with delivery area only", - "with location only", - "with all optional fields", - ], + "spec", + get_test_specs("get_microgrid", tests_dir=TESTS_DIR), + ids=str, ) -async def test_get_microgrid_details_optional_fields( - client_setup: ClientSetup, - has_delivery_area: bool, - has_location: bool, +async def test_get_microgrid( + client: AssetsApiClient, spec: ApiClientTestCaseSpec ) -> None: - """Test get_microgrid_details with different combinations of optional fields.""" - client, mock_stub = client_setup.client, client_setup.mock_stub + """Test get_microgrid method.""" + await spec.test_unary_unary_call(client, "GetMicrogrid") - # Setup - setup_microgrid_with_fields( - client_setup.mock_microgrid, - has_delivery_area=has_delivery_area, - has_location=has_location, - delivery_area=client_setup.mock_delivery_area if has_delivery_area else None, - location=client_setup.mock_location if has_location else None, - ) - response = create_microgrid_response( - client_setup.mock_microgrid, client_setup.mock_response - ) - mock_stub.GetMicrogrid.return_value = response - - # Execute - result = await client.get_microgrid_details(client_setup.microgrid_id) - - # Assert basic fields always present - assert_microgrid_basic_fields(result) - - # Assert optional fields based on parameters - if has_delivery_area: - assert_delivery_area_present(result) - else: - assert_delivery_area_absent(result) - - if has_location: - assert_location_present(result) - else: - assert_location_absent(result) - - # Verify stub was called correctly - mock_stub.GetMicrogrid.assert_called_once() - - -async def test_get_microgrid_details_basic_functionality( - client_setup: ClientSetup, -) -> None: - """Test basic successful microgrid details retrieval functionality.""" - client, mock_stub = client_setup.client, client_setup.mock_stub - # Setup basic response without optional fields - setup_microgrid_with_fields( - client_setup.mock_microgrid, - has_delivery_area=False, - has_location=False, - ) - response = create_microgrid_response( - client_setup.mock_microgrid, client_setup.mock_response - ) - mock_stub.GetMicrogrid.return_value = response - - # Execute - result = await client.get_microgrid_details(client_setup.microgrid_id) - - # Assert - assert_microgrid_basic_fields(result) - assert_delivery_area_absent(result) - assert_location_absent(result) - - # Verify stub was called correctly - mock_stub.GetMicrogrid.assert_called_once() - - -async def test_get_microgrid_details_not_found( - client_setup: ClientSetup, +@pytest.mark.asyncio +@pytest.mark.parametrize( + "spec", + get_test_specs("list_microgrid_electrical_components", tests_dir=TESTS_DIR), + ids=str, +) +async def test_list_microgrid_electrical_components( + client: AssetsApiClient, spec: ApiClientTestCaseSpec ) -> None: - """Test get_microgrid_details when microgrid is not found.""" - client, mock_stub = client_setup.client, client_setup.mock_stub - - # Setup mock to raise NOT_FOUND error - class MockAioRpcError(AioRpcError): # pylint: disable=too-few-public-methods - """Mock AioRpcError for testing NOT_FOUND scenarios.""" - - def __init__(self) -> None: - # Don't call super().__init__() # pylint: disable=super-init-not-called - self._debug_error_string = "Mock debug error string" - self._code = StatusCode.NOT_FOUND - - def code(self) -> StatusCode: - return self._code - - def details(self) -> str: - return "Microgrid not found" - - mock_stub.GetMicrogrid.side_effect = MockAioRpcError() - - # Execute and assert exception is raised - with pytest.raises(EntityNotFound) as exc_info: - await client.get_microgrid_details(client_setup.microgrid_id) - - # Assert exception details - grpc_error = exc_info.value.grpc_error - assert grpc_error.code() == StatusCode.NOT_FOUND - assert grpc_error.details() == "Microgrid not found" - - # Verify stub was called correctly - mock_stub.GetMicrogrid.assert_called_once() + """Test list_microgrid_electrical_components method.""" + await spec.test_unary_unary_call(client, "ListMicrogridElectricalComponents") diff --git a/tests/util.py b/tests/util.py new file mode 100644 index 0000000..b9057d4 --- /dev/null +++ b/tests/util.py @@ -0,0 +1,733 @@ +# License: MIT +# Copyright © 2025 Frequenz Energy-as-a-Service GmbH + +# pylint: disable=too-many-lines + +r"""Utilities for testing client implementations. + +Based on: +https://github.com/frequenz-floss/frequenz-client-microgrid-python/blob/v0.17.x/tests/util.py +""" + +from __future__ import annotations + +import asyncio +import functools +import gc +import importlib +import inspect +import itertools +import logging +import sys +from collections.abc import AsyncIterable, AsyncIterator, Awaitable, Callable, Iterable +from contextlib import AsyncExitStack, ContextDecorator, aclosing +from dataclasses import dataclass +from pathlib import Path +from typing import Any, Generic, Protocol, TypeVar, get_args, get_origin +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest +from frequenz.client.base.client import BaseApiClient +from grpc import StatusCode +from grpc.aio import AioRpcError, Channel, Metadata + +_logger = logging.getLogger(__name__) + +StubT = TypeVar("StubT") +"""Type variable for the gRPC stub type.""" + +ClientT = TypeVar("ClientT", bound=BaseApiClient[Any]) +"""Type variable for the client type.""" + + +@dataclass(frozen=True, kw_only=True) +class ApiClientTestCase: + """A single test case for a gRPC client method.""" + + client_args: tuple[Any, ...] + """The positional arguments to use when calling the client method being tested.""" + + client_kwargs: dict[str, Any] + """The keyword arguments to use when calling the client method being tested.""" + + assert_stub_method_call: Callable[[Any], None] + """The assertion function to validate the gRPC request done by the client. + + The assertion function takes the actual gRPC request that was done, and should + make assertions on it to validate that it matches the expected request. + """ + + grpc_response: Any + """The response or exception to use to mock the gRPC call. + + If this is an exception, it will be raised when the gRPC call is made. + If this is a value, it will be returned as the response. + """ + + assert_client_result: ( + Callable[[Any], None] | Callable[[Any], Awaitable[None]] | None + ) = None + """The assertion function to validate the result returned by the client. + + The assertion function takes the actual result returned by the client method, + and it should make assertions on it to validate that it matches the expected + result. + + This is only used if the gRPC call does not raise an exception. + """ + + assert_client_exception: Callable[[Exception], None] | None = None + """The assertion function to validate the exception raised by the client. + + The assertion function takes the actual exception raised by the client method, + and it should make assertions on it to validate that it matches the expected + exception. + + This is only used if the gRPC call raises an exception. + """ + + def __post_init__(self) -> None: + """Post-initialization checks for the TestCase class.""" + if self.assert_client_result is None and self.assert_client_exception is None: + raise ValueError( + "Either assert_client_result or assert_client_exception must be provided." + ) + if ( + self.assert_client_result is not None + and self.assert_client_exception is not None + ): + raise ValueError( + "Only one of assert_client_result or assert_client_exception must be provided." + ) + + +@dataclass(frozen=True, kw_only=True) +class ApiClientTestCaseSpec: + """A specification for a test case. + + This is used to load the test case data from a file and run the test. + """ + + name: str + """The name of the test case.""" + + client_method_name: str + """The name of the gRPC client method being tested.""" + + path: Path + """The absolute path to the test case file.""" + + relative_path: Path + """The test case file path relative to current working directory.""" + + def __str__(self) -> str: + """Return a string representation of the test case specification.""" + return self.name + + def load_test_module(self) -> Any: + """Return the loaded test case module from the test case file.""" + module_name = self.path.stem + if module_name in sys.modules: + raise ValueError( + f"The module name for test case {self.name} is already in use" + ) + + # Register the module name with pytest to allow for better error reporting + # when the test case fails. + pytest.register_assert_rewrite(module_name) + + # We load the module as a top-level module to avoid requiring adding + # `__init__.py` files to the test directories. We make sure to unload + # the module (and other modules that might have been loaded by the test + # case) after the test case is run to avoid polluting the module namespace. + original_modules = sys.modules.copy() + original_sys_path = sys.path.copy() + sys.path.insert(0, str(self.path.parent)) + try: + module = importlib.import_module(module_name) + except ImportError as exc: + raise ImportError( + f"Test case {self.name} could not be imported from {self.relative_path}, " + f"make sure the file exists and is a valid Python module: {exc}" + ) from exc + finally: + sys.path = original_sys_path + sys.modules = original_modules + importlib.invalidate_caches() + gc.collect() + + return module + + def load_test_case(self) -> ApiClientTestCase: + """Return the loaded test case from the test case file.""" + module = self.load_test_module() + + required_attrs = ["assert_stub_method_call", "grpc_response"] + if missing_attrs := [ + attr for attr in required_attrs if not hasattr(module, attr) + ]: + raise AttributeError( + f"Test case file {self.relative_path} is missing required attributes: " + + ", ".join(missing_attrs) + ) + + try: + test_case = ApiClientTestCase( + client_args=getattr(module, "client_args", ()), + client_kwargs=getattr(module, "client_kwargs", {}), + assert_stub_method_call=module.assert_stub_method_call, + grpc_response=module.grpc_response, + assert_client_result=getattr(module, "assert_client_result", None), + assert_client_exception=getattr( + module, "assert_client_exception", None + ), + ) + except ValueError as exc: + raise ValueError( + f"Test case file {self.relative_path} is invalid: {exc}" + ) from exc + + return test_case + + async def test_call( + self, + *, + client: ClientProtocol, + stub_method_name: str, + call_client_method: Callable[ + [ClientProtocol, str, ApiClientTestCase, AsyncExitStack], + Awaitable[tuple[MagicMock, Any, Exception | None]], + ], + exit_stack: AsyncExitStack, + ) -> None: + """Run a test for a unary-unary gRPC call.""" + _logger.debug( + "Running test case %r for `%s()` (%s)", + self.name, + self.client_method_name, + stub_method_name, + ) + test_case = self.load_test_case() + _logger.debug("Loaded test case %r from %s", self.name, self.relative_path) + client_should_raise = test_case.assert_client_exception is not None + + # Call the client method and collect the result/exception + stub_method_mock, client_result, client_raised_exception = ( + await call_client_method(client, stub_method_name, test_case, exit_stack) + ) + + if client_raised_exception is not None: + if not client_should_raise: + # Expected a result, but got an exception. Test premise failed. + # We raise an AssertionError here to indicate that the test case + # failed, but we chain it to the original exception to keep the + # original traceback. + # We need to check this before running the assert_stub_method_call() because + # if an exception was raised, the stub method might not have been + # called at all. + _logger.debug( + "Raising AssertionError because the client raised an unexpected exception: %r", + client_raised_exception, + ) + raise AssertionError( + f"{self.relative_path}: The client call to method {self.client_method_name}() " + f"raised an exception {client_raised_exception!r}, but a result was expected " + "(the test case provided a assert_client_result() function and not a " + "assert_client_exception() function)" + ) from client_raised_exception + + _logger.debug( + "The client raised an expected exception, calling `assert_client_exception(%r)`", + client_raised_exception, + ) + # Expected an exception, and got one, so run the user's + # assertion function on the exception before we validate the + # gRPC call, because if the wrong exception was raised, the stub + # method might not have been called at all. + # We also chain the exception to the original exception to keep the + # original traceback for a better debugging experience. + assert test_case.assert_client_exception is not None + try: + test_case.assert_client_exception(client_raised_exception) + except AssertionError as err: + raise err from client_raised_exception + + # Validate the gRPC stub call was made correctly + # This will report any failed assertions as a test FAIL, and any other + # unexpected exception as a test ERROR, always pointing to the exact + # location where the issue originated. + test_case.assert_stub_method_call(stub_method_mock) + + if client_raised_exception is None: + if client_should_raise: + # Expected an exception, but got a result. Test premise failed. + pytest.fail( + f"{self.relative_path}: The client call to method " + f"{self.client_method_name}() didn't raise the expected exception " + f"{test_case.grpc_response!r}, instead it returned {client_result!r}", + pytrace=False, + ) + + # Expected a result, and got one, so run the user's assertion + # function on the result. + elif test_case.assert_client_result is None: + pytest.fail( + f"{self.relative_path}: The client method " + f"{self.client_method_name}() returned a result, but an " + "exception was expected (the test case provided a " + "assert_client_exception() function and not a " + "assert_client_result() function)", + pytrace=False, + ) + + if inspect.iscoroutinefunction(test_case.assert_client_result): + _logger.debug("Awaiting `assert_client_result(%r)`", client_result) + async with asyncio.timeout(60): + await test_case.assert_client_result(client_result) + else: + _logger.debug("Calling `assert_client_result(%r)`", client_result) + test_case.assert_client_result(client_result) + + async def test_unary_unary_call( + self, + client: ClientProtocol, + stub_method_name: str, + ) -> None: + """Run a test for a unary-unary gRPC call.""" + async with AsyncExitStack() as exit_stack: + await self.test_call( + client=client, + stub_method_name=stub_method_name, + call_client_method=self.call_unary_method, + exit_stack=exit_stack, + ) + + async def test_unary_stream_call( + self, + client: ClientProtocol, + stub_method_name: str, + ) -> None: + """Run a test for a unary-stream gRPC call.""" + async with AsyncExitStack() as exit_stack: + await self.test_call( + client=client, + stub_method_name=stub_method_name, + call_client_method=self.call_stream_method, + exit_stack=exit_stack, + ) + + async def call_unary_method( + self, + client: ClientProtocol, + stub_method_name: str, + test_case: ApiClientTestCase, + _: AsyncExitStack, + ) -> tuple[AsyncMock, Any, Exception | None]: + """Call a unary method on the client.""" + _logger.debug("Preparing stub gRPC unary call `%s()`", stub_method_name) + # Prepare the mock for the gRPC stub method + stub_method_mock = AsyncMock(name=stub_method_name) + if isinstance(test_case.grpc_response, Exception): + stub_method_mock.side_effect = test_case.grpc_response + else: + stub_method_mock.return_value = test_case.grpc_response + _logger.debug( + "Patching %s.%s with %s", client.stub, stub_method_name, stub_method_mock + ) + setattr(client.stub, stub_method_name, stub_method_mock) + + # Call the client method and collect the result/exception + client_method = getattr(client, self.client_method_name) + # We use a separate variable for the result if it is an exception to be able + # to support weird cases where the method actually returns an exception + # instead of raising it. + client_result: Any = None + client_raised_exception: Exception | None = None + try: + _logger.debug( + "Calling client method `%s(*%r, **%r)`", + self.client_method_name, + test_case.client_args, + test_case.client_kwargs, + ) + client_result = await client_method( + *test_case.client_args, **test_case.client_kwargs + ) + _logger.debug("Client method result: %r", client_result) + except Exception as err: # pylint: disable=broad-exception-caught + _logger.debug("Client method raised an exception: %r", err) + client_raised_exception = err + + return (stub_method_mock, client_result, client_raised_exception) + + async def call_stream_method( + self, + client: ClientProtocol, + stub_method_name: str, + test_case: ApiClientTestCase, + exit_stack: AsyncExitStack, + ) -> tuple[MagicMock, Any, Exception | None]: + """Call a stream method on the client.""" + _logger.debug("Preparing stub gRPC stream call `%s()`", stub_method_name) + stub_method_mock = MagicMock(name=stub_method_name) + + if isinstance(test_case.grpc_response, Exception): + _logger.debug( + "`grpc_response` is an exception, setting as side_effect: %r", + test_case.grpc_response, + ) + stub_method_mock.side_effect = test_case.grpc_response + else: + + def create_response_wrapper(*_: Any, **__: Any) -> AsyncIterator[Any]: + """Create a response wrapper for the gRPC response.""" + wrapper = _IterableResponseWrapper(test_case.grpc_response) + exit_stack.push_async_exit(aclosing(wrapper)) + return wrapper + + stub_method_mock.side_effect = create_response_wrapper + _logger.debug( + "Patching %s.%s with %s", client.stub, stub_method_name, stub_method_mock + ) + setattr(client.stub, stub_method_name, stub_method_mock) + + # Call the client method and collect the result/exception + client_method = getattr(client, self.client_method_name) + # We use a separate variable for the result if it is an exception to be able + # to support weird cases where the method actually returns an exception + # instead of raising it. + client_result: Any = None + client_raised_exception: Exception | None = None + try: + _logger.debug( + "Calling client method `%s(*%r, **%r)`", + self.client_method_name, + test_case.client_args, + test_case.client_kwargs, + ) + client_result = client_method( + *test_case.client_args, **test_case.client_kwargs + ) + _logger.debug("Client method result: %r", client_result) + except Exception as err: # pylint: disable=broad-exception-caught + _logger.debug("Client method raised an exception: %r", err) + client_raised_exception = err + + # Yield control to allow the gRPC streamer to start running + await asyncio.sleep(0) + + return (stub_method_mock, client_result, client_raised_exception) + + +def get_test_specs( + client_method_name: str, + *, + tests_dir: str | Path, + suffixes: Iterable[str] = ("_case",), +) -> Iterable[ApiClientTestCaseSpec]: + """Get all test names for a specific stub call. + + Args: + client_method_name: The name of the client method being tested. + tests_dir: The directory where the test cases are located (inside the + `client_method_name` sub-directory). + suffixes: The file suffixes to look for. + + Returns: + A iterable of test case specs. + + Raises: + ValueError: If the test directory does not exist or is not a directory, + the `test_cases_subdir` is not a relative path, or if no test files + are found in the test directory. + """ + tests_dir = Path(tests_dir) + if not tests_dir.is_absolute(): + raise ValueError(f"{tests_dir} must be an absolute path") + + test_dir = tests_dir / client_method_name + if not test_dir.exists(): + raise ValueError(f"Tests directory {test_dir} does not exist") + if not test_dir.is_dir(): + raise ValueError(f"Tests directory {test_dir} is not a directory") + + specs = list( + itertools.chain( + ( + ApiClientTestCaseSpec( + name=p.stem[: -len(suffix)], + client_method_name=client_method_name, + path=p.resolve(), + relative_path=p.relative_to(Path.cwd()), + ) + for suffix in suffixes + for p in test_dir.glob(f"*{suffix}.py") + ) + ) + ) + if not specs: + globs = [f"*{suffix}.py" for suffix in suffixes] + raise ValueError( + f"No test files found in {test_dir} matching {', '.join(globs)}" + ) + + return specs + + +class ClientProtocol(Protocol): + """Protocol for client objects with a stub property.""" + + @property + def stub(self) -> Any: + """Return the gRPC stub.""" + ... # pylint: disable=unnecessary-ellipsis + + +def make_grpc_error( + code: StatusCode, + *, + initial_metadata: Metadata = Metadata(), + trailing_metadata: Metadata = Metadata(), + details: str | None = None, + debug_error_string: str | None = None, +) -> AioRpcError: + """Create a gRPC error for testing purposes.""" + return AioRpcError( + code=code, + initial_metadata=initial_metadata, + trailing_metadata=trailing_metadata, + details=details, + debug_error_string=debug_error_string, + ) + + +# generic_cls uses Any because it doesn't really take a `type` (which might be +# what looks more intuitive), technically is a `typing._GenericAlias`, but this +# is not a public API and we don't want to depend on it. There is also +# `types.GenericAlias` but this one is only used for built-in generics, like +# `list[int]`, so we can't use it either. +@functools.lru_cache(maxsize=1024) +def is_subclass_of_generic(cls: type[Any], generic_cls: Any) -> bool: + """Return whether `cls` is a subclass of a parameterized generic `generic_cls`. + + Check at runtime whether `cls` is a subclass of a parameterized generic + `generic_cls`., e.g. `is_subclass_generic(DerivedInt, GenericBase[int])`. + + Args: + cls: The class to check. + generic_cls: The parameterized generic type to check against. + + Returns: + True if `cls` is a subclass of `generic_cls`, False otherwise. + + Raises: + TypeError: If `generic_cls` is not a parameterized generic type. + """ + # Check if 'generic_cls' is actually a parameterized generic type + # (like list[int], GenericBase[str], etc.). + # get_origin returns None for non-generics or non-parameterized generics. + origin = get_origin(generic_cls) + if origin is None: + raise TypeError(f"generic_cls {generic_cls!r} must be a parameterized generic") + + # First check the raw generic relationship (e.g., is DerivedInt a subclass + # of GenericBase?). + if not issubclass(cls, origin): + return False + + # Inspect __orig_bases__ throughout the MRO (Method Resolution Order). + # This handles inheritance chains correctly (sub-sub classes). + # We iterate through getmro(cls) to check not just direct parents, but all + # ancestors. + for base in inspect.getmro(cls): + # __orig_bases__ stores the base classes *as they were written*, + # including type parameters. Might not exist on all classes (like 'object'). + # getattr avoids an AttributeError if __orig_bases__ is missing. + # Python3.12 provides types.get_original_bases(cls) to get __orig_bases__, + # this can be updated when we drop support for older versions. + for orig_base in getattr(base, "__orig_bases__", ()): + # Check if the origin of this specific original base matches our + # target origin AND if the arguments match our target arguments. + # get_args returns a tuple, so this correctly handles multi-generic + # bases by comparing tuples element-wise (e.g., (str, int) == (str, + # int)). + if get_origin(orig_base) is origin and get_args(orig_base) == get_args( + generic_cls + ): + return True + + return False + + +class patch_client_class( # pylint: disable=invalid-name + ContextDecorator, Generic[ClientT, StubT] +): + """Patches the client class for testing. + + This avoids the class to really connect anywhere, and creates a mock + channel and stub instead. + + It can be used as a context manager or decorator. + + Example: Usage as a context manager + + ```python + @patch_client_class(SomeApiClient, SomeApiStub) + def test_some_function(client_class: SomeApiClient): + client = client_class(...) + client.stub.some_method.return_value = ... + # Your test code here + ``` + + Example: Usage as a decorator + ```python + def test_some_function(): + with patch_client_class(SomeApiClient, SomeApiStub) as client_class: + client = client_class(...) + client.stub.some_method.return_value = ... + # Your test code here + ``` + """ + + def __init__(self, client_class: type[ClientT], stub_class: type[StubT]) -> None: + """Context manager that patches the client for testing. + + Args: + client_class: The client class to patch. + stub_class: The stub class to patch. + """ + # We need the type ignores here because: + # 1. mypy doesn't consider types hashable (needed for the + # is_subclass_of_generic cache), but they are, based on their memory + # address, which is enough for us. + # 2. mypy expect classes, TypeVar or other type expressions, but we are + # using a *regular variable* here. In general this is wrong, and + # can't be properly type checked, but it does what it should at + # runtime. + assert is_subclass_of_generic( + client_class, BaseApiClient[stub_class] # type: ignore[valid-type] + ) + self._client_class: type[ClientT] = client_class + self._patched_client_class = patch.object( + client_class, "connect", autospec=True, side_effect=self._fake_connect + ) + + def __enter__(self) -> type[ClientT]: + """Enter the context manager.""" + self._patched_client_class.__enter__() + return self._client_class + + def __exit__(self, *args: Any, **kwargs: Any) -> None: + """Exit the context manager.""" + self._patched_client_class.__exit__(*args, **kwargs) + + def _fake_connect( + self, + client: ClientT, + server_url: str | None = None, + auth_key: str | None = None, # pylint: disable=unused-argument + sign_secret: str | None = None, # pylint: disable=unused-argument + ) -> None: + """Fake connect method that does nothing.""" + # pylint: disable=protected-access + if server_url is not None and server_url != client._server_url: # URL changed + client._server_url = server_url + elif client.is_connected: + return + client._channel = MagicMock(name="_channel", spec=Channel) + # We don't spec the stub because we would need the `AsyncStub` for that, + # but it only exists for type hints, so it can't be used at runtime. + client._stub = MagicMock(name="_stub") + # pylint: enable=protected-access + + +async def _iter_to_async_iter(it: Iterable[Any]) -> AsyncIterator[Any]: + """Return an async iterator from an iterable.""" + for item in it: + yield item + + +class _IterableResponseWrapper(AsyncIterator[Any]): + """Wrap a response to make it an async iterator. + + Supports + """ + + def __init__(self, response: Any) -> None: + """Initialize the wrapper with the response.""" + self._response = response + self._iter_is_async = False + self._iter_is_generator = False + self._iter: Any + + if inspect.isasyncgenfunction(response): + _logger.debug( + "`grpc_response` is an async generator function: %r", response + ) + self._iter_is_async = True + self._iter_is_generator = True + self._iter = response() + elif inspect.isgeneratorfunction(response): + _logger.debug("`grpc_response` is a generator function: %r", response) + self._iter_is_generator = True + self._iter = response() + elif inspect.isasyncgen(response): + _logger.debug("`grpc_response` is an async generator: %r", response) + self._iter_is_async = True + self._iter_is_generator = True + self._iter = response + elif inspect.isgenerator(response): + _logger.debug("`grpc_response` is a generator: %r", response) + self._iter_is_generator = True + self._iter = response + elif isinstance(response, AsyncIterable): + _logger.debug("`grpc_response` is an async iterable: %r", response) + self._iter_is_async = True + self._iter = aiter(response) + # We check for str and bytes here because they are iterable, but it + # would be very unlikely that users want to use them as iterator. + # If they do, they can just use grpc_response = iter([...]) to explicitly + # create an iterator from it. + elif isinstance(response, (str, bytes)): + _logger.debug( + "`grpc_response` is a string or bytes, wrapping in a list as an iterator: %r", + response, + ) + self._iter = iter([response]) + elif isinstance(response, Iterable): + _logger.debug("`grpc_response` is an iterable: %r", response) + self._iter = iter(response) + else: + _logger.debug( + "`grpc_response` is not iterable, wrapping in a list as an iterator: %r", + response, + ) + self._iter = iter([response]) + + def __aiter__(self) -> _IterableResponseWrapper: + """Return the iterator.""" + return self + + async def __anext__(self) -> Any: + """Return the next item from the iterator.""" + if self._iter_is_async: + _logger.debug("`grpc_response` is async, awaiting next item") + return await anext(self._iter) + + try: + _logger.debug("`grpc_response` is sync, getting next item without await") + return next(self._iter) + except StopIteration as exc: + raise StopAsyncIteration from exc + + async def aclose(self) -> None: + """Close the iterator.""" + if self._iter_is_generator: + if self._iter_is_async: + _logger.debug( + "`grpc_response` is async generator, awaiting for `aclose()`" + ) + await self._iter.aclose() + else: + _logger.debug("`grpc_response` is generator, calling `close()`") + self._iter.close()