11"""Tests for calibration module"""
22
33import unittest
4- from unittest .mock import create_autospec
4+ from unittest .mock import create_autospec , patch , MagicMock
5+ from typing import Optional , List
56
67import numpy as np
78import numpy .testing as npt
89import pandas as pd
910from scipy .sparse import csr_matrix
10- from shapely . geometry import Point
11+ from scipy . optimize import OptimizeResult
1112
1213from climada .entity import Exposures , ImpactFuncSet
1314from climada .hazard import Hazard , Centroids
1415
15- from . .impact_func import Input , ScipyMinimizeOptimizer
16+ from climada . util . calibrate .impact_func import Input , ScipyMinimizeOptimizer
1617
1718
1819def hazard ():
@@ -21,13 +22,20 @@ def hazard():
2122 lon = [0 , 1 ]
2223 centroids = Centroids .from_lat_lon (lat = lat , lon = lon )
2324 event_id = np .array ([1 , 3 , 10 ])
24- intensity = csr_matrix ([[1 , 1 ], [2 , 2 ], [3 , 3 ]])
25+ intensity = csr_matrix ([[1 , 0. 1 ], [2 , 0. 2 ], [3 , 2 ]])
2526 return Hazard (event_id = event_id , centroids = centroids , intensity = intensity )
2627
2728
2829def exposure ():
2930 """Create a dummy exposure instance"""
30- return Exposures (data = dict (longitude = [0 , 1 , 100 ], latitude = [1 , 2 , 50 ]))
31+ return Exposures (
32+ data = dict (
33+ longitude = [0 , 1 , 100 ],
34+ latitude = [1 , 2 , 50 ],
35+ value = [1 , 0.1 , 1e6 ],
36+ impf_ = [1 , 1 , 1 ],
37+ )
38+ )
3139
3240
3341class TestInputPostInit (unittest .TestCase ):
@@ -49,12 +57,16 @@ def setUp(self):
4957 self .cost_func = lambda impact , data : 1.0
5058 self .impact_func_gen = lambda ** kwargs : ImpactFuncSet ()
5159
52- def test_post_init_calls (self ):
60+ @patch ("climada.util.calibrate.impact_func.np.setdiff1d" )
61+ def test_post_init_calls (self , setdiff1d_mock ):
5362 """Test if post_init calls stuff correctly using mocks"""
5463 # Create mocks
55- hazard_mock_1 = create_autospec (Hazard , instance = True )
56- hazard_mock_2 = create_autospec (Hazard , instance = True )
57- exposure_mock = create_autospec (Exposures , instance = True )
64+ hazard_mock_1 = create_autospec (Hazard ())
65+ event_id = [10 ]
66+ hazard_mock_1 .event_id = event_id
67+ hazard_mock_2 = create_autospec (Hazard ())
68+ exposure_mock = create_autospec (Exposures ())
69+ setdiff1d_mock .return_value = np .array ([])
5870
5971 # Make first hazard mock return another instance
6072 hazard_mock_1 .select .return_value = hazard_mock_2
@@ -69,6 +81,8 @@ def test_post_init_calls(self):
6981 )
7082
7183 # Query checks
84+ npt .assert_array_equal (setdiff1d_mock .call_args .args [0 ], self .data_events )
85+ npt .assert_array_equal (setdiff1d_mock .call_args .args [1 ], event_id )
7286 hazard_mock_1 .select .assert_called_once_with (event_id = self .data_events )
7387 self .assertNotEqual (input .hazard , hazard_mock_1 )
7488 self .assertEqual (input .hazard , hazard_mock_2 )
@@ -90,49 +104,137 @@ def test_post_init(self):
90104 self .assertIn ("centr_" , input .exposure .gdf )
91105 npt .assert_array_equal (input .exposure .gdf ["centr_" ], [0 , 1 , - 1 ])
92106
107+ def test_non_matching_events (self ):
108+ """Test if non-matching events result in errors"""
109+ data = pd .DataFrame (data = {"a" : [1 , 2 , 3 ]}, index = [9 , 3 , 12 ])
110+ input_kwargs = {
111+ "hazard" : self .hazard ,
112+ "exposure" : self .exposure ,
113+ "data" : data ,
114+ "cost_func" : self .cost_func ,
115+ "impact_func_gen" : self .impact_func_gen ,
116+ "align" : False ,
117+ }
118+
119+ # No error without alignment
120+ Input (** input_kwargs )
121+
122+ # Error with alignment
123+ input_kwargs .update (align = True )
124+ with self .assertRaises (RuntimeError ) as cm :
125+ Input (** input_kwargs )
126+
127+ self .assertIn (
128+ "Event IDs in 'data' do not match event IDs in 'hazard'" , str (cm .exception )
129+ )
130+ self .assertIn ("9" , str (cm .exception ))
131+ self .assertIn ("12" , str (cm .exception ))
132+ self .assertNotIn ("3" , str (cm .exception ))
133+
93134
94135class TestScipyMinimizeOptimizer (unittest .TestCase ):
95136 """Tests for the optimizer based on scipy.optimize.minimize"""
96137
97138 def setUp (self ):
98139 """Mock the input and create the optimizer"""
99- self .input = create_autospec (Input , instance = True )
100- self .optimizer = ScipyMinimizeOptimizer (self .input )
101-
102- def test_kwargs_to_impact_func_gen (self ):
103- """Test the _kwargs_to_impact_func_gen method"""
104- # _param_names is empty in the beginning
105- x = np .array ([1 , 2 , 3 ])
106- self .assertDictEqual (self .optimizer ._kwargs_to_impact_func_gen (x ), {})
107-
108- # Now populate it and try again
109- self .optimizer ._param_names = ["x_2" , "x_1" , "x_3" ]
110- result = {"x_2" : 1 , "x_1" : 2 , "x_3" : 3 }
111- self .assertDictEqual (self .optimizer ._kwargs_to_impact_func_gen (x ), result )
112-
113- # Other arguments are ignored
114- self .assertDictEqual (
115- self .optimizer ._kwargs_to_impact_func_gen (x , x + 3 ), result
140+ self .input = MagicMock (
141+ spec_set = Input (
142+ hazard = create_autospec (Hazard , instance = True ),
143+ exposure = create_autospec (Exposures , instance = True ),
144+ data = create_autospec (pd .DataFrame , instance = True ),
145+ cost_func = MagicMock (),
146+ impact_func_gen = MagicMock (),
147+ align = False ,
148+ )
116149 )
150+ self .optimizer = ScipyMinimizeOptimizer (self .input )
117151
118- # Array is flattened, iterator stops
119- self .assertDictEqual (
120- self .optimizer ._kwargs_to_impact_func_gen (np .array ([[1 , 2 ], [3 , 4 ]])),
121- result ,
152+ @patch ("climada.util.calibrate.impact_func.ImpactCalc" , autospec = True )
153+ def test_kwargs_to_impact_func_gen (self , _ ):
154+ """Test transform of minimize func arguments to impact_func_gen arguments
155+
156+ We test the method '_kwargs_to_impact_func_gen' through 'run' because it is
157+ private.
158+ """
159+ # Create stubs
160+ self .input .constraints = None
161+ self .input .bounds = None
162+ self .input .cost_func .return_value = 1.0
163+
164+ # Call 'run', make sure that 'minimize' is only with these parameters
165+ params_init = {"x_2" : 1 , "x 1" : 2 , "x_3" : 3 } # NOTE: Also works with whitespace
166+ self .optimizer .run (params_init = params_init , options = {"maxiter" : 1 })
167+
168+ # Check call to '_kwargs_to_impact_func_gen'
169+ self .input .impact_func_gen .assert_any_call (** params_init )
170+
171+ def test_output (self ):
172+ """Check output reporting"""
173+ params_init = {"x_2" : 1 , "x 1" : 2 , "x_3" : 3 }
174+ target_func_value = 1.12
175+ self .input .constraints = None
176+ self .input .bounds = None
177+
178+ # Mock the optimization function and call 'run'
179+ with patch .object (self .optimizer , "_opt_func" ) as opt_func_mock :
180+ opt_func_mock .return_value = target_func_value
181+ output = self .optimizer .run (params_init = params_init , options = {"maxiter" : 1 })
182+
183+ # Assert output
184+ self .assertListEqual (list (output .params .keys ()), list (params_init .keys ()))
185+ npt .assert_allclose (list (output .params .values ()), list (params_init .values ()))
186+ self .assertEqual (output .target , target_func_value )
187+ self .assertTrue (output .success ) # NOTE: For scipy.optimize, this means no error
188+ self .assertIsInstance (output .result , OptimizeResult )
189+
190+ @patch ("climada.util.calibrate.impact_func.minimize" , autospec = True )
191+ def test_bounds_select (self , minimize_mock ):
192+ """Test the _select_by_param_names method
193+
194+ We test the method '_select_by_param_names' through 'run' because it is private.
195+ """
196+
197+ def assert_bounds_in_call (bounds : Optional [List ]):
198+ """Check if scipy.optimize.minimize was called with the expected kwargs"""
199+ call_kwargs = minimize_mock .call_args .kwargs
200+ print (minimize_mock .call_args )
201+
202+ if bounds is None :
203+ self .assertIsNone (call_kwargs ["bounds" ])
204+ else :
205+ self .assertListEqual (call_kwargs ["bounds" ], bounds )
206+
207+ # Initialize params and mock return value
208+ params_init = {"x_2" : 1 , "x_1" : 2 , "x_3" : 3 }
209+ minimize_mock .return_value = OptimizeResult (
210+ x = np .array (list (params_init .values ())), fun = 0 , success = True
122211 )
123212
124- def test_select_by_keys (self ):
125- """Test the _select_by_keys method"""
126- param_names = ["a" , "b" , "c" , "d" ]
127- mapping = dict (zip (param_names , [1 , "2" , (1 , 2 )]))
128-
129- # _param_names is empty in the beginning
130- self .assertListEqual (self .optimizer ._select_by_param_names (mapping ), [])
131-
132- # Set _param_names
133- self .optimizer ._param_names = param_names
134-
135- # Check result
136- self .assertListEqual (
137- self .optimizer ._select_by_param_names (mapping ), [1 , "2" , (1 , 2 ), None ]
138- )
213+ # Set constraints and bounds to None (default)
214+ self .input .bounds = None
215+
216+ # Call the optimizer (constraints and bounds are None)
217+ self .optimizer .run (params_init = params_init )
218+ self .assertListEqual (self .optimizer ._param_names , list (params_init .keys ()))
219+ minimize_mock .assert_called_once ()
220+ assert_bounds_in_call (None )
221+ minimize_mock .reset_mock ()
222+
223+ # Set new bounds and constraints
224+ self .input .bounds = {"x_1" : "a" , "x_4" : "b" , "x_3" : (1 , 2 )}
225+ self .input .constraints = {"x_5" : [1 ], "x_2" : 2 }
226+
227+ # Call the optimizer
228+ self .optimizer .run (params_init = params_init )
229+ self .assertListEqual (self .optimizer ._param_names , list (params_init .keys ()))
230+ minimize_mock .assert_called_once ()
231+ assert_bounds_in_call (bounds = [None , "a" , (1 , 2 )])
232+
233+
234+ # Execute Tests
235+ if __name__ == "__main__" :
236+ TESTS = unittest .TestLoader ().loadTestsFromTestCase (TestInputPostInit )
237+ TESTS .addTests (
238+ unittest .TestLoader ().loadTestsFromTestCase (TestScipyMinimizeOptimizer )
239+ )
240+ unittest .TextTestRunner (verbosity = 2 ).run (TESTS )
0 commit comments