diff --git a/README.md b/README.md index a5bb5ed..02f85fc 100644 --- a/README.md +++ b/README.md @@ -86,3 +86,47 @@ To run it every Monday at 5 AM, add this line to your crontab: 0 5 * * 1 /path/to/fmriprep_stats/scripts/update_plots.sh 2>> $HOME/var/log/update_plots.err >> $HOME/var/log/update_plots.log ``` +## Migrating from MongoDB + +The repository provides `scripts/migrate_mongo_to_parquet.py` to export +collections from MongoDB into Parquet files. Each issue collection is streamed +in batches and normalized with the same helpers used by the fetching CLI. Rows +that are missing identifiers or timestamps are skipped, while valid records are +written as `//.parquet`. A manifest file named +`_manifest.parquet` is updated atomically so re-running the script is +idempotent. Example usage: + +```bash +python scripts/migrate_mongo_to_parquet.py \ + --mongo-uri mongodb://localhost:27017 \ + --db fmriprep_stats \ + --dataset-root /data/fmriprep-parquet \ + --start-date 2022-01-01 --end-date 2022-02-01 +``` + +Use the `--collections` flag to focus on a subset of issues while debugging, +and adjust `--batch-size` (default: 1000) if you need to reduce memory pressure +or better utilize fast disks. Streaming ensures only a day’s worth of data is +held in memory, but exporting a large history still requires several gigabytes +of temporary disk space for Parquet files. + +After the first migration you can re-run the script with the same arguments to +confirm idempotency—the manifest ensures previously written partitions are +skipped so duplicates are avoided. + +## Updating plots from Parquet exports + +Once a Parquet dataset exists you can generate plots without MongoDB. Either +use `src/run.py plot --dataset-root /path/to/dataset` directly or run the +`scripts/update_plots_parquet.sh` helper, which mirrors `update_plots.sh` but +injects the dataset path: + +```bash +chmod +x scripts/update_plots_parquet.sh +scripts/update_plots_parquet.sh /data/fmriprep-parquet +``` + +This script accepts an optional repository URL (defaults to +`git@github.com:nipreps/nipreps.github.io.git`) and otherwise behaves like the +original weekly plot updater. + diff --git a/requirements.txt b/requirements.txt index fde755a..c1129f6 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,7 +1,8 @@ pymongo pandas +pyarrow matplotlib seaborn notebook nbconvert -requests \ No newline at end of file +requests diff --git a/scripts/migrate_mongo_to_parquet.py b/scripts/migrate_mongo_to_parquet.py new file mode 100755 index 0000000..ecf3df7 --- /dev/null +++ b/scripts/migrate_mongo_to_parquet.py @@ -0,0 +1,211 @@ +#!/usr/bin/env python3 +"""Export MongoDB collections into Parquet partitions.""" + +from __future__ import annotations + +import argparse +import logging +import sys +from datetime import datetime, timedelta +from pathlib import Path +from typing import Dict, List, Optional + +import pandas as pd +from pymongo import MongoClient + +REPO_ROOT = Path(__file__).resolve().parents[1] +SRC_DIR = REPO_ROOT / "src" +if str(SRC_DIR) not in sys.path: + sys.path.insert(0, str(SRC_DIR)) + +from api import ( # noqa: E402 pylint: disable=wrong-import-position + ISSUES, + ManifestCache, + _event_date, + _normalize_event, + _partition_path, +) + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument("--mongo-uri", default="mongodb://localhost:27017", help="MongoDB URI") + parser.add_argument("--db", default="fmriprep_stats", help="MongoDB database name") + parser.add_argument( + "--dataset-root", + required=True, + help="Target directory where Parquet files will be written", + ) + parser.add_argument( + "--batch-size", + type=int, + default=1000, + help="Number of documents to stream from Mongo per batch", + ) + parser.add_argument( + "--start-date", + type=lambda value: datetime.strptime(value, "%Y-%m-%d"), + default=None, + help="Only export events on/after this date (YYYY-MM-DD)", + ) + parser.add_argument( + "--end-date", + type=lambda value: datetime.strptime(value, "%Y-%m-%d"), + default=None, + help="Only export events before this date (YYYY-MM-DD)", + ) + parser.add_argument( + "--collections", + nargs="+", + default=None, + help="Subset of collections to export (defaults to all supported issues)", + ) + parser.add_argument( + "--log-level", + default="INFO", + choices=["DEBUG", "INFO", "WARNING", "ERROR"], + help="Logging verbosity", + ) + return parser.parse_args() + + +def _date_filter(args: argparse.Namespace) -> Optional[Dict[str, str]]: + if not args.start_date and not args.end_date: + return None + + window: Dict[str, str] = {} + if args.start_date: + start_iso = args.start_date.replace(hour=0, minute=0, second=0, microsecond=0) + window["$gte"] = start_iso.strftime("%Y-%m-%dT%H:%M:%SZ") + if args.end_date: + end_iso = args.end_date.replace(hour=0, minute=0, second=0, microsecond=0) + window["$lt"] = (end_iso + timedelta(days=1)).strftime("%Y-%m-%dT%H:%M:%SZ") + return window + + +def _write_partition( + dataset_root: Path, + event_name: str, + event_date, + rows: List[Dict], + manifest_cache: ManifestCache, +) -> None: + if not rows: + return + + target_path = _partition_path(dataset_root, event_name, event_date) + relative_path = str(target_path.relative_to(dataset_root)) + if relative_path in manifest_cache.seen_paths: + logging.info("Skipping %s – manifest already lists this partition", relative_path) + return + + target_path.parent.mkdir(parents=True, exist_ok=True) + df = pd.DataFrame(rows) + tmp_path = target_path.with_suffix(".tmp") + df.to_parquet(tmp_path, index=False) + tmp_path.replace(target_path) + + ids = [row.get("id") for row in rows if row.get("id")] + manifest_cache.add( + { + "event": event_name, + "date": event_date.isoformat(), + "path": relative_path, + "rows": len(rows), + "min_id": min(ids) if ids else None, + "max_id": max(ids) if ids else None, + } + ) + logging.info("Wrote %s (%d rows)", relative_path, len(rows)) + + +def export_collection( + client: MongoClient, + db_name: str, + event_name: str, + dataset_root: Path, + batch_size: int, + date_filter: Optional[Dict[str, str]], + manifest_cache: ManifestCache, +) -> Dict[str, int]: + stats = {"read": 0, "written": 0, "skipped": 0} + collection = client[db_name][event_name] + + query: Dict[str, Dict[str, str]] = {} + if date_filter: + query["dateCreated"] = date_filter + + cursor = ( + collection.find(query) + .sort("dateCreated", 1) + .batch_size(batch_size) + ) + + current_date = None + buffer: List[Dict] = [] + for document in cursor: + stats["read"] += 1 + normalized = _normalize_event(document) + event_date = _event_date(document) or _event_date(normalized) + event_id = normalized.get("id") + if not event_id or event_date is None: + stats["skipped"] += 1 + continue + + normalized.pop("_id", None) + normalized["event"] = event_name + normalized["partition_date"] = event_date.isoformat() + + if current_date is None: + current_date = event_date + if event_date != current_date: + _write_partition(dataset_root, event_name, current_date, buffer, manifest_cache) + stats["written"] += len(buffer) + buffer = [] + current_date = event_date + + buffer.append(normalized) + + if buffer and current_date is not None: + _write_partition(dataset_root, event_name, current_date, buffer, manifest_cache) + stats["written"] += len(buffer) + + manifest_cache.flush() + return stats + + +def main() -> None: + args = parse_args() + logging.basicConfig(level=getattr(logging, args.log_level)) + + dataset_root = Path(args.dataset_root) + dataset_root.mkdir(parents=True, exist_ok=True) + + client = MongoClient(args.mongo_uri) + collections = args.collections or list(ISSUES.keys()) + + manifest_cache = ManifestCache(dataset_root) + date_filter = _date_filter(args) + + for event_name in collections: + logging.info("Exporting collection '%s'", event_name) + stats = export_collection( + client, + args.db, + event_name, + dataset_root, + args.batch_size, + date_filter, + manifest_cache, + ) + logging.info( + "Finished %s: %d read, %d written, %d skipped", + event_name, + stats["read"], + stats["written"], + stats["skipped"], + ) + + +if __name__ == "__main__": + main() diff --git a/scripts/update_plots_parquet.sh b/scripts/update_plots_parquet.sh new file mode 100755 index 0000000..578bc92 --- /dev/null +++ b/scripts/update_plots_parquet.sh @@ -0,0 +1,34 @@ +#!/bin/bash +# scripts/update_plots_parquet.sh +# Generate plots from Parquet exports and push them to nipreps.github.io. + +set -euo pipefail + +DATASET_ROOT=${1:?"Usage: $0 [repo-url]"} +REPO_URL="${2:-git@github.com:nipreps/nipreps.github.io.git}" +TMP_REPO="$(mktemp -d)" + +cleanup() { + rm -rf "$TMP_REPO" +} +trap cleanup EXIT + +git clone "$REPO_URL" "$TMP_REPO" + +ASSETS_DIR="$TMP_REPO/docs/assets" +mkdir -p "$ASSETS_DIR" + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +cd "$SCRIPT_DIR/.." + +"$( which conda )" run -n base src/run.py plot --dataset-root "$DATASET_ROOT" -o "$ASSETS_DIR" + +cd "$TMP_REPO" +git pull --ff-only +git add docs/assets +if ! git diff --cached --quiet; then + git commit -m "Update stats plots from Parquet" + git push +else + echo "No changes to commit." +fi diff --git a/src/api.py b/src/api.py index 5862d6c..a9281f1 100644 --- a/src/api.py +++ b/src/api.py @@ -25,9 +25,13 @@ import os import sys from time import sleep +import datetime +from pathlib import Path +from typing import Any, Dict, Iterable, List, Optional, Set, Union + +import pandas as pd import requests from requests.utils import parse_header_links -import datetime from pymongo import MongoClient from concurrent.futures import ThreadPoolExecutor, as_completed @@ -40,6 +44,137 @@ "sigkill": "854282951", } +MANIFEST_FILENAME = "_manifest.parquet" + + +def _sanitize_key(key: str) -> str: + return key.replace(" ", "_").replace(".", "_") + + +def _flatten(prefix: str, value: Any, dest: Dict[str, Any]) -> None: + if isinstance(value, dict): + for sub_key, sub_value in value.items(): + new_key = _sanitize_key(f"{prefix}_{sub_key}" if prefix else sub_key) + _flatten(new_key, sub_value, dest) + return + if isinstance(value, (list, tuple)): + for index, item in enumerate(value): + new_key = _sanitize_key(f"{prefix}_{index}") + _flatten(new_key, item, dest) + return + dest[_sanitize_key(prefix)] = value + + +def _normalize_event(event: Dict[str, Any]) -> Dict[str, Any]: + normalized: Dict[str, Any] = {} + for key, value in event.items(): + if key == "tags": + continue + if key == "_id": + normalized["_id"] = str(value) + continue + _flatten(key, value, normalized) + + tags = event.get("tags", []) or [] + for tag in tags: + tag_key = _sanitize_key(tag.get("key", "")) + if tag_key: + normalized[tag_key] = tag.get("value") + + normalized.pop("environment", None) + return normalized + + +def _coerce_datetime(value: Any) -> Optional[datetime.datetime]: + if isinstance(value, datetime.datetime): + if not value.tzinfo: + return value.replace(tzinfo=datetime.timezone.utc) + return value + if not value: + return None + if isinstance(value, str): + txt = value.strip() + if txt.endswith("Z"): + txt = txt[:-1] + "+00:00" + try: + return datetime.datetime.fromisoformat(txt) + except ValueError: + return None + return None + + +def _event_date(event: Dict[str, Any]) -> Optional[datetime.date]: + for key in ("dateCreated", "timestamp", "received", "datetime"): + dt = _coerce_datetime(event.get(key)) + if dt: + return dt.date() + return None + + +def _partition_path(dataset_root: Union[Path, str], event_name: str, day: datetime.date) -> Path: + root = Path(dataset_root) + return root / event_name / f"{day.isoformat()}.parquet" + + +def _manifest_path(dataset_root: Union[Path, str]) -> Path: + return Path(dataset_root) / MANIFEST_FILENAME + + +def _read_manifest(dataset_root: Union[Path, str]) -> pd.DataFrame: + manifest_file = _manifest_path(dataset_root) + if not manifest_file.exists(): + return pd.DataFrame(columns=["event", "date", "path", "rows", "min_id", "max_id"]) + return pd.read_parquet(manifest_file) + + +def _existing_manifest_paths(dataset_root: Union[Path, str]) -> Set[str]: + manifest = _read_manifest(dataset_root) + if manifest.empty: + return set() + return set(manifest["path"].astype(str)) + + +def _write_manifest(dataset_root: Union[Path, str], entries: Iterable[Dict[str, Any]]) -> None: + entries = list(entries) + if not entries: + return + + manifest_file = _manifest_path(dataset_root) + manifest_file.parent.mkdir(parents=True, exist_ok=True) + existing = _read_manifest(dataset_root) + df_new = pd.DataFrame(entries) + merged = pd.concat([existing, df_new], ignore_index=True) + tmp_path = manifest_file.with_suffix(".tmp") + merged.to_parquet(tmp_path, index=False) + tmp_path.replace(manifest_file) + + +class ManifestCache: + def __init__(self, dataset_root: Union[Path, str], flush_threshold: int = 10) -> None: + self.dataset_root = Path(dataset_root) + self.flush_threshold = max(1, flush_threshold) + self._cache: List[Dict[str, Any]] = [] + self._seen_paths: Set[str] = _existing_manifest_paths(self.dataset_root) + + @property + def seen_paths(self) -> Set[str]: + return self._seen_paths + + def add(self, entry: Dict[str, Any]) -> None: + path_key = entry.get("path") + if path_key in self._seen_paths: + return + self._cache.append(entry) + if len(self._cache) >= self.flush_threshold: + self.flush() + + def flush(self) -> None: + if not self._cache: + return + _write_manifest(self.dataset_root, self._cache) + self._seen_paths.update(entry["path"] for entry in self._cache) + self._cache.clear() + def filter_new(events, collection): """Return the subset of *events* not already cached in *collection*.""" diff --git a/src/db.py b/src/db.py index ae36c9d..c1818f8 100644 --- a/src/db.py +++ b/src/db.py @@ -4,26 +4,53 @@ from __future__ import annotations import datetime -from typing import Tuple +from pathlib import Path +from typing import Tuple, Union import pandas as pd from pymongo import MongoClient +def _prepare_dataframe(data: pd.DataFrame, unique: bool = True) -> pd.DataFrame: + if "dateCreated" in data.columns: + data["dateCreated"] = pd.to_datetime(data["dateCreated"]) + if "dateCreated" in data.columns: + data["date_minus_time"] = data["dateCreated"].apply( + lambda df: datetime.datetime(year=df.year, month=df.month, day=df.day) + ) + if unique and "run_uuid" in data.columns: + data = data.drop_duplicates(subset=["run_uuid"]) + return data + + def load_event(event_name: str, unique: bool = True) -> pd.DataFrame: """Load one event collection from MongoDB.""" db = MongoClient().fmriprep_stats data = pd.DataFrame(list(db[event_name].find())) if len(data) == 0: raise RuntimeError(f"No records of event '{event_name}'") + return _prepare_dataframe(data, unique=unique) - data["dateCreated"] = pd.to_datetime(data["dateCreated"]) - data["date_minus_time"] = data["dateCreated"].apply( - lambda df: datetime.datetime(year=df.year, month=df.month, day=df.day) - ) - if unique: - data = data.drop_duplicates(subset=["run_uuid"]) - return data + +def load_event_from_parquet( + dataset_root: Union[str, Path], event_name: str, unique: bool = True +) -> pd.DataFrame: + """Load event records stored as Parquet files.""" + + dataset_root = Path(dataset_root) + event_dir = dataset_root / event_name + if not event_dir.exists(): + raise RuntimeError(f"No Parquet export found for '{event_name}' in {dataset_root}") + + files = sorted(event_dir.glob("*.parquet")) + if not files: + raise RuntimeError( + f"No Parquet files found for '{event_name}' under {event_dir}" + ) + + frames = [pd.read_parquet(path) for path in files] + data = pd.concat(frames, ignore_index=True) + return _prepare_dataframe(data, unique=unique) def massage_versions( diff --git a/src/run.py b/src/run.py index 1c7a1f2..e53bbe3 100644 --- a/src/run.py +++ b/src/run.py @@ -28,7 +28,7 @@ import click from api import parallel_fetch, ISSUES, DEFAULT_MAX_ERRORS -from db import load_event, massage_versions +from db import load_event, load_event_from_parquet, massage_versions from viz import plot_performance, plot_version_stream DEFAULT_DAYS_WINDOW = 90 @@ -152,16 +152,32 @@ def get(event, start_date, end_date, days, chunk_days, jobs, max_errors, cached_ @cli.command() -@click.option("-o", "--output-dir", type=click.Path(file_okay=False, dir_okay=True, writable=True), default=".") +@click.option( + "-o", + "--output-dir", + type=click.Path(file_okay=False, dir_okay=True, writable=True), + default=".", +) @click.option("--drop-cutoff", default=None, help="Ignore versions older than this") -def plot(output_dir, drop_cutoff): - """Generate plots using records stored in MongoDB.""" +@click.option( + "--dataset-root", + type=click.Path(file_okay=False, dir_okay=True), + default=None, + help="Read Parquet exports from this directory instead of MongoDB.", +) +def plot(output_dir, drop_cutoff, dataset_root): + """Generate plots using records stored in MongoDB or Parquet exports.""" today = datetime.now().date().strftime("%Y%m%d") out_perf = os.path.join(output_dir, f"{today}_weekly.png") out_ver = os.path.join(output_dir, f"{today}_versionstream.png") - unique_started = load_event("started") - unique_success = load_event("success") + if dataset_root: + loader = lambda name: load_event_from_parquet(dataset_root, name) + else: + loader = load_event + + unique_started = loader("started") + unique_success = loader("success") plot_performance(unique_started, unique_success, drop_cutoff=drop_cutoff, out_file=out_perf) click.echo(f"Saved {out_perf}.")