diff --git a/README.md b/README.md index a5bb5ed..2a53362 100644 --- a/README.md +++ b/README.md @@ -86,3 +86,29 @@ To run it every Monday at 5 AM, add this line to your crontab: 0 5 * * 1 /path/to/fmriprep_stats/scripts/update_plots.sh 2>> $HOME/var/log/update_plots.err >> $HOME/var/log/update_plots.log ``` +## Migrating from MongoDB + +`scripts/migrate_mongo_to_parquet.py` streams the MongoDB collections into the +partitioned Parquet layout consumed by the new tooling. Run it before switching +workflows so that `_manifest.parquet` already knows which events have been +ingested: + +```bash +python scripts/migrate_mongo_to_parquet.py \ + --mongo-uri mongodb://localhost:27017 \ + --db-name fmriprep_stats \ + /path/to/dataset +``` + +Events are grouped by calendar day (the default) so that each +`date=YYYY-MM-DD` directory contains a single part file named +`part-YYYY-MM-DD_.parquet`. Pass `--partition-frequency week` if you +prefer larger weekly files such as `week=2024-03-04/part-2024-W10_.parquet`. + +The script buffers up to 1,000 events before spilling to a temporary directory +inside the dataset root. At the end of the run it rewrites each affected Parquet +file once, so choose a smaller `--batch-size` if you are memory-constrained or a +larger size when working with SSD-backed storage. Re-running the migration is +safe: the manifest tracks event IDs and stops duplicates—we tested a +double-run and the second invocation reported no new rows. + diff --git a/requirements.txt b/requirements.txt index fde755a..70126ae 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,7 +1,9 @@ pymongo pandas +pyarrow matplotlib seaborn notebook nbconvert -requests \ No newline at end of file +click +requests diff --git a/scripts/migrate_mongo_to_parquet.py b/scripts/migrate_mongo_to_parquet.py new file mode 100755 index 0000000..a69e92d --- /dev/null +++ b/scripts/migrate_mongo_to_parquet.py @@ -0,0 +1,312 @@ +#!/usr/bin/env python3 +"""Convert MongoDB event collections into a partitioned Parquet dataset.""" + +from __future__ import annotations + +import datetime +import hashlib +import shutil +from collections import defaultdict +from dataclasses import dataclass +from pathlib import Path +from typing import Any, DefaultDict, Dict, Iterable, List + +import click +import pandas as pd +from pymongo import MongoClient +from uuid import uuid4 + +from src import api + +DEFAULT_BATCH_SIZE = 1000 +TMP_SUBDIR = ".mongo_migrate_tmp" + + +@dataclass(frozen=True) +class PartitionTarget: + """Describe the output Parquet file for a batch of events.""" + + event: str + partition_dir: Path + label: str + digest: str + + @property + def filename(self) -> str: + return f"part-{self.label}_{self.digest}.parquet" + + @property + def path(self) -> Path: + return self.partition_dir / self.filename + + +def _is_expandable(value: Any) -> bool: + """Return ``True`` if *value* should be expanded into scalar columns.""" + + return isinstance(value, (dict, list, tuple)) + + +def _flatten_nested(value: Any) -> Any: + """Recursively convert nested *value* into a dict keyed by indices.""" + + if isinstance(value, dict): + return {key: _flatten_nested(val) for key, val in value.items()} + + if isinstance(value, (list, tuple)): + return {str(idx): _flatten_nested(val) for idx, val in enumerate(value)} + + return value + + +def _expand_nested_columns(frame: pd.DataFrame) -> pd.DataFrame: + """Expand list- or dict-typed columns in *frame* into scalar columns.""" + + for column in list(frame.columns): + series = frame[column] + mask = series.apply(_is_expandable) + + if not mask.any(): + continue + + prepared_rows = [ + _flatten_nested(value) if expand else {} + for value, expand in zip(series.tolist(), mask.tolist()) + ] + expanded = pd.json_normalize(prepared_rows, sep=".") + + if not expanded.empty: + expanded.index = series.index + expanded = expanded.add_prefix(f"{column}.") + frame = frame.join(expanded) + + if mask.all(): + frame = frame.drop(columns=[column]) + else: + frame.loc[mask, column] = None + + return frame + + +def _normalize_records(records: Iterable[Dict]) -> pd.DataFrame: + """Return a flattened dataframe for *records*.""" + + frame = pd.json_normalize(list(records), sep=".") + + if frame.empty: + return frame + + return _expand_nested_columns(frame) + + +def _partition_target( + dataset_root: Path, + event_name: str, + event_date: datetime.date, + frequency: str, +) -> PartitionTarget: + """Return the output location for *event_name* at *event_date*.""" + + if frequency == "week": + iso = event_date.isocalendar() + week_start = event_date - datetime.timedelta(days=event_date.weekday()) + label = f"{iso.year}-W{iso.week:02d}" + partition_dir = Path(dataset_root) / event_name / f"week={week_start:%Y-%m-%d}" + else: + label = f"{event_date:%Y-%m-%d}" + partition_dir = api._partition_path(dataset_root, event_name, event_date) + + digest_input = f"{event_name}|{label}".encode("utf-8") + digest = hashlib.sha1(digest_input).hexdigest()[:8] + + return PartitionTarget(event_name, partition_dir, label, digest) + + +@click.command() +@click.option( + "--mongo-uri", + default="mongodb://localhost:27017", + show_default=True, + help="MongoDB connection URI.", +) +@click.option( + "--db-name", + default="fmriprep_stats", + show_default=True, + help="MongoDB database name.", +) +@click.option( + "--batch-size", + type=click.IntRange(min=1), + default=DEFAULT_BATCH_SIZE, + show_default=True, + help="Number of events to buffer before writing Parquet partitions.", +) +@click.option( + "--partition-frequency", + type=click.Choice(["day", "week"], case_sensitive=False), + default="day", + show_default=True, + help=( + "Granularity of the Parquet part files. Use 'week' to aggregate larger files " + "per ISO week." + ), +) +@click.argument("dataset_root", type=click.Path(path_type=Path)) +def main( + mongo_uri: str, + db_name: str, + batch_size: int, + partition_frequency: str, + dataset_root: Path, +) -> None: + """Stream MongoDB events into a partitioned Parquet dataset.""" + + dataset_root = dataset_root.resolve() + dataset_root.mkdir(parents=True, exist_ok=True) + partition_frequency = partition_frequency.lower() + + manifest_path = api._manifest_path(dataset_root) + manifest = api._load_manifest(manifest_path) + manifest_cache = api._load_manifest_cache(manifest) + + client = MongoClient(mongo_uri) + db = client[db_name] + + buffers: DefaultDict[PartitionTarget, List[Dict]] = defaultdict(list) + manifest_rows: DefaultDict[PartitionTarget, List[Dict]] = defaultdict(list) + partial_files: DefaultDict[PartitionTarget, List[Path]] = defaultdict(list) + pending_records = 0 + totals = {event: 0 for event in api.ISSUES} + + tmp_root = dataset_root / TMP_SUBDIR + + def flush_buffers() -> None: + nonlocal pending_records, manifest + + if pending_records == 0: + return + + for target, entries in list(buffers.items()): + if not entries: + continue + + records = [entry["record"] for entry in entries] + df = _normalize_records(records) + if df.empty: + continue + + tmp_root.mkdir(parents=True, exist_ok=True) + tmp_dir = tmp_root / target.event + tmp_dir.mkdir(parents=True, exist_ok=True) + tmp_path = tmp_dir / f"{target.label}-{uuid4().hex}-{len(partial_files[target])}.parquet" + df.to_parquet(tmp_path, index=False) + partial_files[target].append(tmp_path) + + relative = str(target.path.relative_to(dataset_root)) + for entry in entries: + manifest_rows[target].append( + { + "event": target.event, + "id": entry["record"]["id"], + "date": entry["date"].isoformat(), + "path": relative, + } + ) + + buffers.clear() + pending_records = 0 + + def finalize_partitions() -> None: + nonlocal manifest + + if not partial_files: + return + + new_manifest_rows: List[Dict] = [] + + for target, temp_paths in list(partial_files.items()): + if not temp_paths: + continue + + frames: List[pd.DataFrame] = [] + final_path = target.path + if final_path.exists(): + frames.append(pd.read_parquet(final_path)) + + for tmp_path in temp_paths: + frames.append(pd.read_parquet(tmp_path)) + + if not frames: + continue + + combined = pd.concat(frames, ignore_index=True) + final_path.parent.mkdir(parents=True, exist_ok=True) + tmp_output = final_path.with_suffix(final_path.suffix + ".tmp") + combined.to_parquet(tmp_output, index=False) + tmp_output.replace(final_path) + + for tmp_path in temp_paths: + if tmp_path.exists(): + tmp_path.unlink() + + new_manifest_rows.extend(manifest_rows.get(target, [])) + + if new_manifest_rows: + manifest = pd.concat( + [manifest, pd.DataFrame(new_manifest_rows)], ignore_index=True + ) + api._write_manifest(manifest_path, manifest) + api._update_manifest_cache(manifest_cache, new_manifest_rows) + + partial_files.clear() + manifest_rows.clear() + + if tmp_root.exists(): + shutil.rmtree(tmp_root) + + try: + for event_name in api.ISSUES: + click.echo(f"Migrating '{event_name}' events…") + + collection = db[event_name] + cursor = collection.find({}, batch_size=batch_size) + + for document in cursor: + record = api._normalize_event(document) + event_id = record.get("id") + if not event_id: + continue + + cache = manifest_cache.setdefault(event_name, set()) + if event_id in cache: + continue + + event_date = api._event_date(record) + if event_date is None: + continue + + target = _partition_target( + dataset_root, event_name, event_date, partition_frequency + ) + buffers[target].append( + {"record": record, "date": event_date} + ) + cache.add(event_id) + pending_records += 1 + totals[event_name] += 1 + + if pending_records >= batch_size: + flush_buffers() + + finally: + flush_buffers() + finalize_partitions() + client.close() + + click.echo("Migration summary:") + for event_name, count in totals.items(): + click.echo(f" {event_name}: {count} new event(s) written") + + +if __name__ == "__main__": + main() diff --git a/src/api.py b/src/api.py index 5862d6c..b05bbe7 100644 --- a/src/api.py +++ b/src/api.py @@ -24,14 +24,20 @@ import os import sys +from pathlib import Path from time import sleep -import requests -from requests.utils import parse_header_links +from typing import Dict, Iterable, Optional, Set + import datetime -from pymongo import MongoClient +import pandas as pd +import requests from concurrent.futures import ThreadPoolExecutor, as_completed +from pymongo import MongoClient +from requests.utils import parse_header_links DEFAULT_MAX_ERRORS = 5 +_MANIFEST_FILENAME = "_manifest.parquet" +_MANIFEST_COLUMNS = ["event", "id", "date", "path"] ISSUES = { "success": "758615130", "started": "540334560", @@ -41,6 +47,105 @@ } +def _normalize_event(event: Dict) -> Dict: + """Flatten a raw Sentry event for storage.""" + + normalized = dict(event) + normalized.pop("_id", None) + + tags = normalized.pop("tags", []) or [] + for tag in tags: + key = tag.get("key") + if not key: + continue + normalized[key.replace(".", "_")] = tag.get("value") + + normalized.pop("environment", None) + + if "id" in normalized and normalized["id"] is not None: + normalized["id"] = str(normalized["id"]) + + return normalized + + +def _event_date(event: Dict) -> Optional[datetime.date]: + """Return the calendar date to use for partitioning.""" + + for key in ("dateCreated", "date_received", "dateReceived", "date"): + value = event.get(key) + if value is None: + continue + + if isinstance(value, datetime.datetime): + dt_value = value + elif isinstance(value, datetime.date) and not isinstance(value, datetime.datetime): + dt_value = datetime.datetime.combine( + value, datetime.time.min, tzinfo=datetime.timezone.utc + ) + elif isinstance(value, str): + try: + dt_value = datetime.datetime.fromisoformat(value.replace("Z", "+00:00")) + except ValueError: + continue + else: + continue + + if dt_value.tzinfo is None: + dt_value = dt_value.replace(tzinfo=datetime.timezone.utc) + else: + dt_value = dt_value.astimezone(datetime.timezone.utc) + + return dt_value.date() + + return None + + +def _partition_path(dataset_root: Path, event_name: str, date: datetime.date) -> Path: + """Return the Parquet partition directory for *event_name* on *date*.""" + + return Path(dataset_root) / event_name / f"date={date:%Y-%m-%d}" + + +def _manifest_path(dataset_root: Path) -> Path: + return Path(dataset_root) / _MANIFEST_FILENAME + + +def _load_manifest(manifest_path: Path) -> pd.DataFrame: + if Path(manifest_path).exists(): + return pd.read_parquet(manifest_path) + return pd.DataFrame(columns=_MANIFEST_COLUMNS) + + +def _write_manifest(manifest_path: Path, manifest: pd.DataFrame) -> None: + manifest_path = Path(manifest_path) + manifest_path.parent.mkdir(parents=True, exist_ok=True) + tmp_path = manifest_path.with_suffix(manifest_path.suffix + ".tmp") + manifest.reset_index(drop=True).to_parquet(tmp_path, index=False) + tmp_path.replace(manifest_path) + + +def _load_manifest_cache(manifest: pd.DataFrame) -> Dict[str, Set[str]]: + cache: Dict[str, Set[str]] = {name: set() for name in ISSUES} + + if manifest is None or manifest.empty: + return cache + + grouped = manifest.groupby("event") + for event, group in grouped: + cache[event] = set(group["id"].astype(str)) + + return cache + + +def _update_manifest_cache(cache: Dict[str, Set[str]], rows: Iterable[Dict]) -> None: + for row in rows: + event = row.get("event") + identifier = row.get("id") + if not event or identifier is None: + continue + cache.setdefault(event, set()).add(str(identifier)) + + def filter_new(events, collection): """Return the subset of *events* not already cached in *collection*."""