Skip to content

Commit c4d7b89

Browse files
committed
fixup! chore(etl): separate past event extraction into different class
1 parent 3c874a1 commit c4d7b89

23 files changed

+219
-98
lines changed

alert_system/etl/Gdacs_flood/config.py

Lines changed: 0 additions & 11 deletions
This file was deleted.

alert_system/etl/Usgs_earthquake/config.py

Lines changed: 0 additions & 10 deletions
This file was deleted.

alert_system/etl/base/config.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
from typing import TypedDict, Dict
2+
3+
class ExtractionConfig(TypedDict):
4+
event_collection_type: str
5+
hazard_collection_type: str | None
6+
impact_collection_type: str | None
7+
8+
filter_event: Dict | None
9+
filter_hazard: Dict | None
10+
filter_impact: Dict | None
11+
12+
people_exposed_threshold : int

alert_system/etl/base/extraction.py

Lines changed: 43 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,16 @@
11
import logging
22
from abc import ABC
3-
from datetime import datetime, timedelta, timezone
4-
from typing import Dict, Generator, List, Optional
3+
from datetime import timedelta
4+
from typing import Dict, Generator, List, Optional, Type
55

66
import httpx
77
from django.db import transaction
8+
from django.utils import timezone
89

910
from alert_system.helpers import build_stac_search
1011
from alert_system.models import Connector, ExtractionItem, LoadItem
1112

13+
from .config import ExtractionConfig
1214
from .loader import BaseLoaderClass
1315
from .transform import BaseTransformerClass
1416

@@ -29,19 +31,28 @@ class BaseExtractionClass(ABC):
2931
"""
3032

3133
event_collection_type: str
32-
hazard_collection_type: str | None
33-
impact_collection_type: str | None
34+
transformer_class: Type[BaseTransformerClass]
35+
loader_class: Type[BaseLoaderClass]
36+
37+
hazard_collection_type: Optional[str] = None
38+
impact_collection_type: Optional[str] = None
39+
3440
filter_event: Optional[Dict] = None
3541
filter_hazard: Optional[Dict] = None
3642
filter_impact: Optional[Dict] = None
37-
transformer_class: type[BaseTransformerClass]
38-
loader_class: type[BaseLoaderClass]
43+
44+
config: ExtractionConfig
3945

4046
def __init__(self, connector: Connector):
4147
self.connector = connector
4248
self.base_url = connector.source_url.rstrip("/")
49+
self.load_config()
4350
self._validate_required_attributes()
4451

52+
def load_config(self):
53+
for key, value in self.config.items():
54+
setattr(self, key, value)
55+
4556
def _validate_required_attributes(self):
4657
missing_attr = []
4758
if not getattr(self, "event_collection_type", None):
@@ -96,10 +107,10 @@ def get_datetime_filter(self) -> str:
96107
ISO 8601 datetime range string
97108
"""
98109

99-
now = datetime.now(timezone.utc)
110+
now = timezone.now()
100111
last_run = self.connector.last_success_run
101112

102-
start_time = last_run if last_run else (now - timedelta(days=20)) # NOTE: Arbitrary value for failure case.
113+
start_time = last_run if last_run else (now - timedelta(days=30)) # NOTE: Arbitrary value for failure case.
103114
return f"{start_time.isoformat()}/{now.isoformat()}"
104115

105116
def _save_stac_item(self, stac_id: str, defaults: Dict) -> Optional[ExtractionItem]:
@@ -200,7 +211,7 @@ def process_event_items(self, extraction_run_id: str, correlation_id: str | None
200211
),
201212
)
202213
except Exception as e:
203-
logger.error(f"Failed to fetch events: {e}")
214+
logger.warning(f"Failed to fetch events: {e}")
204215
raise
205216

206217
for feature in event_items:
@@ -233,23 +244,20 @@ def process_event_items(self, extraction_run_id: str, correlation_id: str | None
233244
logger.info(f"Successfully processed event {event_id}")
234245

235246
except Exception as e:
236-
logger.error(f"Failed to process event {event_id}: {e}", exc_info=True)
247+
logger.warning(f"Failed to process event {event_id}: {e}", exc_info=True)
237248
raise
238249

239250
def run(self, extraction_run_id: str, correlation_id: str | None = None, is_past_event: bool = False) -> None:
240251
"""Main entry point for running the connector."""
241-
logger.info(f"Starting connector run for {self.connector}")
242-
243252
try:
244253
self.process_event_items(extraction_run_id, correlation_id, is_past_event)
245-
logger.info("Connector run completed successfully")
246254
except Exception as e:
247-
logger.error(f"Connector run failed: {e}", exc_info=True)
255+
logger.warning(f"Connector run failed: {e}", exc_info=True)
248256
raise
249257

250258

251259
class PastEventExtractionClass:
252-
LOOKBACK_WEEKS = 16
260+
LOOKBACK_WEEKS = 520
253261

254262
def __init__(self, extractor: BaseExtractionClass):
255263
self.extractor = extractor
@@ -268,6 +276,13 @@ def _impact_filter(self, impact_metadata: list[dict]) -> str:
268276

269277
return " OR ".join(f"({filter})" for filter in filters)
270278

279+
def _country_filter(self, country_codes) -> list[str]:
280+
filters = []
281+
if country_codes:
282+
country_cql = " OR ".join(f"a_contains(monty:country_codes, '{code}')" for code in country_codes)
283+
filters.append(f"({country_cql})")
284+
return filters
285+
271286
def _hazard_filter(self, unit: str, value: int) -> str:
272287
return f"monty:hazard_detail.severity_unit = '{unit}' AND " f"monty:hazard_detail.severity_value >= {value}"
273288

@@ -280,21 +295,31 @@ def _collect_corr_ids(self, features, exclude: str) -> set[str]:
280295
return corr_ids
281296

282297
def find_related_corr_ids(self, load_obj: LoadItem) -> set[str]:
283-
start = datetime.now(timezone.utc) - timedelta(weeks=self.LOOKBACK_WEEKS)
284-
end = datetime.now(timezone.utc)
298+
start = timezone.now() - timedelta(weeks=self.LOOKBACK_WEEKS)
299+
end = timezone.now()
285300

286301
corr_ids = set()
287302

288303
if self.extractor.impact_collection_type:
289304
impact_filter = self._impact_filter(load_obj.impact_metadata)
305+
country_filters = self._country_filter(load_obj.country_codes)
306+
307+
additional_filters = []
308+
309+
if impact_filter:
310+
additional_filters.append(impact_filter)
311+
312+
additional_filters.extend(country_filters)
313+
290314
features = self.extractor.fetch_stac_data(
291315
self.base_url,
292316
build_stac_search(
293317
collections=self.extractor.impact_collection_type,
294-
additional_filters=[impact_filter] if impact_filter else [],
318+
additional_filters=additional_filters,
295319
datetime_range=f"{start.isoformat()}/{end.isoformat()}",
296320
),
297321
)
322+
298323
corr_ids |= self._collect_corr_ids(features, load_obj.correlation_id)
299324

300325
# NOTE: Returns too many correlation_ids.

alert_system/etl/base/loader.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,13 +41,15 @@ def load(self, transformed_data: Dict, connector: Connector, run_id: str, is_pas
4141
"total_people_exposed": transformed_data.get("people_exposed"),
4242
"total_buildings_exposed": transformed_data.get("buildings_exposed"),
4343
"impact_metadata": transformed_data.get("impact_metadata"),
44+
"start_datetime": transformed_data.get("start_datetime"),
45+
"end_datetime": transformed_data.get("end_datetime"),
4446
"item_eligible": is_item_eligible,
4547
"is_past_event": is_past_event,
4648
"extraction_run_id": run_id,
4749
},
4850
)
4951

5052
action = "Created" if created else "Updated"
51-
logger.info(f"{action} Event for correlation_id={correlation_id}")
53+
logger.info(f"{action} Event for {correlation_id=}")
5254

5355
return load_obj

alert_system/etl/base/transform.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import logging
22
from abc import ABC, abstractmethod
3+
from datetime import datetime
34
from typing import List, Optional, TypedDict
45

56
from alert_system.models import ExtractionItem
@@ -23,6 +24,8 @@ class EventType(TypedDict):
2324
title: str
2425
description: str
2526
country: str
27+
start_datetime: datetime
28+
end_datetime: datetime
2629

2730
def __init__(
2831
self, event_obj: ExtractionItem, hazard_obj: Optional[ExtractionItem] = None, impact_obj: List[ExtractionItem] = []
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
# NOTE: Store Config files here. Might need to refactor if source supports filtering with hazards.
2+
from alert_system.etl.base.config import ExtractionConfig
3+
4+
gdacs_flood_config: ExtractionConfig = {
5+
"event_collection_type": "gdacs-events",
6+
"hazard_collection_type": "gdacs-hazards",
7+
"impact_collection_type": "gdacs-impacts",
8+
"filter_event": {
9+
"hazard_codes": ["FL", "MH0600", "nat-hyd-flo-flo"]
10+
},
11+
"filter_hazard": None,
12+
"filter_impact": None,
13+
"people_exposed_threshold": 500,
14+
}

alert_system/etl/Gdacs_flood/extraction.py renamed to alert_system/etl/gdacs_flood/extraction.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,6 @@
1010

1111

1212
class GdacsFloodExtraction(BaseExtractionClass):
13-
event_collection_type = gdacs_flood_config.event_collection_type
14-
hazard_collection_type = getattr(gdacs_flood_config, "hazard_collection_type", None)
15-
impact_collection_type = getattr(gdacs_flood_config, "impact_collection_type", None)
16-
filter_event = getattr(gdacs_flood_config, "filter_event", None)
13+
config = gdacs_flood_config
1714
transformer_class = GdacsTransformer
1815
loader_class = GdacsLoader
Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,9 @@
44

55

66
class GdacsLoader(BaseLoaderClass):
7-
people_exposed_threshold = gdacs_flood_config.people_exposed_threshold
87

98
# NOTE: Add additional changes to the filter here. This is example only.
109
def filter_eligible_items(self, load_obj):
11-
if load_obj.get("people_exposed") > GdacsLoader.people_exposed_threshold:
10+
if load_obj.get("people_exposed") > gdacs_flood_config["people_exposed_threshold"]:
1211
return True
1312
return False
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,4 +76,6 @@ def process_event(self, event_item) -> BaseTransformerClass.EventType:
7676
"title": properties.get("title", ""),
7777
"description": properties.get("description", ""),
7878
"country": properties.get("monty:country_codes", ""),
79+
"start_datetime": properties.get("start_datetime"),
80+
"end_datetime": properties.get("end_datetime"),
7981
}

0 commit comments

Comments
 (0)