Skip to content

Commit 70242c7

Browse files
authored
Merge pull request #259 from hydroserver2/revert-257-revert-256-367-aggregation
Revert "Revert "367 aggregation""
2 parents 631af02 + a9b56e4 commit 70242c7

File tree

8 files changed

+1140
-47
lines changed

8 files changed

+1140
-47
lines changed

domains/etl/aggregation.py

Lines changed: 291 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,291 @@
1+
from __future__ import annotations
2+
3+
from dataclasses import dataclass
4+
from datetime import datetime, time, timedelta, timezone as dt_timezone, tzinfo
5+
import math
6+
import re
7+
from bisect import bisect_left
8+
from typing import Iterable
9+
from zoneinfo import ZoneInfo
10+
11+
12+
AGGREGATION_STATISTICS = {
13+
"simple_mean",
14+
"time_weighted_daily_mean",
15+
"last_value_of_day",
16+
}
17+
AGGREGATION_TIMEZONE_MODES = {"fixedOffset", "daylightSavings"}
18+
_FIXED_OFFSET_RE = re.compile(r"^([+-])(\d{2})(\d{2})$")
19+
20+
21+
@dataclass(frozen=True)
22+
class AggregationTransformation:
23+
aggregation_statistic: str
24+
timezone_mode: str
25+
timezone: str
26+
27+
28+
def _first_non_empty(mapping: dict, keys: Iterable[str]) -> str | None:
29+
for key in keys:
30+
value = mapping.get(key)
31+
if value is None:
32+
continue
33+
if isinstance(value, str):
34+
value = value.strip()
35+
if not value:
36+
continue
37+
return value
38+
return None
39+
40+
41+
def _parse_fixed_offset(offset: str) -> tzinfo:
42+
match = _FIXED_OFFSET_RE.fullmatch(offset)
43+
if not match:
44+
raise ValueError("fixedOffset timezone must match +/-HHMM")
45+
46+
sign, hours_raw, minutes_raw = match.groups()
47+
hours = int(hours_raw)
48+
minutes = int(minutes_raw)
49+
if minutes >= 60:
50+
raise ValueError("fixedOffset timezone minutes must be between 00 and 59")
51+
52+
offset_delta = timedelta(hours=hours, minutes=minutes)
53+
if sign == "-":
54+
offset_delta = -offset_delta
55+
56+
return dt_timezone(offset_delta)
57+
58+
59+
def timezone_info_for_transformation(transform: AggregationTransformation) -> tzinfo:
60+
if transform.timezone_mode == "fixedOffset":
61+
return _parse_fixed_offset(transform.timezone)
62+
63+
if transform.timezone_mode == "daylightSavings":
64+
try:
65+
return ZoneInfo(transform.timezone)
66+
except Exception as exc: # pragma: no cover - platform-specific internals
67+
raise ValueError(
68+
"daylightSavings timezone must be a valid IANA timezone"
69+
) from exc
70+
71+
raise ValueError(f"Unsupported timezoneMode: {transform.timezone_mode}")
72+
73+
74+
def normalize_aggregation_transformation(raw: dict) -> dict:
75+
if not isinstance(raw, dict):
76+
raise ValueError("Aggregation transformation must be an object")
77+
78+
transform_type = raw.get("type")
79+
if transform_type != "aggregation":
80+
raise ValueError("Aggregation transformation must set type='aggregation'")
81+
82+
aggregation_statistic = _first_non_empty(
83+
raw, ("aggregationStatistic", "aggregation_statistic")
84+
)
85+
if not isinstance(aggregation_statistic, str) or aggregation_statistic not in AGGREGATION_STATISTICS:
86+
allowed = ", ".join(sorted(AGGREGATION_STATISTICS))
87+
raise ValueError(f"aggregationStatistic must be one of: {allowed}")
88+
89+
timezone_mode = _first_non_empty(raw, ("timezoneMode", "timezone_mode"))
90+
if not isinstance(timezone_mode, str) or timezone_mode not in AGGREGATION_TIMEZONE_MODES:
91+
allowed = ", ".join(sorted(AGGREGATION_TIMEZONE_MODES))
92+
raise ValueError(f"timezoneMode must be one of: {allowed}")
93+
94+
timezone_value = _first_non_empty(raw, ("timezone",))
95+
if not isinstance(timezone_value, str):
96+
raise ValueError("timezone is required for aggregation transformations")
97+
98+
normalized = {
99+
"type": "aggregation",
100+
"aggregationStatistic": aggregation_statistic,
101+
"timezoneMode": timezone_mode,
102+
"timezone": timezone_value,
103+
}
104+
105+
# Validate timezone now so malformed configs fail early.
106+
timezone_info_for_transformation(
107+
AggregationTransformation(
108+
aggregation_statistic=aggregation_statistic,
109+
timezone_mode=timezone_mode,
110+
timezone=timezone_value,
111+
)
112+
)
113+
114+
return normalized
115+
116+
117+
def parse_aggregation_transformation(raw: dict) -> AggregationTransformation:
118+
normalized = normalize_aggregation_transformation(raw)
119+
return AggregationTransformation(
120+
aggregation_statistic=normalized["aggregationStatistic"],
121+
timezone_mode=normalized["timezoneMode"],
122+
timezone=normalized["timezone"],
123+
)
124+
125+
126+
def _local_midnight(timestamp_utc: datetime, tz: tzinfo) -> datetime:
127+
local = timestamp_utc.astimezone(tz)
128+
return datetime.combine(local.date(), time.min, tzinfo=tz)
129+
130+
131+
def closed_window_end_utc(source_end_utc: datetime, transform: AggregationTransformation) -> datetime:
132+
tz = timezone_info_for_transformation(transform)
133+
return _local_midnight(source_end_utc, tz).astimezone(dt_timezone.utc)
134+
135+
136+
def first_window_start_utc(source_begin_utc: datetime, transform: AggregationTransformation) -> datetime:
137+
tz = timezone_info_for_transformation(transform)
138+
return _local_midnight(source_begin_utc, tz).astimezone(dt_timezone.utc)
139+
140+
141+
def next_window_start_utc(destination_end_utc: datetime, transform: AggregationTransformation) -> datetime:
142+
tz = timezone_info_for_transformation(transform)
143+
destination_local = destination_end_utc.astimezone(tz)
144+
next_date = destination_local.date() + timedelta(days=1)
145+
local_midnight = datetime.combine(next_date, time.min, tzinfo=tz)
146+
return local_midnight.astimezone(dt_timezone.utc)
147+
148+
149+
def iter_daily_windows_utc(
150+
start_utc: datetime,
151+
end_utc: datetime,
152+
transform: AggregationTransformation,
153+
):
154+
tz = timezone_info_for_transformation(transform)
155+
156+
current_local = _local_midnight(start_utc, tz)
157+
end_local = _local_midnight(end_utc, tz)
158+
159+
while current_local < end_local:
160+
next_local = datetime.combine(
161+
current_local.date() + timedelta(days=1),
162+
time.min,
163+
tzinfo=tz,
164+
)
165+
yield (
166+
current_local.astimezone(dt_timezone.utc),
167+
next_local.astimezone(dt_timezone.utc),
168+
current_local.date(),
169+
)
170+
current_local = next_local
171+
172+
173+
def _boundary_value(
174+
target: datetime,
175+
timestamps: list[datetime],
176+
values: list[float],
177+
prev_idx: int | None,
178+
next_idx: int | None,
179+
) -> float | None:
180+
prev = None
181+
nxt = None
182+
183+
if prev_idx is not None and 0 <= prev_idx < len(timestamps):
184+
prev = (timestamps[prev_idx], values[prev_idx])
185+
if next_idx is not None and 0 <= next_idx < len(timestamps):
186+
nxt = (timestamps[next_idx], values[next_idx])
187+
188+
if prev and prev[0] == target:
189+
return prev[1]
190+
if nxt and nxt[0] == target:
191+
return nxt[1]
192+
193+
if prev and nxt:
194+
t0, v0 = prev
195+
t1, v1 = nxt
196+
span = (t1 - t0).total_seconds()
197+
if span <= 0:
198+
return v1
199+
ratio = (target - t0).total_seconds() / span
200+
return v0 + ratio * (v1 - v0)
201+
202+
if prev:
203+
return prev[1]
204+
if nxt:
205+
return nxt[1]
206+
207+
return None
208+
209+
210+
def aggregate_daily_window(
211+
timestamps: list[datetime],
212+
values: list[float],
213+
window_start_utc: datetime,
214+
window_end_utc: datetime,
215+
statistic: str,
216+
) -> float | None:
217+
if statistic not in AGGREGATION_STATISTICS:
218+
raise ValueError(f"Unsupported aggregationStatistic '{statistic}'")
219+
220+
if not timestamps or len(timestamps) != len(values):
221+
return None
222+
223+
if window_end_utc <= window_start_utc:
224+
return None
225+
226+
left = bisect_left(timestamps, window_start_utc)
227+
right = bisect_left(timestamps, window_end_utc)
228+
229+
# No observations in this day -> skip writing this day.
230+
if left == right:
231+
return None
232+
233+
window_values = values[left:right]
234+
235+
if statistic == "simple_mean":
236+
return sum(window_values) / len(window_values)
237+
238+
if statistic == "last_value_of_day":
239+
return window_values[-1]
240+
241+
# Time-weighted daily mean using trapezoidal integration over the daily window.
242+
start_value = _boundary_value(
243+
target=window_start_utc,
244+
timestamps=timestamps,
245+
values=values,
246+
prev_idx=(left - 1) if left > 0 else None,
247+
next_idx=left,
248+
)
249+
end_value = _boundary_value(
250+
target=window_end_utc,
251+
timestamps=timestamps,
252+
values=values,
253+
prev_idx=(right - 1) if right > 0 else None,
254+
next_idx=right if right < len(timestamps) else None,
255+
)
256+
257+
if start_value is None or end_value is None:
258+
return None
259+
260+
area_points: list[tuple[datetime, float]] = [(window_start_utc, start_value)]
261+
for idx in range(left, right):
262+
ts = timestamps[idx]
263+
val = values[idx]
264+
if ts == window_start_utc:
265+
area_points[0] = (ts, val)
266+
continue
267+
area_points.append((ts, val))
268+
269+
if area_points[-1][0] == window_end_utc:
270+
area_points[-1] = (window_end_utc, end_value)
271+
else:
272+
area_points.append((window_end_utc, end_value))
273+
274+
total_area = 0.0
275+
for idx in range(1, len(area_points)):
276+
t0, v0 = area_points[idx - 1]
277+
t1, v1 = area_points[idx]
278+
span = (t1 - t0).total_seconds()
279+
if span <= 0:
280+
continue
281+
total_area += (v0 + v1) * 0.5 * span
282+
283+
duration = (window_end_utc - window_start_utc).total_seconds()
284+
if duration <= 0:
285+
return None
286+
287+
result = total_area / duration
288+
if math.isnan(result) or math.isinf(result):
289+
return None
290+
291+
return result
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
from django.db import migrations, models
2+
import django.db.models.deletion
3+
4+
5+
class Migration(migrations.Migration):
6+
7+
dependencies = [
8+
("etl", "0003_remove_datasource_orchestration_system_and_more"),
9+
]
10+
11+
operations = [
12+
migrations.AddField(
13+
model_name="task",
14+
name="task_type",
15+
field=models.CharField(default="ETL", max_length=32),
16+
),
17+
migrations.AlterField(
18+
model_name="task",
19+
name="data_connection",
20+
field=models.ForeignKey(
21+
blank=True,
22+
null=True,
23+
on_delete=django.db.models.deletion.CASCADE,
24+
related_name="tasks",
25+
to="etl.dataconnection",
26+
),
27+
),
28+
]

domains/etl/models/task.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,10 +59,17 @@ def visible(self, principal: Union["User", "APIKey"]):
5959
class Task(models.Model, PermissionChecker):
6060
id = models.UUIDField(primary_key=True, default=uuid6.uuid7, editable=False)
6161
name = models.CharField(max_length=255)
62+
task_type = models.CharField(max_length=32, default="ETL")
6263
workspace = models.ForeignKey(
6364
"iam.Workspace", related_name="tasks", on_delete=models.CASCADE
6465
)
65-
data_connection = models.ForeignKey(DataConnection, on_delete=models.CASCADE, related_name="tasks")
66+
data_connection = models.ForeignKey(
67+
DataConnection,
68+
on_delete=models.CASCADE,
69+
related_name="tasks",
70+
null=True,
71+
blank=True,
72+
)
6673
orchestration_system = models.ForeignKey(
6774
OrchestrationSystem, on_delete=models.CASCADE, related_name="tasks"
6875
)
@@ -92,7 +99,7 @@ def get_principal_permissions(
9299
self, principal: Union["User", "APIKey", None]
93100
) -> list[Literal["edit", "delete", "view"]]:
94101
permissions = self.check_object_permissions(
95-
principal=principal, workspace=self.data_connection.workspace, resource_type="Task"
102+
principal=principal, workspace=self.workspace, resource_type="Task"
96103
)
97104

98105
return permissions

0 commit comments

Comments
 (0)