Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 34 additions & 122 deletions data/ghcache.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
"""
Remote cache management tool for IQB data files.

Usage:
uv run python data/ghcache.py scan

This tool manages caching of large parquet/JSON files with local SHA256
verification. It scans locally generated files and updates the manifest
with correct GCS URLs for remote distribution.
Expand All @@ -29,72 +32,21 @@
"""

import argparse
import hashlib
import json
import os
import re
import subprocess
import sys
from pathlib import Path

from dacite import from_dict

from iqb.ghremote.cache import Manifest
from iqb.ghremote.diff import DiffState, diff

MANIFEST_PATH = Path("state") / "ghremote" / "manifest.json"
CACHE_DIR = Path("cache") / "v1"
GCS_BUCKET = "mlab-sandbox-iqb-us-central1"
GCS_BASE_URL = f"https://storage.googleapis.com/{GCS_BUCKET}"


def compute_sha256(file_path: Path) -> str:
"""Compute SHA256 hash of a file."""
sha256 = hashlib.sha256()
with open(file_path, "rb") as f:
while chunk := f.read(8192):
sha256.update(chunk)
return sha256.hexdigest()


def validate_cache_path(path: str) -> bool:
"""
Validate that a path follows the cache/v1 format.

Valid format:
cache/v1/{rfc3339_timestamp}/{rfc3339_timestamp}/{name}/{file}

Where:
- Component 1: "cache"
- Component 2: "v1"
- Component 3: RFC3339 timestamp (e.g., 20241001T000000Z)
- Component 4: RFC3339 timestamp
- Component 5: lowercase letters, numbers, and underscores [a-z0-9_]+
- Component 6: "data.parquet" or "stats.json"
"""
parts = path.split("/")
if len(parts) != 6:
return False

# Component 1: cache
if parts[0] != "cache":
return False

# Component 2: v1
if parts[1] != "v1":
return False

# Components 3-4: RFC3339 timestamps (YYYYMMDDTHHMMSSZ format)
rfc3339_pattern = re.compile(r"^\d{8}T\d{6}Z$")
if not rfc3339_pattern.match(parts[2]):
return False
if not rfc3339_pattern.match(parts[3]):
return False

# Component 5: lowercase letters, numbers, and underscores
name_pattern = re.compile(r"^[a-z0-9_]+$")
if not name_pattern.match(parts[4]):
return False

# Component 6: data.parquet or stats.json
return parts[5] in ("data.parquet", "stats.json")


def load_manifest() -> dict:
"""Load manifest from state/ghremote/manifest.json, or return empty if not found."""
if not MANIFEST_PATH.exists():
Expand All @@ -112,27 +64,12 @@ def save_manifest(manifest: dict) -> None:
f.write("\n") # Trailing newline


def is_git_ignored(file_path: Path) -> bool:
"""Check if a file is ignored by git."""
try:
result = subprocess.run(
["git", "check-ignore", str(file_path)],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
# Exit code 0 means the file is ignored
return result.returncode == 0
except Exception:
# If git isn't available or other error, assume not ignored
return False


def cmd_scan(args) -> int:
"""
Scan command: Scan local files and update manifest.

1. Load or create manifest
2. Scan cache/v1 for git-ignored files
2. Diff manifest against local cache/v1 files
3. For new or changed files:
- Compute SHA256
- Generate GCS URL
Expand All @@ -141,68 +78,40 @@ def cmd_scan(args) -> int:
5. Print gcloud storage rsync command for uploading
"""
_ = args
manifest = load_manifest()
files_dict = manifest.setdefault("files", {})

if not CACHE_DIR.exists():
print(f"Cache directory {CACHE_DIR} does not exist.")
return 1

print(f"Scanning {CACHE_DIR} for git-ignored files...")

# Find all files under cache/v1
all_files = list(CACHE_DIR.rglob("*"))
cache_files = [f for f in all_files if f.is_file()]

# Filter to only git-ignored files
ignored_files = [f for f in cache_files if is_git_ignored(f)]
manifest_dict = load_manifest()
files_dict = manifest_dict.setdefault("files", {})
manifest = from_dict(Manifest, manifest_dict)

if not ignored_files:
print("No git-ignored files found.")
return 0

print(f"Found {len(ignored_files)} git-ignored files.")
print("Scanning local cache files...")

updated_count = 0

for file_path in ignored_files:
# Convert to relative path string with forward slashes for cross-platform compatibility
rel_path = file_path.as_posix()

# Validate path format
if not validate_cache_path(rel_path):
print(f"Skipping invalid path format: {rel_path}")
continue

# Compute SHA256
sha256 = compute_sha256(file_path)

# Check if file is already in manifest with same SHA256
existing_entry = files_dict.get(rel_path)
if existing_entry and existing_entry["sha256"] == sha256:
print(f"Already in manifest: {rel_path}")
continue

# File is new or changed
url = f"{GCS_BASE_URL}/{rel_path}"
print(f"New/changed: {rel_path}")
print(f" SHA256: {sha256}")
print(f" URL: {url}")

files_dict[rel_path] = {"sha256": sha256, "url": url}
updated_count += 1
for entry in diff(manifest, Path(".")):
if entry.state == DiffState.MATCHING:
print(f"Already in manifest: {entry.file}")
elif entry.state in (DiffState.ONLY_LOCAL, DiffState.SHA256_MISMATCH):
sha256 = entry.local_sha256
url = f"{GCS_BASE_URL}/{entry.file}"
action = "Changed" if entry.state == DiffState.SHA256_MISMATCH else "New"
print(f"{action}: {entry.file}")
print(f" SHA256: {sha256}")
print(f" URL: {url}")
files_dict[entry.file] = {"sha256": sha256, "url": url}
updated_count += 1
elif entry.state == DiffState.ONLY_REMOTE:
print(f"In manifest but not on disk: {entry.file}")

# Save updated manifest
save_manifest(manifest)
save_manifest(manifest_dict)
print(f"\nManifest updated: {MANIFEST_PATH}")

if updated_count > 0:
print(f"\n{updated_count} file(s) added/updated in manifest.")
print("\nNext steps:")
print("1. Remove zero-length .lock files left over by the pipeline:")
print(f" find data/{CACHE_DIR} -type f -name .lock -delete")
print(" find data/cache/v1 -type f -name .lock -delete")
print("2. Upload files to GCS:")
print(f" gcloud storage rsync -r data/{CACHE_DIR} gs://{GCS_BUCKET}/{CACHE_DIR}")
print(f" gcloud storage rsync -r data/cache/v1 gs://{GCS_BUCKET}/cache/v1")
print(f"3. Commit updated data/{MANIFEST_PATH} to repository")

return 0
Expand All @@ -213,7 +122,10 @@ def main() -> int:
script_dir = Path(__file__).resolve().parent
os.chdir(script_dir)

parser = argparse.ArgumentParser(description="Remote cache management tool for IQB data files")
parser = argparse.ArgumentParser(
description="Remote cache management tool for IQB data files. "
"Run with: uv run python data/ghcache.py <command>",
)
subparsers = parser.add_subparsers(dest="command", help="Subcommand to run")

# Scan subcommand
Expand Down
4 changes: 4 additions & 0 deletions library/src/iqb/ghremote/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,15 @@
"""

from .cache import IQBRemoteCache
from .diff import DiffEntry, DiffState, diff

# Backward compatibility alias
IQBGitHubRemoteCache = IQBRemoteCache

__all__ = [
"DiffEntry",
"DiffState",
"IQBGitHubRemoteCache",
"IQBRemoteCache",
"diff",
]
6 changes: 3 additions & 3 deletions library/src/iqb/ghremote/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ def _sync_file_entry(entry: FileEntry, dest_path: Path):
"""Sync the given FileEntry with the remotely cached file."""
# Determine whether we need to download again
exists = dest_path.exists()
if not exists or entry.sha256 != _compute_sha256(dest_path):
if not exists or entry.sha256 != compute_sha256(dest_path):
dest_path.parent.mkdir(parents=True, exist_ok=True)
# Operate inside a temporary directory in the destination directory so
# `os.replace()` is atomic and we avoid cross-filesystem moves.
Expand Down Expand Up @@ -183,13 +183,13 @@ def _sync_file_entry_tmp(entry: FileEntry, tmp_file: Path):

# Make sure the sha256 matches
log.info("validating %s... start", entry)
sha256 = _compute_sha256(tmp_file)
sha256 = compute_sha256(tmp_file)
if sha256 != entry.sha256:
raise ValueError(f"SHA256 mismatch: expected {entry.sha256}, got {sha256}")
log.info("validating %s... ok", entry)


def _compute_sha256(path: Path) -> str:
def compute_sha256(path: Path) -> str:
"""Compute SHA256 hash of a file."""
sha256 = hashlib.sha256()
with open(path, "rb") as fp:
Expand Down
Loading
Loading