diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 3600e1d..5cb8ba1 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -10,7 +10,7 @@ ## New Features - +* Helper function to calculate energy metrics ## Bug Fixes diff --git a/src/frequenz/client/reporting/_client.py b/src/frequenz/client/reporting/_client.py index c06de0b..2f7270b 100644 --- a/src/frequenz/client/reporting/_client.py +++ b/src/frequenz/client/reporting/_client.py @@ -40,6 +40,11 @@ or a pandas DataFrame. """ +CumulativeEnergy = namedtuple( + "CumulativeEnergy", ["start_time", "end_time", "consumption", "production"] +) +"""Type for cumulative energy consumption and production over a specified time.""" + @dataclass(frozen=True) class ComponentsDataBatch: @@ -252,3 +257,82 @@ def dt2ts(dt: datetime) -> PBTimestamp: except grpcaio.AioRpcError as e: print(f"RPC failed: {e}") return + + +# pylint: disable=too-many-arguments +async def cumulative_energy( + client: ReportingApiClient, + microgrid_id: int, + component_id: int, + start_time: datetime, + end_time: datetime, + use_active_power: bool, + resolution: int | None = None, +) -> CumulativeEnergy: + """ + Calculate the cumulative energy consumption and production over a specified time range. + + Args: + client: The client used to fetch the metric samples from the Reporting API. + microgrid_id: The ID of the microgrid. + component_id: The ID of the component within the microgrid. + start_time: The start date and time for the period. + end_time: The end date and time for the period. + use_active_power: If True, use the 'AC_ACTIVE_POWER' metric. + If False, use the 'AC_ACTIVE_ENERGY' metric. + resolution: The resampling resolution for the data, represented in seconds. + If None, no resampling is applied. + + Returns: + EnergyMetric: A named tuple with start_time, end_time, consumption, and production. + """ + metric = Metric.AC_ACTIVE_POWER if use_active_power else Metric.AC_ACTIVE_ENERGY + + metric_samples = [ + sample + async for sample in client.list_microgrid_components_data( + microgrid_components=[(microgrid_id, [component_id])], + metrics=metric, + start_dt=start_time, + end_dt=end_time, + resolution=resolution, + ) + ] + + values = [ + sample.value + for sample in metric_samples + if start_time <= sample.timestamp <= end_time + ] + + if values: + if use_active_power: + # Convert power to energy if using AC_ACTIVE_POWER + time_interval_hours = ( + resolution or (end_time - start_time).total_seconds() + ) / 3600.0 + consumption = sum( + max(e2 - e1, 0) * time_interval_hours + for e1, e2 in zip(values, values[1:]) + ) + production = sum( + min(e2 - e1, 0) * time_interval_hours + for e1, e2 in zip(values, values[1:]) + ) + else: + # Directly use energy values if using AC_ACTIVE_ENERGY + consumption = sum( + e2 - e1 for e1, e2 in zip(values, values[1:]) if e2 - e1 > 0 + ) + production = sum( + e2 - e1 for e1, e2 in zip(values, values[1:]) if e2 - e1 < 0 + ) + else: + consumption = production = 0.0 + + return CumulativeEnergy( + start_time=start_time, + end_time=end_time, + consumption=consumption, + production=production, + )