Skip to content

Commit 59906f8

Browse files
authored
feat: add iqb cache usage subcommand (#140)
This new subcommand allows us to know the size of the cache, the number of bytes billed for generating an entry, and total query time. This information helps reasoning about the cache size and impact. While there, fix usage examples inside `data/README.md`.
1 parent 454849f commit 59906f8

File tree

4 files changed

+466
-8
lines changed

4 files changed

+466
-8
lines changed

data/README.md

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,12 @@ The [state/ghremote/manifest.json](state/ghremote/manifest.json) file lists
2626
all the query results already cached at GCS. Run:
2727

2828
```bash
29-
uv run iqb cache pull -d ..
29+
uv run iqb cache pull -d .
3030
```
3131

3232
to sync files from GCS to the local copy.
3333

34-
Omit `-d ..` if running from the top-level directory.
34+
Omit `-d .` if running from the top-level directory.
3535

3636
Run `uv run iqb cache pull --help` for more help.
3737

@@ -40,14 +40,14 @@ Run `uv run iqb cache pull --help` for more help.
4040
Run the pipeline to query BigQuery and populate the local cache:
4141

4242
```bash
43-
uv run iqb pipeline run -d ..
43+
uv run iqb pipeline run -d .
4444
```
4545

4646
This command loads `pipeline.yaml` to determine the query matrix and
4747
executes BigQuery to generate data. If the cache already contains data, we
4848
do not execute BigQuery to avoid burning cloud credits.
4949

50-
Omit `-d ..` if running from the top-level directory.
50+
Omit `-d .` if running from the top-level directory.
5151

5252
Run `uv run iqb pipeline run --help` for more help.
5353

@@ -56,25 +56,38 @@ Run `uv run iqb pipeline run --help` for more help.
5656
Show which entries are local, remote, or missing:
5757

5858
```bash
59-
uv run iqb cache status -d ..
59+
uv run iqb cache status -d .
6060
```
6161

62-
Omit `-d ..` if running from the top-level directory.
62+
Omit `-d .` if running from the top-level directory.
6363

6464
Run `uv run iqb cache status --help` for more help.
6565

66+
## `iqb cache usage` - Cache Disk and BigQuery Usage
67+
68+
Show per-period cache statistics including parquet file sizes,
69+
cumulative BigQuery bytes billed, and query durations:
70+
71+
```bash
72+
uv run iqb cache usage -d .
73+
```
74+
75+
Omit `-d .` if running from the top-level directory.
76+
77+
Run `uv run iqb cache usage --help` for more help.
78+
6679
### `iqb cache push` - Publishing Data
6780

6881
After generating new cache files locally using `iqb pipeline run`, push
6982
them to GCS and update the manifest:
7083

7184
```bash
72-
uv run iqb cache push -d ..
85+
uv run iqb cache push -d .
7386
```
7487

7588
Then commit the updated `state/ghremote/manifest.json`.
7689

77-
Omit `-d ..` if running from the top-level directory.
90+
Omit `-d .` if running from the top-level directory.
7891

7992
Run `uv run iqb cache push --help` for more help.
8093

library/src/iqb/cli/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,5 +36,6 @@ def version_cmd() -> None:
3636
from . import cache_pull as _cache_pull # noqa: E402, F401
3737
from . import cache_push as _cache_push # noqa: E402, F401
3838
from . import cache_status as _cache_status # noqa: E402, F401
39+
from . import cache_usage as _cache_usage # noqa: E402, F401
3940
from . import pipeline as _pipeline # noqa: E402, F401
4041
from . import pipeline_run as _pipeline_run # noqa: E402, F401

library/src/iqb/cli/cache_usage.py

Lines changed: 199 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,199 @@
1+
"""Cache usage command."""
2+
3+
from __future__ import annotations
4+
5+
import json
6+
import re
7+
from dataclasses import dataclass, field
8+
from pathlib import Path
9+
10+
import click
11+
from rich.console import Console
12+
from rich.table import Table
13+
14+
from ..pipeline.cache import (
15+
PIPELINE_CACHE_DATA_FILENAME,
16+
PIPELINE_CACHE_STATS_FILENAME,
17+
data_dir_or_default,
18+
)
19+
from .cache import cache
20+
21+
_TS_RE = re.compile(r"^\d{8}T\d{6}Z$")
22+
_DATASET_RE = re.compile(r"^[a-z0-9_]+$")
23+
24+
25+
@dataclass
26+
class _DatasetStats:
27+
"""Per-dataset raw statistics."""
28+
29+
name: str
30+
parquet_size: int
31+
bq_bytes_billed: int
32+
query_duration_seconds: float
33+
34+
35+
@dataclass
36+
class _PeriodStats:
37+
"""Aggregated statistics for a (start, end) time period."""
38+
39+
start_ts: str
40+
end_ts: str
41+
datasets: list[_DatasetStats] = field(default_factory=list)
42+
43+
@property
44+
def total_parquet_size(self) -> int:
45+
return sum(d.parquet_size for d in self.datasets)
46+
47+
@property
48+
def total_bq_bytes_billed(self) -> int:
49+
return sum(d.bq_bytes_billed for d in self.datasets)
50+
51+
@property
52+
def total_query_duration(self) -> float:
53+
return sum(d.query_duration_seconds for d in self.datasets)
54+
55+
56+
def _read_stats_json(stats_path: Path) -> tuple[int, float]:
57+
"""Read stats.json and return (total_bytes_billed, query_duration_seconds).
58+
59+
Tolerates missing files, corrupt JSON, and null field values by
60+
returning zeros for any value that cannot be read.
61+
"""
62+
if not stats_path.exists():
63+
return 0, 0.0
64+
try:
65+
data = json.loads(stats_path.read_text())
66+
except (json.JSONDecodeError, OSError):
67+
return 0, 0.0
68+
bq_bytes = data.get("total_bytes_billed")
69+
duration = data.get("query_duration_seconds")
70+
return (
71+
int(bq_bytes) if bq_bytes is not None else 0,
72+
float(duration) if duration is not None else 0.0,
73+
)
74+
75+
76+
def _scan_periods(data_dir: Path) -> list[_PeriodStats]:
77+
"""Walk cache/v1/{start}/{end}/{dataset}/ and collect statistics."""
78+
cache_root = data_dir / "cache" / "v1"
79+
if not cache_root.is_dir():
80+
return []
81+
82+
periods: dict[tuple[str, str], _PeriodStats] = {}
83+
84+
for start_dir in sorted(cache_root.iterdir()):
85+
if not start_dir.is_dir() or not _TS_RE.match(start_dir.name):
86+
continue
87+
for end_dir in sorted(start_dir.iterdir()):
88+
if not end_dir.is_dir() or not _TS_RE.match(end_dir.name):
89+
continue
90+
for dataset_dir in sorted(end_dir.iterdir()):
91+
if not dataset_dir.is_dir() or not _DATASET_RE.match(dataset_dir.name):
92+
continue
93+
parquet_path = dataset_dir / PIPELINE_CACHE_DATA_FILENAME
94+
if not parquet_path.exists():
95+
continue
96+
parquet_size = parquet_path.stat().st_size
97+
stats_path = dataset_dir / PIPELINE_CACHE_STATS_FILENAME
98+
bq_bytes, duration = _read_stats_json(stats_path)
99+
key = (start_dir.name, end_dir.name)
100+
if key not in periods:
101+
periods[key] = _PeriodStats(start_ts=key[0], end_ts=key[1])
102+
periods[key].datasets.append(
103+
_DatasetStats(
104+
name=dataset_dir.name,
105+
parquet_size=parquet_size,
106+
bq_bytes_billed=bq_bytes,
107+
query_duration_seconds=duration,
108+
)
109+
)
110+
111+
return [periods[k] for k in sorted(periods)]
112+
113+
114+
def _format_bytes(n: int) -> str:
115+
"""Format a byte count using SI-like suffixes."""
116+
if n == 0:
117+
return "0 B"
118+
for unit in ("B", "KB", "MB", "GB", "TB"):
119+
if abs(n) < 1024:
120+
if n == int(n):
121+
return f"{int(n)} {unit}"
122+
return f"{n:.1f} {unit}"
123+
n_f = n / 1024
124+
n = n_f # type: ignore[assignment]
125+
return f"{n:.1f} PB"
126+
127+
128+
def _format_duration(seconds: float) -> str:
129+
"""Format a duration in seconds to a human-readable string."""
130+
if seconds == 0:
131+
return "0s"
132+
if seconds < 60:
133+
return f"{seconds:.1f}s"
134+
minutes = int(seconds // 60)
135+
remaining = seconds - minutes * 60
136+
return f"{minutes}m {remaining:.1f}s"
137+
138+
139+
def _format_period(start_ts: str, end_ts: str) -> str:
140+
"""Format a pair of RFC3339-ish timestamps for display.
141+
142+
Converts '20241001T000000Z' to '2024-10-01' style.
143+
"""
144+
start = f"{start_ts[:4]}-{start_ts[4:6]}-{start_ts[6:8]}"
145+
end = f"{end_ts[:4]}-{end_ts[4:6]}-{end_ts[6:8]}"
146+
return f"{start} .. {end}"
147+
148+
149+
def _build_table(periods: list[_PeriodStats]) -> Table:
150+
"""Construct a Rich Table from the scanned period stats."""
151+
table = Table()
152+
table.add_column("Period", style="cyan")
153+
table.add_column("Datasets", justify="right")
154+
table.add_column("Parquet Size", justify="right")
155+
table.add_column("BQ Bytes Billed", justify="right")
156+
table.add_column("Query Duration", justify="right")
157+
158+
total_datasets = 0
159+
total_parquet = 0
160+
total_bq = 0
161+
total_duration = 0.0
162+
163+
for period in periods:
164+
count = len(period.datasets)
165+
total_datasets += count
166+
total_parquet += period.total_parquet_size
167+
total_bq += period.total_bq_bytes_billed
168+
total_duration += period.total_query_duration
169+
table.add_row(
170+
_format_period(period.start_ts, period.end_ts),
171+
str(count),
172+
_format_bytes(period.total_parquet_size),
173+
_format_bytes(period.total_bq_bytes_billed),
174+
_format_duration(period.total_query_duration),
175+
)
176+
177+
table.add_section()
178+
table.add_row(
179+
"[bold]Total[/bold]",
180+
f"[bold]{total_datasets}[/bold]",
181+
f"[bold]{_format_bytes(total_parquet)}[/bold]",
182+
f"[bold]{_format_bytes(total_bq)}[/bold]",
183+
f"[bold]{_format_duration(total_duration)}[/bold]",
184+
)
185+
186+
return table
187+
188+
189+
@cache.command()
190+
@click.option("-d", "--dir", "data_dir", default=None, help="Data directory (default: .iqb)")
191+
def usage(data_dir: str | None) -> None:
192+
"""Show cache disk and BigQuery usage statistics."""
193+
resolved = data_dir_or_default(data_dir)
194+
periods = _scan_periods(resolved)
195+
if not periods:
196+
click.echo("No cached data found.")
197+
return
198+
console = Console()
199+
console.print(_build_table(periods))

0 commit comments

Comments
 (0)