Skip to content

Commit 274e7f5

Browse files
authored
feat: implement iqb cache push (#137)
The `iqb cache push` command allows to push local files to the remote GCS bucket caching them. By default, we do not push modified entries. The user would need to use `-f,--force` to force pushing. See #136 for the rationale for not pushing modified files unless `-f` is given. For now, as an MVP, publish each file independently rather than doing a `gcloud storage rsync` equivalent. While there, add two files uploaded with this command. Also, sync the manifest after every successful upload, making the code robust to crashes.
1 parent ffa7fd4 commit 274e7f5

File tree

6 files changed

+455
-0
lines changed

6 files changed

+455
-0
lines changed

data/state/ghremote/manifest.json

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1247,6 +1247,22 @@
12471247
"cache/v1/20251201T000000Z/20260101T000000Z/uploads_by_country_subdivision1_asn/stats.json": {
12481248
"sha256": "9365ed4b965c2b220de474cf882cf183de21e5f894a3359a2420def3458bef15",
12491249
"url": "https://storage.googleapis.com/mlab-sandbox-iqb-us-central1/cache/v1/20251201T000000Z/20260101T000000Z/uploads_by_country_subdivision1_asn/stats.json"
1250+
},
1251+
"cache/v1/20260101T000000Z/20260201T000000Z/downloads_by_country/data.parquet": {
1252+
"sha256": "4d34ee7e2b649e54254f03e805934dd58ac7c56fca79d1c0c2b45d12e8f2a6d8",
1253+
"url": "https://storage.googleapis.com/mlab-sandbox-iqb-us-central1/cache/v1/20260101T000000Z/20260201T000000Z/downloads_by_country/data.parquet"
1254+
},
1255+
"cache/v1/20260101T000000Z/20260201T000000Z/downloads_by_country/stats.json": {
1256+
"sha256": "7ae465039d0610a8e71e2ce8fb6eda5f0a93085df28babcba32452c2cbd41e27",
1257+
"url": "https://storage.googleapis.com/mlab-sandbox-iqb-us-central1/cache/v1/20260101T000000Z/20260201T000000Z/downloads_by_country/stats.json"
1258+
},
1259+
"cache/v1/20260101T000000Z/20260201T000000Z/uploads_by_country/data.parquet": {
1260+
"sha256": "e9047458ee022ba628d8d1c062256f058acf87f6820469ce8f9a3c2ae2c4c041",
1261+
"url": "https://storage.googleapis.com/mlab-sandbox-iqb-us-central1/cache/v1/20260101T000000Z/20260201T000000Z/uploads_by_country/data.parquet"
1262+
},
1263+
"cache/v1/20260101T000000Z/20260201T000000Z/uploads_by_country/stats.json": {
1264+
"sha256": "7c31a157d92724ea7fd69bbbb35c35dec91b74c9ea8c245292f7c6c0dd1e2bdd",
1265+
"url": "https://storage.googleapis.com/mlab-sandbox-iqb-us-central1/cache/v1/20260101T000000Z/20260201T000000Z/uploads_by_country/stats.json"
12501266
}
12511267
},
12521268
"v": 0

library/pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ dependencies = [
2222
"rich>=14.2.0",
2323
"click>=8.1.0",
2424
"requests>=2.28.0",
25+
"google-cloud-storage>=2.0.0",
2526
]
2627

2728
[project.urls]

library/src/iqb/cli/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,4 +34,5 @@ def version_cmd() -> None:
3434
# Register subcommands (must be after cli is defined)
3535
from . import cache as _cache # noqa: E402, F401
3636
from . import cache_pull as _cache_pull # noqa: E402, F401
37+
from . import cache_push as _cache_push # noqa: E402, F401
3738
from . import cache_status as _cache_status # noqa: E402, F401

library/src/iqb/cli/cache_push.py

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
"""Cache push command."""
2+
3+
import time
4+
from pathlib import Path
5+
6+
import click
7+
from google.cloud import storage
8+
from rich.progress import (
9+
BarColumn,
10+
DownloadColumn,
11+
Progress,
12+
TextColumn,
13+
TransferSpeedColumn,
14+
)
15+
16+
from ..ghremote import (
17+
DiffState,
18+
FileEntry,
19+
diff,
20+
load_manifest,
21+
manifest_path_for_data_dir,
22+
save_manifest,
23+
)
24+
from ..ghremote.diff import DiffEntry
25+
from ..pipeline.cache import data_dir_or_default
26+
from ..scripting import iqb_logging
27+
from .cache import cache
28+
29+
_DEFAULT_BUCKET = "mlab-sandbox-iqb-us-central1"
30+
_GCS_BASE_URL = "https://storage.googleapis.com"
31+
32+
33+
def _short_name(file: str) -> str:
34+
"""Extract a short display name from a cache path."""
35+
parts = file.split("/")
36+
return "/".join(parts[-2:]) if len(parts) >= 2 else file
37+
38+
39+
class _ProgressReader:
40+
"""Wraps a file object to update a Rich progress bar on each read."""
41+
42+
def __init__(self, fp, progress: Progress, task_id) -> None: # noqa: ANN001
43+
self._fp = fp
44+
self._progress = progress
45+
self._task_id = task_id
46+
47+
def read(self, size: int = -1) -> bytes:
48+
data = self._fp.read(size)
49+
if data:
50+
self._progress.update(self._task_id, advance=len(data))
51+
return data
52+
53+
54+
def _upload_one(
55+
entry: DiffEntry,
56+
data_dir: Path,
57+
bucket: storage.Bucket,
58+
progress: Progress,
59+
) -> str:
60+
"""Upload a single file to GCS with progress tracking. Returns the file path."""
61+
assert entry.local_sha256 is not None
62+
source = data_dir / entry.file
63+
file_size = source.stat().st_size
64+
task_id = progress.add_task(_short_name(entry.file), total=file_size)
65+
try:
66+
blob = bucket.blob(entry.file)
67+
with open(source, "rb") as fp:
68+
reader = _ProgressReader(fp, progress, task_id)
69+
blob.upload_from_file(reader, size=file_size)
70+
finally:
71+
progress.remove_task(task_id)
72+
return entry.file
73+
74+
75+
@cache.command()
76+
@click.option("-d", "--dir", "data_dir", default=None, help="Data directory (default: .iqb)")
77+
@click.option("--bucket", default=_DEFAULT_BUCKET, show_default=True, help="GCS bucket name")
78+
@click.option("-f", "--force", is_flag=True, help="Re-upload files with mismatched hashes")
79+
def push(data_dir: str | None, bucket: str, force: bool) -> None:
80+
"""Upload new local cache files to GCS and update the manifest."""
81+
iqb_logging.configure(False)
82+
resolved = data_dir_or_default(data_dir)
83+
manifest_path = manifest_path_for_data_dir(resolved)
84+
manifest = load_manifest(manifest_path)
85+
86+
# Collect entries to upload
87+
targets: list[DiffEntry] = []
88+
for entry in diff(manifest, resolved):
89+
if entry.state == DiffState.ONLY_LOCAL:
90+
targets.append(entry)
91+
elif entry.state == DiffState.SHA256_MISMATCH and force:
92+
targets.append(entry)
93+
94+
if not targets:
95+
click.echo("Nothing to upload.")
96+
return
97+
98+
client = storage.Client()
99+
gcs_bucket = client.bucket(bucket)
100+
101+
failed: list[tuple[str, str]] = []
102+
t0 = time.monotonic()
103+
with Progress(
104+
TextColumn("{task.description}"),
105+
BarColumn(),
106+
DownloadColumn(),
107+
TransferSpeedColumn(),
108+
) as progress:
109+
for entry in targets:
110+
try:
111+
_upload_one(entry, resolved, gcs_bucket, progress)
112+
except Exception as exc:
113+
failed.append((entry.file, str(exc)))
114+
continue
115+
# Update manifest after each successful upload (crash-safe)
116+
assert entry.local_sha256 is not None
117+
url = f"{_GCS_BASE_URL}/{bucket}/{entry.file}"
118+
manifest.files[entry.file] = FileEntry(sha256=entry.local_sha256, url=url)
119+
save_manifest(manifest, manifest_path)
120+
elapsed = time.monotonic() - t0
121+
122+
ok = len(targets) - len(failed)
123+
click.echo(f"Uploaded {ok}/{len(targets)} file(s) in {elapsed:.1f}s.")
124+
125+
if failed:
126+
click.echo(f"{len(failed)} upload(s) failed:", err=True)
127+
for file, reason in failed:
128+
click.echo(f" {file}: {reason}", err=True)
129+
raise SystemExit(1)

0 commit comments

Comments
 (0)