Skip to content

Commit f050eb7

Browse files
Ken LippoldKen Lippold
authored andcommitted
Added ETL local exec option
1 parent d125ee4 commit f050eb7

File tree

3 files changed

+98
-11
lines changed

3 files changed

+98
-11
lines changed

src/hydroserverpy/api/models/etl/task.py

Lines changed: 93 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,14 @@
11
from __future__ import annotations
22
from functools import cached_property
33
import uuid
4+
import logging
5+
import croniter
6+
import pandas as pd
47
from typing import ClassVar, TYPE_CHECKING, List, Optional, Literal, Union
5-
from datetime import datetime
6-
from pydantic import Field, AliasPath, AliasChoices
8+
from datetime import datetime, timedelta, timezone
9+
from pydantic import Field, AliasPath, AliasChoices, TypeAdapter
10+
from hydroserverpy.etl.factories import extractor_factory, transformer_factory, loader_factory
11+
from hydroserverpy.etl.etl_configuration import ExtractorConfig, TransformerConfig, LoaderConfig, SourceTargetMapping, MappingPath
712
from ..base import HydroServerBaseModel
813
from .orchestration_system import OrchestrationSystem
914
from .data_connection import DataConnection
@@ -153,6 +158,91 @@ def delete_task_run(
153158
return self.client.tasks.delete_task_run(uid=self.uid, task_run_id=uid)
154159

155160
def run(self):
156-
"""Run this task."""
161+
"""Trigger HydroServer to run this task."""
157162

158163
return self.client.tasks.run(uid=self.uid)
164+
165+
def run_local(self):
166+
"""Run this task locally."""
167+
168+
if self.paused is True:
169+
return
170+
171+
extractor_cls = extractor_factory(TypeAdapter(ExtractorConfig).validate_python({
172+
"type": self.data_connection.extractor_type,
173+
**self.data_connection.extractor_settings
174+
}))
175+
transformer_cls = transformer_factory(TypeAdapter(TransformerConfig).validate_python({
176+
"type": self.data_connection.transformer_type,
177+
**self.data_connection.transformer_settings
178+
}))
179+
loader_cls = loader_factory(TypeAdapter(LoaderConfig).validate_python({
180+
"type": self.data_connection.loader_type,
181+
**self.data_connection.loader_settings
182+
}), self.client, str(self.uid))
183+
184+
task_run = self.create_task_run(status="RUNNING", started_at=datetime.now(timezone.utc))
185+
186+
try:
187+
logging.info("Starting extract")
188+
189+
task_mappings = [
190+
SourceTargetMapping(
191+
source_identifier=task_mapping["sourceIdentifier"],
192+
paths=[
193+
MappingPath(
194+
target_identifier=task_mapping_path["targetIdentifier"],
195+
data_transformations=task_mapping_path["dataTransformations"],
196+
) for task_mapping_path in task_mapping["paths"]
197+
]
198+
) for task_mapping in self.mappings
199+
]
200+
201+
data = extractor_cls.extract(self, loader_cls)
202+
if self.is_empty(data):
203+
self._update_status(
204+
loader_cls, True, "No data returned from the extractor"
205+
)
206+
return
207+
208+
logging.info("Starting transform")
209+
data = transformer_cls.transform(data, task_mappings)
210+
if self.is_empty(data):
211+
self._update_status(
212+
loader_cls, True, "No data returned from the transformer"
213+
)
214+
return
215+
216+
logging.info("Starting load")
217+
loader_cls.load(data, self)
218+
self._update_status(task_run, True, "OK")
219+
except Exception as e:
220+
self._update_status(task_run, False, str(e))
221+
222+
@staticmethod
223+
def is_empty(data):
224+
if data is None:
225+
return True
226+
227+
if isinstance(data, pd.DataFrame) and data.empty:
228+
return True
229+
230+
return False
231+
232+
def _update_status(self, task_run: TaskRun, success: bool, msg: str):
233+
self.update_task_run(
234+
task_run.id,
235+
status="SUCCESS" if success else "FAILURE",
236+
result={"message": msg}
237+
)
238+
self.next_run_at = self._next_run()
239+
self.save()
240+
241+
def _next_run(self) -> Optional[str]:
242+
now = datetime.now(timezone.utc)
243+
if cron := self.crontab:
244+
return croniter.croniter(cron, now).get_next(datetime).isoformat()
245+
if iv := self.interval:
246+
unit = self.interval_period or "minutes"
247+
return (now + timedelta(**{unit: iv})).isoformat()
248+
return None

src/hydroserverpy/api/services/etl/task.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,9 @@ def list(
5151
page_size=page_size,
5252
order_by=order_by,
5353
workspace_id=normalize_uuid(workspace),
54-
orchestration_system=normalize_uuid(orchestration_system),
54+
orchestration_system_id=normalize_uuid(orchestration_system),
5555
orchestration_system_type=orchestration_system_type,
56-
data_connection=normalize_uuid(data_connection),
56+
data_connection_id=normalize_uuid(data_connection),
5757
data_connection_type=data_connection_type,
5858
extractor_type=extractor_type,
5959
transformer_type=transformer_type,

src/hydroserverpy/etl/loaders/hydroserver_loader.py

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -75,13 +75,10 @@ def _fetch_earliest_begin(
7575
) -> pd.Timestamp:
7676
logging.info("Querying HydroServer for earliest begin date for task...")
7777
timestamps = []
78-
datastreams = self.client.datastreams.list(
79-
data_source=self.data_source_id
80-
).items
81-
ds_by_uid = {str(ds.uid): ds for ds in datastreams}
8278
for m in mappings:
83-
for p in m.paths:
84-
datastream = ds_by_uid[str(p.target_identifier)]
79+
for p in m["paths"] if isinstance(m, dict) else m.paths:
80+
datastream_id = p["targetIdentifier"] if isinstance(p, dict) else p.target_identifier
81+
datastream = self.client.datastreams.get(datastream_id)
8582
raw = datastream.phenomenon_end_time or "1970-01-01"
8683
ts = pd.to_datetime(raw, utc=True)
8784
timestamps.append(ts)

0 commit comments

Comments
 (0)