Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions src/frequenz/client/microgrid/_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
from .component._category import ComponentCategory
from .component._component import Component
from .component._component_proto import component_from_proto
from .component._connection import ComponentConnection
from .component._connection_proto import component_connection_from_proto
from .component._types import ComponentTypes
from .metrics._bounds import Bounds
from .metrics._metric import Metric
Expand Down Expand Up @@ -221,6 +223,66 @@ async def list_components( # noqa: DOC502 (raises ApiClientError indirectly)

return map(component_from_proto, component_list.components)

async def list_connections( # noqa: DOC502 (raises ApiClientError indirectly)
self,
*,
sources: Iterable[ComponentId | Component] = (),
destinations: Iterable[ComponentId | Component] = (),
) -> Iterable[ComponentConnection]:
"""Fetch all the connections present in the local microgrid.

Electrical components are a part of a microgrid's electrical infrastructure
are can be connected to each other to form an electrical circuit, which can
then be represented as a graph.

The direction of a connection is always away from the grid endpoint, i.e.
aligned with the direction of positive current according to the passive sign
convention: https://en.wikipedia.org/wiki/Passive_sign_convention

The request may be filtered by `source`/`destination` component(s) of individual
connections. If provided, the `sources` and `destinations` filters have an
`AND` relationship between each other, meaning that they are applied serially,
but an `OR` relationship with other elements in the same list.

Example:
If `sources = {1, 2, 3}`, and `destinations = {4,
5, 6}`, then the result should have all the connections where:

* Each `source` component ID is either `1`, `2`, OR `3`; **AND**
* Each `destination` component ID is either `4`, `5`, OR `6`.

Args:
sources: The component from which the connections originate.
destinations: The component at which the connections terminate.

Returns:
Iterator whose elements are all the connections in the local microgrid.

Raises:
ApiClientError: If the are any errors communicating with the Microgrid API,
most likely a subclass of
[GrpcError][frequenz.client.microgrid.GrpcError].
"""
connection_list = await client.call_stub_method(
self,
lambda: self.stub.ListConnections(
microgrid_pb2.ListConnectionsRequest(
starts=map(_get_component_id, sources),
ends=map(_get_component_id, destinations),
),
timeout=DEFAULT_GRPC_CALL_TIMEOUT,
),
method_name="ListConnections",
)

return (
conn
for conn in map(
component_connection_from_proto, connection_list.connections
)
if conn is not None
)

async def set_component_power_active( # noqa: DOC502 (raises ApiClientError indirectly)
self,
component: ComponentId | Component,
Expand Down
2 changes: 2 additions & 0 deletions src/frequenz/client/microgrid/component/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from ._category import ComponentCategory
from ._chp import Chp
from ._component import Component
from ._connection import ComponentConnection
from ._converter import Converter
from ._crypto_miner import CryptoMiner
from ._electrolyzer import Electrolyzer
Expand Down Expand Up @@ -67,6 +68,7 @@
"Chp",
"Component",
"ComponentCategory",
"ComponentConnection",
"ComponentStatus",
"ComponentTypes",
"Converter",
Expand Down
72 changes: 72 additions & 0 deletions src/frequenz/client/microgrid/component/_connection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# License: MIT
# Copyright © 2025 Frequenz Energy-as-a-Service GmbH

"""Component connection."""

import dataclasses
from datetime import datetime, timezone

from frequenz.client.common.microgrid.components import ComponentId

from .._lifetime import Lifetime


@dataclasses.dataclass(frozen=True, kw_only=True)
class ComponentConnection:
"""A single electrical link between two components within a microgrid.

A component connection represents the physical wiring as viewed from the grid
connection point, if one exists, or from the islanding point, in case of an islanded
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is "islanding point" a fixed term we use? "island point" would seem more natural to me

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the grid connection point, if one exists, or from the islanding point, in case of an islanded microgrids

Also, I wonder if it would be easier to read if we describe this only once, but we repeat this exact phrase at least two more times..

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like, the other times we could at least say "grid connection/islanding point" instead of the whole phrase. But maybe there is also a more neutral term

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All this should go to the common API, it is all taken from there. I'm not sure how are we going to cope with keeping these in sync, but I guess this is only an issue until we reach 1.0.0, then changes should be very minimal. Maybe when 1.0.0 is released we need to go through all the docs and check they are consistent.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one I will pass, as it diverges too much from the source, I'd rather fix it in the common API first to agree in what's the best wording first with @tiyash-basu-frequenz.

microgrids.

Note: Physical Representation
This object is not about data flow but rather about the physical
electrical connections between components. Therefore, the IDs for the
source and destination components correspond to the actual setup within
the microgrid.

Note: Direction
The direction of the connection follows the flow of current away from the
grid connection point, or in case of islands, away from the islanding
point. This direction is aligned with positive current according to the
[Passive Sign Convention]
(https://en.wikipedia.org/wiki/Passive_sign_convention).

Note: Historical Data
The timestamps of when a connection was created and terminated allow for
tracking the changes over time to a microgrid, providing insights into
when and how the microgrid infrastructure has been modified.
"""

source: ComponentId
"""The unique identifier of the component where the connection originates.

This is aligned with the direction of current flow away from the grid connection
point, or in case of islands, away from the islanding point.
"""

destination: ComponentId
"""The unique ID of the component where the connection terminates.

This is the component towards which the current flows.
"""

operational_lifetime: Lifetime = dataclasses.field(default_factory=Lifetime)
"""The operational lifetime of the connection."""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whats an operational Lifetime?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is still a bit uncertain, sort of. For the microgrid API, as long as we don't add a way to get historical data for the component graph, this should probably always be empty. I think it is mostly for historical data to signal what was the timespan this component was actually part of the component graph. Not sure if this relates to the operational mode (like if it was disabled if it counts as operational or not).

This still probably needs quite a bit of though and polishing, I don't think we'll solve it for the time being, but I could add some warning that this field utility is still a bit uncertain. I considered also not parsing this field from the protobuf message and skip it completely here, that would be another option, but we already merged some code that has operational_lifetime in it, so it would mean more work to remove it now.

@tiyash-basu-frequenz any recommendations or anything to add?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now I'm also leaving this one at it is until there is a clear path on how to move forward.


def __post_init__(self) -> None:
"""Ensure that the source and destination components are different."""
if self.source == self.destination:
raise ValueError("Source and destination components must be different")

def is_operational_at(self, timestamp: datetime) -> bool:
"""Check whether this connection is operational at a specific timestamp."""
return self.operational_lifetime.is_operational_at(timestamp)

def is_operational_now(self) -> bool:
"""Whether this connection is currently operational."""
return self.is_operational_at(datetime.now(timezone.utc))

def __str__(self) -> str:
"""Return a human-readable string representation of this instance."""
return f"{self.source}->{self.destination}"
104 changes: 104 additions & 0 deletions src/frequenz/client/microgrid/component/_connection_proto.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
# License: MIT
# Copyright © 2025 Frequenz Energy-as-a-Service GmbH

"""Loading of ComponentConnection objects from protobuf messages."""

import logging

from frequenz.api.common.v1.microgrid.components import components_pb2
from frequenz.client.common.microgrid.components import ComponentId

from .._lifetime import Lifetime
from .._lifetime_proto import lifetime_from_proto
from ._connection import ComponentConnection

_logger = logging.getLogger(__name__)


def component_connection_from_proto(
message: components_pb2.ComponentConnection,
) -> ComponentConnection | None:
"""Create a `ComponentConnection` from a protobuf message."""
major_issues: list[str] = []
minor_issues: list[str] = []

connection = component_connection_from_proto_with_issues(
message, major_issues=major_issues, minor_issues=minor_issues
)

if major_issues:
_logger.warning(
"Found issues in component connection: %s | Protobuf message:\n%s",
", ".join(major_issues),
message,
)
if minor_issues:
_logger.debug(
"Found minor issues in component connection: %s | Protobuf message:\n%s",
", ".join(minor_issues),
message,
)

return connection


def component_connection_from_proto_with_issues(
message: components_pb2.ComponentConnection,
*,
major_issues: list[str],
minor_issues: list[str],
) -> ComponentConnection | None:
"""Create a `ComponentConnection` from a protobuf message collecting issues.

This function is useful when you want to collect issues during the parsing
of multiple connections, rather than logging them immediately.

Args:
message: The protobuf message to parse.
major_issues: A list to collect major issues found during parsing.
minor_issues: A list to collect minor issues found during parsing.

Returns:
A `ComponentConnection` object created from the protobuf message, or
`None` if the protobuf message is completely invalid and a
`ComponentConnection` cannot be created.
"""
source_component_id = ComponentId(message.source_component_id)
destination_component_id = ComponentId(message.destination_component_id)
if source_component_id == destination_component_id:
major_issues.append(
f"connection ignored: source and destination are the same ({source_component_id})",
)
return None

lifetime = _get_operational_lifetime_from_proto(
message, major_issues=major_issues, minor_issues=minor_issues
)

return ComponentConnection(
source=source_component_id,
destination=destination_component_id,
operational_lifetime=lifetime,
)


def _get_operational_lifetime_from_proto(
message: components_pb2.ComponentConnection,
*,
major_issues: list[str],
minor_issues: list[str],
) -> Lifetime:
"""Get the operational lifetime from a protobuf message."""
if message.HasField("operational_lifetime"):
try:
return lifetime_from_proto(message.operational_lifetime)
except ValueError as exc:
major_issues.append(
f"invalid operational lifetime ({exc}), considering it as missing "
"(i.e. always operational)",
)
else:
minor_issues.append(
"missing operational lifetime, considering it always operational",
)
return Lifetime()
25 changes: 25 additions & 0 deletions tests/client_test_cases/list_connections/empty_case.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# License: MIT
# Copyright © 2025 Frequenz Energy-as-a-Service GmbH

"""Test list_connections with no connections: result should be empty."""

from typing import Any

from frequenz.api.microgrid.v1 import microgrid_pb2

# No client_args or client_kwargs needed for this call


def assert_stub_method_call(stub_method: Any) -> None:
"""Assert that the gRPC request matches the expected request."""
stub_method.assert_called_once_with(
microgrid_pb2.ListConnectionsRequest(starts=[], ends=[]), timeout=60.0
)


grpc_response = microgrid_pb2.ListConnectionsResponse(connections=[])


def assert_client_result(result: Any) -> None: # noqa: D103
"""Assert that the client result is an empty list."""
assert not list(result)
30 changes: 30 additions & 0 deletions tests/client_test_cases/list_connections/error_case.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# License: MIT
# Copyright © 2025 Frequenz Energy-as-a-Service GmbH

"""Test list_connections with error."""

from typing import Any

from frequenz.api.microgrid.v1 import microgrid_pb2
from grpc import StatusCode

from frequenz.client.microgrid import PermissionDenied
from tests.util import make_grpc_error

# No client_args or client_kwargs needed for this call


def assert_stub_method_call(stub_method: Any) -> None:
"""Assert that the gRPC request matches the expected request."""
stub_method.assert_called_once_with(
microgrid_pb2.ListConnectionsRequest(starts=[], ends=[]), timeout=60.0
)


grpc_response = make_grpc_error(StatusCode.PERMISSION_DENIED)


def assert_client_exception(exception: Exception) -> None:
"""Assert that the client exception matches the expected error."""
assert isinstance(exception, PermissionDenied)
assert exception.grpc_error == grpc_response
43 changes: 43 additions & 0 deletions tests/client_test_cases/list_connections/mixed_filters_case.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# License: MIT
# Copyright © 2025 Frequenz Energy-as-a-Service GmbH

"""Test list_connections with mixed ComponentId and Component objects."""

from datetime import datetime, timezone
from typing import Any

from frequenz.api.microgrid.v1 import microgrid_pb2
from frequenz.client.common.microgrid import MicrogridId
from frequenz.client.common.microgrid.components import ComponentId

from frequenz.client.microgrid.component import (
GridConnectionPoint,
Meter,
)

# Mix ComponentId and Component objects
grid_component = GridConnectionPoint(
id=ComponentId(1), microgrid_id=MicrogridId(1), rated_fuse_current=10_000
)
meter_component = Meter(id=ComponentId(4), microgrid_id=MicrogridId(1))

client_kwargs = {
"sources": [grid_component, ComponentId(2)],
"destinations": [ComponentId(3), meter_component],
}


def assert_stub_method_call(stub_method: Any) -> None:
"""Assert that the gRPC request matches the expected request."""
stub_method.assert_called_once_with(
microgrid_pb2.ListConnectionsRequest(starts=[1, 2], ends=[3, 4]), timeout=60.0
)


lifetime_start = datetime(2023, 1, 1, 0, 0, 0, tzinfo=timezone.utc)
grpc_response = microgrid_pb2.ListConnectionsResponse(connections=[])


def assert_client_result(actual_result: Any) -> None:
"""Assert that the client result matches the expected connections list."""
assert list(actual_result) == []
Loading
Loading