diff --git a/setup.cfg b/setup.cfg index 3c87fce..7770c15 100644 --- a/setup.cfg +++ b/setup.cfg @@ -19,7 +19,7 @@ install_requires = numpy >= 1.22.4 pyyaml >= 5 simplejson >= 3 - crontab >= 1 + python-crontab >= 3 python-dateutil >= 2.8.2 croniter >= 2.0.1 jmespath >= 1.0.1 diff --git a/src/hydroserverpy/api/client.py b/src/hydroserverpy/api/client.py index 914cd83..293ce53 100644 --- a/src/hydroserverpy/api/client.py +++ b/src/hydroserverpy/api/client.py @@ -12,8 +12,8 @@ SensorService, DatastreamService, OrchestrationSystemService, - DataSourceService, - DataArchiveService, + DataConnectionService, + TaskService, ) @@ -191,13 +191,13 @@ def orchestrationsystems(self): return OrchestrationSystemService(self) @property - def datasources(self): - """Utilities for managing HydroServer data sources.""" + def dataconnections(self): + """Utilities for managing HydroServer ETL data connections.""" - return DataSourceService(self) + return DataConnectionService(self) @property - def dataarchives(self): - """Utilities for managing HydroServer data archives.""" + def tasks(self): + """Utilities for managing HydroServer ETL tasks.""" - return DataArchiveService(self) + return TaskService(self) diff --git a/src/hydroserverpy/api/models/__init__.py b/src/hydroserverpy/api/models/__init__.py index caaf078..ca6af50 100644 --- a/src/hydroserverpy/api/models/__init__.py +++ b/src/hydroserverpy/api/models/__init__.py @@ -13,8 +13,10 @@ from .sta.thing import Thing from .sta.unit import Unit from .etl.orchestration_system import OrchestrationSystem -from .etl.data_source import DataSource -from .etl.data_archive import DataArchive +from .etl.data_connection import DataConnection +from .etl.run import TaskRun +from .etl.task import Task +from .etl.mapping import TaskMapping Workspace.model_rebuild() Role.model_rebuild() diff --git a/src/hydroserverpy/api/models/etl/__init__.py b/src/hydroserverpy/api/models/etl/__init__.py index 6221f59..af0c0f9 100644 --- a/src/hydroserverpy/api/models/etl/__init__.py +++ b/src/hydroserverpy/api/models/etl/__init__.py @@ -1,26 +1,13 @@ -from .extractors import Extractor, HTTPExtractor, LocalFileExtractor, FTPExtractor -from .transformers import JSONTransformer, CSVTransformer, Transformer -from .loaders import HydroServerLoader, Loader - -from .etl_configuration import EtlConfiguration -from .schedule import Schedule -from .status import Status from .orchestration_system import OrchestrationSystem -from .data_source import DataSource +from .data_connection import DataConnection +from .task import Task +from .schedule import TaskSchedule +from .run import TaskRun __all__ = [ - "CSVTransformer", - "JSONTransformer", - "LocalFileExtractor", - "FTPExtractor", - "HTTPExtractor", - "Extractor", - "Transformer", - "Loader", - "HydroServerLoader", - "EtlConfiguration", - "Schedule", - "Status", "OrchestrationSystem", - "DataSource", + "DataConnection", + "Task", + "TaskSchedule", + "TaskRun" ] diff --git a/src/hydroserverpy/api/models/etl/data_archive.py b/src/hydroserverpy/api/models/etl/data_archive.py deleted file mode 100644 index 85c3e20..0000000 --- a/src/hydroserverpy/api/models/etl/data_archive.py +++ /dev/null @@ -1,77 +0,0 @@ -import uuid -from typing import Union, ClassVar, Optional, TYPE_CHECKING, List -from pydantic import Field -from .orchestration_system import OrchestrationSystem -from .orchestration_configuration import OrchestrationConfigurationFields -from ..sta.datastream import Datastream -from ..base import HydroServerBaseModel - -if TYPE_CHECKING: - from hydroserverpy import HydroServer - from hydroserverpy.api.models import Workspace - - -class DataArchive( - HydroServerBaseModel, OrchestrationConfigurationFields -): - name: str = Field(..., max_length=255) - settings: Optional[dict] = None - orchestration_system_id: uuid.UUID - workspace_id: uuid.UUID - - _editable_fields: ClassVar[set[str]] = { - "name", "settings", "interval", "interval_units", "crontab", "start_time", "end_time", "last_run_successful", - "last_run_message", "last_run", "next_run", "paused" - } - - def __init__(self, client: "HydroServer", **data): - super().__init__(client=client, service=client.dataarchives, **data) - - self._workspace = None - self._orchestration_system = None - self._datastreams = None - - @classmethod - def get_route(cls): - return "data-archives" - - @property - def workspace(self) -> "Workspace": - """The workspace this data archive belongs to.""" - - if self._workspace is None: - self._workspace = self.client.workspaces.get(uid=self.workspace_id) - - return self._workspace - - @property - def orchestration_system(self) -> "OrchestrationSystem": - """The orchestration system that manages this data archive.""" - - if self._orchestration_system is None: - self._orchestration_system = self.client.orchestrationsystems.get(uid=self.orchestration_system_id) - - return self._orchestration_system - - @property - def datastreams(self) -> List["Datastream"]: - """The datastreams this data archive provides data for.""" - - if self._datastreams is None: - self._datastreams = self.client.datastreams.list(data_archive=self.uid, fetch_all=True).items - - return self._datastreams - - def add_datastream(self, datastream: Union["Datastream", uuid.UUID, str]): - """Add a datastream to this data archive.""" - - self.client.dataarchives.add_datastream( - uid=self.uid, datastream=datastream - ) - - def remove_datastream(self, datastream: Union["Datastream", uuid.UUID, str]): - """Remove a datastream from this data archive.""" - - self.client.dataarchives.remove_datastream( - uid=self.uid, datastream=datastream - ) diff --git a/src/hydroserverpy/api/models/etl/data_connection.py b/src/hydroserverpy/api/models/etl/data_connection.py new file mode 100644 index 0000000..7f308e2 --- /dev/null +++ b/src/hydroserverpy/api/models/etl/data_connection.py @@ -0,0 +1,63 @@ +import uuid +from typing import ClassVar, List, Optional, TYPE_CHECKING +from pydantic import Field, AliasPath, AliasChoices +from ..base import HydroServerBaseModel + +if TYPE_CHECKING: + from hydroserverpy import HydroServer + from hydroserverpy.api.models import Workspace, Task + + +class DataConnection(HydroServerBaseModel): + name: str = Field(..., max_length=255) + data_connection_type: str = Field(..., max_length=255, alias="type") + workspace_id: Optional[uuid.UUID] = Field( + None, validation_alias=AliasChoices("workspaceId", AliasPath("workspace", "id")) + ) + extractor_type: str = Field(..., max_length=255, validation_alias=AliasPath("extractor", "type")) + extractor_settings: dict = Field(default_factory=dict, validation_alias=AliasPath("extractor", "settings")) + transformer_type: str = Field(..., max_length=255, validation_alias=AliasPath("transformer", "type")) + transformer_settings: dict = Field(default_factory=dict, validation_alias=AliasPath("transformer", "settings")) + loader_type: str = Field(..., max_length=255, validation_alias=AliasPath("loader", "type")) + loader_settings: dict = Field(default_factory=dict, validation_alias=AliasPath("loader", "settings")) + + _editable_fields: ClassVar[set[str]] = { + "name", + "data_connection_type", + "extractor_type", + "extractor_settings", + "transformer_type", + "transformer_settings", + "loader_type", + "loader_settings", + } + + def __init__(self, client: "HydroServer", **data): + super().__init__(client=client, service=client.dataconnections, **data) + + self._workspace = None + self._tasks = None + + @classmethod + def get_route(cls): + return "etl-data-connections" + + @property + def workspace(self) -> "Workspace": + """The workspace this ETL data connection belongs to.""" + + if self._workspace is None and self.workspace_id: + self._workspace = self.client.workspaces.get(uid=self.workspace_id) + + return self._workspace + + @property + def tasks(self) -> List["Task"]: + """The ETL tasks associated with this ETL data connection.""" + + if self._tasks is None: + self._tasks = self.client.tasks.list( + data_connection=self.uid, fetch_all=True + ).items + + return self._tasks diff --git a/src/hydroserverpy/api/models/etl/data_source.py b/src/hydroserverpy/api/models/etl/data_source.py deleted file mode 100644 index 03c1d6d..0000000 --- a/src/hydroserverpy/api/models/etl/data_source.py +++ /dev/null @@ -1,146 +0,0 @@ -from __future__ import annotations -from datetime import datetime, timedelta, timezone -from functools import cached_property -import logging -import uuid -from typing import ClassVar, TYPE_CHECKING, List, Optional, Union -import croniter -import pandas as pd -from pydantic import Field - -from ..base import HydroServerBaseModel -from ..sta.datastream import Datastream -from .orchestration_system import OrchestrationSystem -from .etl_configuration import EtlConfiguration -from .schedule import Schedule -from .status import Status -from .factories import extractor_factory, transformer_factory, loader_factory -from .loaders import HydroServerLoader - -if TYPE_CHECKING: - from hydroserverpy import HydroServer - from hydroserverpy.api.models import Workspace - - -class DataSource(HydroServerBaseModel): - name: str = Field(..., max_length=255) - settings: EtlConfiguration - orchestration_system_id: uuid.UUID - schedule: Schedule - status: Status - workspace_id: uuid.UUID - - _editable_fields: ClassVar[set[str]] = { - "name", - "settings", - "status", - "schedule", - "interval", - "interval_units", - "crontab", - "start_time", - "end_time", - "last_run_successful", - "last_run_message", - "last_run", - "next_run", - "paused", - } - - def __init__(self, client: HydroServer, **data): - super().__init__(client=client, service=client.datasources, **data) - - @classmethod - def get_route(cls): - return "data-sources" - - @cached_property - def workspace(self) -> Workspace: - return self.client.workspaces.get(uid=self.workspace_id) - - @cached_property - def orchestration_system(self) -> OrchestrationSystem: - return self.client.orchestrationsystems.get(uid=self.orchestration_system_id) - - @cached_property - def datastreams(self) -> List[Datastream]: - return self.client.datastreams.list(data_source=self.uid, fetch_all=True).items - - # TODO: Add functions like add_payload, add_mapping, etc. and don't allow the user to manually - # link or unlink datastreams - handle that automatically. - def add_datastream(self, datastream: Union["Datastream", uuid.UUID, str]): - """Add a datastream to this data source.""" - - self.client.datasources.add_datastream(uid=self.uid, datastream=datastream) - - def remove_datastream(self, datastream: Union["Datastream", uuid.UUID, str]): - """Remove a datastream from this data source.""" - - self.client.datasources.remove_datastream(uid=self.uid, datastream=datastream) - - def _next_run(self) -> Optional[str]: - now = datetime.now(timezone.utc) - if cron := self.schedule.crontab: - return croniter.croniter(cron, now).get_next(datetime).isoformat() - if iv := self.schedule.interval: - unit = self.schedule.interval_units or "minutes" - return (now + timedelta(**{unit: iv})).isoformat() - return None - - def _update_status(self, loader: HydroServerLoader, success: bool, msg: str): - short_msg = msg if len(msg) <= 255 else msg[:252] + "…" - loader.client.datasources.update( - uid=self.uid, - last_run=datetime.now(timezone.utc).isoformat(), - last_run_successful=success, - last_run_message=short_msg, - next_run=self._next_run(), - ) - - def is_empty(self, data): - if data is None: - return True - if isinstance(data, pd.DataFrame) and data.empty: - return True - return False - - def load_data(self, payload_name: str = None): - """Load data for this data source.""" - if self.status.paused is True: - return - - if payload_name: - self.load_data_for_payload(payload_name) - else: - for p in self.settings.payloads: - self.load_data_for_payload(p.name) - - def load_data_for_payload(self, payload_name: str): - payload = next(p for p in self.settings.payloads if p.name == payload_name) - - extractor_cls = extractor_factory(self.settings.extractor) - transformer_cls = transformer_factory(self.settings.transformer) - loader_cls = loader_factory(self.settings.loader, self.client, self.uid) - - try: - logging.info("Starting extract") - data = extractor_cls.extract(payload, loader_cls) - if self.is_empty(data): - self._update_status( - loader_cls, True, "No data returned from the extractor" - ) - return - - logging.info("Starting transform") - data = transformer_cls.transform(data, payload.mappings) - if self.is_empty(data): - self._update_status( - loader_cls, True, "No data returned from the transformer" - ) - return - - logging.info("Starting load") - loader_cls.load(data, payload) - self._update_status(loader_cls, True, "OK") - except Exception as e: - self._update_status(loader_cls, False, str(e)) diff --git a/src/hydroserverpy/api/models/etl/mapping.py b/src/hydroserverpy/api/models/etl/mapping.py new file mode 100644 index 0000000..751ee42 --- /dev/null +++ b/src/hydroserverpy/api/models/etl/mapping.py @@ -0,0 +1,27 @@ +from typing import List +from pydantic import BaseModel, Field, ConfigDict +from pydantic.alias_generators import to_camel + + +class TaskMappingPath(BaseModel): + target_identifier: str + data_transformations: list = Field(default_factory=list) + + model_config = ConfigDict( + validate_assignment=True, + populate_by_name=True, + str_strip_whitespace=True, + alias_generator=to_camel, + ) + + +class TaskMapping(BaseModel): + source_identifier: str + paths: List[TaskMappingPath] + + model_config = ConfigDict( + validate_assignment=True, + populate_by_name=True, + str_strip_whitespace=True, + alias_generator=to_camel, + ) diff --git a/src/hydroserverpy/api/models/etl/orchestration_configuration.py b/src/hydroserverpy/api/models/etl/orchestration_configuration.py deleted file mode 100644 index e5ceaf8..0000000 --- a/src/hydroserverpy/api/models/etl/orchestration_configuration.py +++ /dev/null @@ -1,35 +0,0 @@ -from pydantic import AliasPath -from typing import Optional, Literal -from datetime import datetime -from pydantic import BaseModel, Field - - -class OrchestrationConfigurationFields(BaseModel): - interval: Optional[int] = Field( - None, gt=0, validation_alias=AliasPath("schedule", "interval") - ) - interval_units: Optional[Literal["minutes", "hours", "days"]] = Field( - None, validation_alias=AliasPath("schedule", "intervalUnits") - ) - crontab: Optional[str] = Field( - None, max_length=255, validation_alias=AliasPath("schedule", "crontab") - ) - start_time: Optional[datetime] = Field( - None, validation_alias=AliasPath("schedule", "startTime") - ) - end_time: Optional[datetime] = Field( - None, validation_alias=AliasPath("schedule", "endTime") - ) - last_run_successful: Optional[bool] = Field( - None, validation_alias=AliasPath("status", "lastRunSuccessful") - ) - last_run_message: Optional[str] = Field( - None, max_length=255, validation_alias=AliasPath("status", "lastRunMessage") - ) - last_run: Optional[datetime] = Field( - None, validation_alias=AliasPath("status", "lastRun") - ) - next_run: Optional[datetime] = Field( - None, validation_alias=AliasPath("status", "nextRun") - ) - paused: bool = Field(False, validation_alias=AliasPath("status", "paused")) diff --git a/src/hydroserverpy/api/models/etl/orchestration_system.py b/src/hydroserverpy/api/models/etl/orchestration_system.py index c117238..f5920cb 100644 --- a/src/hydroserverpy/api/models/etl/orchestration_system.py +++ b/src/hydroserverpy/api/models/etl/orchestration_system.py @@ -1,16 +1,11 @@ import uuid -from typing import Optional, ClassVar, List, TYPE_CHECKING -from pydantic import BaseModel, Field +from typing import ClassVar, List, Optional, TYPE_CHECKING +from pydantic import Field from ..base import HydroServerBaseModel if TYPE_CHECKING: from hydroserverpy import HydroServer - from hydroserverpy.api.models import Workspace, DataSource, DataArchive - - -class OrchestrationSystemFields(BaseModel): - name: str = Field(..., max_length=255) - orchestration_system_type: str = Field(..., max_length=255, alias="type") + from hydroserverpy.api.models import Workspace, Task class OrchestrationSystem(HydroServerBaseModel): @@ -24,12 +19,11 @@ def __init__(self, client: "HydroServer", **data): super().__init__(client=client, service=client.orchestrationsystems, **data) self._workspace = None - self._datasources = None - self._dataarchives = None + self._tasks = None @classmethod def get_route(cls): - return "orchestration-systems" + return "etl-orchestration-systems" @property def workspace(self) -> "Workspace": @@ -41,23 +35,12 @@ def workspace(self) -> "Workspace": return self._workspace @property - def datasources(self) -> List["DataSource"]: - """The data sources associated with this workspace.""" - - if self._datasources is None: - self._datasources = self.client.datasources.list( - orchestration_system=self.uid, fetch_all=True - ).items - - return self._datasources - - @property - def dataarchives(self) -> List["DataArchive"]: - """The data archives associated with this workspace.""" + def tasks(self) -> List["Task"]: + """The ETL tasks associated with this orchestration system.""" - if self._dataarchives is None: - self._dataarchives = self.client.dataarchives.list( + if self._tasks is None: + self._tasks = self.client.tasks.list( orchestration_system=self.uid, fetch_all=True ).items - return self._dataarchives + return self._tasks diff --git a/src/hydroserverpy/api/models/etl/run.py b/src/hydroserverpy/api/models/etl/run.py new file mode 100644 index 0000000..e2e28ee --- /dev/null +++ b/src/hydroserverpy/api/models/etl/run.py @@ -0,0 +1,15 @@ +import uuid +from datetime import datetime +from typing import Optional +from pydantic import BaseModel, Field + + +class TaskRun(BaseModel): + id: uuid.UUID + status: str + result: Optional[dict] = None + started_at: Optional[datetime] = Field(None, alias="startedAt") + finished_at: Optional[datetime] = Field(None, alias="finishedAt") + + class Config: + populate_by_name = True diff --git a/src/hydroserverpy/api/models/etl/schedule.py b/src/hydroserverpy/api/models/etl/schedule.py index 84cc82d..0442cf5 100644 --- a/src/hydroserverpy/api/models/etl/schedule.py +++ b/src/hydroserverpy/api/models/etl/schedule.py @@ -3,14 +3,15 @@ from pydantic import BaseModel, Field -class Schedule(BaseModel): - interval: int = Field(..., gt=0) - interval_units: Optional[Literal["minutes", "hours", "days"]] = Field( - None, alias="intervalUnits" +class TaskSchedule(BaseModel): + start_time: Optional[datetime] = Field(None, alias="startTime") + next_run_at: Optional[datetime] = Field(None, alias="nextRunAt") + paused: bool = False + interval: Optional[int] = Field(None, gt=0) + interval_period: Optional[Literal["minutes", "hours", "days"]] = Field( + None, alias="intervalPeriod" ) crontab: Optional[str] - start_time: Optional[datetime] = Field(None, alias="startTime") - end_time: Optional[datetime] = Field(None, alias="endTime") class Config: populate_by_name = True diff --git a/src/hydroserverpy/api/models/etl/status.py b/src/hydroserverpy/api/models/etl/status.py deleted file mode 100644 index b2c16d0..0000000 --- a/src/hydroserverpy/api/models/etl/status.py +++ /dev/null @@ -1,14 +0,0 @@ -from datetime import datetime -from typing import Optional -from pydantic import BaseModel, Field - - -class Status(BaseModel): - paused: bool = Field(False) - last_run_successful: Optional[bool] = Field(None, alias="lastRunSuccessful") - last_run_message: Optional[str] = Field(None, alias="lastRunMessage") - last_run: Optional[datetime] = Field(None, alias="lastRun") - next_run: Optional[datetime] = Field(None, alias="nextRun") - - class Config: - populate_by_name = True diff --git a/src/hydroserverpy/api/models/etl/task.py b/src/hydroserverpy/api/models/etl/task.py new file mode 100644 index 0000000..e0c4f7a --- /dev/null +++ b/src/hydroserverpy/api/models/etl/task.py @@ -0,0 +1,248 @@ +from __future__ import annotations +from functools import cached_property +import uuid +import logging +import croniter +import pandas as pd +from typing import ClassVar, TYPE_CHECKING, List, Optional, Literal, Union +from datetime import datetime, timedelta, timezone +from pydantic import Field, AliasPath, AliasChoices, TypeAdapter +from hydroserverpy.etl.factories import extractor_factory, transformer_factory, loader_factory +from hydroserverpy.etl.etl_configuration import ExtractorConfig, TransformerConfig, LoaderConfig, SourceTargetMapping, MappingPath +from ..base import HydroServerBaseModel +from .orchestration_system import OrchestrationSystem +from .data_connection import DataConnection +from .run import TaskRun + + +if TYPE_CHECKING: + from hydroserverpy import HydroServer + from hydroserverpy.api.models import Workspace + + +class Task(HydroServerBaseModel): + name: str = Field(..., max_length=255) + extractor_settings: dict = Field(default_factory=dict, alias="extractorSettings") + transformer_settings: dict = Field(default_factory=dict, alias="transformerSettings") + loader_settings: dict = Field(default_factory=dict, alias="loaderSettings") + data_connection_id: uuid.UUID = Field( + None, validation_alias=AliasChoices("dataConnectionId", AliasPath("dataConnection", "id")) + ) + orchestration_system_id: uuid.UUID = Field( + None, validation_alias=AliasChoices("orchestrationSystemId", AliasPath("orchestrationSystem", "id")) + ) + workspace_id: uuid.UUID = Field( + None, validation_alias=AliasChoices("workspaceId", AliasPath("workspace", "id")) + ) + start_time: Optional[datetime] = Field(None, validation_alias=AliasPath("schedule", "startTime")) + next_run_at: Optional[datetime] = Field(None, validation_alias=AliasPath("schedule", "nextRunAt")) + paused: bool = Field(False, validation_alias=AliasPath("schedule", "paused")) + interval: Optional[int] = Field(None, gt=0, validation_alias=AliasPath("schedule", "interval")) + interval_period: Optional[Literal["minutes", "hours", "days"]] = Field( + None, validation_alias=AliasPath("schedule", "intervalPeriod") + ) + crontab: Optional[str] = Field(None, validation_alias=AliasPath("schedule", "crontab")) + latest_run: Optional[TaskRun] = None + mappings: List[dict] + + _editable_fields: ClassVar[set[str]] = { + "name", + "extractor_settings", + "transformer_settings", + "loader_settings", + "data_connection_id", + "orchestration_system_id", + "start_time", + "next_run_at", + "paused", + "interval", + "interval_period", + "crontab", + "mappings" + } + + def __init__(self, client: HydroServer, **data): + super().__init__(client=client, service=client.tasks, **data) + + @classmethod + def get_route(cls): + return "etl-tasks" + + @cached_property + def workspace(self) -> Workspace: + return self.client.workspaces.get(uid=self.workspace_id) + + @cached_property + def orchestration_system(self) -> Optional[OrchestrationSystem]: + return self.client.orchestrationsystems.get(uid=self.orchestration_system_id) + + @cached_property + def data_connection(self) -> Optional[DataConnection]: + return self.client.dataconnections.get(uid=self.data_connection_id) + + def get_task_runs( + self, + page: int = ..., + page_size: int = 100, + order_by: List[str] = ..., + status: str = ..., + started_at_max: datetime = ..., + started_at_min: datetime = ..., + finished_at_max: datetime = ..., + finished_at_min: datetime = ..., + ): + """Get a collection of task runs associated with this task.""" + + return self.client.tasks.get_task_runs( + uid=self.uid, + page=page, + page_size=page_size, + order_by=order_by, + status=status, + started_at_max=started_at_max, + started_at_min=started_at_min, + finished_at_max=finished_at_max, + finished_at_min=finished_at_min, + ) + + def create_task_run( + self, + status: Literal["RUNNING", "SUCCESS", "FAILURE"], + started_at: datetime, + finished_at: datetime = ..., + result: dict = ..., + ): + """Create a new task run for this task.""" + + return self.client.tasks.create_task_run( + uid=self.uid, + status=status, + started_at=started_at, + finished_at=finished_at, + result=result, + ) + + def get_task_run( + self, + uid: Union[uuid.UUID, str], + ): + """Get a task run record for this task.""" + + return self.client.tasks.get_task_run(uid=self.uid, task_run_id=uid) + + def update_task_run( + self, + uid: Union[uuid.UUID, str], + status: Literal["RUNNING", "SUCCESS", "FAILURE"] = ..., + started_at: datetime = ..., + finished_at: datetime = ..., + result: dict = ..., + ): + """Update a task run record of this task.""" + + return self.client.tasks.update_task_run( + uid=self.uid, + task_run_id=uid, + status=status, + started_at=started_at, + finished_at=finished_at, + result=result, + ) + + def delete_task_run( + self, + uid: Union[uuid.UUID, str], + ): + """Delete a task run record of this task.""" + + return self.client.tasks.delete_task_run(uid=self.uid, task_run_id=uid) + + def run(self): + """Trigger HydroServer to run this task.""" + + return self.client.tasks.run(uid=self.uid) + + def run_local(self): + """Run this task locally.""" + + if self.paused is True: + return + + extractor_cls = extractor_factory(TypeAdapter(ExtractorConfig).validate_python({ + "type": self.data_connection.extractor_type, + **self.data_connection.extractor_settings + })) + transformer_cls = transformer_factory(TypeAdapter(TransformerConfig).validate_python({ + "type": self.data_connection.transformer_type, + **self.data_connection.transformer_settings + })) + loader_cls = loader_factory(TypeAdapter(LoaderConfig).validate_python({ + "type": self.data_connection.loader_type, + **self.data_connection.loader_settings + }), self.client, str(self.uid)) + + task_run = self.create_task_run(status="RUNNING", started_at=datetime.now(timezone.utc)) + + try: + logging.info("Starting extract") + + task_mappings = [ + SourceTargetMapping( + source_identifier=task_mapping["sourceIdentifier"], + paths=[ + MappingPath( + target_identifier=task_mapping_path["targetIdentifier"], + data_transformations=task_mapping_path["dataTransformations"], + ) for task_mapping_path in task_mapping["paths"] + ] + ) for task_mapping in self.mappings + ] + + data = extractor_cls.extract(self, loader_cls) + if self.is_empty(data): + self._update_status( + loader_cls, True, "No data returned from the extractor" + ) + return + + logging.info("Starting transform") + data = transformer_cls.transform(data, task_mappings) + if self.is_empty(data): + self._update_status( + loader_cls, True, "No data returned from the transformer" + ) + return + + logging.info("Starting load") + loader_cls.load(data, self) + self._update_status(task_run, True, "OK") + except Exception as e: + self._update_status(task_run, False, str(e)) + + @staticmethod + def is_empty(data): + if data is None: + return True + + if isinstance(data, pd.DataFrame) and data.empty: + return True + + return False + + def _update_status(self, task_run: TaskRun, success: bool, msg: str): + self.update_task_run( + task_run.id, + status="SUCCESS" if success else "FAILURE", + result={"message": msg} + ) + self.next_run_at = self._next_run() + self.save() + + def _next_run(self) -> Optional[str]: + now = datetime.now(timezone.utc) + if cron := self.crontab: + return croniter.croniter(cron, now).get_next(datetime).isoformat() + if iv := self.interval: + unit = self.interval_period or "minutes" + return (now + timedelta(**{unit: iv})).isoformat() + return None diff --git a/src/hydroserverpy/api/models/iam/workspace.py b/src/hydroserverpy/api/models/iam/workspace.py index 85b7b94..91eb050 100644 --- a/src/hydroserverpy/api/models/iam/workspace.py +++ b/src/hydroserverpy/api/models/iam/workspace.py @@ -19,8 +19,8 @@ ResultQualifier, Datastream, OrchestrationSystem, - DataSource, - DataArchive, + Task, + DataConnection, ) @@ -50,8 +50,8 @@ def __init__(self, client: "HydroServer", **data): self._sensors = None self._datastreams = None self._orchestrationsystems = None - self._datasources = None - self._dataarchives = None + self._dataconnections = None + self._tasks = None @classmethod def get_route(cls): @@ -166,22 +166,22 @@ def orchestrationsystems(self) -> List["OrchestrationSystem"]: return self._orchestrationsystems @property - def datasources(self) -> List["DataSource"]: - """The data sources associated with this workspace.""" + def dataconnections(self) -> List["DataConnection"]: + """The ETL data connections associated with this workspace.""" - if self._datasources is None: - self._datasources = self.client.datasources.list(workspace=self.uid, fetch_all=True).items + if self._dataconnections is None: + self._dataconnections = self.client.dataconnections.list(workspace=self.uid, fetch_all=True).items - return self._datasources + return self._dataconnections @property - def dataarchives(self) -> List["DataArchive"]: - """The data archives associated with this workspace.""" + def tasks(self) -> List["Task"]: + """The ETL tasks associated with this workspace.""" - if self._dataarchives is None: - self._dataarchives = self.client.dataarchives.list(workspace=self.uid, fetch_all=True).items + if self._tasks is None: + self._tasks = self.client.tasks.list(workspace=self.uid, fetch_all=True).items - return self._dataarchives + return self._tasks def create_api_key( self, diff --git a/src/hydroserverpy/api/models/sta/datastream.py b/src/hydroserverpy/api/models/sta/datastream.py index a85a662..de9cc1e 100644 --- a/src/hydroserverpy/api/models/sta/datastream.py +++ b/src/hydroserverpy/api/models/sta/datastream.py @@ -16,8 +16,6 @@ ObservedProperty, Unit, ProcessingLevel, - DataSource, - DataArchive ) @@ -43,7 +41,6 @@ class Datastream(HydroServerBaseModel): intended_time_spacing_unit: Optional[ Literal["seconds", "minutes", "hours", "days"] ] = None - data_source_id: Optional[uuid.UUID] = None thing_id: uuid.UUID workspace_id: uuid.UUID sensor_id: uuid.UUID @@ -70,8 +67,6 @@ def __init__(self, client: "HydroServer", **data): self._unit = None self._processing_level = None self._sensor = None - self._data_source = None - self._data_archives = None @classmethod def get_route(cls): @@ -171,24 +166,6 @@ def processing_level(self, processing_level: Union["ProcessingLevel", UUID, str] self.processing_level_id = normalize_uuid(processing_level) self._processing_level = None - @property - def data_source(self) -> Optional["DataSource"]: - """The data source of this datastream.""" - - if self._data_source is None and self.data_source_id is not None: - self._data_source = self.client.datasources.get(uid=self.data_source_id) - - return self._data_source - - @property - def data_archives(self) -> List["DataArchive"]: - """The data archives of this datastream.""" - - if self._data_archives is None: - self._data_archives = self.client.dataarchives.list(datastream=self.uid, fetch_all=True).items - - return self._data_archives - def get_observations( self, page: int = ..., diff --git a/src/hydroserverpy/api/services/__init__.py b/src/hydroserverpy/api/services/__init__.py index 224f516..4842ebf 100644 --- a/src/hydroserverpy/api/services/__init__.py +++ b/src/hydroserverpy/api/services/__init__.py @@ -8,5 +8,5 @@ from .sta.sensor import SensorService from .sta.datastream import DatastreamService from .etl.orchestration_system import OrchestrationSystemService -from .etl.data_source import DataSourceService -from .etl.data_archive import DataArchiveService +from .etl.data_connection import DataConnectionService +from .etl.task import TaskService diff --git a/src/hydroserverpy/api/services/etl/data_archive.py b/src/hydroserverpy/api/services/etl/data_archive.py deleted file mode 100644 index 231f744..0000000 --- a/src/hydroserverpy/api/services/etl/data_archive.py +++ /dev/null @@ -1,166 +0,0 @@ -from typing import Optional, Union, List, Literal, TYPE_CHECKING -from uuid import UUID -from datetime import datetime -from hydroserverpy.api.models import DataArchive -from hydroserverpy.api.utils import normalize_uuid -from ..base import HydroServerBaseService - -if TYPE_CHECKING: - from hydroserverpy import HydroServer - from hydroserverpy.api.models import Workspace, OrchestrationSystem, Datastream - - -class DataArchiveService(HydroServerBaseService): - def __init__(self, client: "HydroServer"): - self.model = DataArchive - super().__init__(client) - - def list( - self, - page: int = ..., - page_size: int = ..., - order_by: List[str] = ..., - workspace: Optional[Union["Workspace", UUID, str]] = ..., - datastream: Optional[Union["Datastream", UUID, str]] = ..., - orchestration_system: Optional[Union["OrchestrationSystem", UUID, str]] = ..., - fetch_all: bool = False, - ) -> List["DataArchive"]: - """Fetch a collection of data archives.""" - - return super().list( - page=page, - page_size=page_size, - order_by=order_by, - workspace_id=normalize_uuid(workspace), - datastream_id=normalize_uuid(datastream), - orchestration_system_id=normalize_uuid(orchestration_system), - fetch_all=fetch_all, - ) - - def create( - self, - name: str, - workspace: Union["Workspace", UUID, str], - orchestration_system: Union["OrchestrationSystem", UUID, str], - settings: Optional[dict] = None, - interval: Optional[int] = None, - interval_units: Optional[Literal["minutes", "hours", "days"]] = None, - crontab: Optional[str] = None, - start_time: Optional[datetime] = None, - end_time: Optional[datetime] = None, - last_run_successful: Optional[bool] = None, - last_run_message: Optional[str] = None, - last_run: Optional[datetime] = None, - next_run: Optional[datetime] = None, - paused: bool = False, - datastreams: Optional[List[Union["Datastream", UUID, str]]] = None, - ) -> "DataArchive": - """Create a new data archive.""" - - body = { - "name": name, - "workspaceId": normalize_uuid(workspace), - "orchestrationSystemId": normalize_uuid(orchestration_system), - "settings": settings, - "schedule": { - "interval": interval, - "intervalUnits": interval_units, - "crontab": crontab, - "startTime": start_time, - "endTime": end_time, - }, - "status": { - "lastRunSuccessful": last_run_successful, - "lastRunMessage": last_run_message, - "lastRun": last_run, - "nextRun": next_run, - "paused": paused, - }, - "datastreamIds": ( - [ - normalize_uuid(datastream) - for datastream in datastreams - ] - if datastreams - else [] - ), - } - - return super().create(**body) - - def update( - self, - uid: Union[UUID, str], - name: str = ..., - orchestration_system: Union["OrchestrationSystem", UUID, str] = ..., - settings: Optional[dict] = ..., - interval: Optional[int] = ..., - interval_units: Optional[Literal["minutes", "hours", "days"]] = ..., - crontab: Optional[str] = ..., - start_time: Optional[datetime] = ..., - end_time: Optional[datetime] = ..., - last_run_successful: Optional[bool] = ..., - last_run_message: Optional[str] = ..., - last_run: Optional[datetime] = ..., - next_run: Optional[datetime] = ..., - paused: bool = ..., - ) -> "DataArchive": - """Update a data archive.""" - - status_body = { - k: v - for k, v in { - "lastRunSuccessful": last_run_successful, - "lastRunMessage": last_run_message, - "lastRun": last_run, - "nextRun": next_run, - "paused": paused, - }.items() - if v is not ... - } - status_body = status_body if status_body else ... - - schedule_body = { - k: v - for k, v in { - "interval": interval, - "intervalUnits": interval_units, - "crontab": crontab, - "startTime": start_time, - "endTime": end_time, - }.items() - if v is not ... - } - schedule_body = schedule_body if schedule_body else ... - - body = { - k: v - for k, v in { - "name": name, - "orchestrationSystemId": getattr( - orchestration_system, "uid", orchestration_system - ), - "settings": settings, - "schedule": schedule_body, - "status": status_body, - }.items() - if v is not ... - } - - return super().update(uid=str(uid), **body) - - def add_datastream( - self, uid: Union[UUID, str], datastream: Union["Datastream", UUID, str] - ) -> None: - """Add a datastream to this data archive.""" - - path = f"/{self.client.base_route}/{self.model.get_route()}/{str(uid)}/datastreams/{normalize_uuid(datastream)}" - self.client.request("put", path) - - def remove_datastream( - self, uid: Union[UUID, str], datastream: Union["Datastream", UUID, str] - ) -> None: - """Remove a datastream from this data archive.""" - - path = f"/{self.client.base_route}/{self.model.get_route()}/{str(uid)}/datastreams/{normalize_uuid(datastream)}" - self.client.request("delete", path) diff --git a/src/hydroserverpy/api/services/etl/data_connection.py b/src/hydroserverpy/api/services/etl/data_connection.py new file mode 100644 index 0000000..f4624aa --- /dev/null +++ b/src/hydroserverpy/api/services/etl/data_connection.py @@ -0,0 +1,114 @@ +from typing import Union, List, Optional, TYPE_CHECKING +from uuid import UUID +from hydroserverpy.api.models import DataConnection +from hydroserverpy.api.utils import normalize_uuid +from ..base import HydroServerBaseService + +if TYPE_CHECKING: + from hydroserverpy import HydroServer + from hydroserverpy.api.models import Workspace + + +class DataConnectionService(HydroServerBaseService): + def __init__(self, client: "HydroServer"): + self.model = DataConnection + super().__init__(client) + + def list( + self, + page: int = ..., + page_size: int = ..., + order_by: List[str] = ..., + workspace: Union["Workspace", UUID, str] = ..., + data_connection_type: str = ..., + extractor_type: str = ..., + transformer_type: str = ..., + loader_type: str = ..., + fetch_all: bool = False, + ) -> List["DataConnection"]: + """Fetch a collection of ETL data connections.""" + + return super().list( + page=page, + page_size=page_size, + order_by=order_by, + workspace_id=normalize_uuid(workspace), + type=data_connection_type, + extractor_type=extractor_type, + transformer_type=transformer_type, + loader_type=loader_type, + fetch_all=fetch_all, + ) + + def create( + self, + name: str, + data_connection_type: str, + workspace: Optional[Union["Workspace", UUID, str]] = None, + extractor_type: str = ..., + extractor_settings: dict = ..., + transformer_type: str = ..., + transformer_settings: dict = ..., + loader_type: str = ..., + loader_settings: dict = ..., + ) -> "DataConnection": + """Create a new data connection.""" + + body = { + "name": name, + "type": data_connection_type, + "workspaceId": normalize_uuid(workspace), + "extractor": { + "type": extractor_type, + "settings": extractor_settings, + }, + "transformer": { + "type": transformer_type, + "settings": transformer_settings, + }, + "loader": { + "type": loader_type, + "settings": loader_settings, + } + } + + return super().create(**body) + + def update( + self, + uid: Union[UUID, str], + name: str = ..., + data_connection_type: str = ..., + extractor_type: str = ..., + extractor_settings: dict = ..., + transformer_type: str = ..., + transformer_settings: dict = ..., + loader_type: str = ..., + loader_settings: dict = ..., + ) -> "DataConnection": + """Update an ETL data connection.""" + + body = { + "name": name, + "type": data_connection_type, + } + + if extractor_type is not ... or extractor_settings is not ...: + body["extractor"] = { + "type": extractor_type, + "settings": extractor_settings, + } + + if transformer_type is not ... or transformer_settings is not ...: + body["transformer"] = { + "type": transformer_type, + "settings": transformer_settings, + } + + if loader_type is not ... or loader_settings is not ...: + body["loader"] = { + "type": loader_type, + "settings": loader_settings, + } + + return super().update(uid=str(uid), **body) diff --git a/src/hydroserverpy/api/services/etl/data_source.py b/src/hydroserverpy/api/services/etl/data_source.py deleted file mode 100644 index 726e4e5..0000000 --- a/src/hydroserverpy/api/services/etl/data_source.py +++ /dev/null @@ -1,163 +0,0 @@ -from typing import Optional, Union, List, Literal, TYPE_CHECKING -from uuid import UUID -from datetime import datetime -from hydroserverpy.api.models import DataSource -from hydroserverpy.api.utils import normalize_uuid -from ..base import HydroServerBaseService - -if TYPE_CHECKING: - from hydroserverpy import HydroServer - from hydroserverpy.api.models import Workspace, OrchestrationSystem, Datastream - - -class DataSourceService(HydroServerBaseService): - def __init__(self, client: "HydroServer"): - self.model = DataSource - super().__init__(client) - - def list( - self, - page: int = ..., - page_size: int = ..., - order_by: List[str] = ..., - workspace: Optional[Union["Workspace", UUID, str]] = ..., - datastream: Optional[Union["Datastream", UUID, str]] = ..., - orchestration_system: Optional[Union["OrchestrationSystem", UUID, str]] = ..., - fetch_all: bool = False, - ) -> List["DataSource"]: - """Fetch a collection of data sources.""" - - return super().list( - page=page, - page_size=page_size, - order_by=order_by, - workspace_id=normalize_uuid(workspace), - datastream_id=normalize_uuid(datastream), - orchestration_system_id=normalize_uuid(orchestration_system), - fetch_all=fetch_all, - ) - - def create( - self, - name: str, - workspace: Union["Workspace", UUID, str], - orchestration_system: Union["OrchestrationSystem", UUID, str], - settings: Optional[dict] = None, - interval: Optional[int] = None, - interval_units: Optional[Literal["minutes", "hours", "days"]] = None, - crontab: Optional[str] = None, - start_time: Optional[datetime] = None, - end_time: Optional[datetime] = None, - last_run_successful: Optional[bool] = None, - last_run_message: Optional[str] = None, - last_run: Optional[datetime] = None, - next_run: Optional[datetime] = None, - paused: bool = False, - datastreams: Optional[List[Union["Datastream", UUID, str]]] = None, - ) -> "DataSource": - """Create a new data source.""" - - body = { - "name": name, - "workspaceId": normalize_uuid(workspace), - "orchestrationSystemId": normalize_uuid(orchestration_system), - "settings": settings, - "schedule": { - "interval": interval, - "intervalUnits": interval_units, - "crontab": crontab, - "startTime": start_time, - "endTime": end_time, - }, - "status": { - "lastRunSuccessful": last_run_successful, - "lastRunMessage": last_run_message, - "lastRun": last_run, - "nextRun": next_run, - "paused": paused, - }, - "datastreamIds": ( - [normalize_uuid(datastream) for datastream in datastreams] - if datastreams - else [] - ), - } - - return super().create(**body) - - def update( - self, - uid: Union[UUID, str], - name: str = ..., - orchestration_system: Union["OrchestrationSystem", UUID, str] = ..., - settings: Optional[dict] = ..., - interval: Optional[int] = ..., - interval_units: Optional[Literal["minutes", "hours", "days"]] = ..., - crontab: Optional[str] = ..., - start_time: Optional[datetime] = ..., - end_time: Optional[datetime] = ..., - last_run_successful: Optional[bool] = ..., - last_run_message: Optional[str] = ..., - last_run: Optional[datetime] = ..., - next_run: Optional[datetime] = ..., - paused: bool = ..., - ) -> "DataSource": - """Update a data source.""" - - status_body = { - k: v - for k, v in { - "lastRunSuccessful": last_run_successful, - "lastRunMessage": last_run_message, - "lastRun": last_run, - "nextRun": next_run, - "paused": paused, - }.items() - if v is not ... - } - status_body = status_body if status_body else ... - - schedule_body = { - k: v - for k, v in { - "interval": interval, - "intervalUnits": interval_units, - "crontab": crontab, - "startTime": start_time, - "endTime": end_time, - }.items() - if v is not ... - } - schedule_body = schedule_body if schedule_body else ... - - body = { - k: v - for k, v in { - "name": name, - "orchestrationSystemId": getattr( - orchestration_system, "uid", orchestration_system - ), - "settings": settings, - "schedule": schedule_body, - "status": status_body, - }.items() - if v is not ... - } - - return super().update(uid=str(uid), **body) - - def add_datastream( - self, uid: Union[UUID, str], datastream: Union["Datastream", UUID, str] - ) -> None: - """Add a datastream to this data source.""" - - path = f"/{self.client.base_route}/{self.model.get_route()}/{str(uid)}/datastreams/{normalize_uuid(datastream)}" - self.client.request("put", path) - - def remove_datastream( - self, uid: Union[UUID, str], datastream: Union["Datastream", UUID, str] - ) -> None: - """Remove a datastream from this data source.""" - - path = f"/{self.client.base_route}/{self.model.get_route()}/{str(uid)}/datastreams/{normalize_uuid(datastream)}" - self.client.request("delete", path) diff --git a/src/hydroserverpy/api/services/etl/orchestration_system.py b/src/hydroserverpy/api/services/etl/orchestration_system.py index 6afe970..3bcdfff 100644 --- a/src/hydroserverpy/api/services/etl/orchestration_system.py +++ b/src/hydroserverpy/api/services/etl/orchestration_system.py @@ -1,4 +1,4 @@ -from typing import Optional, Union, List, TYPE_CHECKING +from typing import Union, List, Optional, TYPE_CHECKING from uuid import UUID from hydroserverpy.api.models import OrchestrationSystem from hydroserverpy.api.utils import normalize_uuid @@ -19,7 +19,7 @@ def list( page: int = ..., page_size: int = ..., order_by: List[str] = ..., - workspace: Optional[Union["Workspace", UUID, str]] = ..., + workspace: Union["Workspace", UUID, str] = ..., orchestration_system_type: str = ..., fetch_all: bool = False, ) -> List["OrchestrationSystem"]: diff --git a/src/hydroserverpy/api/services/etl/task.py b/src/hydroserverpy/api/services/etl/task.py new file mode 100644 index 0000000..69eefc8 --- /dev/null +++ b/src/hydroserverpy/api/services/etl/task.py @@ -0,0 +1,273 @@ +import json +from typing import Literal, Union, Optional, List, Dict, Any, TYPE_CHECKING +from uuid import UUID +from datetime import datetime +from pydantic import Field +from hydroserverpy.api.models import DataConnection, Task, TaskRun, TaskMapping, OrchestrationSystem +from hydroserverpy.api.utils import normalize_uuid +from ..base import HydroServerBaseService + +if TYPE_CHECKING: + from hydroserverpy import HydroServer + from hydroserverpy.api.models import Workspace + + +class TaskService(HydroServerBaseService): + def __init__(self, client: "HydroServer"): + self.model = Task + super().__init__(client) + + def list( + self, + page: int = ..., + page_size: int = ..., + order_by: List[str] = ..., + workspace: Optional[Union["Workspace", UUID, str]] = ..., + orchestration_system: Optional[Union["OrchestrationSystem", UUID, str]] = ..., + orchestration_system_type: str = ..., + data_connection: Union["Workspace", UUID, str] = ..., + data_connection_type: str = ..., + extractor_type: str = ..., + transformer_type: str = ..., + loader_type: str = ..., + source_identifier: str = ..., + target_identifier: str = ..., + latest_run_status: str = ..., + latest_run_started_at_max: datetime = ..., + latest_run_started_at_min: datetime = ..., + latest_run_finished_at_max: Optional[datetime] = ..., + latest_run_finished_at_min: Optional[datetime] = ..., + start_time_max: Optional[datetime] = ..., + start_time_min: Optional[datetime] = ..., + next_run_at_max: Optional[datetime] = ..., + next_run_at_min: Optional[datetime] = ..., + paused: bool = ..., + fetch_all: bool = False, + ) -> List["Task"]: + """Fetch a collection of ETL tasks.""" + + return super().list( + page=page, + page_size=page_size, + order_by=order_by, + workspace_id=normalize_uuid(workspace), + orchestration_system_id=normalize_uuid(orchestration_system), + orchestration_system_type=orchestration_system_type, + data_connection_id=normalize_uuid(data_connection), + data_connection_type=data_connection_type, + extractor_type=extractor_type, + transformer_type=transformer_type, + loader_type=loader_type, + source_identifier=source_identifier, + target_identifier=target_identifier, + latest_run_status=latest_run_status, + latest_run_started_at_max=latest_run_started_at_max, + latest_run_started_at_min=latest_run_started_at_min, + latest_run_finished_at_max=latest_run_finished_at_max, + latest_run_finished_at_min=latest_run_finished_at_min, + start_time_max=start_time_max, + start_time_min=start_time_min, + next_run_at_max=next_run_at_max, + next_run_at_min=next_run_at_min, + paused=paused, + fetch_all=fetch_all, + ) + + def create( + self, + name: str, + workspace: Union["Workspace", UUID, str], + data_connection: Union["DataConnection", UUID, str], + orchestration_system: Union["OrchestrationSystem", UUID, str], + extractor_variables: dict = Field(default_factory=dict), + transformer_variables: dict = Field(default_factory=dict), + loader_variables: dict = Field(default_factory=dict), + paused: bool = False, + start_time: Optional[datetime] = None, + next_run_at: Optional[datetime] = None, + crontab: Optional[str] = None, + interval: Optional[int] = None, + interval_period: Optional[str] = None, + mappings: List[dict] = Field(default_factory=list), + ) -> "Task": + """Create a new ETL task.""" + + body = { + "name": name, + "workspaceId": normalize_uuid(workspace), + "dataConnectionId": normalize_uuid(data_connection), + "orchestrationSystemId": normalize_uuid(orchestration_system), + "extractorVariables": extractor_variables, + "transformerVariables": transformer_variables, + "loaderVariables": loader_variables, + "schedule": { + "paused": paused, + "startTime": start_time, + "nextRunAt": next_run_at, + "crontab": crontab, + "interval": interval, + "intervalPeriod": interval_period, + } if interval or crontab else ..., + "mappings": mappings if mappings else [] + } + + return super().create(**body) + + def update( + self, + uid: Union[UUID, str], + name: str = ..., + data_connection: Union["DataConnection", UUID, str] = ..., + orchestration_system: Union["OrchestrationSystem", UUID, str] = ..., + extractor_variables: dict = ..., + transformer_variables: dict = ..., + loader_variables: dict = ..., + paused: bool = ..., + start_time: Optional[datetime] = ..., + next_run_at: Optional[datetime] = ..., + crontab: Optional[str] = ..., + interval: Optional[int] = ..., + interval_period: Optional[str] = ..., + mappings: List[dict] = ... + ) -> "DataConnection": + """Update an ETL task.""" + + body: Dict[str, Any] = { + "name": name, + "dataConnectionId": normalize_uuid(data_connection), + "orchestrationSystemId": normalize_uuid(orchestration_system), + "extractorVariables": extractor_variables, + "transformerVariables": transformer_variables, + "loaderVariables": loader_variables, + "mappings": mappings + } + + if crontab is None and interval is None: + body["schedule"] = None + elif any(value is not ... for value in [paused, start_time, next_run_at, crontab, interval, interval_period]): + body["schedule"] = { + "paused": paused, + "startTime": start_time, + "nextRunAt": next_run_at, + "crontab": crontab, + "interval": interval, + "intervalPeriod": interval_period, + } + + return super().update(uid=str(uid), **body) + + def run(self, uid: Union[UUID, str]): + """Run an ETL task.""" + + self.client.request( + "post", f"/{self.client.base_route}/{self.model.get_route()}/{str(uid)}" + ) + + def get_task_runs( + self, + uid: Union[UUID, str], + page: int = ..., + page_size: int = 100, + order_by: List[str] = ..., + status: str = ..., + started_at_max: datetime = ..., + started_at_min: datetime = ..., + finished_at_max: datetime = ..., + finished_at_min: datetime = ..., + ) -> List["TaskRun"]: + """Retrieve task runs of a task.""" + + params = { + "page": page, + "page_size": page_size, + "order_by": ",".join(order_by) if order_by is not ... else order_by, + "status": status, + "started_at_max": started_at_max, + "started_at_min": started_at_min, + "finished_at_max": finished_at_max, + "finished_at_min": finished_at_min, + } + params = { + k: ("null" if v is None else v) + for k, v in params.items() + if v is not ... + } + + path = f"/{self.client.base_route}/{self.model.get_route()}/{str(uid)}/runs" + + return [ + TaskRun(**task_run) for task_run in self.client.request("get", path, params=params).json() + ] + + def create_task_run( + self, + uid: Union[UUID, str], + status: Literal["RUNNING", "SUCCESS", "FAILURE"], + started_at: datetime, + finished_at: datetime = ..., + result: dict = ..., + ) -> TaskRun: + """Create a task run record for a task.""" + + path = f"/{self.client.base_route}/{self.model.get_route()}/{str(uid)}/runs" + headers = {"Content-type": "application/json"} + body = { + k: ("null" if v is None else v) for k, v in { + "status": status, + "started_at": started_at, + "finished_at": finished_at, + "result": result, + }.items() if v is not ... + } + + return TaskRun(**self.client.request( + "post", path, headers=headers, data=json.dumps(body, default=self.default_serializer) + ).json()) + + def get_task_run( + self, + uid: Union[UUID, str], + task_run_id: Union[UUID, str] + ) -> TaskRun: + """Get a task run record for a task.""" + + return TaskRun(**self.client.request( + "get", f"/{self.client.base_route}/{self.model.get_route()}/{str(uid)}/runs/{str(task_run_id)}" + ).json()) + + def update_task_run( + self, + uid: Union[UUID, str], + task_run_id: Union[UUID, str], + status: Literal["RUNNING", "SUCCESS", "FAILURE"] = ..., + started_at: datetime = ..., + finished_at: datetime = ..., + result: dict = ..., + ) -> TaskRun: + """Update a task run record for a task.""" + + path = f"/{self.client.base_route}/{self.model.get_route()}/{str(uid)}/runs/{str(task_run_id)}" + headers = {"Content-type": "application/json"} + body = { + k: ("null" if v is None else v) for k, v in { + "status": status, + "started_at": started_at, + "finished_at": finished_at, + "result": result, + }.items() if v is not ... + } + + return TaskRun(**self.client.request( + "patch", path, headers=headers, data=json.dumps(body, default=self.default_serializer) + ).json()) + + def delete_task_run( + self, + uid: Union[UUID, str], + task_run_id: Union[UUID, str] + ) -> None: + """Delete a task run record for a task.""" + + self.client.request( + "delete", f"/{self.client.base_route}/{self.model.get_route()}/{str(uid)}/runs/{str(task_run_id)}" + ) diff --git a/src/hydroserverpy/api/services/sta/datastream.py b/src/hydroserverpy/api/services/sta/datastream.py index b95c7d0..f2d3d94 100644 --- a/src/hydroserverpy/api/services/sta/datastream.py +++ b/src/hydroserverpy/api/services/sta/datastream.py @@ -17,8 +17,6 @@ Sensor, ObservedProperty, ProcessingLevel, - DataSource, - DataArchive ) @@ -38,8 +36,6 @@ def list( observed_property: Union["ObservedProperty", UUID, str] = ..., processing_level: Union["ProcessingLevel", UUID, str] = ..., unit: Union["Unit", UUID, str] = ..., - data_source: Optional[Union["DataSource", UUID, str]] = ..., - data_archive: Optional[Union["DataArchive", UUID, str]] = ..., observation_type: str = ..., sampled_medium: str = ..., status: Optional[str] = ..., @@ -69,8 +65,6 @@ def list( observed_property_id=normalize_uuid(observed_property), processing_level_id=normalize_uuid(processing_level), unit_id=normalize_uuid(unit), - data_source_id=normalize_uuid(data_source), - data_archive_id=normalize_uuid(data_archive), observation_type=observation_type, sampled_medium=sampled_medium, status=status, diff --git a/src/hydroserverpy/api/models/etl/README.md b/src/hydroserverpy/etl/README.md similarity index 100% rename from src/hydroserverpy/api/models/etl/README.md rename to src/hydroserverpy/etl/README.md diff --git a/src/hydroserverpy/etl/__init__.py b/src/hydroserverpy/etl/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/hydroserverpy/api/models/etl/etl_configuration.py b/src/hydroserverpy/etl/etl_configuration.py similarity index 93% rename from src/hydroserverpy/api/models/etl/etl_configuration.py rename to src/hydroserverpy/etl/etl_configuration.py index b737284..34ca9e7 100644 --- a/src/hydroserverpy/api/models/etl/etl_configuration.py +++ b/src/hydroserverpy/etl/etl_configuration.py @@ -1,3 +1,4 @@ +import uuid from typing import Annotated, Dict, List, Literal, Optional, Union from pydantic import BaseModel, Field, field_validator from enum import Enum @@ -84,9 +85,9 @@ def check_timezone(cls, timezone_value, info): return timezone_value -class PerPayloadPlaceholder(BaseModel): +class PerTaskPlaceholder(BaseModel): name: str - type: Literal["perPayload"] + type: Literal["perTask"] class RunTimePlaceholder(BaseModel): @@ -100,7 +101,7 @@ class Config: PlaceholderVariable = Annotated[ - Union[PerPayloadPlaceholder, RunTimePlaceholder], + Union[PerTaskPlaceholder, RunTimePlaceholder], Field(discriminator="type"), ] @@ -205,20 +206,19 @@ class Config: populate_by_name = True -class Payload(BaseModel): +class Task(BaseModel): + uid: uuid.UUID = Field(..., alias="id") name: str = "" mappings: List[SourceTargetMapping] = Field(default_factory=list) extractor_variables: Dict[str, str] = Field( default_factory=dict, alias="extractorVariables" ) + transformer_variables: Dict[str, str] = Field( + default_factory=dict, alias="transformerVariables" + ) + loader_variables: Dict[str, str] = Field( + default_factory=dict, alias="loaderVariables" + ) class Config: populate_by_name = True - - -class EtlConfiguration(BaseModel): - type: WorkflowType - extractor: ExtractorConfig - transformer: TransformerConfig - loader: LoaderConfig - payloads: List[Payload] diff --git a/src/hydroserverpy/api/models/etl/extractors/__init__.py b/src/hydroserverpy/etl/extractors/__init__.py similarity index 100% rename from src/hydroserverpy/api/models/etl/extractors/__init__.py rename to src/hydroserverpy/etl/extractors/__init__.py diff --git a/src/hydroserverpy/api/models/etl/extractors/base.py b/src/hydroserverpy/etl/extractors/base.py similarity index 74% rename from src/hydroserverpy/api/models/etl/extractors/base.py rename to src/hydroserverpy/etl/extractors/base.py index 527dc44..5eeeebb 100644 --- a/src/hydroserverpy/api/models/etl/extractors/base.py +++ b/src/hydroserverpy/etl/extractors/base.py @@ -2,7 +2,7 @@ import logging import pandas as pd from datetime import datetime -from ..etl_configuration import ExtractorConfig, Payload +from ..etl_configuration import ExtractorConfig, Task from ..timestamp_parser import TimestampParser @@ -10,7 +10,7 @@ class Extractor: def __init__(self, extractor_config: ExtractorConfig): self.cfg = extractor_config - def resolve_placeholder_variables(self, payload: Payload, loader): + def resolve_placeholder_variables(self, task: Task, loader): logging.info(f"Creating runtime variables...") filled = {} for placeholder in self.cfg.placeholder_variables: @@ -19,14 +19,14 @@ def resolve_placeholder_variables(self, payload: Payload, loader): if placeholder.type == "runTime": logging.info(f"Resolving runtime var: {name}") if placeholder.run_time_value == "latestObservationTimestamp": - value = loader.earliest_begin_date(payload) + value = loader.earliest_begin_date(task) elif placeholder.run_time_value == "jobExecutionTime": value = pd.Timestamp.now(tz="UTC") - elif placeholder.type == "perPayload": - logging.info(f"Resolving payload var: {name}") - if name not in payload.extractor_variables: - raise KeyError(f"Missing per-payload variable '{name}'") - value = payload.extractor_variables[name] + elif placeholder.type == "perTask": + logging.info(f"Resolving task var: {name}") + if name not in task.extractor_variables: + raise KeyError(f"Missing per-task variable '{name}'") + value = task.extractor_variables[name] else: continue diff --git a/src/hydroserverpy/api/models/etl/extractors/ftp_extractor.py b/src/hydroserverpy/etl/extractors/ftp_extractor.py similarity index 100% rename from src/hydroserverpy/api/models/etl/extractors/ftp_extractor.py rename to src/hydroserverpy/etl/extractors/ftp_extractor.py diff --git a/src/hydroserverpy/api/models/etl/extractors/http_extractor.py b/src/hydroserverpy/etl/extractors/http_extractor.py similarity index 80% rename from src/hydroserverpy/api/models/etl/extractors/http_extractor.py rename to src/hydroserverpy/etl/extractors/http_extractor.py index d35a6bf..8181848 100644 --- a/src/hydroserverpy/api/models/etl/extractors/http_extractor.py +++ b/src/hydroserverpy/etl/extractors/http_extractor.py @@ -2,7 +2,7 @@ import requests from io import BytesIO -from ..etl_configuration import Payload +from ..etl_configuration import Task from .base import Extractor, ExtractorConfig @@ -10,11 +10,11 @@ class HTTPExtractor(Extractor): def __init__(self, settings: ExtractorConfig): super().__init__(settings) - def extract(self, payload: Payload, loader=None): + def extract(self, task: Task, loader=None): """ Downloads the file from the HTTP/HTTPS server and returns a file-like object. """ - url = self.resolve_placeholder_variables(payload, loader) + url = self.resolve_placeholder_variables(task, loader) logging.info(f"Requesting data from → {url}") response = requests.get(url) diff --git a/src/hydroserverpy/api/models/etl/extractors/local_file_extractor.py b/src/hydroserverpy/etl/extractors/local_file_extractor.py similarity index 100% rename from src/hydroserverpy/api/models/etl/extractors/local_file_extractor.py rename to src/hydroserverpy/etl/extractors/local_file_extractor.py diff --git a/src/hydroserverpy/api/models/etl/factories.py b/src/hydroserverpy/etl/factories.py similarity index 100% rename from src/hydroserverpy/api/models/etl/factories.py rename to src/hydroserverpy/etl/factories.py diff --git a/src/hydroserverpy/api/models/etl/loaders/__init__.py b/src/hydroserverpy/etl/loaders/__init__.py similarity index 100% rename from src/hydroserverpy/api/models/etl/loaders/__init__.py rename to src/hydroserverpy/etl/loaders/__init__.py diff --git a/src/hydroserverpy/api/models/etl/loaders/base.py b/src/hydroserverpy/etl/loaders/base.py similarity index 73% rename from src/hydroserverpy/api/models/etl/loaders/base.py rename to src/hydroserverpy/etl/loaders/base.py index 9f70729..85c9fe8 100644 --- a/src/hydroserverpy/api/models/etl/loaders/base.py +++ b/src/hydroserverpy/etl/loaders/base.py @@ -7,5 +7,5 @@ def load(self, *args, **kwargs) -> None: pass @abstractmethod - def earliest_begin_date(self, payload_mappings) -> str: + def earliest_begin_date(self, task_mappings) -> str: pass diff --git a/src/hydroserverpy/api/models/etl/loaders/hydroserver_loader.py b/src/hydroserverpy/etl/loaders/hydroserver_loader.py similarity index 80% rename from src/hydroserverpy/api/models/etl/loaders/hydroserver_loader.py rename to src/hydroserverpy/etl/loaders/hydroserver_loader.py index 398875c..b4e942b 100644 --- a/src/hydroserverpy/api/models/etl/loaders/hydroserver_loader.py +++ b/src/hydroserverpy/etl/loaders/hydroserver_loader.py @@ -4,7 +4,7 @@ from .base import Loader import logging import pandas as pd -from ..etl_configuration import Payload, SourceTargetMapping +from ..etl_configuration import Task, SourceTargetMapping if TYPE_CHECKING: from hydroserverpy.api.client import HydroServer @@ -15,17 +15,17 @@ class HydroServerLoader(Loader): A class that extends the HydroServer client with ETL-specific functionalities. """ - def __init__(self, client: HydroServer, data_source_id): + def __init__(self, client: HydroServer, task_id): self.client = client self._begin_cache: dict[str, pd.Timestamp] = {} - self.data_source_id = data_source_id + self.task_id = task_id - def load(self, data: pd.DataFrame, payload: Payload) -> None: + def load(self, data: pd.DataFrame, task: Task) -> None: """ Load observations from a DataFrame to the HydroServer. :param data: A Pandas DataFrame where each column corresponds to a datastream. """ - begin_date = self.earliest_begin_date(payload) + begin_date = self.earliest_begin_date(task) new_data = data[data["timestamp"] > begin_date] for col in new_data.columns.difference(["timestamp"]): datastream = self.client.datastreams.get( @@ -77,26 +77,23 @@ def load(self, data: pd.DataFrame, payload: Payload) -> None: def _fetch_earliest_begin( self, mappings: list[SourceTargetMapping] ) -> pd.Timestamp: - logging.info("Querying HydroServer for earliest begin date for payload...") + logging.info("Querying HydroServer for earliest begin date for task...") timestamps = [] - datastreams = self.client.datastreams.list( - data_source=self.data_source_id - ).items - ds_by_uid = {str(ds.uid): ds for ds in datastreams} for m in mappings: - for p in m.paths: - datastream = ds_by_uid[str(p.target_identifier)] + for p in m["paths"] if isinstance(m, dict) else m.paths: + datastream_id = p["targetIdentifier"] if isinstance(p, dict) else p.target_identifier + datastream = self.client.datastreams.get(datastream_id) raw = datastream.phenomenon_end_time or "1970-01-01" ts = pd.to_datetime(raw, utc=True) timestamps.append(ts) logging.info(f"Found earliest begin date: {min(timestamps)}") return min(timestamps) - def earliest_begin_date(self, payload: Payload) -> pd.Timestamp: + def earliest_begin_date(self, task: Task) -> pd.Timestamp: """ - Return earliest begin date for a payload, or compute+cache it on first call. + Return earliest begin date for a task, or compute+cache it on first call. """ - key = payload.name + key = task.name if key not in self._begin_cache: - self._begin_cache[key] = self._fetch_earliest_begin(payload.mappings) + self._begin_cache[key] = self._fetch_earliest_begin(task.mappings) return self._begin_cache[key] diff --git a/src/hydroserverpy/api/models/etl/timestamp_parser.py b/src/hydroserverpy/etl/timestamp_parser.py similarity index 100% rename from src/hydroserverpy/api/models/etl/timestamp_parser.py rename to src/hydroserverpy/etl/timestamp_parser.py diff --git a/src/hydroserverpy/api/models/etl/transformers/__init__.py b/src/hydroserverpy/etl/transformers/__init__.py similarity index 100% rename from src/hydroserverpy/api/models/etl/transformers/__init__.py rename to src/hydroserverpy/etl/transformers/__init__.py diff --git a/src/hydroserverpy/api/models/etl/transformers/base.py b/src/hydroserverpy/etl/transformers/base.py similarity index 96% rename from src/hydroserverpy/api/models/etl/transformers/base.py rename to src/hydroserverpy/etl/transformers/base.py index 69c61eb..42a754a 100644 --- a/src/hydroserverpy/api/models/etl/transformers/base.py +++ b/src/hydroserverpy/etl/transformers/base.py @@ -69,9 +69,9 @@ def standardize_dataframe( self, df: pd.DataFrame, mappings: List[SourceTargetMapping] ): if not df.empty: - logging.info(f"Read payload into dataframe: {df.iloc[0].to_dict()}") + logging.info(f"Read task into dataframe: {df.iloc[0].to_dict()}") else: - logging.info("Read payload into dataframe: [empty dataframe]") + logging.info("Read task into dataframe: [empty dataframe]") # 1) Normalize timestamp column df.rename(columns={self.timestamp.key: "timestamp"}, inplace=True) diff --git a/src/hydroserverpy/api/models/etl/transformers/csv_transformer.py b/src/hydroserverpy/etl/transformers/csv_transformer.py similarity index 100% rename from src/hydroserverpy/api/models/etl/transformers/csv_transformer.py rename to src/hydroserverpy/etl/transformers/csv_transformer.py diff --git a/src/hydroserverpy/api/models/etl/transformers/json_transformer.py b/src/hydroserverpy/etl/transformers/json_transformer.py similarity index 100% rename from src/hydroserverpy/api/models/etl/transformers/json_transformer.py rename to src/hydroserverpy/etl/transformers/json_transformer.py diff --git a/src/hydroserverpy/api/models/etl/types.py b/src/hydroserverpy/etl/types.py similarity index 100% rename from src/hydroserverpy/api/models/etl/types.py rename to src/hydroserverpy/etl/types.py