diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index d471f0e..0b51df5 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -20,5 +20,7 @@ - Published a locale-aware `ColumnMapper` utility that reads the YAML schema so notebooks can seamlessly move between raw API headers, canonical identifiers, and localized display labels. ## Bug Fixes +- `frequenz.lib.notebooks.reporting.utils.helpers.add_energy_flows()` now infers consumption totals from existing data when explicit consumption columns are missing, preventing inconsistent outputs in notebook pipelines that only provide grid and production inputs. +- `frequenz.lib.notebooks.reporting.metrics.consumption()` reindexes optional production/battery inputs and raises a warning when inferred consumption turns negative so sign-convention issues are surfaced immediately. diff --git a/src/frequenz/lib/notebooks/reporting/metrics/reporting_metrics.py b/src/frequenz/lib/notebooks/reporting/metrics/reporting_metrics.py index 9055fa4..4aff87d 100644 --- a/src/frequenz/lib/notebooks/reporting/metrics/reporting_metrics.py +++ b/src/frequenz/lib/notebooks/reporting/metrics/reporting_metrics.py @@ -96,7 +96,11 @@ def production_excess_in_bat( production_excess_series = production_excess( production, consumption, production_is_positive=production_is_positive ) - battery = battery.astype("float64").clip(lower=0) + battery = ( + battery.astype("float64") + .reindex(production_excess_series.index, fill_value=0.0) + .clip(lower=0) + ) return pd.concat([production_excess_series, battery], axis=1).min(axis=1) @@ -199,44 +203,60 @@ def production_self_share( def consumption( - df: pd.DataFrame, production_cols: list[str] | None, grid_cols: list[str] + grid: pd.Series, + production: pd.Series | None = None, + battery: pd.Series | None = None, ) -> pd.Series: - """Infer the consumption column from grid and production data if missing. + """Infer total consumption from grid import, on-site production, and battery power. - If a 'consumption' column is not present, it is computed as the total grid import - (sum of all grid columns) minus total production. Safely handles missing or - empty production columns by treating them as zero. + Computes: consumption = grid_import - production (raw production-neg values) + - battery (raw - with positive and negative values). Args: - df: Input DataFrame containing grid and optional production columns. - production_cols: List of production column names (e.g., "pv", "chp", "battery" or "ev"). - Can be None or empty if no on-site generation is present. - grid_cols: List of one or more grid column names. + grid: Series of grid import values (e.g., kW or MW). + production: Optional Series of on-site production values. + If None, production is treated as zero. + battery: Optional Series representing battery discharge/charge power. + Positive values increase inferred consumption (battery discharge), + while negative values decrease it (battery charging). If None, the + battery contribution is treated as zero. Returns: A Series representing inferred total consumption, named `"consumption"`. Raises: - ValueError: If `grid_cols` is empty. + ValueError: If `grid` is None. + + Warns: + UserWarning: If negative inferred consumption values are detected, + which may indicate times of net export or a sign-convention mismatch. """ - if "consumption" in df.columns: - return df["consumption"] + if grid is None: + raise ValueError("`grid` must be provided as a pandas Series.") - if not grid_cols: - raise ValueError("At least one grid column must be specified in grid_cols.") + grid_s = grid.astype("float64").fillna(0) - # Compute total grid import and total production - grid_total = df[grid_cols].sum(axis=1) + # Ensure raw production values are used (usually negative for production) + if production is None: + prod_s = pd.Series(0.0, index=grid_s.index) + else: + prod_s = production.astype("float64").reindex(grid_s.index, fill_value=0.0) - # Handle empty production columns safely - if production_cols: - production_total = df[production_cols].sum(axis=1) + if battery is None: + battery_s = pd.Series(0.0, index=grid_s.index) else: - # No production → production_total = 0 - production_total = pd.Series(0, index=df.index) + battery_s = battery.astype("float64") + battery_s = battery_s.reindex(grid_s.index, fill_value=0.0) - # Compute inferred consumption (Series) - consumption = grid_total - production_total - consumption.name = "consumption" + result = (grid_s - prod_s - battery_s).astype("float64") + result.name = "consumption" - return consumption + if (result < 0).any(): + warnings.warn( + "Negative inferred consumption detected. This can occur during net export " + "or due to a sign-convention mismatch between grid and production.", + UserWarning, + stacklevel=2, + ) + + return result diff --git a/src/frequenz/lib/notebooks/reporting/utils/helpers.py b/src/frequenz/lib/notebooks/reporting/utils/helpers.py index dc9df15..9f11115 100644 --- a/src/frequenz/lib/notebooks/reporting/utils/helpers.py +++ b/src/frequenz/lib/notebooks/reporting/utils/helpers.py @@ -23,6 +23,7 @@ import pandas as pd from frequenz.lib.notebooks.reporting.metrics.reporting_metrics import ( + asset_production, grid_feed_in, production_excess, production_excess_in_bat, @@ -43,11 +44,14 @@ def _get_numeric_series(df: pd.DataFrame, col: str | None) -> pd.Series: col: Column name to retrieve. If None or missing, zeros are returned. Returns: - A float64 Series with non-negative values, matching the input index. + A float64 Series aligned to the input index. """ if col is None: - return pd.Series(0.0, index=df.index, dtype="float64") - return df.reindex(columns=[col], fill_value=0)[col].astype("float64").clip(lower=0) + series = pd.Series(0.0, index=df.index, dtype="float64") + else: + raw = df.reindex(columns=[col], fill_value=0)[col] + series = pd.to_numeric(raw, errors="coerce").fillna(0.0).astype("float64") + return series def _sum_cols(df: pd.DataFrame, cols: list[str] | None) -> pd.Series: @@ -67,32 +71,45 @@ def _sum_cols(df: pd.DataFrame, cols: list[str] | None) -> pd.Series: if not cols: return pd.Series(0.0, index=df.index, dtype="float64") - # Safely extract each column as a numeric, non-negative Series, then sum row-wise + # Safely extract each column as a numeric Series then sum row-wise series_list = [_get_numeric_series(df, c) for c in cols] return pd.concat(series_list, axis=1).sum(axis=1).astype("float64") -# pylint: disable=too-many-arguments, too-many-locals +def _column_has_data(df: pd.DataFrame, col: str | None) -> bool: + """Return True when the column exists and has at least one non-zero value.""" + if col is None or col not in df.columns: + return False + + series = pd.to_numeric(df[col], errors="coerce").fillna(0.0).astype("float64") + if series.empty or not series.notna().any(): + return False + + return not series.fillna(0).eq(0).all() + + +# pylint: disable=too-many-arguments, too-many-locals, too-many-positional-arguments def add_energy_flows( df: pd.DataFrame, production_cols: list[str] | None = None, consumption_cols: list[str] | None = None, - battery_charge_col: str | None = None, + battery_cols: list[str] | None = None, production_is_positive: bool = False, ) -> pd.DataFrame: """Compute and add derived energy flow metrics to the DataFrame. This function aggregates production and consumption data, derives energy flow relationships such as grid feed-in, battery charging, and self-consumption, - and appends these computed columns to the given DataFrame. + and appends these computed columns to the given DataFrame. Columns that are + specified but missing or contain only null/zero values are ignored. Args: df: Input DataFrame containing production, consumption, and optionally - battery charge data. + battery power data. production_cols: list of column names representing production sources. consumption_cols: list of column names representing consumption sources. - battery_charge_col: optional column name for battery charging power. If None, - battery-related flows are set to zero. + battery_cols: optional column names representing signed battery power. + Positive values indicate charging, negative values indicate discharging. production_is_positive: Whether production values are already positive. If False, `production` is inverted before clipping. @@ -106,47 +123,83 @@ def add_energy_flows( """ df_flows = df.copy() - # Total production and consumption (returns pandas series with 0.0 for missing cols) - df_flows["production_total"] = _sum_cols(df_flows, production_cols) - df_flows["consumption_total"] = _sum_cols(df_flows, consumption_cols) + # Normalize production, consumption and battery columns by removing None entries + resolved_production_cols = [ + col for col in (production_cols or []) if _column_has_data(df_flows, col) + ] + resolved_consumption_cols = [ + col for col in (consumption_cols or []) if _column_has_data(df_flows, col) + ] + resolved_battery_cols = [ + col for col in (battery_cols or []) if _column_has_data(df_flows, col) + ] + + battery_power_series = _sum_cols(df_flows, resolved_battery_cols) + battery_charge_series = ( + battery_power_series.reindex(df_flows.index).fillna(0.0).clip(lower=0.0) + ) + + # Compute total asset production + asset_production_cols: list[str] = [] + for col in resolved_production_cols: + series = _get_numeric_series( + df_flows, + col, + ) + asset_series = asset_production( + series, + production_is_positive=production_is_positive, + ) + asset_col_name = f"{col}_asset_production" + df_flows[asset_col_name] = asset_series + asset_production_cols.append(asset_col_name) + + df_flows["production_total"] = _sum_cols(df_flows, asset_production_cols) - # Surplus vs. consumption + # Compute total consumption + consumption_series_cols: list[str] = [] + for col in resolved_consumption_cols: + df_flows[col] = _get_numeric_series(df_flows, col) + consumption_series_cols.append(col) + + df_flows["consumption_total"] = _sum_cols(df_flows, consumption_series_cols) + + # Surplus vs. consumption (production is already positive because of the above cleaning) df_flows["production_excess"] = production_excess( df_flows["production_total"], df_flows["consumption_total"], - production_is_positive=production_is_positive, + production_is_positive=True, ) # Battery charging power (optional) - bat_in = _get_numeric_series(df_flows, battery_charge_col) df_flows["production_excess_in_bat"] = production_excess_in_bat( df_flows["production_total"], df_flows["consumption_total"], - bat_in, - production_is_positive=production_is_positive, + battery=battery_charge_series, + production_is_positive=True, ) # Split excess into battery vs. grid df_flows["grid_feed_in"] = grid_feed_in( df_flows["production_total"], df_flows["consumption_total"], - bat_in, - production_is_positive=production_is_positive, + battery=battery_charge_series, + production_is_positive=True, ) # If no production columns exist, set self-consumption metrics to zero - if production_cols: + if asset_production_cols: # Use total production for self-consumption instead of asset_production # (which may not exist) df_flows["production_self_use"] = production_self_consumption( df_flows["production_total"], df_flows["consumption_total"], - production_is_positive=production_is_positive, + production_is_positive=True, ) df_flows["production_self_share"] = production_self_share( df_flows["production_total"], df_flows["consumption_total"], - production_is_positive=production_is_positive, + production_is_positive=True, ) else: df_flows["production_self_use"] = 0.0