Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,3 +86,47 @@ To run it every Monday at 5 AM, add this line to your crontab:
0 5 * * 1 /path/to/fmriprep_stats/scripts/update_plots.sh 2>> $HOME/var/log/update_plots.err >> $HOME/var/log/update_plots.log
```

## Migrating from MongoDB

The repository provides `scripts/migrate_mongo_to_parquet.py` to export
collections from MongoDB into Parquet files. Each issue collection is streamed
in batches and normalized with the same helpers used by the fetching CLI. Rows
that are missing identifiers or timestamps are skipped, while valid records are
written as `<dataset-root>/<event>/<YYYY-MM-DD>.parquet`. A manifest file named
`_manifest.parquet` is updated atomically so re-running the script is
idempotent. Example usage:

```bash
python scripts/migrate_mongo_to_parquet.py \
--mongo-uri mongodb://localhost:27017 \
--db fmriprep_stats \
--dataset-root /data/fmriprep-parquet \
--start-date 2022-01-01 --end-date 2022-02-01
```

Use the `--collections` flag to focus on a subset of issues while debugging,
and adjust `--batch-size` (default: 1000) if you need to reduce memory pressure
or better utilize fast disks. Streaming ensures only a day’s worth of data is
held in memory, but exporting a large history still requires several gigabytes
of temporary disk space for Parquet files.

After the first migration you can re-run the script with the same arguments to
confirm idempotency—the manifest ensures previously written partitions are
skipped so duplicates are avoided.

## Updating plots from Parquet exports

Once a Parquet dataset exists you can generate plots without MongoDB. Either
use `src/run.py plot --dataset-root /path/to/dataset` directly or run the
`scripts/update_plots_parquet.sh` helper, which mirrors `update_plots.sh` but
injects the dataset path:

```bash
chmod +x scripts/update_plots_parquet.sh
scripts/update_plots_parquet.sh /data/fmriprep-parquet
```

This script accepts an optional repository URL (defaults to
`[email protected]:nipreps/nipreps.github.io.git`) and otherwise behaves like the
original weekly plot updater.

3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
pymongo
pandas
pyarrow
matplotlib
seaborn
notebook
nbconvert
requests
requests
211 changes: 211 additions & 0 deletions scripts/migrate_mongo_to_parquet.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
#!/usr/bin/env python3
"""Export MongoDB collections into Parquet partitions."""

from __future__ import annotations

import argparse
import logging
import sys
from datetime import datetime, timedelta
from pathlib import Path
from typing import Dict, List, Optional

import pandas as pd
from pymongo import MongoClient

REPO_ROOT = Path(__file__).resolve().parents[1]
SRC_DIR = REPO_ROOT / "src"
if str(SRC_DIR) not in sys.path:
sys.path.insert(0, str(SRC_DIR))

from api import ( # noqa: E402 pylint: disable=wrong-import-position
ISSUES,
ManifestCache,
_event_date,
_normalize_event,
_partition_path,
)


def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--mongo-uri", default="mongodb://localhost:27017", help="MongoDB URI")
parser.add_argument("--db", default="fmriprep_stats", help="MongoDB database name")
parser.add_argument(
"--dataset-root",
required=True,
help="Target directory where Parquet files will be written",
)
parser.add_argument(
"--batch-size",
type=int,
default=1000,
help="Number of documents to stream from Mongo per batch",
)
parser.add_argument(
"--start-date",
type=lambda value: datetime.strptime(value, "%Y-%m-%d"),
default=None,
help="Only export events on/after this date (YYYY-MM-DD)",
)
parser.add_argument(
"--end-date",
type=lambda value: datetime.strptime(value, "%Y-%m-%d"),
default=None,
help="Only export events before this date (YYYY-MM-DD)",
)
parser.add_argument(
"--collections",
nargs="+",
default=None,
help="Subset of collections to export (defaults to all supported issues)",
)
parser.add_argument(
"--log-level",
default="INFO",
choices=["DEBUG", "INFO", "WARNING", "ERROR"],
help="Logging verbosity",
)
return parser.parse_args()


def _date_filter(args: argparse.Namespace) -> Optional[Dict[str, str]]:
if not args.start_date and not args.end_date:
return None

window: Dict[str, str] = {}
if args.start_date:
start_iso = args.start_date.replace(hour=0, minute=0, second=0, microsecond=0)
window["$gte"] = start_iso.strftime("%Y-%m-%dT%H:%M:%SZ")
if args.end_date:
end_iso = args.end_date.replace(hour=0, minute=0, second=0, microsecond=0)
window["$lt"] = (end_iso + timedelta(days=1)).strftime("%Y-%m-%dT%H:%M:%SZ")
return window


def _write_partition(
dataset_root: Path,
event_name: str,
event_date,
rows: List[Dict],
manifest_cache: ManifestCache,
) -> None:
if not rows:
return

target_path = _partition_path(dataset_root, event_name, event_date)
relative_path = str(target_path.relative_to(dataset_root))
if relative_path in manifest_cache.seen_paths:
logging.info("Skipping %s – manifest already lists this partition", relative_path)
return

target_path.parent.mkdir(parents=True, exist_ok=True)
df = pd.DataFrame(rows)
tmp_path = target_path.with_suffix(".tmp")
df.to_parquet(tmp_path, index=False)
tmp_path.replace(target_path)

ids = [row.get("id") for row in rows if row.get("id")]
manifest_cache.add(
{
"event": event_name,
"date": event_date.isoformat(),
"path": relative_path,
"rows": len(rows),
"min_id": min(ids) if ids else None,
"max_id": max(ids) if ids else None,
}
)
logging.info("Wrote %s (%d rows)", relative_path, len(rows))


def export_collection(
client: MongoClient,
db_name: str,
event_name: str,
dataset_root: Path,
batch_size: int,
date_filter: Optional[Dict[str, str]],
manifest_cache: ManifestCache,
) -> Dict[str, int]:
stats = {"read": 0, "written": 0, "skipped": 0}
collection = client[db_name][event_name]

query: Dict[str, Dict[str, str]] = {}
if date_filter:
query["dateCreated"] = date_filter

cursor = (
collection.find(query)
.sort("dateCreated", 1)
.batch_size(batch_size)
)

current_date = None
buffer: List[Dict] = []
for document in cursor:
stats["read"] += 1
normalized = _normalize_event(document)
event_date = _event_date(document) or _event_date(normalized)
event_id = normalized.get("id")
if not event_id or event_date is None:
stats["skipped"] += 1
continue

normalized.pop("_id", None)
normalized["event"] = event_name
normalized["partition_date"] = event_date.isoformat()

if current_date is None:
current_date = event_date
if event_date != current_date:
_write_partition(dataset_root, event_name, current_date, buffer, manifest_cache)
stats["written"] += len(buffer)
buffer = []
current_date = event_date

buffer.append(normalized)

if buffer and current_date is not None:
_write_partition(dataset_root, event_name, current_date, buffer, manifest_cache)
stats["written"] += len(buffer)

manifest_cache.flush()
return stats


def main() -> None:
args = parse_args()
logging.basicConfig(level=getattr(logging, args.log_level))

dataset_root = Path(args.dataset_root)
dataset_root.mkdir(parents=True, exist_ok=True)

client = MongoClient(args.mongo_uri)
collections = args.collections or list(ISSUES.keys())

manifest_cache = ManifestCache(dataset_root)
date_filter = _date_filter(args)

for event_name in collections:
logging.info("Exporting collection '%s'", event_name)
stats = export_collection(
client,
args.db,
event_name,
dataset_root,
args.batch_size,
date_filter,
manifest_cache,
)
logging.info(
"Finished %s: %d read, %d written, %d skipped",
event_name,
stats["read"],
stats["written"],
stats["skipped"],
)


if __name__ == "__main__":
main()
34 changes: 34 additions & 0 deletions scripts/update_plots_parquet.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#!/bin/bash
# scripts/update_plots_parquet.sh
# Generate plots from Parquet exports and push them to nipreps.github.io.

set -euo pipefail

DATASET_ROOT=${1:?"Usage: $0 <dataset-root> [repo-url]"}
REPO_URL="${2:[email protected]:nipreps/nipreps.github.io.git}"
TMP_REPO="$(mktemp -d)"

cleanup() {
rm -rf "$TMP_REPO"
}
trap cleanup EXIT

git clone "$REPO_URL" "$TMP_REPO"

ASSETS_DIR="$TMP_REPO/docs/assets"
mkdir -p "$ASSETS_DIR"

SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
cd "$SCRIPT_DIR/.."

"$( which conda )" run -n base src/run.py plot --dataset-root "$DATASET_ROOT" -o "$ASSETS_DIR"

cd "$TMP_REPO"
git pull --ff-only
git add docs/assets
if ! git diff --cached --quiet; then
git commit -m "Update stats plots from Parquet"
git push
else
echo "No changes to commit."
fi
Loading