Skip to content

Commit 3136942

Browse files
kjlippoldKen Lippold
andauthored
367 data aggregation (#26)
* Added ETL data aggregation step * Updated ETL README to include aggregation step --------- Co-authored-by: Ken Lippold <klippold@Kens-MacBook-Pro-2.local>
1 parent bf52df3 commit 3136942

File tree

6 files changed

+1067
-62
lines changed

6 files changed

+1067
-62
lines changed

src/hydroserverpy/etl/README.md

Lines changed: 76 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -115,8 +115,8 @@ transformer = CSVTransformer(
115115

116116
| `timezone_type` | Behaviour | Requires |
117117
|---|---|---|
118-
| `"utc"` (default) | Treats all timestamps as UTC. ||
119-
| `"embedded"` | Reads timezone offset from the timestamp string itself. Falls back to UTC if the timestamps are naive. ||
118+
| `None` (default) | Reads timezone offset from the timestamp string itself. Falls back to UTC if the timestamps are naive. ||
119+
| `"utc"` | Treats all timestamps as UTC. ||
120120
| `"offset"` | Treats timestamps as naive and applies a fixed UTC offset. Strips any embedded offset if present. | `timezone` in `±HHMM` or `±HH:MM` format |
121121
| `"iana"` | Treats timestamps as naive and applies a named IANA timezone. Strips any embedded offset if present. | `timezone` as a valid IANA name |
122122

@@ -135,10 +135,10 @@ transformer = CSVTransformer(
135135
timezone="America/Denver",
136136
)
137137

138-
# Embedded offset — timestamps include their own offset, e.g. "2024-01-15T08:30:00-07:00"
138+
# Embedded offsets — timestamps include their own offset, e.g. "2024-01-15T08:30:00-07:00"
139+
# Omit timezone_type (or set it to None) to read offsets from the timestamps directly.
139140
transformer = CSVTransformer(
140141
timestamp_key="datetime",
141-
timezone_type="embedded",
142142
)
143143
```
144144

@@ -209,6 +209,77 @@ ETLTargetPath(
209209

210210
Operations are applied in order. The output of each operation becomes the input of the next.
211211

212+
### Temporal Aggregation
213+
214+
Temporal aggregation is an optional step that reduces the per-observation DataFrame produced by the transformer into period-level summaries before loading. When configured, the same aggregation is applied uniformly to every target series in the pipeline.
215+
216+
```python
217+
from hydroserverpy.etl.models import TemporalAggregation
218+
219+
aggregation = TemporalAggregation(
220+
aggregation_statistic="simple_mean",
221+
aggregation_interval=1,
222+
aggregation_interval_unit="day",
223+
)
224+
```
225+
226+
Pass it to the transformer at construction time:
227+
228+
```python
229+
transformer = CSVTransformer(
230+
timestamp_key="datetime",
231+
temporal_aggregation=aggregation,
232+
)
233+
```
234+
235+
#### Aggregation statistic
236+
237+
| `aggregation_statistic` | Behaviour |
238+
|---|---|
239+
| `"simple_mean"` | Arithmetic mean of all observations within the window. |
240+
| `"time_weighted_mean"` | Mean weighted by the time between observations, computed via trapezoidal integration. Values at window boundaries are estimated by linear interpolation from the nearest surrounding observations. |
241+
| `"last_value_of_period"` | The last observation within the window. |
242+
243+
#### Aggregation interval
244+
245+
`aggregation_interval` (integer, default `1`) and `aggregation_interval_unit` (currently `"day"`) together define the window width. An `aggregation_interval` of `3` with unit `"day"` produces 3-day windows.
246+
247+
#### Timezone
248+
249+
Window boundaries are aligned to local midnight in the configured timezone. The timezone fields follow the same conventions as the transformer timestamp configuration, with `None` (the default) falling back to UTC-day boundaries.
250+
251+
| `timezone_type` | Window boundary alignment | Requires |
252+
|---|---|---|
253+
| `None` (default) | UTC midnight ||
254+
| `"utc"` | UTC midnight ||
255+
| `"offset"` | Local midnight at a fixed UTC offset | `timezone` in `±HHMM` or `±HH:MM` format |
256+
| `"iana"` | Local midnight in a named timezone, handling DST automatically | `timezone` as a valid IANA name |
257+
258+
```python
259+
# Daily windows aligned to US Mountain Time (UTC-7, DST-aware)
260+
aggregation = TemporalAggregation(
261+
aggregation_statistic="simple_mean",
262+
aggregation_interval=1,
263+
aggregation_interval_unit="day",
264+
timezone_type="iana",
265+
timezone="America/Denver",
266+
)
267+
268+
# Daily windows at a fixed offset (no DST adjustment)
269+
aggregation = TemporalAggregation(
270+
aggregation_statistic="time_weighted_mean",
271+
aggregation_interval=1,
272+
aggregation_interval_unit="day",
273+
timezone_type="offset",
274+
timezone="-0700",
275+
)
276+
```
277+
278+
**Window boundary semantics:** Windows run from the local midnight that contains the first observation to the local midnight that contains the last observation. The last observation defines the exclusive upper boundary — observations on that final local day are not aggregated. Ensure your source data extends at least one day past the last period you want included, or that the last observation falls on the day following the final window.
279+
280+
Days with no observations are omitted from the output rather than filled with null values.
281+
282+
212283
### Loader
213284

214285
```python
@@ -349,4 +420,4 @@ for target_id, target in context.results.target_results.items():
349420
| Error | Likely cause |
350421
|---|---|
351422
| `Missing datastream IDs: ...` | One or more target datastream UUIDs don't exist on the HydroServer instance |
352-
| `HydroServer loader failed to retrieve datastream` | A network or authentication error occurred while looking up a datastream |
423+
| `HydroServer loader failed to retrieve datastream` | A network or authentication error occurred while looking up a datastream |
Lines changed: 261 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,261 @@
1+
import math
2+
import pandas as pd
3+
from typing import Literal, Optional
4+
from bisect import bisect_left
5+
from datetime import datetime, time, timedelta, timezone as dt_timezone
6+
from .timestamp import Timezone
7+
8+
9+
AggregationStatistic = Literal["simple_mean", "time_weighted_mean", "last_value_of_period"]
10+
AggregationIntervalUnit = Literal["day"]
11+
12+
13+
class TemporalAggregation(Timezone):
14+
aggregation_statistic: AggregationStatistic
15+
aggregation_interval: int = 1
16+
aggregation_interval_unit: AggregationIntervalUnit = "day"
17+
18+
def apply(self, df: pd.DataFrame) -> pd.DataFrame:
19+
"""
20+
Aggregate all non-timestamp columns in df over fixed-duration day windows.
21+
22+
The df must have a UTC-normalized 'timestamp' column. The returned DataFrame
23+
has one row per window with the window-start UTC timestamp as the
24+
'timestamp' column. Windows where every target column has no observations
25+
are dropped entirely.
26+
"""
27+
28+
target_columns = [col for col in df.columns if col != "timestamp"]
29+
30+
if df.empty:
31+
return pd.DataFrame(columns=["timestamp"] + target_columns)
32+
33+
timestamps: list[datetime] = df["timestamp"].dt.to_pydatetime().tolist()
34+
start_utc = timestamps[0]
35+
end_utc = timestamps[-1]
36+
37+
result: dict[str, list] = {"timestamp": [], **{col: [] for col in target_columns}}
38+
39+
for ws, we in self._iter_windows(start_utc, end_utc):
40+
row = {
41+
col: self._aggregate_window(
42+
timestamps,
43+
pd.to_numeric(df[col], errors="coerce").tolist(),
44+
ws,
45+
we,
46+
)
47+
for col in target_columns
48+
}
49+
if all(v is None for v in row.values()):
50+
continue
51+
result["timestamp"].append(ws)
52+
for col, val in row.items():
53+
result[col].append(val)
54+
55+
out = pd.DataFrame(result)
56+
out["timestamp"] = pd.to_datetime(out["timestamp"], utc=True)
57+
58+
return out
59+
60+
def _effective_tz(self):
61+
"""
62+
Return the tzinfo to use for window boundaries, defaulting to UTC.
63+
"""
64+
65+
return self.tz or dt_timezone.utc
66+
67+
def _interval_delta(self) -> timedelta:
68+
"""
69+
Return the timedelta for the configured aggregation interval.
70+
"""
71+
72+
if self.aggregation_interval_unit == "day":
73+
return timedelta(days=self.aggregation_interval)
74+
75+
raise NotImplementedError(
76+
f"Invalid temporal aggregation configuration. "
77+
f"Received unsupported aggregation interval unit: {self.aggregation_interval_unit!r}"
78+
)
79+
80+
def _window_start(self, ts_utc: datetime) -> datetime:
81+
"""
82+
Return the local datetime aligned to the start of the window containing ts_utc.
83+
"""
84+
85+
tz = self._effective_tz()
86+
local = ts_utc.astimezone(tz)
87+
88+
if self.aggregation_interval_unit == "day":
89+
return datetime.combine(local.date(), time.min, tzinfo=tz)
90+
91+
raise NotImplementedError(
92+
f"Invalid temporal aggregation configuration. "
93+
f"Received unsupported aggregation interval unit: {self.aggregation_interval_unit!r}"
94+
)
95+
96+
def _next_window_start(self, current: datetime) -> datetime:
97+
"""
98+
Return the local datetime of the next window boundary after current.
99+
"""
100+
101+
tz = self._effective_tz()
102+
103+
if self.aggregation_interval_unit == "day":
104+
next_date = current.date() + timedelta(days=self.aggregation_interval)
105+
return datetime.combine(next_date, time.min, tzinfo=tz)
106+
107+
raise NotImplementedError(
108+
f"Invalid temporal aggregation configuration. "
109+
f"Received unsupported aggregation interval unit: {self.aggregation_interval_unit!r}"
110+
)
111+
112+
def _iter_windows(self, start_utc: datetime, end_utc: datetime):
113+
"""
114+
Yield (window_start_utc, window_end_utc) pairs covering [start_utc, end_utc).
115+
116+
Windows are aligned to unit boundaries in local time (e.g. midnight for days)
117+
and stepped using _next_window_start to handle DST transitions correctly.
118+
"""
119+
120+
current_local = self._window_start(start_utc)
121+
end_local = self._window_start(end_utc)
122+
123+
while current_local < end_local:
124+
next_local = self._next_window_start(current_local)
125+
yield current_local.astimezone(dt_timezone.utc), next_local.astimezone(dt_timezone.utc)
126+
current_local = next_local
127+
128+
@staticmethod
129+
def _boundary_value(
130+
target: datetime,
131+
timestamps: list[datetime],
132+
values: list[float],
133+
prev_idx: Optional[int],
134+
next_idx: Optional[int],
135+
) -> Optional[float]:
136+
"""
137+
Estimate the value at a window boundary by exact match or linear interpolation.
138+
139+
If the observation immediately before (prev_idx) or after (next_idx) the boundary
140+
falls exactly on the target timestamp, that value is returned directly. Otherwise,
141+
if observations exist on both sides, the value is linearly interpolated. If only
142+
one side is available, that side's value is used as a flat extrapolation.
143+
144+
Returns None if no usable observations are available on either side.
145+
"""
146+
147+
prev = None
148+
nxt = None
149+
150+
if prev_idx is not None and 0 <= prev_idx < len(timestamps):
151+
prev = (timestamps[prev_idx], values[prev_idx])
152+
if next_idx is not None and 0 <= next_idx < len(timestamps):
153+
nxt = (timestamps[next_idx], values[next_idx])
154+
155+
if prev is not None and prev[0] == target:
156+
return prev[1]
157+
if nxt is not None and nxt[0] == target:
158+
return nxt[1]
159+
160+
if prev is not None and nxt is not None:
161+
t0, v0 = prev
162+
t1, v1 = nxt
163+
span = (t1 - t0).total_seconds()
164+
if span <= 0:
165+
return v1
166+
ratio = (target - t0).total_seconds() / span
167+
return v0 + ratio * (v1 - v0)
168+
169+
if prev is not None:
170+
return prev[1]
171+
if nxt is not None:
172+
return nxt[1]
173+
174+
return None
175+
176+
def _aggregate_window(
177+
self,
178+
timestamps: list[datetime],
179+
values: list[float],
180+
window_start: datetime,
181+
window_end: datetime,
182+
) -> Optional[float]:
183+
"""
184+
Compute the configured aggregation statistic for a single window.
185+
186+
Observations are selected from timestamps in [window_start, window_end) using
187+
binary search. Returns None if no observations fall within the window.
188+
189+
For simple_mean: returns the arithmetic mean of all observations in the window.
190+
For last_value_of_period: returns the last observation in the window.
191+
For time_weighted_mean: computes a time-weighted mean via trapezoidal integration
192+
over the full window duration. Boundary values at window_start and window_end
193+
are estimated by _boundary_value if no observation falls exactly on those
194+
timestamps. Returns None if either boundary value cannot be determined.
195+
"""
196+
197+
if not timestamps or window_end <= window_start:
198+
return None
199+
200+
left = bisect_left(timestamps, window_start)
201+
right = bisect_left(timestamps, window_end)
202+
203+
if left == right:
204+
return None
205+
206+
window_values = values[left:right]
207+
208+
if self.aggregation_statistic == "simple_mean":
209+
return sum(window_values) / len(window_values)
210+
211+
if self.aggregation_statistic == "last_value_of_period":
212+
return window_values[-1]
213+
214+
# time_weighted_mean: trapezoidal integration over the window.
215+
start_value = self._boundary_value(
216+
target=window_start,
217+
timestamps=timestamps,
218+
values=values,
219+
prev_idx=(left - 1) if left > 0 else None,
220+
next_idx=left,
221+
)
222+
end_value = self._boundary_value(
223+
target=window_end,
224+
timestamps=timestamps,
225+
values=values,
226+
prev_idx=(right - 1) if right > 0 else None,
227+
next_idx=right if right < len(timestamps) else None,
228+
)
229+
230+
if start_value is None or end_value is None:
231+
return None
232+
233+
area_points: list[tuple[datetime, float]] = [(window_start, start_value)]
234+
for idx in range(left, right):
235+
ts = timestamps[idx]
236+
val = values[idx]
237+
if ts == window_start:
238+
area_points[0] = (ts, val)
239+
continue
240+
area_points.append((ts, val))
241+
242+
if area_points[-1][0] == window_end:
243+
area_points[-1] = (window_end, end_value)
244+
else:
245+
area_points.append((window_end, end_value))
246+
247+
total_area = 0.0
248+
for idx in range(1, len(area_points)):
249+
t0, v0 = area_points[idx - 1]
250+
t1, v1 = area_points[idx]
251+
span = (t1 - t0).total_seconds()
252+
if span > 0:
253+
total_area += (v0 + v1) * 0.5 * span
254+
255+
duration = (window_end - window_start).total_seconds()
256+
if duration <= 0:
257+
return None
258+
259+
result = total_area / duration
260+
261+
return None if (math.isnan(result) or math.isinf(result)) else result

0 commit comments

Comments
 (0)