Skip to content

Commit 2bf7768

Browse files
committed
Big refactoring to include "static" (non-interpolated) trajectories
1 parent 0a04ab9 commit 2bf7768

File tree

5 files changed

+462
-418
lines changed

5 files changed

+462
-418
lines changed

climada/trajectories/impact_calc_strat.py

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@
2727

2828
from climada.engine.impact import Impact
2929
from climada.engine.impact_calc import ImpactCalc
30+
from climada.entity.exposures.base import Exposures
31+
from climada.entity.impact_funcs.impact_func_set import ImpactFuncSet
32+
from climada.hazard.base import Hazard
3033
from climada.trajectories.snapshot import Snapshot
3134

3235

@@ -36,9 +39,9 @@ class ImpactComputationStrategy(ABC):
3639
@abstractmethod
3740
def compute_impacts(
3841
self,
39-
snapshot0: Snapshot,
40-
snapshot1: Snapshot,
41-
future: tuple[int, int, int],
42+
exp: Exposures,
43+
haz: Hazard,
44+
vul: ImpactFuncSet,
4245
risk_transf_attach: float | None = None,
4346
risk_transf_cover: float | None = None,
4447
calc_residual: bool = True,
@@ -51,28 +54,25 @@ class ImpactCalcComputation(ImpactComputationStrategy):
5154

5255
def compute_impacts(
5356
self,
54-
snapshot0: Snapshot,
55-
snapshot1: Snapshot,
56-
future: tuple[int, int, int],
57+
exp: Exposures,
58+
haz: Hazard,
59+
vul: ImpactFuncSet,
5760
risk_transf_attach: float | None = None,
5861
risk_transf_cover: float | None = None,
5962
calc_residual: bool = False,
6063
):
61-
impact = self.compute_impacts_pre_transfer(snapshot0, snapshot1, future)
64+
impact = self.compute_impacts_pre_transfer(exp, haz, vul)
6265
self._apply_risk_transfer(
6366
impact, risk_transf_attach, risk_transf_cover, calc_residual
6467
)
6568
return impact
6669

6770
def compute_impacts_pre_transfer(
6871
self,
69-
snapshot0: Snapshot,
70-
snapshot1: Snapshot,
71-
future: tuple[int, int, int],
72+
exp: Exposures,
73+
haz: Hazard,
74+
vul: ImpactFuncSet,
7275
) -> Impact:
73-
exp = snapshot1.exposure if future[0] else snapshot0.exposure
74-
haz = snapshot1.hazard if future[1] else snapshot0.hazard
75-
vul = snapshot1.impfset if future[2] else snapshot0.impfset
7676
return ImpactCalc(exposures=exp, impfset=vul, hazard=haz).impact()
7777

7878
def _apply_risk_transfer(

climada/trajectories/risk_trajectory.py

Lines changed: 31 additions & 159 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@
1616
1717
---
1818
19-
This file implements risk trajectory objects, to allow a better evaluation
20-
of risk in between two points in time (snapshots).
19+
This file implements interpolated risk trajectory objects, to allow a better evaluation
20+
of risk in between points in time (snapshots).
2121
2222
"""
2323

@@ -35,20 +35,18 @@
3535
from climada.trajectories.interpolation import InterpolationStrategyBase
3636
from climada.trajectories.riskperiod import (
3737
AllLinearStrategy,
38-
CalcRiskPeriod,
38+
CalcRiskMetricsPeriod,
3939
ImpactCalcComputation,
4040
ImpactComputationStrategy,
4141
)
4242
from climada.trajectories.snapshot import Snapshot
43+
from climada.trajectories.trajectory import RiskTrajectory
4344
from climada.util import log_level
4445

4546
LOGGER = logging.getLogger(__name__)
4647

47-
POSSIBLE_METRICS = ["eai", "aai", "return_periods", "risk_components", "aai_per_group"]
48-
DEFAULT_RP = [50, 100, 500]
4948

50-
51-
class RiskTrajectory:
49+
class InterpolatedRiskTrajectory(RiskTrajectory):
5250
"""Calculates risk trajectories over a series of snapshots.
5351
5452
This class computes risk metrics over a series of snapshots,
@@ -59,6 +57,15 @@ class RiskTrajectory:
5957
_grouper = ["measure", "metric"]
6058
"""Results dataframe grouper"""
6159

60+
POSSIBLE_METRICS = [
61+
"eai",
62+
"aai",
63+
"return_periods",
64+
"risk_components",
65+
"aai_per_group",
66+
]
67+
DEFAULT_RP = [50, 100, 500]
68+
6269
def __init__(
6370
self,
6471
snapshots_list: list[Snapshot],
@@ -69,78 +76,31 @@ def __init__(
6976
interpolation_strategy: InterpolationStrategyBase | None = None,
7077
impact_computation_strategy: ImpactComputationStrategy | None = None,
7178
):
72-
self._reset_metrics()
79+
super().__init__(
80+
snapshots_list,
81+
all_groups_name=all_groups_name,
82+
risk_disc=risk_disc,
83+
impact_computation_strategy=impact_computation_strategy,
84+
)
7385
self._risk_period_up_to_date: bool = False
74-
self._snapshots = snapshots_list
75-
self._all_groups_name = all_groups_name
76-
self._default_rp = DEFAULT_RP
7786
self.start_date = min([snapshot.date for snapshot in snapshots_list])
7887
self.end_date = max([snapshot.date for snapshot in snapshots_list])
7988
self._time_resolution = time_resolution
80-
self._risk_disc = risk_disc
8189
self._interpolation_strategy = interpolation_strategy or AllLinearStrategy()
82-
self._impact_computation_strategy = (
83-
impact_computation_strategy or ImpactCalcComputation()
84-
)
8590
self._risk_periods_calculators = None
8691

87-
def _reset_metrics(self):
88-
for metric in POSSIBLE_METRICS:
89-
setattr(self, "_" + metric + "_metrics", None)
90-
91-
self._all_risk_metrics = None
92-
93-
@property
94-
def default_rp(self) -> list[int]:
95-
"""The default return period values to use when computing risk period metrics.
96-
97-
Notes
98-
-----
99-
100-
Changing its value resets the corresponding metric.
101-
"""
102-
return self._default_rp
103-
104-
@default_rp.setter
105-
def default_rp(self, value):
106-
if not isinstance(value, list):
107-
raise ValueError("Return periods need to be a list of int.")
108-
if any(not isinstance(i, int) for i in value):
109-
raise ValueError("Return periods need to be a list of int.")
110-
self._return_periods_metrics = None
111-
self._all_risk_metrics = None
112-
self._default_rp = value
113-
11492
@property
115-
def risk_disc(self) -> DiscRates | None:
116-
"""The discount rate applied to compute net present values.
117-
None means no discount rate.
118-
119-
Notes
120-
-----
121-
122-
Changing its value resets the metrics.
123-
"""
124-
return self._risk_disc
125-
126-
@risk_disc.setter
127-
def risk_disc(self, value, /):
128-
if not isinstance(value, DiscRates):
129-
raise ValueError("Risk discount needs to be a `DiscRates` object.")
130-
131-
self._reset_metrics()
132-
self._risk_disc = value
133-
134-
@property
135-
def risk_periods(self) -> list[CalcRiskPeriod]:
136-
"""The computed risk periods from the snapshots."""
93+
def _risk_periods(self) -> list[CalcRiskMetricsPeriod]:
94+
"""The risk periods computing objects."""
13795
if self._risk_periods_calculators is None or not self._risk_period_up_to_date:
13896
self._risk_periods_calculators = self._calc_risk_periods(self._snapshots)
13997
self._risk_period_up_to_date = True
14098

14199
return self._risk_periods_calculators
142100

143-
def _calc_risk_periods(self, snapshots: list[Snapshot]) -> list[CalcRiskPeriod]:
101+
def _calc_risk_periods(
102+
self, snapshots: list[Snapshot]
103+
) -> list[CalcRiskMetricsPeriod]:
144104
"""Creates the `CalcRiskPeriod` objects corresponding to a given list of snapshots."""
145105

146106
def pairwise(container: list):
@@ -169,7 +129,7 @@ def pairwise(container: list):
169129
LOGGER.debug(f"{self.__class__.__name__}: Calc risk periods")
170130
# impfset = self._merge_impfset(snapshots)
171131
return [
172-
CalcRiskPeriod(
132+
CalcRiskMetricsPeriod(
173133
start_snapshot,
174134
end_snapshot,
175135
time_resolution=self._time_resolution,
@@ -181,65 +141,27 @@ def pairwise(container: list):
181141
)
182142
]
183143

184-
@classmethod
185-
def npv_transform(cls, df: pd.DataFrame, risk_disc: DiscRates) -> pd.DataFrame:
186-
"""Apply discount rate to a metric `DataFrame`.
187-
188-
Parameters
189-
----------
190-
df : pd.DataFrame
191-
The `DataFrame` of the metric to discount.
192-
risk_disc : DiscRate
193-
The discount rate to apply.
194-
195-
Returns
196-
-------
197-
pd.DataFrame
198-
The discounted risk metric.
199-
200-
201-
"""
202-
203-
def _npv_group(group, disc):
204-
start_date = group.index.get_level_values("date").min()
205-
return calc_npv_cash_flows(group, start_date, disc)
206-
207-
df = df.set_index("date")
208-
grouper = cls._grouper
209-
if "group" in df.columns:
210-
grouper = ["group"] + grouper
211-
212-
df["risk"] = df.groupby(
213-
grouper,
214-
dropna=False,
215-
as_index=False,
216-
group_keys=False,
217-
observed=True,
218-
)["risk"].transform(_npv_group, risk_disc)
219-
df = df.reset_index()
220-
return df
221-
222144
def _generic_metrics(
223145
self,
224146
npv: bool = True,
225147
metric_name: str | None = None,
226148
metric_meth: str | None = None,
227149
**kwargs,
228-
) -> pd.DataFrame:
150+
) -> pd.DataFrame | None:
229151
"""Generic method to compute metrics based on the provided metric name and method."""
230152
if metric_name is None or metric_meth is None:
231153
raise ValueError("Both metric_name and metric_meth must be provided.")
232154

233-
if metric_name not in POSSIBLE_METRICS:
155+
if metric_name not in self.POSSIBLE_METRICS:
234156
raise NotImplementedError(
235-
f"{metric_name} not implemented ({POSSIBLE_METRICS})."
157+
f"{metric_name} not implemented ({self.POSSIBLE_METRICS})."
236158
)
237159

238160
# Construct the attribute name for storing the metric results
239161
attr_name = f"_{metric_name}_metrics"
240162

241163
tmp = []
242-
for calc_period in self.risk_periods:
164+
for calc_period in self._risk_periods:
243165
# Call the specified method on the calc_period object
244166
with log_level(level="WARNING", name_prefix="climada"):
245167
tmp.append(getattr(calc_period, metric_meth)(**kwargs))
@@ -487,7 +409,7 @@ def per_date_risk_metrics(
487409

488410
@staticmethod
489411
def _get_risk_periods(
490-
risk_periods: list[CalcRiskPeriod],
412+
risk_periods: list[CalcRiskMetricsPeriod],
491413
start_date: datetime.date,
492414
end_date: datetime.date,
493415
strict: bool = True,
@@ -813,53 +735,3 @@ def plot_waterfall(
813735
)
814736

815737
return ax
816-
817-
818-
def calc_npv_cash_flows(
819-
cash_flows: pd.DataFrame,
820-
start_date: datetime.date,
821-
disc: DiscRates | None = None,
822-
):
823-
"""Apply discount rate to cash flows.
824-
825-
If it is defined, applies a discount rate `disc` to a given cash flow
826-
`cash_flows` assuming present year corresponds to `start_date`.
827-
828-
Parameters
829-
----------
830-
cash_flows : pd.DataFrame
831-
The cash flow to apply the discount rate to.
832-
start_date : datetime.date
833-
The date representing the present.
834-
end_date : datetime.date, optional
835-
disc : DiscRates, optional
836-
The discount rate to apply.
837-
838-
Returns
839-
-------
840-
841-
A dataframe (copy) of `cash_flows` where values are discounted according to `disc`
842-
"""
843-
844-
if not disc:
845-
return cash_flows
846-
847-
if not isinstance(cash_flows.index, pd.DatetimeIndex):
848-
raise ValueError("cash_flows must be a pandas Series with a datetime index")
849-
850-
df = cash_flows.to_frame(name="cash_flow")
851-
df["year"] = df.index.year
852-
853-
# Merge with the discount rates based on the year
854-
tmp = df.merge(
855-
pd.DataFrame({"year": disc.years, "rate": disc.rates}), on="year", how="left"
856-
)
857-
tmp.index = df.index
858-
df = tmp.copy()
859-
start = pd.Timestamp(start_date)
860-
df["discount_factor"] = (1 / (1 + df["rate"])) ** ((df.index - start).days // 365)
861-
862-
# Apply the discount factors to the cash flows
863-
df["npv_cash_flow"] = df["cash_flow"] * df["discount_factor"]
864-
865-
return df["npv_cash_flow"]

0 commit comments

Comments
 (0)