Skip to content

Commit 0ba6dca

Browse files
authored
feat(cli): record pull structured logs (#141)
Let us record strucured logs during pulls by default. This data provides us data to address performance issues, if needed. For now, let us keep it always on, until we have stabilized the library a bit more. The rationale is that, specifically, with always on logs, we don't need to ask anyone to re-download with logs enabled, so we avoid wasting people's time, since the data is already there.
1 parent 59906f8 commit 0ba6dca

File tree

3 files changed

+179
-9
lines changed

3 files changed

+179
-9
lines changed

data/state/logs/.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
*
2+
!.gitignore

library/src/iqb/cli/cache_pull.py

Lines changed: 56 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,15 @@
11
"""Cache pull command."""
22

3+
# TODO(bassosimone): this download logic overlaps with ghremote/cache.py;
4+
# consider unifying into a shared helper once both implementations stabilise.
5+
36
import hashlib
7+
import json
48
import os
9+
import threading
510
import time
611
from concurrent.futures import ThreadPoolExecutor, as_completed
12+
from datetime import datetime
713
from pathlib import Path
814
from tempfile import TemporaryDirectory
915

@@ -29,13 +35,24 @@ def _short_name(file: str) -> str:
2935
return "/".join(parts[-2:]) if len(parts) >= 2 else file
3036

3137

38+
def _now() -> str:
39+
"""Return the current time formatted for metrics spans."""
40+
return datetime.now().astimezone().strftime("%Y-%m-%d %H:%M:%S %z")
41+
42+
3243
def _download_one(
3344
entry: DiffEntry,
3445
data_dir: Path,
3546
session: requests.Session,
3647
progress: Progress,
37-
) -> str:
38-
"""Download a single file, verify SHA256, atomic-replace. Returns the file path."""
48+
) -> dict[str, object]:
49+
"""Download a single file, verify SHA256, atomic-replace. Returns a metrics span."""
50+
t0 = _now()
51+
worker_id = threading.get_ident()
52+
total_bytes = 0
53+
content_length: int | None = None
54+
ok = True
55+
error: str | None = None
3956
assert entry.url is not None
4057
assert entry.remote_sha256 is not None
4158
dest = data_dir / entry.file
@@ -46,24 +63,39 @@ def _download_one(
4663
tmp_file = Path(tmp_dir) / dest.name
4764
resp = session.get(entry.url, stream=True)
4865
resp.raise_for_status()
49-
content_length = resp.headers.get("Content-Length")
50-
if content_length is not None:
51-
progress.update(task_id, total=int(content_length))
66+
cl = resp.headers.get("Content-Length")
67+
if cl is not None:
68+
content_length = int(cl)
69+
progress.update(task_id, total=content_length)
5270
sha256 = hashlib.sha256()
5371
with open(tmp_file, "wb") as fp:
5472
for chunk in resp.iter_content(chunk_size=8192):
5573
fp.write(chunk)
5674
sha256.update(chunk)
75+
total_bytes += len(chunk)
5776
progress.update(task_id, advance=len(chunk))
5877
got = sha256.hexdigest()
5978
if got != entry.remote_sha256:
6079
raise ValueError(
6180
f"SHA256 mismatch for {entry.file}: expected {entry.remote_sha256}, got {got}"
6281
)
6382
os.replace(tmp_file, dest)
83+
except Exception as exc:
84+
ok = False
85+
error = str(exc)
6486
finally:
6587
progress.remove_task(task_id)
66-
return entry.file
88+
return {
89+
"t0": t0,
90+
"t": _now(),
91+
"worker_id": worker_id,
92+
"file": entry.file,
93+
"url": entry.url,
94+
"content_length": content_length,
95+
"bytes": total_bytes,
96+
"ok": ok,
97+
"error": error,
98+
}
6799

68100

69101
@cache.command()
@@ -90,6 +122,7 @@ def pull(data_dir: str | None, force: bool, jobs: int) -> None:
90122

91123
session = requests.Session()
92124
failed: list[tuple[str, str]] = []
125+
spans: list[dict[str, object]] = []
93126
t0 = time.monotonic()
94127
with (
95128
Progress(
@@ -107,13 +140,27 @@ def pull(data_dir: str | None, force: bool, jobs: int) -> None:
107140
for future in as_completed(futures):
108141
entry = futures[future]
109142
try:
110-
future.result()
143+
span = future.result()
144+
spans.append(span)
145+
if not span["ok"]:
146+
failed.append((str(span["file"]), str(span["error"] or "unknown")))
111147
except Exception as exc:
148+
# Defensive: bug in _download_one itself
112149
failed.append((entry.file, str(exc)))
113150
elapsed = time.monotonic() - t0
114151

115-
ok = len(targets) - len(failed)
116-
click.echo(f"Downloaded {ok}/{len(targets)} file(s) in {elapsed:.1f}s.")
152+
# Write metrics
153+
log_dir = resolved / "state" / "logs"
154+
log_dir.mkdir(parents=True, exist_ok=True)
155+
now = datetime.now().astimezone().strftime("%Y%m%dT%H%M%S")
156+
log_file = log_dir / f"{now}_{time.time_ns()}_pull.jsonl"
157+
with open(log_file, "w") as fp:
158+
for span in spans:
159+
fp.write(json.dumps(span) + "\n")
160+
161+
ok_count = len(targets) - len(failed)
162+
click.echo(f"Downloaded {ok_count}/{len(targets)} file(s) in {elapsed:.1f}s.")
163+
click.echo(f"Detailed logs: {log_file.relative_to(resolved)}")
117164

118165
if failed:
119166
click.echo(f"{len(failed)} download(s) failed:", err=True)

library/tests/iqb/cli/cache_pull_test.py

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import hashlib
44
import json
5+
from datetime import datetime
56
from pathlib import Path
67
from unittest.mock import MagicMock, patch
78

@@ -36,6 +37,24 @@ def _make_cache_file(data_dir: Path, rel_path: str, content: bytes) -> Path:
3637
_FILE_B = f"cache/v1/{_TS1}/{_TS2}/uploads/data.parquet"
3738

3839

40+
def _find_log_file(data_dir: Path) -> Path | None:
41+
"""Find the single JSONL log file under state/logs/."""
42+
log_dir = data_dir / "state" / "logs"
43+
if not log_dir.exists():
44+
return None
45+
files = list(log_dir.glob("*_pull.jsonl"))
46+
return files[0] if len(files) == 1 else None
47+
48+
49+
def _read_spans(log_file: Path) -> list[dict]:
50+
"""Read all spans from a JSONL log file."""
51+
spans = []
52+
for line in log_file.read_text().splitlines():
53+
if line.strip():
54+
spans.append(json.loads(line))
55+
return spans
56+
57+
3958
def _fake_response(content: bytes) -> MagicMock:
4059
"""Create a mock response that yields content in chunks."""
4160
resp = MagicMock()
@@ -82,6 +101,15 @@ def test_downloads_only_remote(self, mock_session_cls: MagicMock, tmp_path: Path
82101
# Session.get was called with the right URL
83102
mock_session.get.assert_called_once_with(url, stream=True)
84103

104+
# Metrics log should exist with one span
105+
log_file = _find_log_file(tmp_path)
106+
assert log_file is not None
107+
spans = _read_spans(log_file)
108+
assert len(spans) == 1
109+
assert spans[0]["ok"] is True
110+
assert spans[0]["file"] == _FILE_A
111+
assert spans[0]["bytes"] == len(content)
112+
85113

86114
class TestCachePullMatchingSkipped:
87115
"""MATCHING entries are not downloaded."""
@@ -178,6 +206,15 @@ def test_sha256_failure(self, mock_session_cls: MagicMock, tmp_path: Path):
178206
# File should NOT exist on disk (atomic write prevented partial file)
179207
assert not (tmp_path / _FILE_A).exists()
180208

209+
# Metrics log should record the failure
210+
log_file = _find_log_file(tmp_path)
211+
assert log_file is not None
212+
spans = _read_spans(log_file)
213+
assert len(spans) == 1
214+
assert spans[0]["ok"] is False
215+
assert spans[0]["error"] is not None
216+
assert "SHA256 mismatch" in spans[0]["error"]
217+
181218

182219
class TestCachePullMultipleFiles:
183220
"""Multiple ONLY_REMOTE entries are all downloaded."""
@@ -243,3 +280,87 @@ def test_no_partial_file_on_failure(self, mock_session_cls: MagicMock, tmp_path:
243280

244281
# No partial file should exist
245282
assert not (tmp_path / _FILE_A).exists()
283+
284+
# Metrics log should record the connection failure
285+
log_file = _find_log_file(tmp_path)
286+
assert log_file is not None
287+
spans = _read_spans(log_file)
288+
assert len(spans) == 1
289+
assert spans[0]["ok"] is False
290+
assert "connection refused" in spans[0]["error"]
291+
292+
293+
class TestCachePullMetrics:
294+
"""Download metrics JSONL file is created with correct schema."""
295+
296+
_EXPECTED_KEYS = {
297+
"t0",
298+
"t",
299+
"worker_id",
300+
"file",
301+
"url",
302+
"content_length",
303+
"bytes",
304+
"ok",
305+
"error",
306+
}
307+
308+
@patch("iqb.cli.cache_pull.requests.Session")
309+
def test_two_files_produce_two_spans(self, mock_session_cls: MagicMock, tmp_path: Path):
310+
content_a = b"content a"
311+
content_b = b"content b"
312+
url_a = "https://example.com/a"
313+
url_b = "https://example.com/b"
314+
_write_manifest(
315+
tmp_path,
316+
{
317+
_FILE_A: {"sha256": _sha256(content_a), "url": url_a},
318+
_FILE_B: {"sha256": _sha256(content_b), "url": url_b},
319+
},
320+
)
321+
322+
mock_session = MagicMock()
323+
324+
def side_effect(url: str, **kwargs):
325+
if url == url_a:
326+
return _fake_response(content_a)
327+
return _fake_response(content_b)
328+
329+
mock_session.get.side_effect = side_effect
330+
mock_session_cls.return_value = mock_session
331+
332+
runner = CliRunner()
333+
result = runner.invoke(cli, ["cache", "pull", "-d", str(tmp_path)])
334+
assert result.exit_code == 0
335+
336+
log_file = _find_log_file(tmp_path)
337+
assert log_file is not None
338+
assert log_file.name.endswith("_pull.jsonl")
339+
spans = _read_spans(log_file)
340+
assert len(spans) == 2
341+
342+
for span in spans:
343+
# All expected keys are present
344+
assert set(span.keys()) == self._EXPECTED_KEYS
345+
346+
# ok is True
347+
assert span["ok"] is True
348+
assert span["error"] is None
349+
350+
# worker_id is an integer
351+
assert isinstance(span["worker_id"], int)
352+
353+
# Timestamps are parseable and t0 <= t
354+
t0 = datetime.strptime(span["t0"], "%Y-%m-%d %H:%M:%S %z")
355+
t = datetime.strptime(span["t"], "%Y-%m-%d %H:%M:%S %z")
356+
assert t0 <= t
357+
358+
# URL is present
359+
assert span["url"] in (url_a, url_b)
360+
361+
# Check per-file details
362+
spans_by_file = {s["file"]: s for s in spans}
363+
assert spans_by_file[_FILE_A]["bytes"] == len(content_a)
364+
assert spans_by_file[_FILE_A]["content_length"] == len(content_a)
365+
assert spans_by_file[_FILE_B]["bytes"] == len(content_b)
366+
assert spans_by_file[_FILE_B]["content_length"] == len(content_b)

0 commit comments

Comments
 (0)