Skip to content

Commit 3e94229

Browse files
kjlippoldKen Lippold
andauthored
Refactored etl module to improve error handling, and added testing. (#25)
* Refactored etl module to improve error handling, and added additional testing. * Add Python versions 3.12, 3.13, and 3.14 to CI * Updated data type check in base ETL transformer * 367 data aggregation (#26) * Added ETL data aggregation step * Updated ETL README to include aggregation step --------- Co-authored-by: Ken Lippold <klippold@Kens-MacBook-Pro-2.local> * Refactored ETL module to improve error handling, documentation, testing, and interfaces. * Changed transformer output DataFrame format to better support aggregation operations. * Bump version from 1.9.0b1 to 1.9.0b2 * Fixed unit test for Python 3.14 --------- Co-authored-by: Ken Lippold <klippold@Kens-MacBook-Pro-2.local>
1 parent de76bc3 commit 3e94229

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

55 files changed

+6035
-2191
lines changed

.github/workflows/tests.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ jobs:
1010

1111
strategy:
1212
matrix:
13-
python-version: ["3.9", "3.10", "3.11"]
13+
python-version: ["3.9", "3.10", "3.11", "3.12", "3.13", "3.14"]
1414

1515
steps:
1616
- name: Checkout Repo

setup.cfg

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[metadata]
22
name = hydroserverpy
3-
version = 1.9.0b1
3+
version = 1.9.0b3
44
description = A Python client for managing HydroServer data
55
long_description_content_type = text/markdown
66
long_description = file: README.md
Lines changed: 5 additions & 335 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,8 @@
1-
from __future__ import annotations
2-
from functools import cached_property
31
import uuid
4-
import logging
5-
import croniter
6-
import pandas as pd
2+
from functools import cached_property
73
from typing import ClassVar, TYPE_CHECKING, List, Optional, Literal, Union
8-
from datetime import datetime, timedelta, timezone
9-
from pydantic import Field, AliasPath, AliasChoices, TypeAdapter
10-
from hydroserverpy.etl.factories import extractor_factory, transformer_factory, loader_factory
11-
from hydroserverpy.etl.loaders.hydroserver_loader import LoadSummary
12-
from hydroserverpy.etl.etl_configuration import ExtractorConfig, TransformerConfig, LoaderConfig, SourceTargetMapping, MappingPath
13-
from hydroserverpy.etl.aggregation import (
14-
aggregate_daily_window,
15-
closed_window_end_utc,
16-
first_window_start_utc,
17-
iter_daily_windows_utc,
18-
next_window_start_utc,
19-
parse_aggregation_transformation,
20-
)
4+
from datetime import datetime
5+
from pydantic import Field, AliasPath, AliasChoices
216
from ..base import HydroServerBaseModel
227
from .orchestration_system import OrchestrationSystem
238
from .data_connection import DataConnection
@@ -72,15 +57,15 @@ class Task(HydroServerBaseModel):
7257
"mappings"
7358
}
7459

75-
def __init__(self, client: HydroServer, **data):
60+
def __init__(self, client: "HydroServer", **data):
7661
super().__init__(client=client, service=client.tasks, **data)
7762

7863
@classmethod
7964
def get_route(cls):
8065
return "etl-tasks"
8166

8267
@cached_property
83-
def workspace(self) -> Workspace:
68+
def workspace(self) -> "Workspace":
8469
return self.client.workspaces.get(uid=self.workspace_id)
8570

8671
@cached_property
@@ -174,318 +159,3 @@ def run(self):
174159
"""Trigger HydroServer to run this task."""
175160

176161
return self.client.tasks.run(uid=self.uid)
177-
178-
def run_local(self):
179-
"""Run this task locally."""
180-
181-
if self.paused is True:
182-
return
183-
184-
task_run = self.create_task_run(status="RUNNING", started_at=datetime.now(timezone.utc))
185-
runtime_source_uri: Optional[str] = None
186-
187-
try:
188-
if self.task_type == "Aggregation":
189-
summary = self._run_local_aggregation()
190-
if summary["rows_loaded"] == 0:
191-
self._update_status(
192-
task_run,
193-
True,
194-
"No new closed daily windows were available for aggregation.",
195-
)
196-
else:
197-
self._update_status(
198-
task_run,
199-
True,
200-
(
201-
f"Aggregated {summary['days_loaded']} day(s) and loaded "
202-
f"{summary['rows_loaded']} observation(s) across {summary['mappings_loaded']} mapping(s)."
203-
),
204-
)
205-
return
206-
207-
if not self.data_connection:
208-
raise ValueError("ETL tasks require a data connection.")
209-
210-
extractor_cls = extractor_factory(TypeAdapter(ExtractorConfig).validate_python({
211-
"type": self.data_connection.extractor_type,
212-
**self.data_connection.extractor_settings
213-
}))
214-
transformer_cls = transformer_factory(TypeAdapter(TransformerConfig).validate_python({
215-
"type": self.data_connection.transformer_type,
216-
**self.data_connection.transformer_settings
217-
}))
218-
loader_cls = loader_factory(TypeAdapter(LoaderConfig).validate_python({
219-
"type": self.data_connection.loader_type,
220-
**self.data_connection.loader_settings
221-
}), self.client, str(self.uid))
222-
223-
logging.info("Starting extract")
224-
225-
task_mappings = [
226-
SourceTargetMapping(
227-
source_identifier=task_mapping["sourceIdentifier"],
228-
paths=[
229-
MappingPath(
230-
target_identifier=task_mapping_path["targetIdentifier"],
231-
data_transformations=task_mapping_path["dataTransformations"],
232-
) for task_mapping_path in task_mapping["paths"]
233-
]
234-
) for task_mapping in self.mappings
235-
]
236-
237-
data = extractor_cls.extract(self, loader_cls)
238-
runtime_source_uri = getattr(extractor_cls, "runtime_source_uri", None)
239-
if self.is_empty(data):
240-
self._update_status(
241-
task_run,
242-
True,
243-
"No data returned from the extractor",
244-
runtime_source_uri=runtime_source_uri,
245-
)
246-
return
247-
248-
logging.info("Starting transform")
249-
data = transformer_cls.transform(data, task_mappings)
250-
if self.is_empty(data):
251-
self._update_status(
252-
task_run,
253-
True,
254-
"No data returned from the transformer",
255-
runtime_source_uri=runtime_source_uri,
256-
)
257-
return
258-
259-
logging.info("Starting load")
260-
load_summary = loader_cls.load(data, self)
261-
self._update_status(
262-
task_run,
263-
True,
264-
self._success_message(load_summary),
265-
runtime_source_uri=runtime_source_uri,
266-
)
267-
except Exception as e:
268-
self._update_status(
269-
task_run, False, str(e), runtime_source_uri=runtime_source_uri
270-
)
271-
272-
@staticmethod
273-
def is_empty(data):
274-
if data is None:
275-
return True
276-
277-
if isinstance(data, pd.DataFrame) and data.empty:
278-
return True
279-
280-
return False
281-
282-
def _fetch_observation_points(
283-
self,
284-
source_datastream_id: str,
285-
query_start_utc: datetime,
286-
query_end_utc: datetime,
287-
) -> tuple[list[datetime], list[float]]:
288-
observations = self.client.datastreams.get_observations(
289-
uid=source_datastream_id,
290-
order_by=["phenomenonTime"],
291-
phenomenon_time_min=query_start_utc,
292-
phenomenon_time_max=query_end_utc,
293-
fetch_all=True,
294-
).dataframe
295-
296-
prev_df = self.client.datastreams.get_observations(
297-
uid=source_datastream_id,
298-
order_by=["-phenomenonTime"],
299-
page=1,
300-
page_size=1,
301-
phenomenon_time_max=query_start_utc,
302-
fetch_all=False,
303-
).dataframe
304-
next_df = self.client.datastreams.get_observations(
305-
uid=source_datastream_id,
306-
order_by=["phenomenonTime"],
307-
page=1,
308-
page_size=1,
309-
phenomenon_time_min=query_end_utc,
310-
fetch_all=False,
311-
).dataframe
312-
313-
frames = [df for df in [prev_df, observations, next_df] if not df.empty]
314-
if not frames:
315-
return [], []
316-
317-
merged = pd.concat(frames, ignore_index=True)
318-
merged = merged[["phenomenon_time", "result"]].dropna(subset=["phenomenon_time", "result"])
319-
merged["phenomenon_time"] = pd.to_datetime(
320-
merged["phenomenon_time"], utc=True, errors="coerce"
321-
)
322-
merged["result"] = pd.to_numeric(merged["result"], errors="coerce")
323-
merged = merged.dropna(subset=["phenomenon_time", "result"])
324-
merged = merged.sort_values("phenomenon_time")
325-
merged = merged.drop_duplicates(subset=["phenomenon_time"], keep="last")
326-
327-
timestamps = [
328-
ts.to_pydatetime() for ts in merged["phenomenon_time"].tolist()
329-
]
330-
values = merged["result"].astype(float).tolist()
331-
return timestamps, values
332-
333-
def _run_local_aggregation(self) -> dict[str, int]:
334-
mappings = self.mappings or []
335-
if len(mappings) < 1:
336-
raise ValueError(
337-
"Aggregation tasks must include at least one mapping."
338-
)
339-
340-
rows_loaded = 0
341-
mappings_loaded = 0
342-
days_loaded = 0
343-
344-
for mapping in mappings:
345-
source_id = str(mapping["sourceIdentifier"])
346-
paths = mapping.get("paths", []) or []
347-
if len(paths) != 1:
348-
raise ValueError(
349-
"Aggregation mappings must include exactly one target path per source."
350-
)
351-
352-
path = paths[0]
353-
target_id = str(path["targetIdentifier"])
354-
transformations = path.get("dataTransformations", []) or []
355-
if not isinstance(transformations, list) or len(transformations) != 1:
356-
raise ValueError(
357-
"Aggregation mappings must include exactly one aggregation transformation."
358-
)
359-
transformation = parse_aggregation_transformation(transformations[0])
360-
361-
source_datastream = self.client.datastreams.get(uid=source_id)
362-
target_datastream = self.client.datastreams.get(uid=target_id)
363-
364-
source_end = source_datastream.phenomenon_end_time
365-
if source_end is None:
366-
continue
367-
368-
closed_end = closed_window_end_utc(source_end, transformation)
369-
destination_end = target_datastream.phenomenon_end_time
370-
source_begin = source_datastream.phenomenon_begin_time
371-
if source_begin is None:
372-
continue
373-
374-
query_start = first_window_start_utc(source_begin, transformation)
375-
if destination_end is None:
376-
start_window = query_start
377-
else:
378-
start_window = next_window_start_utc(destination_end, transformation)
379-
380-
if start_window >= closed_end:
381-
continue
382-
383-
timestamps, values = self._fetch_observation_points(
384-
source_datastream_id=source_id,
385-
query_start_utc=query_start,
386-
query_end_utc=closed_end,
387-
)
388-
if not timestamps:
389-
continue
390-
391-
output_rows: list[dict[str, object]] = []
392-
for day_start, day_end, _ in iter_daily_windows_utc(
393-
start_window,
394-
closed_end,
395-
transformation,
396-
):
397-
value = aggregate_daily_window(
398-
timestamps=timestamps,
399-
values=values,
400-
window_start_utc=day_start,
401-
window_end_utc=day_end,
402-
statistic=transformation.aggregation_statistic,
403-
)
404-
if value is None:
405-
continue
406-
output_rows.append(
407-
{
408-
"phenomenon_time": day_start,
409-
"result": float(value),
410-
}
411-
)
412-
413-
if not output_rows:
414-
continue
415-
416-
output_df = pd.DataFrame(output_rows)
417-
self.client.datastreams.load_observations(
418-
uid=target_id,
419-
observations=output_df,
420-
mode="append",
421-
)
422-
423-
rows_loaded += len(output_rows)
424-
days_loaded += len(output_rows)
425-
mappings_loaded += 1
426-
427-
return {
428-
"rows_loaded": rows_loaded,
429-
"days_loaded": days_loaded,
430-
"mappings_loaded": mappings_loaded,
431-
}
432-
433-
def _update_status(
434-
self,
435-
task_run: TaskRun,
436-
success: bool,
437-
msg: str,
438-
runtime_source_uri: Optional[str] = None,
439-
):
440-
result = {"message": msg}
441-
if runtime_source_uri:
442-
result.update(
443-
{
444-
"runtimeSourceUri": runtime_source_uri,
445-
"runtime_source_uri": runtime_source_uri,
446-
"runtimeUrl": runtime_source_uri,
447-
"runtime_url": runtime_source_uri,
448-
}
449-
)
450-
451-
self.update_task_run(
452-
task_run.id,
453-
status="SUCCESS" if success else "FAILURE",
454-
finished_at=datetime.now(timezone.utc),
455-
result=result
456-
)
457-
self.next_run_at = self._next_run()
458-
self.save()
459-
460-
@staticmethod
461-
def _success_message(load: Optional[LoadSummary]) -> str:
462-
if not load:
463-
return "OK"
464-
465-
loaded = load.observations_loaded
466-
if loaded == 0:
467-
if load.timestamps_total and load.timestamps_after_cutoff == 0:
468-
if load.cutoff:
469-
return (
470-
"Already up to date - no new observations loaded "
471-
f"(all timestamps were at or before {load.cutoff})."
472-
)
473-
return "Already up to date - no new observations loaded (all timestamps were at or before the cutoff)."
474-
if load.observations_available == 0:
475-
return "Already up to date - no new observations loaded."
476-
return "No new observations were loaded."
477-
478-
if load.datastreams_loaded:
479-
return (
480-
f"Load completed successfully ({loaded} rows across {load.datastreams_loaded} datastreams)."
481-
)
482-
return f"Load completed successfully ({loaded} rows loaded)."
483-
484-
def _next_run(self) -> Optional[str]:
485-
now = datetime.now(timezone.utc)
486-
if cron := self.crontab:
487-
return croniter.croniter(cron, now).get_next(datetime).isoformat()
488-
if iv := self.interval:
489-
unit = self.interval_period or "minutes"
490-
return (now + timedelta(**{unit: iv})).isoformat()
491-
return None

src/hydroserverpy/api/services/etl/task.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
from typing import Literal, Union, Optional, List, Dict, Any, TYPE_CHECKING
33
from uuid import UUID
44
from datetime import datetime
5-
from hydroserverpy.api.models import DataConnection, Task, TaskRun, TaskMapping, OrchestrationSystem
5+
from hydroserverpy.api.models import DataConnection, Task, TaskRun, OrchestrationSystem
66
from hydroserverpy.api.utils import normalize_uuid
77
from ..base import HydroServerBaseService
88

0 commit comments

Comments
 (0)