Skip to content

Commit 1d69ff6

Browse files
Ken LippoldKen Lippold
authored andcommitted
Fixed old references to payloads
1 parent 8fe11d3 commit 1d69ff6

File tree

6 files changed

+26
-36
lines changed

6 files changed

+26
-36
lines changed

src/hydroserverpy/etl/etl_configuration.py

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -85,9 +85,9 @@ def check_timezone(cls, timezone_value, info):
8585
return timezone_value
8686

8787

88-
class PerPayloadPlaceholder(BaseModel):
88+
class PerTaskPlaceholder(BaseModel):
8989
name: str
90-
type: Literal["perPayload"]
90+
type: Literal["perTask"]
9191

9292

9393
class RunTimePlaceholder(BaseModel):
@@ -101,7 +101,7 @@ class Config:
101101

102102

103103
PlaceholderVariable = Annotated[
104-
Union[PerPayloadPlaceholder, RunTimePlaceholder],
104+
Union[PerTaskPlaceholder, RunTimePlaceholder],
105105
Field(discriminator="type"),
106106
]
107107

@@ -206,7 +206,7 @@ class Config:
206206
populate_by_name = True
207207

208208

209-
class Payload(BaseModel):
209+
class Task(BaseModel):
210210
uid: uuid.UUID = Field(..., alias="id")
211211
name: str = ""
212212
mappings: List[SourceTargetMapping] = Field(default_factory=list)
@@ -222,13 +222,3 @@ class Payload(BaseModel):
222222

223223
class Config:
224224
populate_by_name = True
225-
226-
227-
class EtlConfiguration(BaseModel):
228-
uid: uuid.UUID = Field(..., alias="id")
229-
name: str
230-
type: WorkflowType
231-
extractor: ExtractorConfig
232-
transformer: TransformerConfig
233-
loader: LoaderConfig
234-
payloads: List[Payload]

src/hydroserverpy/etl/extractors/base.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,15 @@
22
import logging
33
import pandas as pd
44
from datetime import datetime
5-
from ..etl_configuration import ExtractorConfig, Payload
5+
from ..etl_configuration import ExtractorConfig, Task
66
from ..timestamp_parser import TimestampParser
77

88

99
class Extractor:
1010
def __init__(self, extractor_config: ExtractorConfig):
1111
self.cfg = extractor_config
1212

13-
def resolve_placeholder_variables(self, payload: Payload, loader):
13+
def resolve_placeholder_variables(self, task: Task, loader):
1414
logging.info(f"Creating runtime variables...")
1515
filled = {}
1616
for placeholder in self.cfg.placeholder_variables:
@@ -19,14 +19,14 @@ def resolve_placeholder_variables(self, payload: Payload, loader):
1919
if placeholder.type == "runTime":
2020
logging.info(f"Resolving runtime var: {name}")
2121
if placeholder.run_time_value == "latestObservationTimestamp":
22-
value = loader.earliest_begin_date(payload)
22+
value = loader.earliest_begin_date(task)
2323
elif placeholder.run_time_value == "jobExecutionTime":
2424
value = pd.Timestamp.now(tz="UTC")
25-
elif placeholder.type == "perPayload":
26-
logging.info(f"Resolving payload var: {name}")
27-
if name not in payload.extractor_variables:
28-
raise KeyError(f"Missing per-payload variable '{name}'")
29-
value = payload.extractor_variables[name]
25+
elif placeholder.type == "perTask":
26+
logging.info(f"Resolving task var: {name}")
27+
if name not in task.extractor_variables:
28+
raise KeyError(f"Missing per-task variable '{name}'")
29+
value = task.extractor_variables[name]
3030
else:
3131
continue
3232

src/hydroserverpy/etl/extractors/http_extractor.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,19 +2,19 @@
22
import requests
33
from io import BytesIO
44

5-
from ..etl_configuration import Payload
5+
from ..etl_configuration import Task
66
from .base import Extractor, ExtractorConfig
77

88

99
class HTTPExtractor(Extractor):
1010
def __init__(self, settings: ExtractorConfig):
1111
super().__init__(settings)
1212

13-
def extract(self, payload: Payload, loader=None):
13+
def extract(self, task: Task, loader=None):
1414
"""
1515
Downloads the file from the HTTP/HTTPS server and returns a file-like object.
1616
"""
17-
url = self.resolve_placeholder_variables(payload, loader)
17+
url = self.resolve_placeholder_variables(task, loader)
1818
logging.info(f"Requesting data from → {url}")
1919

2020
response = requests.get(url)

src/hydroserverpy/etl/loaders/base.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,5 +7,5 @@ def load(self, *args, **kwargs) -> None:
77
pass
88

99
@abstractmethod
10-
def earliest_begin_date(self, payload_mappings) -> str:
10+
def earliest_begin_date(self, task_mappings) -> str:
1111
pass

src/hydroserverpy/etl/loaders/hydroserver_loader.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from .base import Loader
55
import logging
66
import pandas as pd
7-
from ..etl_configuration import Payload, SourceTargetMapping
7+
from ..etl_configuration import Task, SourceTargetMapping
88

99
if TYPE_CHECKING:
1010
from hydroserverpy.api.client import HydroServer
@@ -20,12 +20,12 @@ def __init__(self, client: HydroServer, task_id):
2020
self._begin_cache: dict[str, pd.Timestamp] = {}
2121
self.task_id = task_id
2222

23-
def load(self, data: pd.DataFrame, payload: Payload) -> None:
23+
def load(self, data: pd.DataFrame, task: Task) -> None:
2424
"""
2525
Load observations from a DataFrame to the HydroServer.
2626
:param data: A Pandas DataFrame where each column corresponds to a datastream.
2727
"""
28-
begin_date = self.earliest_begin_date(payload)
28+
begin_date = self.earliest_begin_date(task)
2929
new_data = data[data["timestamp"] > begin_date]
3030
for col in new_data.columns.difference(["timestamp"]):
3131
df = (
@@ -73,7 +73,7 @@ def load(self, data: pd.DataFrame, payload: Payload) -> None:
7373
def _fetch_earliest_begin(
7474
self, mappings: list[SourceTargetMapping]
7575
) -> pd.Timestamp:
76-
logging.info("Querying HydroServer for earliest begin date for payload...")
76+
logging.info("Querying HydroServer for earliest begin date for task...")
7777
timestamps = []
7878
datastreams = self.client.datastreams.list(
7979
data_source=self.data_source_id
@@ -88,11 +88,11 @@ def _fetch_earliest_begin(
8888
logging.info(f"Found earliest begin date: {min(timestamps)}")
8989
return min(timestamps)
9090

91-
def earliest_begin_date(self, payload: Payload) -> pd.Timestamp:
91+
def earliest_begin_date(self, task: Task) -> pd.Timestamp:
9292
"""
93-
Return earliest begin date for a payload, or compute+cache it on first call.
93+
Return earliest begin date for a task, or compute+cache it on first call.
9494
"""
95-
key = payload.name
95+
key = task.name
9696
if key not in self._begin_cache:
97-
self._begin_cache[key] = self._fetch_earliest_begin(payload.mappings)
97+
self._begin_cache[key] = self._fetch_earliest_begin(task.mappings)
9898
return self._begin_cache[key]

src/hydroserverpy/etl/transformers/base.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,9 +69,9 @@ def standardize_dataframe(
6969
self, df: pd.DataFrame, mappings: List[SourceTargetMapping]
7070
):
7171
if not df.empty:
72-
logging.info(f"Read payload into dataframe: {df.iloc[0].to_dict()}")
72+
logging.info(f"Read task into dataframe: {df.iloc[0].to_dict()}")
7373
else:
74-
logging.info("Read payload into dataframe: [empty dataframe]")
74+
logging.info("Read task into dataframe: [empty dataframe]")
7575

7676
# 1) Normalize timestamp column
7777
df.rename(columns={self.timestamp.key: "timestamp"}, inplace=True)

0 commit comments

Comments
 (0)