Skip to content

Commit a0c4f0e

Browse files
committed
Add MongoDB migration to Parquet dataset
1 parent 88c7a38 commit a0c4f0e

File tree

4 files changed

+279
-4
lines changed

4 files changed

+279
-4
lines changed

README.md

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,3 +86,23 @@ To run it every Monday at 5 AM, add this line to your crontab:
8686
0 5 * * 1 /path/to/fmriprep_stats/scripts/update_plots.sh 2>> $HOME/var/log/update_plots.err >> $HOME/var/log/update_plots.log
8787
```
8888

89+
## Migrating from MongoDB
90+
91+
`scripts/migrate_mongo_to_parquet.py` streams the MongoDB collections into the
92+
partitioned Parquet layout consumed by the new tooling. Run it before switching
93+
workflows so that `_manifest.parquet` already knows which events have been
94+
ingested:
95+
96+
```bash
97+
python scripts/migrate_mongo_to_parquet.py \
98+
--mongo-uri mongodb://localhost:27017 \
99+
--db-name fmriprep_stats \
100+
/path/to/dataset
101+
```
102+
103+
The script buffers up to 1,000 events (one Parquet file) at a time by default.
104+
Reduce `--batch-size` if you are memory-constrained or increase it on beefier
105+
machines to reduce the number of tiny Parquet files. Re-running the migration
106+
is safe: the manifest tracks event IDs and stops duplicates—we tested a
107+
double-run and the second invocation reported no new rows.
108+

requirements.txt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
pymongo
22
pandas
3+
pyarrow
34
matplotlib
45
seaborn
56
notebook
67
nbconvert
7-
requests
8+
click
9+
requests
Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
#!/usr/bin/env python3
2+
"""Convert MongoDB event collections into a partitioned Parquet dataset."""
3+
4+
from __future__ import annotations
5+
6+
from collections import defaultdict
7+
from pathlib import Path
8+
from typing import Dict, Iterable, List, Tuple
9+
10+
import click
11+
import pandas as pd
12+
from pymongo import MongoClient
13+
from uuid import uuid4
14+
15+
from src import api
16+
17+
DEFAULT_BATCH_SIZE = 1000
18+
19+
20+
def _normalize_records(records: Iterable[Dict]) -> pd.DataFrame:
21+
"""Return a flattened dataframe for *records*."""
22+
23+
return pd.json_normalize(list(records), sep=".")
24+
25+
26+
@click.command()
27+
@click.option(
28+
"--mongo-uri",
29+
default="mongodb://localhost:27017",
30+
show_default=True,
31+
help="MongoDB connection URI.",
32+
)
33+
@click.option(
34+
"--db-name",
35+
default="fmriprep_stats",
36+
show_default=True,
37+
help="MongoDB database name.",
38+
)
39+
@click.option(
40+
"--batch-size",
41+
type=click.IntRange(min=1),
42+
default=DEFAULT_BATCH_SIZE,
43+
show_default=True,
44+
help="Number of events to buffer before writing Parquet partitions.",
45+
)
46+
@click.argument("dataset_root", type=click.Path(path_type=Path))
47+
def main(mongo_uri: str, db_name: str, batch_size: int, dataset_root: Path) -> None:
48+
"""Stream MongoDB events into a partitioned Parquet dataset."""
49+
50+
dataset_root = dataset_root.resolve()
51+
dataset_root.mkdir(parents=True, exist_ok=True)
52+
53+
manifest_path = api._manifest_path(dataset_root)
54+
manifest = api._load_manifest(manifest_path)
55+
manifest_cache = api._load_manifest_cache(manifest)
56+
57+
client = MongoClient(mongo_uri)
58+
db = client[db_name]
59+
60+
buffers: Dict[Tuple[str, Path], List[Dict]] = defaultdict(list)
61+
pending_records = 0
62+
totals = {event: 0 for event in api.ISSUES}
63+
64+
def flush_buffers() -> None:
65+
nonlocal pending_records, manifest
66+
67+
if pending_records == 0:
68+
return
69+
70+
new_manifest_rows: List[Dict] = []
71+
72+
for (event_name, partition_dir), entries in list(buffers.items()):
73+
if not entries:
74+
continue
75+
76+
partition_dir.mkdir(parents=True, exist_ok=True)
77+
records = [entry["record"] for entry in entries]
78+
df = _normalize_records(records)
79+
if df.empty:
80+
continue
81+
82+
part_path = partition_dir / f"part-{uuid4().hex}.parquet"
83+
df.to_parquet(part_path, index=False)
84+
relative = str(part_path.relative_to(dataset_root))
85+
86+
totals[event_name] += len(entries)
87+
for entry in entries:
88+
new_manifest_rows.append(
89+
{
90+
"event": event_name,
91+
"id": entry["record"]["id"],
92+
"date": entry["date"].isoformat(),
93+
"path": relative,
94+
}
95+
)
96+
97+
if new_manifest_rows:
98+
manifest = pd.concat(
99+
[manifest, pd.DataFrame(new_manifest_rows)], ignore_index=True
100+
)
101+
api._write_manifest(manifest_path, manifest)
102+
api._update_manifest_cache(manifest_cache, new_manifest_rows)
103+
104+
buffers.clear()
105+
pending_records = 0
106+
107+
try:
108+
for event_name in api.ISSUES:
109+
click.echo(f"Migrating '{event_name}' events…")
110+
111+
collection = db[event_name]
112+
cursor = collection.find({}, batch_size=batch_size)
113+
114+
for document in cursor:
115+
record = api._normalize_event(document)
116+
event_id = record.get("id")
117+
if not event_id:
118+
continue
119+
120+
cache = manifest_cache.setdefault(event_name, set())
121+
if event_id in cache:
122+
continue
123+
124+
event_date = api._event_date(record)
125+
if event_date is None:
126+
continue
127+
128+
partition_dir = api._partition_path(dataset_root, event_name, event_date)
129+
buffers[(event_name, partition_dir)].append(
130+
{"record": record, "date": event_date}
131+
)
132+
cache.add(event_id)
133+
pending_records += 1
134+
135+
if pending_records >= batch_size:
136+
flush_buffers()
137+
138+
finally:
139+
flush_buffers()
140+
client.close()
141+
142+
click.echo("Migration summary:")
143+
for event_name, count in totals.items():
144+
click.echo(f" {event_name}: {count} new event(s) written")
145+
146+
147+
if __name__ == "__main__":
148+
main()

src/api.py

Lines changed: 108 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,20 @@
2424

2525
import os
2626
import sys
27+
from pathlib import Path
2728
from time import sleep
28-
import requests
29-
from requests.utils import parse_header_links
29+
from typing import Dict, Iterable, Optional, Set
30+
3031
import datetime
31-
from pymongo import MongoClient
32+
import pandas as pd
33+
import requests
3234
from concurrent.futures import ThreadPoolExecutor, as_completed
35+
from pymongo import MongoClient
36+
from requests.utils import parse_header_links
3337

3438
DEFAULT_MAX_ERRORS = 5
39+
_MANIFEST_FILENAME = "_manifest.parquet"
40+
_MANIFEST_COLUMNS = ["event", "id", "date", "path"]
3541
ISSUES = {
3642
"success": "758615130",
3743
"started": "540334560",
@@ -41,6 +47,105 @@
4147
}
4248

4349

50+
def _normalize_event(event: Dict) -> Dict:
51+
"""Flatten a raw Sentry event for storage."""
52+
53+
normalized = dict(event)
54+
normalized.pop("_id", None)
55+
56+
tags = normalized.pop("tags", []) or []
57+
for tag in tags:
58+
key = tag.get("key")
59+
if not key:
60+
continue
61+
normalized[key.replace(".", "_")] = tag.get("value")
62+
63+
normalized.pop("environment", None)
64+
65+
if "id" in normalized and normalized["id"] is not None:
66+
normalized["id"] = str(normalized["id"])
67+
68+
return normalized
69+
70+
71+
def _event_date(event: Dict) -> Optional[datetime.date]:
72+
"""Return the calendar date to use for partitioning."""
73+
74+
for key in ("dateCreated", "date_received", "dateReceived", "date"):
75+
value = event.get(key)
76+
if value is None:
77+
continue
78+
79+
if isinstance(value, datetime.datetime):
80+
dt_value = value
81+
elif isinstance(value, datetime.date) and not isinstance(value, datetime.datetime):
82+
dt_value = datetime.datetime.combine(
83+
value, datetime.time.min, tzinfo=datetime.timezone.utc
84+
)
85+
elif isinstance(value, str):
86+
try:
87+
dt_value = datetime.datetime.fromisoformat(value.replace("Z", "+00:00"))
88+
except ValueError:
89+
continue
90+
else:
91+
continue
92+
93+
if dt_value.tzinfo is None:
94+
dt_value = dt_value.replace(tzinfo=datetime.timezone.utc)
95+
else:
96+
dt_value = dt_value.astimezone(datetime.timezone.utc)
97+
98+
return dt_value.date()
99+
100+
return None
101+
102+
103+
def _partition_path(dataset_root: Path, event_name: str, date: datetime.date) -> Path:
104+
"""Return the Parquet partition directory for *event_name* on *date*."""
105+
106+
return Path(dataset_root) / event_name / f"date={date:%Y-%m-%d}"
107+
108+
109+
def _manifest_path(dataset_root: Path) -> Path:
110+
return Path(dataset_root) / _MANIFEST_FILENAME
111+
112+
113+
def _load_manifest(manifest_path: Path) -> pd.DataFrame:
114+
if Path(manifest_path).exists():
115+
return pd.read_parquet(manifest_path)
116+
return pd.DataFrame(columns=_MANIFEST_COLUMNS)
117+
118+
119+
def _write_manifest(manifest_path: Path, manifest: pd.DataFrame) -> None:
120+
manifest_path = Path(manifest_path)
121+
manifest_path.parent.mkdir(parents=True, exist_ok=True)
122+
tmp_path = manifest_path.with_suffix(manifest_path.suffix + ".tmp")
123+
manifest.reset_index(drop=True).to_parquet(tmp_path, index=False)
124+
tmp_path.replace(manifest_path)
125+
126+
127+
def _load_manifest_cache(manifest: pd.DataFrame) -> Dict[str, Set[str]]:
128+
cache: Dict[str, Set[str]] = {name: set() for name in ISSUES}
129+
130+
if manifest is None or manifest.empty:
131+
return cache
132+
133+
grouped = manifest.groupby("event")
134+
for event, group in grouped:
135+
cache[event] = set(group["id"].astype(str))
136+
137+
return cache
138+
139+
140+
def _update_manifest_cache(cache: Dict[str, Set[str]], rows: Iterable[Dict]) -> None:
141+
for row in rows:
142+
event = row.get("event")
143+
identifier = row.get("id")
144+
if not event or identifier is None:
145+
continue
146+
cache.setdefault(event, set()).add(str(identifier))
147+
148+
44149
def filter_new(events, collection):
45150
"""Return the subset of *events* not already cached in *collection*."""
46151

0 commit comments

Comments
 (0)