Skip to content

Commit 82c6e8c

Browse files
authored
Merge pull request #10 from nsidc/ingest-qol
Ingest quality of life improvements
2 parents 439059c + 06a98c8 commit 82c6e8c

File tree

5 files changed

+72
-38
lines changed

5 files changed

+72
-38
lines changed

README.md

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -153,8 +153,7 @@ browser to `http://localhost:8080` and enter:
153153
### Run ingest
154154
155155
```bash
156-
docker compose run cli init # Create empty tables (deleting any pre-existing ones)
157-
docker compose run cli load # Load the tables from event files
156+
docker compose run cli init
158157
```
159158
160159
From a fast disk, this should take under 2 minutes.

pyproject.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ classifiers = [
2323
dynamic = ["version"]
2424
dependencies = [
2525
"loguru",
26+
"tqdm",
2627
"fastapi ~=0.111.0",
2728
"pydantic ~=2.0",
2829
"pydantic-settings",
@@ -41,6 +42,7 @@ test = [
4142
"pytest >=6",
4243
"pytest-cov >=3",
4344
"mypy >=1.10",
45+
"types-tqdm",
4446
]
4547
dev = [
4648
"pytest >=6",

src/aross_stations_db/cli.py

Lines changed: 31 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
import click
22
from loguru import logger
33
from sqlalchemy.orm import Session
4+
from tqdm import tqdm
45

5-
from aross_stations_db.config import CliLoadSettings, Settings
6+
from aross_stations_db.config import CliLoadSettings
67
from aross_stations_db.db.setup import (
8+
generate_event_object,
79
load_events,
810
load_stations,
911
recreate_tables,
@@ -19,31 +21,44 @@ def cli() -> None:
1921
pass
2022

2123

24+
@click.option(
25+
"--skip-load",
26+
help="Skip loading data; only initialize tables.",
27+
is_flag=True,
28+
)
2229
@cli.command
23-
def init() -> None:
24-
"""Create the database tables, dropping any that pre-exist."""
30+
def init(skip_load: bool = False) -> None:
31+
"""Load the database from files on disk."""
2532
# TODO: False-positive. Remove type-ignore.
2633
# See: https://github.com/pydantic/pydantic/issues/6713
27-
config = Settings() # type:ignore[call-arg]
34+
config = CliLoadSettings() # type:ignore[call-arg]
2835

2936
with Session(config.db_engine) as db_session:
3037
recreate_tables(db_session)
3138

32-
logger.success("Database initialized")
39+
logger.info("Database tables initialized")
3340

41+
if skip_load:
42+
logger.warning("Skipping data load.")
43+
return
3444

35-
@cli.command
36-
def load() -> None:
37-
"""Load the database tables from files on disk."""
38-
# TODO: False-positive. Remove type-ignore.
39-
# See: https://github.com/pydantic/pydantic/issues/6713
40-
config = CliLoadSettings() # type:ignore[call-arg]
41-
42-
stations = get_stations(config.stations_metadata_filepath)
43-
events = get_events(config.events_dir)
45+
raw_stations = get_stations(config.stations_metadata_filepath)
46+
raw_events = get_events(config.events_dir)
4447

4548
with Session(config.db_engine) as db_session:
46-
load_stations(stations, session=db_session)
49+
load_stations(raw_stations, session=db_session)
50+
logger.info("Loaded stations")
51+
52+
# The event processing steps are split into stages to provide better feadback at
53+
# runtime. On slower systems, it can be unclear what the bottleneck is. In the
54+
# long run, we should try to optimize this after learning more.
55+
events = [
56+
generate_event_object(e) for e in tqdm(raw_events, desc="Reading events")
57+
]
58+
59+
# TODO: Is there any way we can monitor this process with a progress bar?
60+
logger.info("Loading events; this can take a minute or so")
4761
load_events(events, session=db_session)
62+
logger.info("Loaded events")
4863

49-
logger.success("Data loaded")
64+
logger.success("Database load complete")

src/aross_stations_db/db/setup.py

Lines changed: 27 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import datetime as dt
2-
from collections.abc import Iterator
32

4-
from sqlalchemy import MetaData
3+
from sqlalchemy import MetaData, insert
54
from sqlalchemy.orm import Session
65

76
from aross_stations_db.db.tables import Base, Event, Station
@@ -65,22 +64,33 @@ def load_stations(stations: list[dict[str, str]], *, session: Session) -> None:
6564
session.commit()
6665

6766

68-
def load_events(events: Iterator[dict[str, str]], *, session: Session) -> None:
69-
session.add_all(
70-
[
71-
Event(
72-
station_id=event["station_id"],
73-
time_start=dt.datetime.fromisoformat(event["start"]),
74-
time_end=dt.datetime.fromisoformat(event["end"]),
75-
snow_on_ground=_snow_on_ground_status(event["sog"]),
76-
rain_hours=int(event["RA"]),
77-
freezing_rain_hours=int(event["FZRA"]),
78-
solid_precipitation_hours=int(event["SOLID"]),
79-
unknown_precipitation_hours=int(event["UP"]),
80-
)
81-
for event in events
82-
]
67+
def generate_event_object(raw_event: dict[str, str]) -> Event:
68+
return Event(
69+
station_id=raw_event["station_id"],
70+
time_start=dt.datetime.fromisoformat(raw_event["start"]),
71+
time_end=dt.datetime.fromisoformat(raw_event["end"]),
72+
snow_on_ground=_snow_on_ground_status(raw_event["sog"]),
73+
rain_hours=int(raw_event["RA"]),
74+
freezing_rain_hours=int(raw_event["FZRA"]),
75+
solid_precipitation_hours=int(raw_event["SOLID"]),
76+
unknown_precipitation_hours=int(raw_event["UP"]),
77+
)
78+
79+
80+
def load_events(events: list[Event], *, session: Session) -> None:
81+
"""Load events into the database.
82+
83+
Trying to follow the bulk load instructions, but it's hard to tell why this step
84+
takes as long as it does. When using tqdm to monitor progress, things "stall" for
85+
some time after the iterable is consumed. I expected this would not happen because
86+
of under-the-hood batching, so I'm not really sure how to make this more performant,
87+
or if we can.
88+
"""
89+
session.execute(
90+
insert(Event),
91+
[event.__dict__ for event in events],
8392
)
93+
8494
session.commit()
8595

8696

src/aross_stations_db/source_data.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,22 @@
33
from collections.abc import Iterator
44
from pathlib import Path
55

6+
from loguru import logger
7+
68

79
def get_stations(metadata_fp: Path) -> list[dict[str, str]]:
810
stations_metadata_str = metadata_fp.read_text()
9-
return list(csv.DictReader(io.StringIO(stations_metadata_str)))
11+
stations_metadata = list(csv.DictReader(io.StringIO(stations_metadata_str)))
12+
13+
logger.info(f"Found {len(stations_metadata)} stations")
14+
return stations_metadata
15+
1016

17+
def get_event_files(events_dir: Path) -> list[Path]:
18+
event_files = list(events_dir.glob("*.event.csv"))
1119

12-
def get_event_files(events_dir: Path) -> Iterator[Path]:
13-
return events_dir.glob("*.event.csv")
20+
logger.info(f"Found {len(event_files)} event files")
21+
return event_files
1422

1523

1624
def get_events(events_dir: Path) -> Iterator[dict[str, str]]:

0 commit comments

Comments
 (0)