Skip to content

Commit 6654358

Browse files
committed
Add ensemble calibrators
1 parent b6357a6 commit 6654358

File tree

1 file changed

+283
-0
lines changed

1 file changed

+283
-0
lines changed
Lines changed: 283 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,283 @@
1+
"""
2+
This file is part of CLIMADA.
3+
4+
Copyright (C) 2017 ETH Zurich, CLIMADA contributors listed in AUTHORS.
5+
6+
CLIMADA is free software: you can redistribute it and/or modify it under the
7+
terms of the GNU General Public License as published by the Free
8+
Software Foundation, version 3.
9+
10+
CLIMADA is distributed in the hope that it will be useful, but WITHOUT ANY
11+
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
12+
PARTICULAR PURPOSE. See the GNU General Public License for more details.
13+
14+
You should have received a copy of the GNU General Public License along
15+
with CLIMADA. If not, see <https://www.gnu.org/licenses/>.
16+
17+
---
18+
Cross-calibration on top of a single calibration module
19+
"""
20+
21+
from abc import ABC, abstractmethod
22+
from dataclasses import dataclass, InitVar, field
23+
from typing import Optional, List, Mapping, Any, Tuple, Union, Sequence
24+
from copy import copy, deepcopy
25+
from pathlib import Path
26+
27+
import numpy as np
28+
from numpy.random import default_rng
29+
import pandas as pd
30+
31+
from ...engine.unsequa.input_var import InputVar
32+
from .base import Optimizer, Output, Input
33+
34+
# TODO: derived classes for average and tragedy
35+
36+
37+
def sample_data(data: pd.DataFrame, sample: List[Tuple[int, int]]):
38+
"""Return a sample of the data"""
39+
# Create all-NaN data
40+
data_sampled = pd.DataFrame(np.nan, columns=data.columns, index=data.index)
41+
42+
# Extract sample values from data
43+
for x, y in sample:
44+
data_sampled.iloc[x, y] = data.iloc[x, y]
45+
46+
return data_sampled
47+
48+
49+
@dataclass
50+
class EnsembleOptimizerOutput:
51+
data: pd.DataFrame
52+
53+
@classmethod
54+
def from_outputs(cls, outputs: Sequence[Output]):
55+
"""Build data from a list of outputs"""
56+
cols = pd.MultiIndex.from_tuples(
57+
[("Parameters", p_name) for p_name in outputs[0].params.keys()]
58+
+ [("Event", p_name) for p_name in outputs[0].event_info]
59+
)
60+
data = pd.DataFrame(columns=cols)
61+
62+
# Fill with data
63+
data["Parameters"] = pd.DataFrame.from_records([out.params for out in outputs])
64+
data["Event"] = pd.DataFrame.from_records([out.event_info for out in outputs])
65+
66+
return cls(data=data)
67+
# return cls(data=pd.DataFrame.from_records([out.params for out in outputs]))
68+
69+
@classmethod
70+
def from_csv(cls, filepath):
71+
"""Load data from CSV"""
72+
return cls(data=pd.read_csv(filepath, header=[0, 1]))
73+
74+
def to_csv(self, filepath):
75+
"""Store data as CSV"""
76+
self.data.to_csv(filepath, index=None)
77+
78+
def to_input_var(self, impact_func_gen, **impfset_kwargs):
79+
"""Build Unsequa InputVar from the parameters stored in this object"""
80+
impf_set_list = [
81+
impact_func_gen(**params) for _, params in self.data.iterrows()
82+
]
83+
return InputVar.impfset(impf_set_list, **impfset_kwargs)
84+
85+
# Build MultiIndex DataFrame
86+
# data = pd.DataFrame(
87+
# columns=pd.MultiIndex.from_tuples(
88+
# [("Parameters", p_name) for p_name in outputs[0].params.keys()]
89+
# )
90+
# )
91+
92+
# Insert Parameters
93+
# params = pd.DataFrame.from_records([out.params for out in outputs])
94+
# for p_name in params.columns:
95+
# data["Parameters", p_name] = params[p_name]
96+
97+
# Insert
98+
99+
# return cls(data=pd.DataFrame.from_records([out.params for out in outputs]))
100+
101+
102+
@dataclass
103+
class EnsembleOptimizer(ABC):
104+
""""""
105+
106+
input: Input
107+
optimizer_type: Any
108+
optimizer_init_kwargs: Mapping[str, Any] = field(default_factory=dict)
109+
samples: List[List[Tuple[int, int]]] = field(init=False)
110+
111+
def __post_init__(self):
112+
""""""
113+
if self.samples is None:
114+
raise RuntimeError("Samples must be set!")
115+
116+
def run(self, **optimizer_run_kwargs) -> EnsembleOptimizerOutput:
117+
outputs = []
118+
for idx, sample in enumerate(self.samples):
119+
input = self.input_from_sample(sample)
120+
121+
# Run optimizer
122+
opt = self.optimizer_type(input, **self.optimizer_init_kwargs)
123+
out = opt.run(**optimizer_run_kwargs)
124+
125+
out.event_info = self.event_info_from_input(input)
126+
print(f"Ensemble: {idx}, Params: {out.params}")
127+
outputs.append(out)
128+
129+
return EnsembleOptimizerOutput.from_outputs(outputs)
130+
131+
@abstractmethod
132+
def input_from_sample(self, sample: List[Tuple[int, int]]):
133+
""""""
134+
135+
def event_info_from_input(self, input: Input):
136+
"""Get information on the event(s) for which we calibrated"""
137+
# Get region and event IDs
138+
data = input.data.dropna(axis="columns", how="all").dropna(
139+
axis="index", how="all"
140+
)
141+
event_ids = data.index
142+
region_ids = data.columns
143+
144+
# Get event name
145+
event_names = input.hazard.select(event_id=event_ids.to_list()).event_name
146+
147+
# Return data
148+
return {
149+
"event_id": event_ids,
150+
"region_id": region_ids,
151+
"event_name": event_names,
152+
}
153+
154+
155+
@dataclass
156+
class AverageEnsembleOptimizer(EnsembleOptimizer):
157+
""""""
158+
159+
sample_fraction: InitVar[float] = 0.8
160+
ensemble_size: InitVar[int] = 20
161+
random_state: InitVar[int] = 1
162+
163+
def __post_init__(self, sample_fraction, ensemble_size, random_state):
164+
"""Create the samples"""
165+
if sample_fraction <= 0 or sample_fraction >= 1:
166+
raise ValueError("Sample fraction must be in (0, 1)")
167+
if ensemble_size < 1:
168+
raise ValueError("Ensemble size must be >=1")
169+
170+
# Find out number of samples
171+
notna_idx = np.argwhere(self.input.data.notna().to_numpy())
172+
num_notna = notna_idx.shape[0]
173+
num_samples = int(np.rint(num_notna * sample_fraction))
174+
175+
# Create samples
176+
rng = default_rng(random_state)
177+
self.samples = [
178+
rng.choice(notna_idx, size=num_samples, replace=False)
179+
for _ in range(ensemble_size)
180+
]
181+
182+
return super().__post_init__()
183+
184+
def input_from_sample(self, sample: List[Tuple[int, int]]):
185+
"""Shallow-copy the input and update the data"""
186+
input = copy(self.input) # NOTE: Shallow copy!
187+
input.data = sample_data(input.data, sample)
188+
return input
189+
190+
191+
@dataclass
192+
class TragedyEnsembleOptimizer(EnsembleOptimizer):
193+
""""""
194+
195+
def __post_init__(self):
196+
"""Create the single samples"""
197+
notna_idx = np.argwhere(self.input.data.notna().to_numpy())
198+
self.samples = notna_idx[:, np.newaxis].tolist() # Must extend by one dimension
199+
200+
return super().__post_init__()
201+
202+
def input_from_sample(self, sample: List[Tuple[int, int]]):
203+
"""Subselect all input"""
204+
# Data
205+
input = copy(self.input) # NOTE: Shallow copy!
206+
data = sample_data(input.data, sample)
207+
input.data = data.dropna(axis="columns", how="all").dropna(
208+
axis="index", how="all"
209+
)
210+
211+
# Select single hazard event
212+
input.hazard = input.hazard.select(event_id=input.data.index)
213+
214+
# Select single region in exposure
215+
# NOTE: This breaks impact_at_reg with pre-defined region IDs!!
216+
# exp = input.exposure.copy(deep=False)
217+
# exp.gdf = exp.gdf[exp.gdf["region_id"] == input.data.columns[0]]
218+
# input.exposure = exp
219+
220+
return input
221+
222+
223+
# @dataclass
224+
# class CrossCalibration:
225+
# """A class for running multiple calibration tasks on data subsets"""
226+
227+
# input: Input
228+
# optimizer_type: Any
229+
# sample_size: int = 1
230+
# ensemble_size: Optional[int] = None
231+
# random_state: InitVar[int] = 1
232+
# optimizer_init_kwargs: Mapping[str, Any] = field(default_factory=dict)
233+
234+
# def __post_init__(self, random_state):
235+
# """"""
236+
# if self.sample_size < 1:
237+
# raise ValueError("Sample size must be >=1")
238+
# if self.sample_size > 1 and self.ensemble_size is None:
239+
# raise ValueError("Ensemble size must be set if sample size > 1")
240+
241+
# # Copy the original data
242+
# self.data = self.input.data.copy()
243+
# notna_idx = np.argwhere(self.data.notna().to_numpy())
244+
245+
# # Create the samples
246+
# if self.ensemble_size is not None:
247+
# rng = default_rng(random_state)
248+
# self.samples = [
249+
# rng.choice(notna_idx, size=self.sample_size, replace=False)
250+
# for _ in range(self.ensemble_size)
251+
# ]
252+
# else:
253+
# self.samples = notna_idx.tolist()
254+
255+
# print("Samples:\n", self.samples)
256+
257+
# def run(self, **optimizer_run_kwargs) -> List[Output]:
258+
# """Run the optimizer for the ensemble"""
259+
# outputs = []
260+
# for idx, sample in enumerate(self.samples):
261+
# # Select data samples
262+
# data_sample = self.data.copy()
263+
# data_sample.iloc[:, :] = np.nan # Set all to NaN
264+
# for x, y in sample:
265+
# data_sample.iloc[x, y] = self.data.iloc[x, y]
266+
267+
# # Run the optimizer
268+
# input = deepcopy(self.input)
269+
# input.data = data_sample
270+
271+
# # NOTE: NOO assign_centroids
272+
# opt = self.optimizer_type(input, **self.optimizer_init_kwargs)
273+
# out = opt.run(**optimizer_run_kwargs)
274+
# outputs.append(out)
275+
# print(f"Ensemble: {idx}, Params: {out.params}")
276+
277+
# return outputs
278+
279+
280+
# # TODO: Tragedy: Localize exposure and hazards!
281+
# @dataclass
282+
# class TragedyEnsembleCrossCalibration(CrossCalibration):
283+
# """Cross calibration for computing an ensemble of tragedies"""

0 commit comments

Comments
 (0)