Skip to content

Commit 2ae818f

Browse files
committed
Add better logging
1 parent 26b5dc5 commit 2ae818f

File tree

13 files changed

+497
-140
lines changed

13 files changed

+497
-140
lines changed

src/hydroserverpy/api/models/etl/task.py

Lines changed: 12 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from datetime import datetime, timedelta, timezone
99
from pydantic import Field, AliasPath, AliasChoices, TypeAdapter
1010
from hydroserverpy.etl.factories import extractor_factory, transformer_factory, loader_factory
11+
from hydroserverpy.etl.loaders.hydroserver_loader import LoadSummary
1112
from hydroserverpy.etl.etl_configuration import ExtractorConfig, TransformerConfig, LoaderConfig, SourceTargetMapping, MappingPath
1213
from ..base import HydroServerBaseModel
1314
from .orchestration_system import OrchestrationSystem
@@ -222,11 +223,11 @@ def run_local(self):
222223
return
223224

224225
logging.info("Starting load")
225-
load_stats = loader_cls.load(data, self)
226+
load_summary = loader_cls.load(data, self)
226227
self._update_status(
227228
task_run,
228229
True,
229-
self._success_message(load_stats),
230+
self._success_message(load_summary),
230231
runtime_source_uri=runtime_source_uri,
231232
)
232233
except Exception as e:
@@ -271,35 +272,26 @@ def _update_status(
271272
self.save()
272273

273274
@staticmethod
274-
def _success_message(load_stats: Optional[dict]) -> str:
275-
if not isinstance(load_stats, dict):
276-
return "OK"
277-
278-
loaded = load_stats.get("observations_loaded")
279-
datastreams_loaded = load_stats.get("datastreams_loaded")
280-
available = load_stats.get("observations_available")
281-
timestamps_total = load_stats.get("timestamps_total")
282-
timestamps_after_cutoff = load_stats.get("timestamps_after_cutoff")
283-
284-
if loaded is None:
275+
def _success_message(load: Optional[LoadSummary]) -> str:
276+
if not load:
285277
return "OK"
286278

279+
loaded = load.observations_loaded
287280
if loaded == 0:
288-
if timestamps_total and timestamps_after_cutoff == 0:
289-
cutoff = load_stats.get("cutoff")
290-
if cutoff:
281+
if load.timestamps_total and load.timestamps_after_cutoff == 0:
282+
if load.cutoff:
291283
return (
292284
"No new observations to load "
293-
f"(all timestamps were at or before {cutoff})."
285+
f"(all timestamps were at or before {load.cutoff})."
294286
)
295287
return "No new observations to load (all timestamps were at or before the cutoff)."
296-
if available == 0:
288+
if load.observations_available == 0:
297289
return "No new observations to load."
298290
return "No new observations were loaded."
299291

300-
if datastreams_loaded is not None:
292+
if load.datastreams_loaded:
301293
return (
302-
f"Load completed successfully ({loaded} rows across {datastreams_loaded} datastreams)."
294+
f"Load completed successfully ({loaded} rows across {load.datastreams_loaded} datastreams)."
303295
)
304296
return f"Load completed successfully ({loaded} rows loaded)."
305297

Lines changed: 55 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,65 @@
1-
## Possible error states:
2-
3-
Config file validation
4-
Tell the user exactly which configuration variables are missing or invalid
5-
Could not connect to the source system.
6-
The source system did not respond before the timeout.
7-
Authentication with the source system failed; credentials may be invalid or expired.
8-
The requested payload was not found on the source system.
9-
The source system returned no data.
10-
11-
The source returned a format different from what this job expects.
12-
The payload’s expected fields were not found.
13-
For CSV:
1+
## Possible Needs Attention states:
2+
3+
These are the most important end-user messages the ETL system can return for a task run
4+
that needs user action.
5+
6+
### Configuration / Setup
7+
8+
- Invalid extractor configuration. Tell the user exactly which field is missing or invalid.
9+
- Invalid transformer configuration. Tell the user exactly which field is missing or invalid.
10+
- A required configuration value is missing.
11+
- A required configuration value is null where a value is expected.
12+
- Missing required per-task extractor variable "<name>".
13+
- Extractor source URI contains a placeholder "<name>", but it was not provided.
14+
- Task configuration is missing required daylight savings offset (when using daylightSavings mode).
15+
16+
### Data Source (Connectivity / Authentication)
17+
18+
- Could not connect to the source system.
19+
- The source system did not respond before the timeout.
20+
- Authentication with the source system failed; credentials may be invalid or expired.
21+
- The requested payload was not found on the source system.
22+
- The source system returned no data.
1423

24+
### Source Data Did Not Match The Task
25+
26+
- The source returned a format different from what this job expects.
27+
- The payload's expected fields were not found.
28+
- One or more timestamps could not be read with the current settings.
29+
- This job references a resource that no longer exists.
30+
- The file structure does not match the configuration.
31+
32+
For CSV:
1533
- The header row contained unexpected values and could not be processed.
1634
- One or more data rows contained unexpected values and could not be processed.
35+
- Timestamp column "<key>" was not found in the extracted data.
36+
- A mapping source index is out of range for the extracted data.
37+
- A mapping source column was not found in the extracted data.
1738

1839
For JSON:
40+
- The timestamp or value key could not be found with the specified query.
41+
- Transformer did not receive any extracted data to parse.
42+
43+
### Targets / HydroServer
44+
45+
- HydroServer rejected some or all of the data.
46+
- The target data series (datastream) could not be found.
47+
- This may happen if the datastream was deleted or the mapping points to the wrong target.
1948

20-
- The timestamp or value key couldn’t be found with the specified JMESPath query
49+
### Unexpected System Error
2150

22-
This job references a resource that no longer exists.
23-
The file structure does not match the configuration.
51+
- An internal system error occurred while processing the job.
52+
- The job stopped before completion.
2453

25-
HydroServer rejected some or all of the data.
26-
The target datastream could not be found.
27-
An internal system error occurred while processing the job.
28-
The job stopped before completion.
54+
## Possible OK states:
2955

30-
## Possible warning states:
56+
These are the most important end-user messages the ETL system can return for a successful run.
3157

32-
## Possible success states:
58+
- Load completed successfully.
59+
- Load completed successfully (<n> rows loaded).
60+
- Load completed successfully (<n> rows across <m> datastreams).
61+
- No new observations to load.
62+
- No new observations were loaded.
63+
- No new observations to load (all timestamps were at or before <cutoff>).
64+
- No data returned from the extractor. Nothing to load.
65+
- Transform produced no rows. Nothing to load.

src/hydroserverpy/etl/etl_configuration.py

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
from typing import Annotated, Dict, List, Literal, Optional, Union
33
from pydantic import BaseModel, Field, field_validator
44
from enum import Enum
5+
from zoneinfo import ZoneInfo
56

67
WorkflowType = Literal["ETL", "Aggregation", "Virtual", "SDL"]
78
CSVDelimiterType = Literal[",", "|", "\t", ";", " "]
@@ -76,12 +77,28 @@ class Timestamp(BaseModel):
7677

7778
class Config:
7879
populate_by_name = True
80+
validate_default = True
7981

80-
@field_validator("timezone")
82+
@field_validator("timezone", mode="after")
8183
def check_timezone(cls, timezone_value, info):
8284
mode = info.data.get("timezone_mode")
83-
if mode == TimezoneMode.fixedOffset and timezone_value is None:
84-
raise ValueError("`timezone` must be set when timezoneMode is fixedOffset")
85+
if mode == TimezoneMode.fixedOffset:
86+
if timezone_value is None:
87+
raise ValueError(
88+
"`timezone` must be set when timezoneMode is fixedOffset (e.g. '-0700')"
89+
)
90+
if mode == TimezoneMode.daylightSavings:
91+
if timezone_value is None or str(timezone_value).strip() == "":
92+
raise ValueError(
93+
"Task configuration is missing required daylight savings offset (when using daylightSavings mode)."
94+
)
95+
# Validate it's a real IANA tz name early to avoid cryptic ZoneInfo errors later.
96+
try:
97+
ZoneInfo(str(timezone_value))
98+
except Exception:
99+
raise ValueError(
100+
f"Invalid timezone {timezone_value!r}. Use an IANA timezone like 'America/Denver'."
101+
)
85102
return timezone_value
86103

87104

src/hydroserverpy/etl/extractors/base.py

Lines changed: 40 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,10 @@
44
from datetime import datetime
55
from ..etl_configuration import ExtractorConfig, Task
66
from ..timestamp_parser import TimestampParser
7+
from ..logging_utils import redact_url, summarize_list
8+
9+
10+
logger = logging.getLogger(__name__)
711

812

913
class Extractor:
@@ -12,21 +16,32 @@ def __init__(self, extractor_config: ExtractorConfig):
1216
self.runtime_source_uri = None
1317

1418
def resolve_placeholder_variables(self, task: Task, loader):
15-
logging.info("Resolving extractor runtime variables...")
19+
placeholders = list(self.cfg.placeholder_variables or [])
1620
filled = {}
17-
for placeholder in self.cfg.placeholder_variables:
21+
runtime_names: set[str] = set()
22+
task_names: set[str] = set()
23+
for placeholder in placeholders:
1824
name = placeholder.name
1925

2026
if placeholder.type == "runTime":
21-
logging.info("Resolving runtime var: %s", name)
27+
logger.debug("Resolving runtime var: %s", name)
28+
runtime_names.add(name)
2229
if placeholder.run_time_value == "latestObservationTimestamp":
2330
value = loader.earliest_begin_date(task)
2431
elif placeholder.run_time_value == "jobExecutionTime":
2532
value = pd.Timestamp.now(tz="UTC")
2633
elif placeholder.type == "perTask":
27-
logging.info("Resolving task var: %s", name)
34+
logger.debug("Resolving task var: %s", name)
35+
task_names.add(name)
2836
if name not in task.extractor_variables:
29-
raise KeyError(f"Missing per-task variable '{name}'")
37+
logger.error(
38+
"Missing per-task extractor variable '%s'. Provided extractorVariables keys=%s",
39+
name,
40+
summarize_list(sorted((task.extractor_variables or {}).keys())),
41+
)
42+
raise ValueError(
43+
f"Missing required per-task extractor variable '{name}'."
44+
)
3045
value = task.extractor_variables[name]
3146
else:
3247
continue
@@ -36,21 +51,39 @@ def resolve_placeholder_variables(self, task: Task, loader):
3651
value = parser.utc_to_string(value)
3752

3853
filled[name] = value
54+
55+
if runtime_names:
56+
names = ", ".join(sorted(runtime_names))
57+
logger.debug(
58+
"Runtime variables resolved (%s): %s", len(runtime_names), names
59+
)
60+
if task_names:
61+
names = ", ".join(sorted(task_names))
62+
logger.debug("Task variables resolved (%s): %s", len(task_names), names)
63+
3964
if not filled:
4065
uri = self.cfg.source_uri
4166
else:
4267
uri = self.format_uri(filled)
4368

4469
self.runtime_source_uri = uri
45-
logging.info("Resolved runtime source URI: %s", uri)
70+
# Keep a stable log prefix for downstream parsing, but redact secrets.
71+
logger.info("Resolved runtime source URI: %s", redact_url(uri))
4672
return uri
4773

4874
def format_uri(self, placeholder_variables):
4975
try:
5076
uri = self.cfg.source_uri.format(**placeholder_variables)
5177
except KeyError as e:
5278
missing_key = e.args[0]
53-
raise KeyError(f"Missing placeholder variable: {missing_key}")
79+
logger.error(
80+
"Failed to format sourceUri: missing placeholder '%s'. Provided placeholders=%s",
81+
missing_key,
82+
summarize_list(sorted(placeholder_variables.keys())),
83+
)
84+
raise ValueError(
85+
f"Extractor source URI contains a placeholder '{missing_key}', but it was not provided."
86+
)
5487
return uri
5588

5689
@abstractmethod

src/hydroserverpy/etl/extractors/ftp_extractor.py

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
11
import logging
2-
from ftplib import FTP
2+
from ftplib import FTP, error_perm
33
from io import BytesIO
44
from typing import Dict
55

66
from .base import Extractor
77
from ..types import TimeRange
88

99

10+
logger = logging.getLogger(__name__)
11+
12+
1013
class FTPExtractor(Extractor):
1114
def __init__(
1215
self,
@@ -33,18 +36,33 @@ def extract(self):
3336
try:
3437
ftp.connect(self.host, self.port)
3538
ftp.login(user=self.username, passwd=self.password)
36-
logging.info(f"Connected to FTP server: {self.host}:{self.port}")
39+
logger.debug("Connected to FTP server %s:%s", self.host, self.port)
3740

3841
data = BytesIO()
3942
ftp.retrbinary(f"RETR {self.filepath}", data.write)
40-
logging.info(
41-
f"Successfully downloaded file '{self.filepath}' from FTP server."
43+
logger.debug(
44+
"Successfully downloaded file %r from FTP server.",
45+
self.filepath,
4246
)
4347
data.seek(0)
48+
if data.getbuffer().nbytes == 0:
49+
raise ValueError("The source system returned no data.")
4450
return data
51+
except error_perm as e:
52+
msg = str(e)
53+
# Common FTP status codes:
54+
# 530 = not logged in / auth failure
55+
# 550 = file unavailable
56+
if msg.startswith("530"):
57+
raise ValueError(
58+
"Authentication with the source system failed; credentials may be invalid or expired."
59+
) from e
60+
if msg.startswith("550"):
61+
raise ValueError("The requested payload was not found on the source system.") from e
62+
raise ValueError("The source system returned an error.") from e
4563
except Exception as e:
46-
logging.error(f"Error retrieving file from FTP server: {e}")
47-
return None
64+
logger.error("Error retrieving file from FTP server: %s", e, exc_info=True)
65+
raise ValueError("Could not connect to the source system.") from e
4866
finally:
4967
if ftp:
5068
ftp.quit()

0 commit comments

Comments
 (0)