Skip to content

Commit 0fb0962

Browse files
committed
More explicit run messages
1 parent 7cc4748 commit 0fb0962

File tree

2 files changed

+99
-10
lines changed

2 files changed

+99
-10
lines changed

src/hydroserverpy/api/models/etl/task.py

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -222,9 +222,12 @@ def run_local(self):
222222
return
223223

224224
logging.info("Starting load")
225-
loader_cls.load(data, self)
225+
load_stats = loader_cls.load(data, self)
226226
self._update_status(
227-
task_run, True, "OK", runtime_source_uri=runtime_source_uri
227+
task_run,
228+
True,
229+
self._success_message(load_stats),
230+
runtime_source_uri=runtime_source_uri,
228231
)
229232
except Exception as e:
230233
self._update_status(
@@ -267,6 +270,39 @@ def _update_status(
267270
self.next_run_at = self._next_run()
268271
self.save()
269272

273+
@staticmethod
274+
def _success_message(load_stats: Optional[dict]) -> str:
275+
if not isinstance(load_stats, dict):
276+
return "OK"
277+
278+
loaded = load_stats.get("observations_loaded")
279+
datastreams_loaded = load_stats.get("datastreams_loaded")
280+
available = load_stats.get("observations_available")
281+
timestamps_total = load_stats.get("timestamps_total")
282+
timestamps_after_cutoff = load_stats.get("timestamps_after_cutoff")
283+
284+
if loaded is None:
285+
return "OK"
286+
287+
if loaded == 0:
288+
if timestamps_total and timestamps_after_cutoff == 0:
289+
cutoff = load_stats.get("cutoff")
290+
if cutoff:
291+
return (
292+
"No new observations to load "
293+
f"(all timestamps were at or before {cutoff})."
294+
)
295+
return "No new observations to load (all timestamps were at or before the cutoff)."
296+
if available == 0:
297+
return "No new observations to load."
298+
return "No new observations were loaded."
299+
300+
if datastreams_loaded is not None:
301+
return (
302+
f"Load completed successfully ({loaded} rows across {datastreams_loaded} datastreams)."
303+
)
304+
return f"Load completed successfully ({loaded} rows loaded)."
305+
270306
def _next_run(self) -> Optional[str]:
271307
now = datetime.now(timezone.utc)
272308
if cron := self.crontab:

src/hydroserverpy/etl/loaders/hydroserver_loader.py

Lines changed: 61 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from __future__ import annotations
2-
from typing import TYPE_CHECKING
2+
from typing import TYPE_CHECKING, Any, Dict
33

44
from .base import Loader
55
import logging
@@ -20,30 +20,70 @@ def __init__(self, client: HydroServer, task_id):
2020
self._begin_cache: dict[str, pd.Timestamp] = {}
2121
self.task_id = task_id
2222

23-
def load(self, data: pd.DataFrame, task: Task) -> None:
23+
def load(self, data: pd.DataFrame, task: Task) -> Dict[str, Any]:
2424
"""
2525
Load observations from a DataFrame to the HydroServer.
2626
:param data: A Pandas DataFrame where each column corresponds to a datastream.
2727
"""
2828
begin_date = self.earliest_begin_date(task)
2929
new_data = data[data["timestamp"] > begin_date]
30+
31+
cutoff_value = (
32+
begin_date.isoformat()
33+
if hasattr(begin_date, "isoformat")
34+
else str(begin_date)
35+
)
36+
stats: Dict[str, Any] = {
37+
"cutoff": cutoff_value,
38+
"timestamps_total": len(data),
39+
"timestamps_after_cutoff": len(new_data),
40+
"timestamps_filtered_by_cutoff": max(len(data) - len(new_data), 0),
41+
"observations_available": 0,
42+
"observations_loaded": 0,
43+
"observations_skipped": 0,
44+
"observations_filtered_by_end_time": 0,
45+
"datastreams_total": 0,
46+
"datastreams_available": 0,
47+
"datastreams_loaded": 0,
48+
"per_datastream": {},
49+
}
50+
3051
for col in new_data.columns.difference(["timestamp"]):
31-
datastream = self.client.datastreams.get(
32-
uid=str(col)
33-
)
52+
stats["datastreams_total"] += 1
53+
datastream = self.client.datastreams.get(uid=str(col))
3454
ds_cutoff = datastream.phenomenon_end_time
35-
df = (
55+
56+
base_df = (
3657
new_data[["timestamp", col]]
37-
.loc[lambda d: d["timestamp"] > ds_cutoff if ds_cutoff else True]
3858
.rename(columns={col: "value"})
3959
.dropna(subset=["value"])
4060
)
61+
pre_count = len(base_df)
62+
if ds_cutoff:
63+
base_df = base_df.loc[base_df["timestamp"] > ds_cutoff]
64+
65+
filtered_by_end = pre_count - len(base_df)
66+
if filtered_by_end:
67+
stats["observations_filtered_by_end_time"] += filtered_by_end
68+
69+
df = base_df
70+
available = len(df)
71+
stats["observations_available"] += available
4172
if df.empty:
42-
logging.warning(f"No new data for {col}, skipping.")
73+
logging.warning(
74+
"No new data for %s after filtering; skipping.", col
75+
)
76+
stats["per_datastream"][str(col)] = {
77+
"available": 0,
78+
"loaded": 0,
79+
"skipped": 0,
80+
}
4381
continue
4482

83+
stats["datastreams_available"] += 1
4584
df = df.rename(columns={"timestamp": "phenomenon_time", "value": "result"})
4685

86+
loaded = 0
4787
# Chunked upload
4888
CHUNK_SIZE = 5000
4989
total = len(df)
@@ -61,6 +101,7 @@ def load(self, data: pd.DataFrame, task: Task) -> None:
61101
self.client.datastreams.load_observations(
62102
uid=str(col), observations=chunk
63103
)
104+
loaded += len(chunk)
64105
except Exception as e:
65106
status = getattr(e, "status_code", None) or getattr(
66107
getattr(e, "response", None), "status_code", None
@@ -74,6 +115,18 @@ def load(self, data: pd.DataFrame, task: Task) -> None:
74115
)
75116
raise
76117

118+
stats["observations_loaded"] += loaded
119+
stats["observations_skipped"] += max(available - loaded, 0)
120+
if loaded > 0:
121+
stats["datastreams_loaded"] += 1
122+
stats["per_datastream"][str(col)] = {
123+
"available": available,
124+
"loaded": loaded,
125+
"skipped": max(available - loaded, 0),
126+
}
127+
128+
return stats
129+
77130
def _fetch_earliest_begin(
78131
self, mappings: list[SourceTargetMapping]
79132
) -> pd.Timestamp:

0 commit comments

Comments
 (0)