Skip to content

Commit 3c975fc

Browse files
committed
improve resolution processing code
1 parent 6ee7333 commit 3c975fc

File tree

2 files changed

+273
-2
lines changed

2 files changed

+273
-2
lines changed

scripts/ej/cmr_processing.py

Lines changed: 66 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -116,11 +116,75 @@ def _process_spatial_info(self) -> SpatialInfo:
116116
for rect in rectangles
117117
)
118118

119-
resolution_system = horizontal_domain.get("ResolutionAndCoordinateSystem", {})
120-
resolution = resolution_system.get("HorizontalDataResolution", "")
119+
resolution = self._extract_spatial_resolution(horizontal_domain)
121120

122121
return SpatialInfo(is_global, resolution, rectangles)
123122

123+
def _extract_spatial_resolution(self, horizontal_domain: dict) -> str:
124+
"""
125+
Extract and format spatial resolution from horizontal domain data.
126+
127+
Args:
128+
horizontal_domain: Dictionary containing resolution information
129+
130+
Returns:
131+
Formatted resolution string or empty string if not available
132+
"""
133+
resolution_system = horizontal_domain.get("ResolutionAndCoordinateSystem", {})
134+
resolution_data = resolution_system.get("HorizontalDataResolution", {})
135+
136+
if not resolution_data:
137+
return ""
138+
139+
# Check for Varies resolution
140+
if resolution_data.get("VariesResolution") == "Varies":
141+
return "Varies"
142+
143+
# Check for GriddedRangeResolutions (use maximum values)
144+
gridded_range = resolution_data.get("GriddedRangeResolutions", [])
145+
if gridded_range:
146+
# I spot checked 200 datasets, and never saw more than one entry
147+
# so I'm just going to use the first one for now for simplicity
148+
range_data = gridded_range[0]
149+
# in a gridded range, MinimumXDimension is also available,
150+
# however I have chosen to use the less impressive MaximumXDimension
151+
max_x = range_data.get("MaximumXDimension")
152+
max_y = range_data.get("MaximumYDimension")
153+
unit = range_data.get("Unit", "").lower()
154+
if max_x and max_y and unit:
155+
# Use the larger of the two dimensions
156+
max_dim = max(max_x, max_y)
157+
return f"{max_dim} {unit}"
158+
return ""
159+
160+
# Check for GriddedResolutions
161+
gridded = resolution_data.get("GriddedResolutions", [])
162+
if gridded:
163+
grid_data = gridded[0]
164+
x_dim = grid_data.get("XDimension")
165+
y_dim = grid_data.get("YDimension")
166+
unit = grid_data.get("Unit", "").lower()
167+
if x_dim and y_dim and unit:
168+
# If dimensions differ, use the larger one
169+
max_dim = max(x_dim, y_dim)
170+
return f"{max_dim} {unit}"
171+
return ""
172+
173+
# Check for GenericResolutions
174+
generic = resolution_data.get("GenericResolutions", [])
175+
if generic:
176+
generic_data = generic[0]
177+
x_dim = generic_data.get("XDimension")
178+
y_dim = generic_data.get("YDimension")
179+
unit = generic_data.get("Unit", "").lower()
180+
if x_dim and y_dim and unit:
181+
# If dimensions differ, use the larger one
182+
max_dim = max(x_dim, y_dim)
183+
return f"{max_dim} {unit}"
184+
return ""
185+
186+
return ""
187+
124188
def _process_download_info(self) -> DownloadInfo:
125189
"""Process all download and visualization information."""
126190
has_distribution = False
Lines changed: 207 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,207 @@
1+
"""Unit tests for threshold processing functionality."""
2+
3+
import pytest
4+
from threshold_processing import ThresholdProcessor
5+
6+
7+
class TestThresholdProcessor:
8+
"""Test suite for ThresholdProcessor class."""
9+
10+
@pytest.fixture
11+
def default_thresholds(self):
12+
"""Default thresholds for testing."""
13+
return {
14+
"Not EJ": 0.80,
15+
"Urban Flooding": 0.50,
16+
"Extreme Heat": 0.50,
17+
"Water Availability": 0.80,
18+
"Health & Air Quality": 0.90,
19+
"Disasters": 0.80,
20+
"Food Availability": 0.80,
21+
"Human Dimensions": 0.80,
22+
}
23+
24+
@pytest.fixture
25+
def authorized_classifications(self):
26+
"""Authorized classifications for testing."""
27+
return [
28+
"Urban Flooding",
29+
"Extreme Heat",
30+
"Water Availability",
31+
"Health & Air Quality",
32+
"Disasters",
33+
"Food Availability",
34+
"Human Dimensions",
35+
]
36+
37+
@pytest.fixture
38+
def processor(self, default_thresholds):
39+
"""Create a ThresholdProcessor instance with test thresholds."""
40+
return ThresholdProcessor(thresholds=default_thresholds)
41+
42+
@pytest.fixture
43+
def custom_processor(self):
44+
"""Create a ThresholdProcessor instance with simplified test thresholds."""
45+
custom_thresholds = {
46+
"Not EJ": 0.75,
47+
"Test Category 1": 0.60,
48+
"Test Category 2": 0.80,
49+
}
50+
return ThresholdProcessor(thresholds=custom_thresholds)
51+
52+
def test_initialization_with_thresholds(self, processor, default_thresholds):
53+
"""Test initialization with provided thresholds."""
54+
assert processor.thresholds == default_thresholds
55+
assert "Not EJ" in processor.thresholds
56+
assert processor.thresholds["Not EJ"] == 0.80
57+
58+
def test_initialization_custom_thresholds(self, custom_processor):
59+
"""Test initialization with custom thresholds."""
60+
assert custom_processor.thresholds["Not EJ"] == 0.75
61+
assert custom_processor.thresholds["Test Category 1"] == 0.60
62+
assert custom_processor.thresholds["Test Category 2"] == 0.80
63+
64+
def test_single_high_scoring_not_ej(self, processor):
65+
"""Test when 'Not EJ' has the highest score."""
66+
predictions = [
67+
{"label": "Not EJ", "score": 0.90},
68+
{"label": "Urban Flooding", "score": 0.85},
69+
{"label": "Water Availability", "score": 0.82},
70+
]
71+
result = processor.process_predictions(predictions)
72+
assert result == ["Not EJ"]
73+
assert len(result) == 1
74+
75+
def test_multiple_indicators_above_threshold(self, processor):
76+
"""Test when multiple indicators exceed their thresholds."""
77+
predictions = [
78+
{"label": "Not EJ", "score": 0.30},
79+
{"label": "Urban Flooding", "score": 0.75}, # Above 0.50 threshold
80+
{"label": "Extreme Heat", "score": 0.60}, # Above 0.50 threshold
81+
{"label": "Water Availability", "score": 0.85}, # Above 0.80 threshold
82+
]
83+
result = processor.process_predictions(predictions)
84+
assert len(result) == 3
85+
assert "Urban Flooding" in result
86+
assert "Extreme Heat" in result
87+
assert "Water Availability" in result
88+
89+
def test_no_indicators_above_threshold(self, processor):
90+
"""Test when no indicators meet their thresholds."""
91+
predictions = [
92+
{"label": "Not EJ", "score": 0.70},
93+
{"label": "Urban Flooding", "score": 0.45}, # Below 0.50 threshold
94+
{"label": "Water Availability", "score": 0.75}, # Below 0.80 threshold
95+
]
96+
result = processor.process_predictions(predictions)
97+
assert result == ["Not EJ"]
98+
99+
def test_mixed_threshold_scenarios(self, processor):
100+
"""Test various mixed scenarios of threshold checking."""
101+
predictions = [
102+
{"label": "Not EJ", "score": 0.60},
103+
{"label": "Urban Flooding", "score": 0.55}, # Above 0.50 threshold
104+
{"label": "Extreme Heat", "score": 0.45}, # Below 0.50 threshold
105+
{"label": "Water Availability", "score": 0.85}, # Above 0.80 threshold
106+
]
107+
result = processor.process_predictions(predictions)
108+
assert len(result) == 2
109+
assert "Urban Flooding" in result
110+
assert "Water Availability" in result
111+
assert "Extreme Heat" not in result
112+
113+
def test_authorized_classifications_filtering(self, processor, authorized_classifications):
114+
"""Test filtering of authorized classifications."""
115+
# Monkey patch the authorized classifications for this test
116+
import threshold_processing
117+
118+
original_authorized = threshold_processing.AUTHORIZED_CLASSIFICATIONS
119+
threshold_processing.AUTHORIZED_CLASSIFICATIONS = authorized_classifications
120+
121+
test_classifications = ["Urban Flooding", "Invalid Category", "Water Availability", "Another Invalid"]
122+
result = processor.filter_authorized_classifications(test_classifications)
123+
assert len(result) == 2
124+
assert all(r in authorized_classifications for r in result)
125+
assert "Invalid Category" not in result
126+
assert "Another Invalid" not in result
127+
128+
# Restore original authorized classifications
129+
threshold_processing.AUTHORIZED_CLASSIFICATIONS = original_authorized
130+
131+
def test_process_and_filter_complete_pipeline(self, processor, authorized_classifications):
132+
"""Test the complete processing pipeline with unauthorized categories."""
133+
# Monkey patch the authorized classifications for this test
134+
import threshold_processing
135+
136+
original_authorized = threshold_processing.AUTHORIZED_CLASSIFICATIONS
137+
threshold_processing.AUTHORIZED_CLASSIFICATIONS = authorized_classifications
138+
139+
predictions = [
140+
{"label": "Not EJ", "score": 0.30},
141+
{"label": "Urban Flooding", "score": 0.75},
142+
{"label": "Invalid Category", "score": 0.95},
143+
{"label": "Water Availability", "score": 0.85},
144+
]
145+
result = processor.process_and_filter(predictions)
146+
assert len(result) == 2
147+
assert "Urban Flooding" in result
148+
assert "Water Availability" in result
149+
assert "Invalid Category" not in result
150+
151+
# Restore original authorized classifications
152+
threshold_processing.AUTHORIZED_CLASSIFICATIONS = original_authorized
153+
154+
def test_edge_case_empty_predictions(self, processor):
155+
"""Test handling of empty predictions list."""
156+
result = processor.process_predictions([])
157+
assert result == ["Not EJ"]
158+
159+
def test_edge_case_missing_scores(self, processor):
160+
"""Test handling of predictions with missing scores."""
161+
predictions = [{"label": "Urban Flooding"}, {"label": "Water Availability", "score": 0.85}] # Missing score
162+
with pytest.raises(KeyError):
163+
processor.process_predictions(predictions)
164+
165+
def test_edge_case_invalid_score_values(self, processor):
166+
"""Test handling of invalid score values."""
167+
predictions = [{"label": "Not EJ", "score": "invalid"}, {"label": "Urban Flooding", "score": 0.75}]
168+
with pytest.raises(TypeError):
169+
processor.process_predictions(predictions)
170+
171+
def test_threshold_boundary_conditions(self, processor):
172+
"""Test classification at exact threshold boundaries."""
173+
predictions = [
174+
{"label": "Not EJ", "score": 0.30},
175+
{"label": "Urban Flooding", "score": 0.50}, # Exactly at threshold
176+
{"label": "Water Availability", "score": 0.80}, # Exactly at threshold
177+
{"label": "Health & Air Quality", "score": 0.89}, # Just below threshold
178+
]
179+
result = processor.process_predictions(predictions)
180+
assert len(result) == 2
181+
assert "Urban Flooding" in result
182+
assert "Water Availability" in result
183+
assert "Health & Air Quality" not in result
184+
185+
def test_all_indicators_same_score(self, processor):
186+
"""Test behavior when all indicators have the same score."""
187+
predictions = [
188+
{"label": "Not EJ", "score": 0.85},
189+
{"label": "Urban Flooding", "score": 0.85},
190+
{"label": "Water Availability", "score": 0.85},
191+
]
192+
result = processor.process_predictions(predictions)
193+
assert result == ["Not EJ"] # Since Not EJ is highest scoring (tied) prediction
194+
195+
def test_high_scores_below_threshold(self, processor):
196+
"""Test when scores are high but still below their respective thresholds."""
197+
predictions = [
198+
{"label": "Not EJ", "score": 0.70},
199+
{"label": "Health & Air Quality", "score": 0.89}, # High but below 0.90 threshold
200+
{"label": "Water Availability", "score": 0.79}, # High but below 0.80 threshold
201+
]
202+
result = processor.process_predictions(predictions)
203+
assert result == ["Not EJ"]
204+
205+
206+
if __name__ == "__main__":
207+
pytest.main([__file__])

0 commit comments

Comments
 (0)