Skip to content

Commit 03b0e19

Browse files
Ken LippoldKen Lippold
authored andcommitted
Changed transformer output DataFrame format to better support aggregation operations.
1 parent a3b5d0a commit 03b0e19

19 files changed

+906
-629
lines changed

src/hydroserverpy/etl/README.md

Lines changed: 34 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ extractor = LocalFileExtractor(
5555

5656
### Transformers
5757

58-
Transformers parse the extracted payload into a DataFrame where the first column is `timestamp` and the remaining columns are named by their target datastream ID.
58+
Transformers parse the extracted payload into a DataFrame with columns `timestamp`, `value`, and `target_id`.
5959

6060
**CSV:**
6161
```python
@@ -71,6 +71,8 @@ transformer = CSVTransformer(
7171

7272
Use `identifier_type="index"` to reference columns by 1-based position instead of name:
7373
```python
74+
from hydroserverpy.etl.transformers import CSVTransformer
75+
7476
transformer = CSVTransformer(
7577
timestamp_key="1",
7678
identifier_type="index",
@@ -102,6 +104,8 @@ Every transformer requires a `timestamp_key` identifying the source column that
102104
| `"custom"` | Parses timestamps using a `strftime`-compatible format string provided via `timestamp_format`. Required when the source timestamps are not in a standard ISO 8601 format. |
103105

104106
```python
107+
from hydroserverpy.etl.transformers import CSVTransformer
108+
105109
# Custom format example — timestamps like "01/15/2024 08:30:00"
106110
transformer = CSVTransformer(
107111
timestamp_key="datetime",
@@ -121,15 +125,17 @@ transformer = CSVTransformer(
121125
| `"iana"` | Treats timestamps as naive and applies a named IANA timezone. Strips any embedded offset if present. | `timezone` as a valid IANA name |
122126

123127
```python
128+
from hydroserverpy.etl.transformers import CSVTransformer
129+
124130
# Fixed UTC offset — timestamps are in US Mountain Standard Time
125-
transformer = CSVTransformer(
131+
utc_transformer = CSVTransformer(
126132
timestamp_key="datetime",
127133
timezone_type="offset",
128134
timezone="-0700",
129135
)
130136

131137
# IANA timezone — handles daylight saving time automatically
132-
transformer = CSVTransformer(
138+
iana_transformer = CSVTransformer(
133139
timestamp_key="datetime",
134140
timezone_type="iana",
135141
timezone="America/Denver",
@@ -148,7 +154,7 @@ All timestamps are normalized to UTC before loading regardless of the source tim
148154

149155
### Data Mappings
150156

151-
Data mappings connect source columns (by name or index) to HydroServer datastream IDs. Each mapping can fan out to multiple target datastreams, and optional data operations can be applied along the way.
157+
Data mappings connect source columns (by name or index) to HydroServer datastream IDs. Each mapping can fan out to multiple target datastreams, and optional data operations can be applied per target path.
152158

153159
```python
154160
from hydroserverpy.etl.transformers import ETLDataMapping, ETLTargetPath
@@ -171,18 +177,18 @@ data_mappings = [
171177

172178
#### Data Operations
173179

174-
Target paths can include a sequence of data operations applied to the source values before loading. Supported operations are arithmetic expressions and rating curves.
180+
Target paths can include a sequence of data operations applied to the source values before loading. Operations are applied in order — the output of each becomes the input of the next. Supported operations are arithmetic expressions, rating curves, and temporal aggregation.
175181

176182
**Arithmetic expression** — applies a Python arithmetic expression where `x` represents the source value. Only `+`, `-`, `*`, `/`, numeric literals, and the variable `x` are permitted.
177183

178184
```python
185+
from hydroserverpy.etl.transformers import ETLTargetPath
179186
from hydroserverpy.etl.operations import ArithmeticExpressionOperation
180187

181188
ETLTargetPath(
182189
target_identifier="<datastream-uuid>",
183190
data_operations=[
184191
ArithmeticExpressionOperation(
185-
type="arithmetic_expression",
186192
expression="(x - 32) / 1.8", # Fahrenheit to Celsius
187193
target_identifier="<datastream-uuid>",
188194
)
@@ -193,44 +199,40 @@ ETLTargetPath(
193199
**Rating curve** — maps input values to output values using linear interpolation against a two-column CSV lookup table (input, output), retrieved from a URL.
194200

195201
```python
202+
from hydroserverpy.etl.transformers import ETLTargetPath
196203
from hydroserverpy.etl.operations import RatingCurveDataOperation
197204

198205
ETLTargetPath(
199206
target_identifier="<datastream-uuid>",
200207
data_operations=[
201208
RatingCurveDataOperation(
202-
type="rating_curve",
203209
rating_curve_url="https://example.com/curves/stage-discharge.csv",
204210
target_identifier="<datastream-uuid>",
205211
)
206212
],
207213
)
208214
```
209215

210-
Operations are applied in order. The output of each operation becomes the input of the next.
211-
212-
### Temporal Aggregation
213-
214-
Temporal aggregation is an optional step that reduces the per-observation DataFrame produced by the transformer into period-level summaries before loading. When configured, the same aggregation is applied uniformly to every target series in the pipeline.
216+
**Temporal aggregation** — reduces per-observation values into period-level summaries. When included, it should be the last operation in the sequence, as it changes the shape of the data from one row per observation to one row per aggregation window.
215217

216218
```python
217-
from hydroserverpy.etl.models import TemporalAggregation
219+
from hydroserverpy.etl.transformers import ETLTargetPath
220+
from hydroserverpy.etl.operations import TemporalAggregationOperation
218221

219-
aggregation = TemporalAggregation(
220-
aggregation_statistic="simple_mean",
221-
aggregation_interval=1,
222-
aggregation_interval_unit="day",
222+
ETLTargetPath(
223+
target_identifier="<datastream-uuid>",
224+
data_operations=[
225+
TemporalAggregationOperation(
226+
aggregation_statistic="simple_mean",
227+
aggregation_interval=1,
228+
aggregation_interval_unit="day",
229+
target_identifier="<datastream-uuid>",
230+
)
231+
],
223232
)
224233
```
225234

226-
Pass it to the transformer at construction time:
227-
228-
```python
229-
transformer = CSVTransformer(
230-
timestamp_key="datetime",
231-
temporal_aggregation=aggregation,
232-
)
233-
```
235+
Because temporal aggregation is a per-target operation, different targets fed by the same source can use different statistics, intervals, or timezone alignments independently.
234236

235237
#### Aggregation statistic
236238

@@ -256,37 +258,40 @@ Window boundaries are aligned to local midnight in the configured timezone. The
256258
| `"iana"` | Local midnight in a named timezone, handling DST automatically | `timezone` as a valid IANA name |
257259

258260
```python
261+
from hydroserverpy.etl.operations import TemporalAggregationOperation
262+
259263
# Daily windows aligned to US Mountain Time (UTC-7, DST-aware)
260-
aggregation = TemporalAggregation(
264+
TemporalAggregationOperation(
261265
aggregation_statistic="simple_mean",
262266
aggregation_interval=1,
263267
aggregation_interval_unit="day",
264268
timezone_type="iana",
265269
timezone="America/Denver",
270+
target_identifier="<datastream-uuid>",
266271
)
267272

268273
# Daily windows at a fixed offset (no DST adjustment)
269-
aggregation = TemporalAggregation(
274+
TemporalAggregationOperation(
270275
aggregation_statistic="time_weighted_mean",
271276
aggregation_interval=1,
272277
aggregation_interval_unit="day",
273278
timezone_type="offset",
274279
timezone="-0700",
280+
target_identifier="<datastream-uuid>",
275281
)
276282
```
277283

278284
**Window boundary semantics:** Windows run from the local midnight that contains the first observation to the local midnight that contains the last observation. The last observation defines the exclusive upper boundary — observations on that final local day are not aggregated. Ensure your source data extends at least one day past the last period you want included, or that the last observation falls on the day following the final window.
279285

280286
Days with no observations are omitted from the output rather than filled with null values.
281287

282-
283288
### Loader
284289

285290
```python
286291
from hydroserverpy import HydroServer
287292
from hydroserverpy.etl.loaders import HydroServerLoader
288293

289-
hs = HydroServer(host="https://my-hydroserver.com", username="user", password="pass")
294+
hs = HydroServer(host="https://playground.hydroserver.org", email="user@example.com", password="pass")
290295

291296
loader = HydroServerLoader(
292297
client=hs,

src/hydroserverpy/etl/hydroserver.py

Lines changed: 28 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,9 @@
44
from hydroserverpy.api.models import Task, DataConnection
55
from hydroserverpy.etl import extractors, transformers, loaders, ETLPipeline
66
from hydroserverpy.etl.transformers import ETLDataMapping, ETLTargetPath
7-
from hydroserverpy.etl.operations import DataOperation, RatingCurveDataOperation, ArithmeticExpressionOperation
8-
from hydroserverpy.etl.models import Timestamp, TemporalAggregation
7+
from hydroserverpy.etl.operations import (DataOperation, RatingCurveDataOperation, ArithmeticExpressionOperation,
8+
TemporalAggregationOperation)
9+
from hydroserverpy.etl.models import Timestamp
910

1011

1112
def normalize_timestamp_kwargs(**kwargs) -> dict:
@@ -82,62 +83,23 @@ def resolve_data_operations(raw_etl_target_path: dict) -> list[DataOperation]:
8283
target_identifier=raw_etl_target_path["targetIdentifier"],
8384
rating_curve_url=data_operation["ratingCurveUrl"],
8485
))
86+
elif data_operation["type"] == "aggregation":
87+
timezone_kwargs = normalize_timestamp_kwargs(format="iso", **data_operation)
88+
aggregation_mapping = {
89+
"simple_mean": "simple_mean",
90+
"time_weighted_daily_mean": "time_weighted_mean",
91+
"last_value_of_day": "last_value_of_period",
92+
}
93+
resolved_data_operations.append(TemporalAggregationOperation(
94+
target_identifier=raw_etl_target_path["targetIdentifier"],
95+
aggregation_statistic=aggregation_mapping[data_operation["aggregationStatistic"]],
96+
timezone_type=timezone_kwargs.get("timezone_type"),
97+
timezone=timezone_kwargs.get("timezone"),
98+
))
8599

86100
return resolved_data_operations
87101

88102

89-
def resolve_temporal_aggregation(
90-
data_mappings: list[dict]
91-
) -> Optional[TemporalAggregation]:
92-
"""
93-
Extract and return a TemporalAggregation configuration from the data mappings,
94-
or None if no aggregation transformation is configured.
95-
96-
Aggregation is applied uniformly across all series at the transformer level,
97-
so only one aggregation configuration is permitted across all mappings. Raises
98-
ValueError if conflicting aggregation configurations are found.
99-
"""
100-
101-
# TODO: Aggregation settings are currently stored in HydroServer under data transformations.
102-
# In the ETL module, aggregation is configured at the transformer level and applied uniformly
103-
# to all datastreams after per-column data operations. This ensures the DataFrame passed to
104-
# the loader remains aligned in time — a shared timestamp column is only valid if all series are
105-
# aggregated identically. We may want to update how aggregation settings are stored in HydroServer
106-
# to match this. In the meantime, if multiple conflicting aggregation settings somehow get passed
107-
# to the same task, this method will throw an exception rather than try to guess which
108-
# aggregation configuration to use. Alternatively, if we want to allow varying aggregation on a per
109-
# column basis, we'll need to retool how the pipeline passes data to the loader. Each datastream
110-
# will either need a dedicated timestamp column, or multiple dataframes will need to be passed to
111-
# the loader.
112-
113-
temporal_aggregation = None
114-
115-
for mapping in data_mappings:
116-
for path in mapping["paths"]:
117-
for transformation in path.get("dataTransformations", []):
118-
if transformation["type"] == "aggregation":
119-
if temporal_aggregation is not None:
120-
raise ValueError(
121-
"Received multiple aggregation configurations from HydroServer for the transformer. "
122-
"Only one aggregation configuration per transformer is currently supported."
123-
)
124-
125-
timezone_kwargs = normalize_timestamp_kwargs(**transformation)
126-
aggregation_mapping = {
127-
"simple_mean": "simple_mean",
128-
"time_weighted_daily_mean": "time_weighted_mean",
129-
"last_value_of_day": "last_value_of_period",
130-
}
131-
132-
temporal_aggregation = TemporalAggregation(
133-
aggregation_statistic=aggregation_mapping[transformation["aggregationStatistic"]],
134-
timezone_type=timezone_kwargs.get("timezone_type"),
135-
timezone=timezone_kwargs.get("timezone"),
136-
)
137-
138-
return temporal_aggregation
139-
140-
141103
def build_hydroserver_pipeline(
142104
task: Task,
143105
data_connection: DataConnection,
@@ -178,9 +140,9 @@ def build_hydroserver_pipeline(
178140
if loader_cls is None:
179141
loader_cls = getattr(loaders, f"{data_connection.loader_type}Loader")
180142

181-
extractor_settings = dict(data_connection.extractor_settings)
182-
transformer_settings = dict(data_connection.transformer_settings)
183-
loader_settings = dict(data_connection.loader_settings)
143+
extractor_settings = dict(data_connection.extractor_settings) if data_connection else {}
144+
transformer_settings = dict(data_connection.transformer_settings) if data_connection else {}
145+
loader_settings = dict(data_connection.loader_settings) if data_connection else {}
184146

185147
extractor_placeholders = extractor_settings.pop("placeholderVariables", [])
186148
extractor_variables = getattr(task, "extractor_settings", None) or getattr(task, "extractor_variables", {})
@@ -195,19 +157,22 @@ def build_hydroserver_pipeline(
195157
**{to_snake(k): v for k, v in extractor_settings.items()}
196158
)
197159

198-
timestamp_settings = transformer_settings.pop("timestamp", {})
160+
if task.task_type == "Aggregation":
161+
timestamp_settings = {
162+
"key": "phenomenon_time",
163+
"format": "iso",
164+
"timezoneMode": "utc"
165+
}
166+
else:
167+
timestamp_settings = transformer_settings.pop("timestamp", {})
168+
199169
transformer_settings = {
200170
("jmespath" if k == "JMESPath" else to_snake(k)): v
201171
for k, v in transformer_settings.items()
202172
}
203173

204-
temporal_aggregation = resolve_temporal_aggregation(
205-
data_mappings=data_mappings
206-
)
207-
208174
transformer: transformers.Transformer = transformer_cls(
209175
timestamp_key=timestamp_settings["key"],
210-
temporal_aggregation=temporal_aggregation,
211176
**transformer_settings,
212177
**normalize_timestamp_kwargs(**timestamp_settings)
213178
)

0 commit comments

Comments
 (0)