Skip to content

Commit 3c874a1

Browse files
committed
chore(etl): separate past event extraction into different class
1 parent 0ba1868 commit 3c874a1

File tree

6 files changed

+98
-68
lines changed

6 files changed

+98
-68
lines changed

alert_system/etl/Gdacs_flood/config.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ def __init__(self):
44
self.event_collection_type = "gdacs-events"
55
self.hazard_collection_type = "gdacs-hazards"
66
self.impact_collection_type = "gdacs-impacts"
7-
self.people_exposed_threshold = 5
7+
self.people_exposed_threshold = 500
88
self.filter_event = {"hazard_codes": ["FL", "MH0600", "nat-hyd-flo-flo"]}
99

1010

alert_system/etl/Gdacs_flood/transform.py

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,25 +31,22 @@ def compute_buildings_exposed(self, metadata_list) -> int:
3131
# NOTE: This logic will change with changes in montandon.
3232
def process_impact(self, impact_items) -> BaseTransformerClass.ImpactType:
3333
metadata = []
34-
largest_values_metadata = {}
3534
for item in impact_items:
3635
properties = item.resp_data.get("properties", {})
3736
impact_detail = properties.get("monty:impact_detail", {})
3837
category = impact_detail.get("category")
3938
type_ = impact_detail.get("type")
4039
value = impact_detail.get("value")
41-
if category and type_:
42-
key = (category, type_)
43-
44-
if key not in largest_values_metadata or value > largest_values_metadata[key]["value"]:
45-
largest_values_metadata[key] = {
40+
if category == "people" and type_ == "affected_total":
41+
metadata = [
42+
{
4643
"category": category,
4744
"type": type_,
4845
"value": value,
4946
"unit": impact_detail.get("unit", ""),
5047
"estimate_type": impact_detail.get("estimate_type", ""),
5148
}
52-
metadata.extend(largest_values_metadata.values())
49+
]
5350
return {
5451
"people_exposed": self.compute_people_exposed(metadata),
5552
"buildings_exposed": self.compute_buildings_exposed(metadata),

alert_system/etl/Usgs_earthquake/transform.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ def compute_buildings_exposed(self, metadata_list) -> int:
2828
return data["value"]
2929
return 0
3030

31+
# NOTE: To be changed.
3132
def process_impact(self, impact_items) -> BaseTransformerClass.ImpactType:
3233
metadata = []
3334
values_metadata = {}

alert_system/etl/base/extraction.py

Lines changed: 86 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ def get_datetime_filter(self) -> str:
9999
now = datetime.now(timezone.utc)
100100
last_run = self.connector.last_success_run
101101

102-
start_time = last_run if last_run else (now - timedelta(days=10)) # NOTE: Arbitrary value for failure case.
102+
start_time = last_run if last_run else (now - timedelta(days=20)) # NOTE: Arbitrary value for failure case.
103103
return f"{start_time.isoformat()}/{now.isoformat()}"
104104

105105
def _save_stac_item(self, stac_id: str, defaults: Dict) -> Optional[ExtractionItem]:
@@ -236,74 +236,110 @@ def process_event_items(self, extraction_run_id: str, correlation_id: str | None
236236
logger.error(f"Failed to process event {event_id}: {e}", exc_info=True)
237237
raise
238238

239-
def _construct_filter_for_past_events(self, impact_metadata: list[dict]) -> str:
240-
filters = []
239+
def run(self, extraction_run_id: str, correlation_id: str | None = None, is_past_event: bool = False) -> None:
240+
"""Main entry point for running the connector."""
241+
logger.info(f"Starting connector run for {self.connector}")
242+
243+
try:
244+
self.process_event_items(extraction_run_id, correlation_id, is_past_event)
245+
logger.info("Connector run completed successfully")
246+
except Exception as e:
247+
logger.error(f"Connector run failed: {e}", exc_info=True)
248+
raise
241249

242-
for detail in impact_metadata:
243-
category = detail.get("category")
244-
type_ = detail.get("type")
245-
value = detail.get("value")
246250

247-
if category and type_ and value is not None:
251+
class PastEventExtractionClass:
252+
LOOKBACK_WEEKS = 16
253+
254+
def __init__(self, extractor: BaseExtractionClass):
255+
self.extractor = extractor
256+
self.base_url = extractor.base_url
257+
258+
def _impact_filter(self, impact_metadata: list[dict]) -> str:
259+
filters = []
260+
261+
for data in impact_metadata or []:
262+
if data.get("category") and data.get("type") and data.get("value") is not None:
248263
filters.append(
249-
f"monty:impact_detail.category = '{category}' AND "
250-
f"monty:impact_detail.type = '{type_}' AND "
251-
f"monty:impact_detail.value >= {value}"
264+
f"monty:impact_detail.category = '{data['category']}' AND "
265+
f"monty:impact_detail.type = '{data['type']}' AND "
266+
f"monty:impact_detail.value >= {data['value']}"
252267
)
253268

254-
return " OR ".join(f"({f})" for f in filters) if filters else ""
269+
return " OR ".join(f"({filter})" for filter in filters)
255270

256-
def fetch_past_events(self, load_obj):
257-
if not self.impact_collection_type:
258-
logger.warning(f"Impact does not exist for event {load_obj}")
259-
return
260-
start_time = datetime.now(timezone.utc) - timedelta(weeks=16) # NOTE: Arbitrary value for lookback.
261-
filters = [self._construct_filter_for_past_events(load_obj.impact_metadata)]
262-
impact_data = self.fetch_stac_data(
263-
self.base_url,
264-
build_stac_search(
265-
collections=self.impact_collection_type,
266-
additional_filters=filters,
267-
datetime_range=f"{start_time.isoformat()}/{datetime.now(timezone.utc).isoformat()}",
268-
),
269-
)
270-
load_obj_corr_id = load_obj.correlation_id
271-
related_ids = []
272-
logger.info(f"Fetching past event for event={load_obj.id}")
271+
def _hazard_filter(self, unit: str, value: int) -> str:
272+
return f"monty:hazard_detail.severity_unit = '{unit}' AND " f"monty:hazard_detail.severity_value >= {value}"
273+
274+
def _collect_corr_ids(self, features, exclude: str) -> set[str]:
273275
corr_ids = set()
274-
for feature in impact_data:
275-
corr_id = self._get_correlation_id(feature)
276-
if corr_id and corr_id != load_obj_corr_id:
276+
for feature in features or []:
277+
corr_id = self.extractor._get_correlation_id(feature)
278+
if corr_id and corr_id != exclude:
277279
corr_ids.add(corr_id)
280+
return corr_ids
281+
282+
def find_related_corr_ids(self, load_obj: LoadItem) -> set[str]:
283+
start = datetime.now(timezone.utc) - timedelta(weeks=self.LOOKBACK_WEEKS)
284+
end = datetime.now(timezone.utc)
285+
286+
corr_ids = set()
287+
288+
if self.extractor.impact_collection_type:
289+
impact_filter = self._impact_filter(load_obj.impact_metadata)
290+
features = self.extractor.fetch_stac_data(
291+
self.base_url,
292+
build_stac_search(
293+
collections=self.extractor.impact_collection_type,
294+
additional_filters=[impact_filter] if impact_filter else [],
295+
datetime_range=f"{start.isoformat()}/{end.isoformat()}",
296+
),
297+
)
298+
corr_ids |= self._collect_corr_ids(features, load_obj.correlation_id)
299+
300+
# NOTE: Returns too many correlation_ids.
301+
# if self.extractor.hazard_collection_type:
302+
# hazard_filter = self._hazard_filter(
303+
# load_obj.severity_unit,
304+
# load_obj.severity_value,
305+
# )
306+
# features = self.extractor.fetch_stac_data(
307+
# self.base_url,
308+
# build_stac_search(
309+
# collections=self.extractor.hazard_collection_type,
310+
# additional_filters=[hazard_filter],
311+
# datetime_range=f"{start.isoformat()}/{end.isoformat()}",
312+
# ),
313+
# )
314+
# corr_ids |= self._collect_corr_ids(features, load_obj.correlation_id)
315+
316+
return corr_ids
317+
318+
def extract_past_events(self, load_obj: LoadItem) -> None:
319+
corr_ids = self.find_related_corr_ids(load_obj)
278320

279321
if not corr_ids:
280322
return
281323

282324
existing_items = LoadItem.objects.filter(correlation_id__in=corr_ids)
283-
existing_map = {item.correlation_id: item for item in existing_items}
325+
existing_map = {i.correlation_id: i for i in existing_items}
326+
327+
related_ids = []
284328

285329
for corr_id in corr_ids:
286330
item = existing_map.get(corr_id)
331+
332+
if not item:
333+
self.extractor.run(
334+
extraction_run_id=load_obj.extraction_run_id,
335+
correlation_id=corr_id,
336+
is_past_event=True,
337+
)
338+
item = LoadItem.objects.filter(correlation_id=corr_id).first()
339+
287340
if item:
288341
related_ids.append(item.id)
289342
item.related_montandon_events.add(load_obj.id)
290-
else:
291-
self.run(extraction_run_id=load_obj.extraction_run_id, correlation_id=corr_id, is_past_event=True)
292-
new_item = LoadItem.objects.filter(correlation_id=corr_id).first()
293-
if new_item:
294-
related_ids.append(new_item.id)
295-
new_item.related_montandon_events.add(load_obj.id)
296343

297344
if related_ids:
298345
load_obj.related_montandon_events.set(related_ids)
299-
300-
def run(self, extraction_run_id: str, correlation_id: str | None = None, is_past_event: bool = False) -> None:
301-
"""Main entry point for running the connector."""
302-
logger.info(f"Starting connector run for {self.connector}")
303-
304-
try:
305-
self.process_event_items(extraction_run_id, correlation_id, is_past_event)
306-
logger.info("Connector run completed successfully")
307-
except Exception as e:
308-
logger.error(f"Connector run failed: {e}", exc_info=True)
309-
raise

alert_system/mappings.py

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,18 @@
11
from dataclasses import dataclass
22

33
from alert_system.etl.base.extraction import BaseExtractionClass
4-
from alert_system.etl.base.loader import BaseLoaderClass
5-
from alert_system.etl.base.transform import BaseTransformerClass
64
from alert_system.etl.Gdacs_flood.extraction import GdacsFloodExtraction
7-
from alert_system.etl.Gdacs_flood.loader import GdacsLoader
8-
from alert_system.etl.Gdacs_flood.transform import GdacsTransformer
5+
from alert_system.etl.Usgs_earthquake.extraction import USGSEarthquakeExtraction
96
from alert_system.models import Connector
107

118

129
# NOTE: Store all the mappings here.
1310
@dataclass
1411
class ConnectorClasses:
1512
extractor: type[BaseExtractionClass]
16-
transfomer: type[BaseTransformerClass]
17-
loader: type[BaseLoaderClass]
1813

1914

2015
CONNECTOR_REGISTRY = {
21-
Connector.ConnectorType.GDACS_FLOOD: ConnectorClasses(
22-
extractor=GdacsFloodExtraction, transfomer=GdacsTransformer, loader=GdacsLoader
23-
),
16+
Connector.ConnectorType.GDACS_FLOOD: ConnectorClasses(extractor=GdacsFloodExtraction),
17+
Connector.ConnectorType.USGS_EARTHQUAKE: ConnectorClasses(extractor=USGSEarthquakeExtraction),
2418
}

alert_system/tasks.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from celery.exceptions import MaxRetriesExceededError
66
from django.db import transaction
77

8+
from alert_system.etl.base.extraction import PastEventExtractionClass
89
from api.models import Event
910

1011
from .helpers import get_connector_processor, set_connector_status
@@ -75,6 +76,7 @@ def fetch_past_events_from_monty(self, extraction_run_id):
7576
connector_id = first_item.connector.id
7677

7778
processor, _ = get_connector_processor(connector_id)
79+
past_event_extraction_service = PastEventExtractionClass(processor)
7880

7981
# Process each eligible item
8082
processed = 0
@@ -83,7 +85,7 @@ def fetch_past_events_from_monty(self, extraction_run_id):
8385
for load_obj in eligible_items.iterator():
8486
try:
8587
with transaction.atomic():
86-
processor.fetch_past_events(load_obj)
88+
past_event_extraction_service.extract_past_events(load_obj=load_obj)
8789
processed += 1
8890
except Exception as e:
8991
failed += 1

0 commit comments

Comments
 (0)