Skip to content

Commit fcc9bdd

Browse files
committed
chore(etl): separate past event extraction into different class
1 parent dcae4c6 commit fcc9bdd

23 files changed

+302
-154
lines changed

alert_system/etl/Gdacs_flood/config.py

Lines changed: 0 additions & 11 deletions
This file was deleted.

alert_system/etl/Usgs_earthquake/config.py

Lines changed: 0 additions & 10 deletions
This file was deleted.

alert_system/etl/base/config.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
from typing import Dict, TypedDict
2+
3+
4+
class ExtractionConfig(TypedDict):
5+
event_collection_type: str
6+
hazard_collection_type: str | None
7+
impact_collection_type: str | None
8+
9+
filter_event: Dict | None
10+
filter_hazard: Dict | None
11+
filter_impact: Dict | None
12+
13+
people_exposed_threshold: int

alert_system/etl/base/extraction.py

Lines changed: 120 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,16 @@
11
import logging
22
from abc import ABC
3-
from datetime import datetime, timedelta, timezone
4-
from typing import Dict, Generator, List, Optional
3+
from datetime import timedelta
4+
from typing import Dict, Generator, List, Optional, Type
55

66
import httpx
77
from django.db import transaction
8+
from django.utils import timezone
89

910
from alert_system.helpers import build_stac_search
1011
from alert_system.models import Connector, ExtractionItem, LoadItem
1112

13+
from .config import ExtractionConfig
1214
from .loader import BaseLoaderClass
1315
from .transform import BaseTransformerClass
1416

@@ -29,19 +31,28 @@ class BaseExtractionClass(ABC):
2931
"""
3032

3133
event_collection_type: str
32-
hazard_collection_type: str | None
33-
impact_collection_type: str | None
34+
transformer_class: Type[BaseTransformerClass]
35+
loader_class: Type[BaseLoaderClass]
36+
37+
hazard_collection_type: Optional[str] = None
38+
impact_collection_type: Optional[str] = None
39+
3440
filter_event: Optional[Dict] = None
3541
filter_hazard: Optional[Dict] = None
3642
filter_impact: Optional[Dict] = None
37-
transformer_class: type[BaseTransformerClass]
38-
loader_class: type[BaseLoaderClass]
43+
44+
config: ExtractionConfig
3945

4046
def __init__(self, connector: Connector):
4147
self.connector = connector
4248
self.base_url = connector.source_url.rstrip("/")
49+
self.load_config()
4350
self._validate_required_attributes()
4451

52+
def load_config(self):
53+
for key, value in self.config.items():
54+
setattr(self, key, value)
55+
4556
def _validate_required_attributes(self):
4657
missing_attr = []
4758
if not getattr(self, "event_collection_type", None):
@@ -96,10 +107,10 @@ def get_datetime_filter(self) -> str:
96107
ISO 8601 datetime range string
97108
"""
98109

99-
now = datetime.now(timezone.utc)
110+
now = timezone.now()
100111
last_run = self.connector.last_success_run
101112

102-
start_time = last_run if last_run else (now - timedelta(days=10)) # NOTE: Arbitrary value for failure case.
113+
start_time = last_run if last_run else (now - timedelta(days=30)) # NOTE: Arbitrary value for failure case.
103114
return f"{start_time.isoformat()}/{now.isoformat()}"
104115

105116
def _save_stac_item(self, stac_id: str, defaults: Dict) -> Optional[ExtractionItem]:
@@ -200,7 +211,7 @@ def process_event_items(self, extraction_run_id: str, correlation_id: str | None
200211
),
201212
)
202213
except Exception as e:
203-
logger.error(f"Failed to fetch events: {e}")
214+
logger.warning(f"Failed to fetch events: {e}")
204215
raise
205216

206217
for feature in event_items:
@@ -233,77 +244,127 @@ def process_event_items(self, extraction_run_id: str, correlation_id: str | None
233244
logger.info(f"Successfully processed event {event_id}")
234245

235246
except Exception as e:
236-
logger.error(f"Failed to process event {event_id}: {e}", exc_info=True)
247+
logger.warning(f"Failed to process event {event_id}: {e}", exc_info=True)
237248
raise
238249

239-
def _construct_filter_for_past_events(self, impact_metadata: list[dict]) -> str:
240-
filters = []
250+
def run(self, extraction_run_id: str, correlation_id: str | None = None, is_past_event: bool = False) -> None:
251+
"""Main entry point for running the connector."""
252+
try:
253+
self.process_event_items(extraction_run_id, correlation_id, is_past_event)
254+
except Exception as e:
255+
logger.warning(f"Connector run failed: {e}", exc_info=True)
256+
raise
257+
241258

242-
for detail in impact_metadata:
243-
category = detail.get("category")
244-
type_ = detail.get("type")
245-
value = detail.get("value")
259+
class PastEventExtractionClass:
260+
LOOKBACK_WEEKS = 520
246261

247-
if category and type_ and value is not None:
262+
def __init__(self, extractor: BaseExtractionClass):
263+
self.extractor = extractor
264+
self.base_url = extractor.base_url
265+
266+
def _impact_filter(self, impact_metadata: list[dict]) -> str:
267+
filters = []
268+
269+
for data in impact_metadata or []:
270+
if data.get("category") and data.get("type") and data.get("value") is not None:
248271
filters.append(
249-
f"monty:impact_detail.category = '{category}' AND "
250-
f"monty:impact_detail.type = '{type_}' AND "
251-
f"monty:impact_detail.value >= {value}"
272+
f"monty:impact_detail.category = '{data['category']}' AND "
273+
f"monty:impact_detail.type = '{data['type']}' AND "
274+
f"monty:impact_detail.value >= {data['value']}"
252275
)
253276

254-
return " OR ".join(f"({f})" for f in filters) if filters else ""
277+
return " OR ".join(f"({filter})" for filter in filters)
255278

256-
def fetch_past_events(self, load_obj):
257-
if not self.impact_collection_type:
258-
logger.warning(f"Impact does not exist for event {load_obj}")
259-
return
260-
start_time = datetime.now(timezone.utc) - timedelta(weeks=16) # NOTE: Arbitrary value for lookback.
261-
filters = [self._construct_filter_for_past_events(load_obj.impact_metadata)]
262-
impact_data = self.fetch_stac_data(
263-
self.base_url,
264-
build_stac_search(
265-
collections=self.impact_collection_type,
266-
additional_filters=filters,
267-
datetime_range=f"{start_time.isoformat()}/{datetime.now(timezone.utc).isoformat()}",
268-
),
269-
)
270-
load_obj_corr_id = load_obj.correlation_id
271-
related_ids = []
272-
logger.info(f"Fetching past event for event={load_obj.id}")
279+
def _country_filter(self, country_codes) -> list[str]:
280+
filters = []
281+
if country_codes:
282+
country_cql = " OR ".join(f"a_contains(monty:country_codes, '{code}')" for code in country_codes)
283+
filters.append(f"({country_cql})")
284+
return filters
285+
286+
def _hazard_filter(self, unit: str, value: int) -> str:
287+
return f"monty:hazard_detail.severity_unit = '{unit}' AND " f"monty:hazard_detail.severity_value >= {value}"
288+
289+
def _collect_corr_ids(self, features, exclude: str) -> set[str]:
273290
corr_ids = set()
274-
for feature in impact_data:
275-
corr_id = self._get_correlation_id(feature)
276-
if corr_id and corr_id != load_obj_corr_id:
291+
for feature in features or []:
292+
corr_id = self.extractor._get_correlation_id(feature)
293+
if corr_id and corr_id != exclude:
277294
corr_ids.add(corr_id)
295+
return corr_ids
296+
297+
def find_related_corr_ids(self, load_obj: LoadItem) -> set[str]:
298+
start = timezone.now() - timedelta(weeks=self.LOOKBACK_WEEKS)
299+
end = timezone.now()
300+
301+
corr_ids = set()
302+
303+
if self.extractor.impact_collection_type:
304+
impact_filter = self._impact_filter(load_obj.impact_metadata)
305+
country_filters = self._country_filter(load_obj.country_codes)
306+
307+
additional_filters = []
308+
309+
if impact_filter:
310+
additional_filters.append(impact_filter)
311+
312+
additional_filters.extend(country_filters)
313+
314+
features = self.extractor.fetch_stac_data(
315+
self.base_url,
316+
build_stac_search(
317+
collections=self.extractor.impact_collection_type,
318+
additional_filters=additional_filters,
319+
datetime_range=f"{start.isoformat()}/{end.isoformat()}",
320+
),
321+
)
322+
323+
corr_ids |= self._collect_corr_ids(features, load_obj.correlation_id)
324+
325+
# NOTE: Returns too many correlation_ids.
326+
# if self.extractor.hazard_collection_type:
327+
# hazard_filter = self._hazard_filter(
328+
# load_obj.severity_unit,
329+
# load_obj.severity_value,
330+
# )
331+
# features = self.extractor.fetch_stac_data(
332+
# self.base_url,
333+
# build_stac_search(
334+
# collections=self.extractor.hazard_collection_type,
335+
# additional_filters=[hazard_filter],
336+
# datetime_range=f"{start.isoformat()}/{end.isoformat()}",
337+
# ),
338+
# )
339+
# corr_ids |= self._collect_corr_ids(features, load_obj.correlation_id)
340+
341+
return corr_ids
342+
343+
def extract_past_events(self, load_obj: LoadItem) -> None:
344+
corr_ids = self.find_related_corr_ids(load_obj)
278345

279346
if not corr_ids:
280347
return
281348

282349
existing_items = LoadItem.objects.filter(correlation_id__in=corr_ids)
283-
existing_map = {item.correlation_id: item for item in existing_items}
350+
existing_map = {i.correlation_id: i for i in existing_items}
351+
352+
related_ids = []
284353

285354
for corr_id in corr_ids:
286355
item = existing_map.get(corr_id)
356+
357+
if not item:
358+
self.extractor.run(
359+
extraction_run_id=load_obj.extraction_run_id,
360+
correlation_id=corr_id,
361+
is_past_event=True,
362+
)
363+
item = LoadItem.objects.filter(correlation_id=corr_id).first()
364+
287365
if item:
288366
related_ids.append(item.id)
289367
item.related_montandon_events.add(load_obj.id)
290-
else:
291-
self.run(extraction_run_id=load_obj.extraction_run_id, correlation_id=corr_id, is_past_event=True)
292-
new_item = LoadItem.objects.filter(correlation_id=corr_id).first()
293-
if new_item:
294-
related_ids.append(new_item.id)
295-
new_item.related_montandon_events.add(load_obj.id)
296368

297369
if related_ids:
298370
load_obj.related_montandon_events.set(related_ids)
299-
300-
def run(self, extraction_run_id: str, correlation_id: str | None = None, is_past_event: bool = False) -> None:
301-
"""Main entry point for running the connector."""
302-
logger.info(f"Starting connector run for {self.connector}")
303-
304-
try:
305-
self.process_event_items(extraction_run_id, correlation_id, is_past_event)
306-
logger.info("Connector run completed successfully")
307-
except Exception as e:
308-
logger.error(f"Connector run failed: {e}", exc_info=True)
309-
raise

alert_system/etl/base/loader.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,13 +41,15 @@ def load(self, transformed_data: Dict, connector: Connector, run_id: str, is_pas
4141
"total_people_exposed": transformed_data.get("people_exposed"),
4242
"total_buildings_exposed": transformed_data.get("buildings_exposed"),
4343
"impact_metadata": transformed_data.get("impact_metadata"),
44+
"start_datetime": transformed_data.get("start_datetime"),
45+
"end_datetime": transformed_data.get("end_datetime"),
4446
"item_eligible": is_item_eligible,
4547
"is_past_event": is_past_event,
4648
"extraction_run_id": run_id,
4749
},
4850
)
4951

5052
action = "Created" if created else "Updated"
51-
logger.info(f"{action} Event for correlation_id={correlation_id}")
53+
logger.info(f"{action} Event for {correlation_id=}")
5254

5355
return load_obj

alert_system/etl/base/transform.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import logging
22
from abc import ABC, abstractmethod
3+
from datetime import datetime
34
from typing import List, Optional, TypedDict
45

56
from alert_system.models import ExtractionItem
@@ -23,6 +24,8 @@ class EventType(TypedDict):
2324
title: str
2425
description: str
2526
country: str
27+
start_datetime: datetime
28+
end_datetime: datetime
2629

2730
def __init__(
2831
self, event_obj: ExtractionItem, hazard_obj: Optional[ExtractionItem] = None, impact_obj: List[ExtractionItem] = []
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
# NOTE: Store Config files here. Might need to refactor if source supports filtering with hazards.
2+
from alert_system.etl.base.config import ExtractionConfig
3+
4+
gdacs_flood_config: ExtractionConfig = {
5+
"event_collection_type": "gdacs-events",
6+
"hazard_collection_type": "gdacs-hazards",
7+
"impact_collection_type": "gdacs-impacts",
8+
"filter_event": {"hazard_codes": ["FL", "MH0600", "nat-hyd-flo-flo"]},
9+
"filter_hazard": None,
10+
"filter_impact": None,
11+
"people_exposed_threshold": 500,
12+
}

alert_system/etl/Gdacs_flood/extraction.py renamed to alert_system/etl/gdacs_flood/extraction.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,6 @@
1010

1111

1212
class GdacsFloodExtraction(BaseExtractionClass):
13-
event_collection_type = gdacs_flood_config.event_collection_type
14-
hazard_collection_type = getattr(gdacs_flood_config, "hazard_collection_type", None)
15-
impact_collection_type = getattr(gdacs_flood_config, "impact_collection_type", None)
16-
filter_event = getattr(gdacs_flood_config, "filter_event", None)
13+
config = gdacs_flood_config
1714
transformer_class = GdacsTransformer
1815
loader_class = GdacsLoader
Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,9 @@
44

55

66
class GdacsLoader(BaseLoaderClass):
7-
people_exposed_threshold = gdacs_flood_config.people_exposed_threshold
87

98
# NOTE: Add additional changes to the filter here. This is example only.
109
def filter_eligible_items(self, load_obj):
11-
if load_obj.get("people_exposed") > GdacsLoader.people_exposed_threshold:
10+
if load_obj.get("people_exposed") > gdacs_flood_config["people_exposed_threshold"]:
1211
return True
1312
return False
Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,25 +31,22 @@ def compute_buildings_exposed(self, metadata_list) -> int:
3131
# NOTE: This logic will change with changes in montandon.
3232
def process_impact(self, impact_items) -> BaseTransformerClass.ImpactType:
3333
metadata = []
34-
largest_values_metadata = {}
3534
for item in impact_items:
3635
properties = item.resp_data.get("properties", {})
3736
impact_detail = properties.get("monty:impact_detail", {})
3837
category = impact_detail.get("category")
3938
type_ = impact_detail.get("type")
4039
value = impact_detail.get("value")
41-
if category and type_:
42-
key = (category, type_)
43-
44-
if key not in largest_values_metadata or value > largest_values_metadata[key]["value"]:
45-
largest_values_metadata[key] = {
40+
if category == "people" and type_ == "affected_total":
41+
metadata = [
42+
{
4643
"category": category,
4744
"type": type_,
4845
"value": value,
4946
"unit": impact_detail.get("unit", ""),
5047
"estimate_type": impact_detail.get("estimate_type", ""),
5148
}
52-
metadata.extend(largest_values_metadata.values())
49+
]
5350
return {
5451
"people_exposed": self.compute_people_exposed(metadata),
5552
"buildings_exposed": self.compute_buildings_exposed(metadata),
@@ -79,4 +76,6 @@ def process_event(self, event_item) -> BaseTransformerClass.EventType:
7976
"title": properties.get("title", ""),
8077
"description": properties.get("description", ""),
8178
"country": properties.get("monty:country_codes", ""),
79+
"start_datetime": properties.get("start_datetime"),
80+
"end_datetime": properties.get("end_datetime"),
8281
}

0 commit comments

Comments
 (0)