Skip to content

Commit c0d95e5

Browse files
committed
Add tests for cross_calibrate.py
1 parent 5ab9e91 commit c0d95e5

File tree

2 files changed

+350
-8
lines changed

2 files changed

+350
-8
lines changed

climada/util/calibrate/cross_calibrate.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,6 @@ def event_info_from_input(inp: Input) -> dict[str, Any]:
7878
"""Get information on the event(s) for which we calibrated
7979
8080
This tries to retrieve the event IDs, region IDs, and event names.
81-
For an average ensemble, they might be lists of lists.
8281
8382
Returns
8483
-------
@@ -91,10 +90,9 @@ def event_info_from_input(inp: Input) -> dict[str, Any]:
9190
region_ids = data.columns
9291

9392
# Get event name
94-
hazard = inp.hazard.select(event_id=event_ids.to_list())
95-
if hazard is not None:
96-
event_names = hazard.event_name
97-
else:
93+
try:
94+
event_names = inp.hazard.select(event_id=event_ids.to_list()).event_name
95+
except IndexError:
9896
event_names = []
9997

10098
# Return data

climada/util/calibrate/test/test_cross_calibrate.py

Lines changed: 347 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,19 +17,29 @@
1717
---
1818
Tests for cross calibration module
1919
"""
20+
21+
import copy
2022
import unittest
21-
from tempfile import TemporaryDirectory
2223
from pathlib import Path
24+
from tempfile import TemporaryDirectory
25+
from unittest.mock import create_autospec, patch
2326

24-
import pandas as pd
25-
import pandas.testing as pdt
2627
import numpy as np
2728
import numpy.testing as npt
29+
import pandas as pd
30+
import pandas.testing as pdt
2831

32+
from climada.util.calibrate.base import Input, Output
2933
from climada.util.calibrate.cross_calibrate import (
34+
AverageEnsembleOptimizer,
35+
EnsembleOptimizer,
3036
EnsembleOptimizerOutput,
3137
SingleEnsembleOptimizerOutput,
38+
TragedyEnsembleOptimizer,
39+
event_info_from_input,
40+
sample_data,
3241
)
42+
from climada.util.calibrate.test.test_base import ConcreteOptimizer, exposure, hazard
3343

3444

3545
class TestEnsembleOptimizerOutput(unittest.TestCase):
@@ -107,3 +117,337 @@ def test_cycling_csv(self):
107117

108118
out_new = EnsembleOptimizerOutput.from_csv(filepath)
109119
pdt.assert_frame_equal(out.data, out_new.data)
120+
121+
def test_to_input_var(self):
122+
"""Test creating an Unsequa InputVar from the output"""
123+
124+
def impf_creator(**params):
125+
"""Stub impf creator"""
126+
return params
127+
128+
invar = EnsembleOptimizerOutput.from_outputs(
129+
[self.output1, self.output2]
130+
).to_input_var(
131+
impact_func_creator=impf_creator,
132+
haz_id_dict={"TC": [0, 1]},
133+
bounds_impfi=(0, 1),
134+
)
135+
self.assertDictEqual(invar.func(IFi=None, IL=0), self.output1.params)
136+
self.assertDictEqual(invar.func(IFi=None, IL=1), self.output2.params)
137+
self.assertListEqual(list(invar.distr_dict.keys()), ["IFi", "IL"])
138+
139+
140+
class TestSampleData(unittest.TestCase):
141+
"""Test sample_data function"""
142+
143+
def test_sample_data(self):
144+
"""Test sampling of a Data Frame"""
145+
df = pd.DataFrame([[0, 1, 2], [3, 4, 5]], index=[1, 2], columns=["a", "b", "c"])
146+
samples = [(0, 0), (0, 2), (1, 1)]
147+
148+
pdt.assert_frame_equal(
149+
sample_data(df, samples),
150+
pd.DataFrame(
151+
[[0, np.nan, 2], [np.nan, 4, np.nan]],
152+
index=df.index,
153+
columns=df.columns,
154+
),
155+
)
156+
157+
158+
class TestEventInfoFromInput(unittest.TestCase):
159+
"""Test retrieving event information from the input"""
160+
161+
def setUp(self):
162+
"""Create input"""
163+
self.input = Input(
164+
hazard=hazard(),
165+
exposure=exposure(),
166+
data=pd.DataFrame(
167+
[[1, np.nan], [10, np.nan], [np.nan, np.nan]],
168+
index=[1, 3, 10],
169+
columns=["a", "b"],
170+
),
171+
cost_func=lambda _: None,
172+
impact_func_creator=lambda _: None,
173+
impact_to_dataframe=lambda _: None,
174+
assign_centroids=True,
175+
)
176+
self.input.hazard.centroids.gdf["region_id"] = ["a", "b"]
177+
self.input.hazard.event_name = ["event1", "event2", 3]
178+
179+
def test_info_valid_hazard(self):
180+
"""Test retrieving event information from the input"""
181+
info = event_info_from_input(self.input)
182+
self.assertListEqual(list(info.keys()), ["event_id", "region_id", "event_name"])
183+
npt.assert_array_equal(info["event_id"], np.array([1, 3]))
184+
npt.assert_array_equal(info["region_id"], np.array(["a"]))
185+
self.assertListEqual(info["event_name"], ["event1", "event2"])
186+
187+
def test_info_invalid_hazard(self):
188+
"""Test retrieving event information if selection somehow failed"""
189+
self.input.data.set_index(pd.Index([100, 101, 102]), inplace=True)
190+
info = event_info_from_input(self.input)
191+
self.assertListEqual(info["event_name"], [])
192+
193+
194+
class ConcreteEnsembleOptimizer(EnsembleOptimizer):
195+
"""Concrete instantiation of an ensemble optimizer"""
196+
197+
def __post_init__(self, **__):
198+
self.samples = [[(0, 0)], [(1, 0), (1, 1)], [(2, 0)]]
199+
return super().__post_init__(**__)
200+
201+
def input_from_sample(self, sample):
202+
inp = copy.copy(self.input) # NOTE: Shallow copy!
203+
inp.data = sample_data(inp.data, sample)
204+
return inp
205+
206+
207+
@patch("climada.util.calibrate.bayesian_optimizer.BayesianOptimizer")
208+
class TestEnsembleOptimizer(unittest.TestCase):
209+
"""Test the AverageEnsembleOptimizer"""
210+
211+
def setUp(self):
212+
"""Create input and optimizer"""
213+
self.input = Input(
214+
hazard=hazard(),
215+
exposure=exposure(),
216+
data=pd.DataFrame(
217+
[[1, 2], [10, 11], [100, np.nan]],
218+
index=[1, 3, 10],
219+
columns=["a", "b"],
220+
),
221+
cost_func=lambda _: None,
222+
impact_func_creator=lambda _: None,
223+
impact_to_dataframe=lambda _: None,
224+
assign_centroids=False,
225+
)
226+
227+
@patch("climada.util.calibrate.cross_calibrate.ProcessPool")
228+
def test_run(self, pool_class_mock, opt_class_mock):
229+
"""Test initialization"""
230+
# Mock the optimizer class
231+
opt_mock = opt_class_mock.return_value
232+
output = Output(params={"p1": 0.1, "p2": 2}, target=0.2)
233+
opt_mock.run.return_value = output
234+
235+
# Mock the process pool (context manager)
236+
pool_mock = pool_class_mock.return_value.__enter__.return_value
237+
pool_mock.imap.side_effect = map
238+
239+
self.opt = ConcreteEnsembleOptimizer(
240+
input=self.input,
241+
optimizer_type=opt_class_mock,
242+
optimizer_init_kwargs={"foo": "bar", "random_state": 2},
243+
)
244+
245+
outputs = []
246+
for proc in (1, 3):
247+
with self.subTest(processes=proc):
248+
ens_out = self.opt.run(processes=proc, bar="baz")
249+
outputs.append(ens_out)
250+
251+
if proc > 1:
252+
pool_class_mock.assert_called_once_with(nodes=proc)
253+
254+
self.assertEqual(ens_out.data.shape[0], len(self.opt.samples))
255+
256+
# Test passing init_kwargs
257+
self.assertEqual(opt_class_mock.call_args.kwargs["foo"], "bar")
258+
259+
# Test update_init_kwargs
260+
self.assertListEqual(
261+
[call[1]["random_state"] for call in opt_class_mock.call_args_list],
262+
[2, 3, 4],
263+
)
264+
265+
# Test passing run kwargs
266+
self.assertEqual(opt_mock.run.call_args.kwargs["bar"], "baz")
267+
268+
# Test passing the input and sampling
269+
pdt.assert_frame_equal(
270+
opt_class_mock.call_args_list[0][0][0].data,
271+
pd.DataFrame(
272+
[[1, np.nan], [np.nan, np.nan], [np.nan, np.nan]],
273+
index=[1, 3, 10],
274+
columns=["a", "b"],
275+
),
276+
)
277+
pdt.assert_frame_equal(
278+
opt_class_mock.call_args_list[1][0][0].data,
279+
pd.DataFrame(
280+
[[np.nan, np.nan], [10, 11], [np.nan, np.nan]],
281+
index=[1, 3, 10],
282+
columns=["a", "b"],
283+
),
284+
)
285+
pdt.assert_frame_equal(
286+
opt_class_mock.call_args_list[2][0][0].data,
287+
pd.DataFrame(
288+
[[np.nan, np.nan], [np.nan, np.nan], [100, np.nan]],
289+
index=[1, 3, 10],
290+
columns=["a", "b"],
291+
),
292+
)
293+
294+
# Reset mock calls
295+
opt_class_mock.reset_mock()
296+
opt_mock.reset_mock()
297+
pool_class_mock.reset_mock()
298+
pool_mock.reset_mock()
299+
300+
pdt.assert_frame_equal(outputs[0].data, outputs[1].data)
301+
302+
303+
class DummyInput:
304+
def __init__(self, df):
305+
self.data = df
306+
self.stub = "a"
307+
self.hazard = create_autospec(hazard())
308+
self.hazard.select.return_value = self.hazard
309+
310+
311+
class TestAverageEnsembleOptimizer(unittest.TestCase):
312+
"""Test the AverageEnsembleOptimizer"""
313+
314+
def setUp(self):
315+
# Sample DataFrame with some NaNs
316+
data = pd.DataFrame({"a": [1.0, None, None, 2.0], "b": [None, None, 3.0, 4.0]})
317+
self.input = DummyInput(data)
318+
319+
def test_post_init_sampling(self):
320+
opt = AverageEnsembleOptimizer(
321+
input=self.input, sample_fraction=0.5, optimizer_type=ConcreteOptimizer
322+
)
323+
samples = np.array(opt.samples)
324+
self.assertTupleEqual(samples.shape, (20, 2, 2))
325+
326+
opt = AverageEnsembleOptimizer(
327+
input=self.input,
328+
ensemble_size=11,
329+
sample_fraction=0.8, # Will cause rounding
330+
optimizer_type=ConcreteOptimizer,
331+
)
332+
samples = np.array(opt.samples)
333+
self.assertTupleEqual(samples.shape, (11, 3, 2))
334+
335+
opt = AverageEnsembleOptimizer(
336+
input=self.input,
337+
ensemble_size=2,
338+
sample_fraction=0.95, # Will cause rounding, always select all
339+
optimizer_type=ConcreteOptimizer,
340+
)
341+
342+
samples = [sorted([tuple(idx) for idx in arr]) for arr in opt.samples]
343+
npt.assert_array_equal(samples[0], [[0, 0], [2, 1], [3, 0], [3, 1]])
344+
npt.assert_array_equal(samples[0], samples[1])
345+
346+
def test_invalid_sample_fraction(self):
347+
with self.assertRaisesRegex(ValueError, "Sample fraction"):
348+
AverageEnsembleOptimizer(
349+
input=self.input,
350+
sample_fraction=1,
351+
optimizer_type=ConcreteOptimizer,
352+
)
353+
with self.assertRaisesRegex(ValueError, "Sample fraction"):
354+
AverageEnsembleOptimizer(
355+
input=self.input,
356+
sample_fraction=0,
357+
optimizer_type=ConcreteOptimizer,
358+
)
359+
360+
def test_invalid_ensemble_size(self):
361+
with self.assertRaisesRegex(ValueError, "Ensemble size must be >=1"):
362+
AverageEnsembleOptimizer(
363+
input=self.input,
364+
ensemble_size=0,
365+
optimizer_type=ConcreteOptimizer,
366+
)
367+
368+
def test_random_state_determinism(self):
369+
opt1 = AverageEnsembleOptimizer(
370+
input=self.input,
371+
random_state=123,
372+
optimizer_type=ConcreteOptimizer,
373+
)
374+
opt2 = AverageEnsembleOptimizer(
375+
input=self.input,
376+
random_state=123,
377+
optimizer_type=ConcreteOptimizer,
378+
)
379+
for s1, s2 in zip(opt1.samples, opt2.samples):
380+
np.testing.assert_array_equal(s1, s2)
381+
382+
def test_input_from_sample(self):
383+
opt = AverageEnsembleOptimizer(
384+
input=self.input,
385+
optimizer_type=ConcreteOptimizer,
386+
)
387+
inp = opt.input_from_sample([(0, 0)])
388+
self.assertIsNot(inp, self.input)
389+
self.assertIs(inp.stub, self.input.stub)
390+
391+
392+
class TestTragedyEnsembleOptimizer(unittest.TestCase):
393+
"""Test the TragedyEnsembleOptimizer"""
394+
395+
def setUp(self):
396+
# Sample DataFrame with some NaNs
397+
data = pd.DataFrame({"a": [1.0, None, None, 2.0], "b": [None, None, 3.0, 4.0]})
398+
self.input = DummyInput(data)
399+
400+
def test_post_init_sampling(self):
401+
opt = TragedyEnsembleOptimizer(
402+
input=self.input, optimizer_type=ConcreteOptimizer
403+
)
404+
samples = np.array(opt.samples)
405+
self.assertTupleEqual(samples.shape, (4, 1, 2))
406+
npt.assert_array_equal(samples, [[[0, 0]], [[2, 1]], [[3, 0]], [[3, 1]]])
407+
408+
opt = TragedyEnsembleOptimizer(
409+
input=self.input,
410+
ensemble_size=2,
411+
optimizer_type=ConcreteOptimizer,
412+
)
413+
samples = np.array(opt.samples)
414+
self.assertTupleEqual(samples.shape, (2, 1, 2))
415+
416+
def test_invalid_ensemble_size(self):
417+
with self.assertRaisesRegex(ValueError, "Ensemble size must be >=1"):
418+
TragedyEnsembleOptimizer(
419+
input=self.input,
420+
ensemble_size=0,
421+
optimizer_type=ConcreteOptimizer,
422+
)
423+
with self.assertRaisesRegex(ValueError, "here: 4"):
424+
TragedyEnsembleOptimizer(
425+
input=self.input,
426+
ensemble_size=5,
427+
optimizer_type=ConcreteOptimizer,
428+
)
429+
430+
def test_random_state_determinism(self):
431+
opt1 = TragedyEnsembleOptimizer(
432+
input=self.input,
433+
random_state=2,
434+
optimizer_type=ConcreteOptimizer,
435+
)
436+
opt2 = TragedyEnsembleOptimizer(
437+
input=self.input,
438+
random_state=2,
439+
optimizer_type=ConcreteOptimizer,
440+
)
441+
for s1, s2 in zip(opt1.samples, opt2.samples):
442+
np.testing.assert_array_equal(s1, s2)
443+
444+
def test_input_from_sample(self):
445+
opt = TragedyEnsembleOptimizer(
446+
input=self.input,
447+
optimizer_type=ConcreteOptimizer,
448+
)
449+
inp = opt.input_from_sample([(3, 0)])
450+
self.assertIsNot(inp, self.input)
451+
self.assertIs(inp.stub, self.input.stub)
452+
pdt.assert_frame_equal(inp.data, pd.DataFrame({"a": [2.0]}, index=[3]))
453+
inp.hazard.select.assert_called_once_with(event_id=pd.Index([3]))

0 commit comments

Comments
 (0)