Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions data/state/logs/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
*
!.gitignore
65 changes: 56 additions & 9 deletions library/src/iqb/cli/cache_pull.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
"""Cache pull command."""

# TODO(bassosimone): this download logic overlaps with ghremote/cache.py;
# consider unifying into a shared helper once both implementations stabilise.

import hashlib
import json
import os
import threading
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
from pathlib import Path
from tempfile import TemporaryDirectory

Expand All @@ -29,13 +35,24 @@ def _short_name(file: str) -> str:
return "/".join(parts[-2:]) if len(parts) >= 2 else file


def _now() -> str:
"""Return the current time formatted for metrics spans."""
return datetime.now().astimezone().strftime("%Y-%m-%d %H:%M:%S %z")


def _download_one(
entry: DiffEntry,
data_dir: Path,
session: requests.Session,
progress: Progress,
) -> str:
"""Download a single file, verify SHA256, atomic-replace. Returns the file path."""
) -> dict[str, object]:
"""Download a single file, verify SHA256, atomic-replace. Returns a metrics span."""
t0 = _now()
worker_id = threading.get_ident()
total_bytes = 0
content_length: int | None = None
ok = True
error: str | None = None
assert entry.url is not None
assert entry.remote_sha256 is not None
dest = data_dir / entry.file
Expand All @@ -46,24 +63,39 @@ def _download_one(
tmp_file = Path(tmp_dir) / dest.name
resp = session.get(entry.url, stream=True)
resp.raise_for_status()
content_length = resp.headers.get("Content-Length")
if content_length is not None:
progress.update(task_id, total=int(content_length))
cl = resp.headers.get("Content-Length")
if cl is not None:
content_length = int(cl)
progress.update(task_id, total=content_length)
sha256 = hashlib.sha256()
with open(tmp_file, "wb") as fp:
for chunk in resp.iter_content(chunk_size=8192):
fp.write(chunk)
sha256.update(chunk)
total_bytes += len(chunk)
progress.update(task_id, advance=len(chunk))
got = sha256.hexdigest()
if got != entry.remote_sha256:
raise ValueError(
f"SHA256 mismatch for {entry.file}: expected {entry.remote_sha256}, got {got}"
)
os.replace(tmp_file, dest)
except Exception as exc:
ok = False
error = str(exc)
finally:
progress.remove_task(task_id)
return entry.file
return {
"t0": t0,
"t": _now(),
"worker_id": worker_id,
"file": entry.file,
"url": entry.url,
"content_length": content_length,
"bytes": total_bytes,
"ok": ok,
"error": error,
}


@cache.command()
Expand All @@ -90,6 +122,7 @@ def pull(data_dir: str | None, force: bool, jobs: int) -> None:

session = requests.Session()
failed: list[tuple[str, str]] = []
spans: list[dict[str, object]] = []
t0 = time.monotonic()
with (
Progress(
Expand All @@ -107,13 +140,27 @@ def pull(data_dir: str | None, force: bool, jobs: int) -> None:
for future in as_completed(futures):
entry = futures[future]
try:
future.result()
span = future.result()
spans.append(span)
if not span["ok"]:
failed.append((str(span["file"]), str(span["error"] or "unknown")))
except Exception as exc:
# Defensive: bug in _download_one itself
failed.append((entry.file, str(exc)))
elapsed = time.monotonic() - t0

ok = len(targets) - len(failed)
click.echo(f"Downloaded {ok}/{len(targets)} file(s) in {elapsed:.1f}s.")
# Write metrics
log_dir = resolved / "state" / "logs"
log_dir.mkdir(parents=True, exist_ok=True)
now = datetime.now().astimezone().strftime("%Y%m%dT%H%M%S")
log_file = log_dir / f"{now}_{time.time_ns()}_pull.jsonl"
with open(log_file, "w") as fp:
for span in spans:
fp.write(json.dumps(span) + "\n")

ok_count = len(targets) - len(failed)
click.echo(f"Downloaded {ok_count}/{len(targets)} file(s) in {elapsed:.1f}s.")
click.echo(f"Detailed logs: {log_file.relative_to(resolved)}")

if failed:
click.echo(f"{len(failed)} download(s) failed:", err=True)
Expand Down
121 changes: 121 additions & 0 deletions library/tests/iqb/cli/cache_pull_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import hashlib
import json
from datetime import datetime
from pathlib import Path
from unittest.mock import MagicMock, patch

Expand Down Expand Up @@ -36,6 +37,24 @@ def _make_cache_file(data_dir: Path, rel_path: str, content: bytes) -> Path:
_FILE_B = f"cache/v1/{_TS1}/{_TS2}/uploads/data.parquet"


def _find_log_file(data_dir: Path) -> Path | None:
"""Find the single JSONL log file under state/logs/."""
log_dir = data_dir / "state" / "logs"
if not log_dir.exists():
return None
files = list(log_dir.glob("*_pull.jsonl"))
return files[0] if len(files) == 1 else None


def _read_spans(log_file: Path) -> list[dict]:
"""Read all spans from a JSONL log file."""
spans = []
for line in log_file.read_text().splitlines():
if line.strip():
spans.append(json.loads(line))
return spans


def _fake_response(content: bytes) -> MagicMock:
"""Create a mock response that yields content in chunks."""
resp = MagicMock()
Expand Down Expand Up @@ -82,6 +101,15 @@ def test_downloads_only_remote(self, mock_session_cls: MagicMock, tmp_path: Path
# Session.get was called with the right URL
mock_session.get.assert_called_once_with(url, stream=True)

# Metrics log should exist with one span
log_file = _find_log_file(tmp_path)
assert log_file is not None
spans = _read_spans(log_file)
assert len(spans) == 1
assert spans[0]["ok"] is True
assert spans[0]["file"] == _FILE_A
assert spans[0]["bytes"] == len(content)


class TestCachePullMatchingSkipped:
"""MATCHING entries are not downloaded."""
Expand Down Expand Up @@ -178,6 +206,15 @@ def test_sha256_failure(self, mock_session_cls: MagicMock, tmp_path: Path):
# File should NOT exist on disk (atomic write prevented partial file)
assert not (tmp_path / _FILE_A).exists()

# Metrics log should record the failure
log_file = _find_log_file(tmp_path)
assert log_file is not None
spans = _read_spans(log_file)
assert len(spans) == 1
assert spans[0]["ok"] is False
assert spans[0]["error"] is not None
assert "SHA256 mismatch" in spans[0]["error"]


class TestCachePullMultipleFiles:
"""Multiple ONLY_REMOTE entries are all downloaded."""
Expand Down Expand Up @@ -243,3 +280,87 @@ def test_no_partial_file_on_failure(self, mock_session_cls: MagicMock, tmp_path:

# No partial file should exist
assert not (tmp_path / _FILE_A).exists()

# Metrics log should record the connection failure
log_file = _find_log_file(tmp_path)
assert log_file is not None
spans = _read_spans(log_file)
assert len(spans) == 1
assert spans[0]["ok"] is False
assert "connection refused" in spans[0]["error"]


class TestCachePullMetrics:
"""Download metrics JSONL file is created with correct schema."""

_EXPECTED_KEYS = {
"t0",
"t",
"worker_id",
"file",
"url",
"content_length",
"bytes",
"ok",
"error",
}

@patch("iqb.cli.cache_pull.requests.Session")
def test_two_files_produce_two_spans(self, mock_session_cls: MagicMock, tmp_path: Path):
content_a = b"content a"
content_b = b"content b"
url_a = "https://example.com/a"
url_b = "https://example.com/b"
_write_manifest(
tmp_path,
{
_FILE_A: {"sha256": _sha256(content_a), "url": url_a},
_FILE_B: {"sha256": _sha256(content_b), "url": url_b},
},
)

mock_session = MagicMock()

def side_effect(url: str, **kwargs):
if url == url_a:
return _fake_response(content_a)
return _fake_response(content_b)

mock_session.get.side_effect = side_effect
mock_session_cls.return_value = mock_session

runner = CliRunner()
result = runner.invoke(cli, ["cache", "pull", "-d", str(tmp_path)])
assert result.exit_code == 0

log_file = _find_log_file(tmp_path)
assert log_file is not None
assert log_file.name.endswith("_pull.jsonl")
spans = _read_spans(log_file)
assert len(spans) == 2

for span in spans:
# All expected keys are present
assert set(span.keys()) == self._EXPECTED_KEYS

# ok is True
assert span["ok"] is True
assert span["error"] is None

# worker_id is an integer
assert isinstance(span["worker_id"], int)

# Timestamps are parseable and t0 <= t
t0 = datetime.strptime(span["t0"], "%Y-%m-%d %H:%M:%S %z")
t = datetime.strptime(span["t"], "%Y-%m-%d %H:%M:%S %z")
assert t0 <= t

# URL is present
assert span["url"] in (url_a, url_b)

# Check per-file details
spans_by_file = {s["file"]: s for s in spans}
assert spans_by_file[_FILE_A]["bytes"] == len(content_a)
assert spans_by_file[_FILE_A]["content_length"] == len(content_a)
assert spans_by_file[_FILE_B]["bytes"] == len(content_b)
assert spans_by_file[_FILE_B]["content_length"] == len(content_b)
Loading