Skip to content

Commit 1c02580

Browse files
committed
Aggregate Parquet partitions by period
1 parent a0c4f0e commit 1c02580

File tree

2 files changed

+139
-21
lines changed

2 files changed

+139
-21
lines changed

README.md

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -100,9 +100,15 @@ python scripts/migrate_mongo_to_parquet.py \
100100
/path/to/dataset
101101
```
102102

103-
The script buffers up to 1,000 events (one Parquet file) at a time by default.
104-
Reduce `--batch-size` if you are memory-constrained or increase it on beefier
105-
machines to reduce the number of tiny Parquet files. Re-running the migration
106-
is safe: the manifest tracks event IDs and stops duplicates—we tested a
103+
Events are grouped by calendar day (the default) so that each
104+
`date=YYYY-MM-DD` directory contains a single part file named
105+
`part-YYYY-MM-DD_<hash>.parquet`. Pass `--partition-frequency week` if you
106+
prefer larger weekly files such as `week=2024-03-04/part-2024-W10_<hash>.parquet`.
107+
108+
The script buffers up to 1,000 events before spilling to a temporary directory
109+
inside the dataset root. At the end of the run it rewrites each affected Parquet
110+
file once, so choose a smaller `--batch-size` if you are memory-constrained or a
111+
larger size when working with SSD-backed storage. Re-running the migration is
112+
safe: the manifest tracks event IDs and stops duplicates—we tested a
107113
double-run and the second invocation reported no new rows.
108114

scripts/migrate_mongo_to_parquet.py

Lines changed: 129 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,13 @@
33

44
from __future__ import annotations
55

6+
import datetime
7+
import hashlib
8+
import shutil
69
from collections import defaultdict
10+
from dataclasses import dataclass
711
from pathlib import Path
8-
from typing import Dict, Iterable, List, Tuple
12+
from typing import DefaultDict, Dict, Iterable, List
913

1014
import click
1115
import pandas as pd
@@ -15,6 +19,25 @@
1519
from src import api
1620

1721
DEFAULT_BATCH_SIZE = 1000
22+
TMP_SUBDIR = ".mongo_migrate_tmp"
23+
24+
25+
@dataclass(frozen=True)
26+
class PartitionTarget:
27+
"""Describe the output Parquet file for a batch of events."""
28+
29+
event: str
30+
partition_dir: Path
31+
label: str
32+
digest: str
33+
34+
@property
35+
def filename(self) -> str:
36+
return f"part-{self.label}_{self.digest}.parquet"
37+
38+
@property
39+
def path(self) -> Path:
40+
return self.partition_dir / self.filename
1841

1942

2043
def _normalize_records(records: Iterable[Dict]) -> pd.DataFrame:
@@ -23,6 +46,29 @@ def _normalize_records(records: Iterable[Dict]) -> pd.DataFrame:
2346
return pd.json_normalize(list(records), sep=".")
2447

2548

49+
def _partition_target(
50+
dataset_root: Path,
51+
event_name: str,
52+
event_date: datetime.date,
53+
frequency: str,
54+
) -> PartitionTarget:
55+
"""Return the output location for *event_name* at *event_date*."""
56+
57+
if frequency == "week":
58+
iso = event_date.isocalendar()
59+
week_start = event_date - datetime.timedelta(days=event_date.weekday())
60+
label = f"{iso.year}-W{iso.week:02d}"
61+
partition_dir = Path(dataset_root) / event_name / f"week={week_start:%Y-%m-%d}"
62+
else:
63+
label = f"{event_date:%Y-%m-%d}"
64+
partition_dir = api._partition_path(dataset_root, event_name, event_date)
65+
66+
digest_input = f"{event_name}|{label}".encode("utf-8")
67+
digest = hashlib.sha1(digest_input).hexdigest()[:8]
68+
69+
return PartitionTarget(event_name, partition_dir, label, digest)
70+
71+
2672
@click.command()
2773
@click.option(
2874
"--mongo-uri",
@@ -43,12 +89,29 @@ def _normalize_records(records: Iterable[Dict]) -> pd.DataFrame:
4389
show_default=True,
4490
help="Number of events to buffer before writing Parquet partitions.",
4591
)
92+
@click.option(
93+
"--partition-frequency",
94+
type=click.Choice(["day", "week"], case_sensitive=False),
95+
default="day",
96+
show_default=True,
97+
help=(
98+
"Granularity of the Parquet part files. Use 'week' to aggregate larger files "
99+
"per ISO week."
100+
),
101+
)
46102
@click.argument("dataset_root", type=click.Path(path_type=Path))
47-
def main(mongo_uri: str, db_name: str, batch_size: int, dataset_root: Path) -> None:
103+
def main(
104+
mongo_uri: str,
105+
db_name: str,
106+
batch_size: int,
107+
partition_frequency: str,
108+
dataset_root: Path,
109+
) -> None:
48110
"""Stream MongoDB events into a partitioned Parquet dataset."""
49111

50112
dataset_root = dataset_root.resolve()
51113
dataset_root.mkdir(parents=True, exist_ok=True)
114+
partition_frequency = partition_frequency.lower()
52115

53116
manifest_path = api._manifest_path(dataset_root)
54117
manifest = api._load_manifest(manifest_path)
@@ -57,52 +120,97 @@ def main(mongo_uri: str, db_name: str, batch_size: int, dataset_root: Path) -> N
57120
client = MongoClient(mongo_uri)
58121
db = client[db_name]
59122

60-
buffers: Dict[Tuple[str, Path], List[Dict]] = defaultdict(list)
123+
buffers: DefaultDict[PartitionTarget, List[Dict]] = defaultdict(list)
124+
manifest_rows: DefaultDict[PartitionTarget, List[Dict]] = defaultdict(list)
125+
partial_files: DefaultDict[PartitionTarget, List[Path]] = defaultdict(list)
61126
pending_records = 0
62127
totals = {event: 0 for event in api.ISSUES}
63128

129+
tmp_root = dataset_root / TMP_SUBDIR
130+
64131
def flush_buffers() -> None:
65132
nonlocal pending_records, manifest
66133

67134
if pending_records == 0:
68135
return
69136

70-
new_manifest_rows: List[Dict] = []
71-
72-
for (event_name, partition_dir), entries in list(buffers.items()):
137+
for target, entries in list(buffers.items()):
73138
if not entries:
74139
continue
75140

76-
partition_dir.mkdir(parents=True, exist_ok=True)
77141
records = [entry["record"] for entry in entries]
78142
df = _normalize_records(records)
79143
if df.empty:
80144
continue
81145

82-
part_path = partition_dir / f"part-{uuid4().hex}.parquet"
83-
df.to_parquet(part_path, index=False)
84-
relative = str(part_path.relative_to(dataset_root))
146+
tmp_root.mkdir(parents=True, exist_ok=True)
147+
tmp_dir = tmp_root / target.event
148+
tmp_dir.mkdir(parents=True, exist_ok=True)
149+
tmp_path = tmp_dir / f"{target.label}-{uuid4().hex}-{len(partial_files[target])}.parquet"
150+
df.to_parquet(tmp_path, index=False)
151+
partial_files[target].append(tmp_path)
85152

86-
totals[event_name] += len(entries)
153+
relative = str(target.path.relative_to(dataset_root))
87154
for entry in entries:
88-
new_manifest_rows.append(
155+
manifest_rows[target].append(
89156
{
90-
"event": event_name,
157+
"event": target.event,
91158
"id": entry["record"]["id"],
92159
"date": entry["date"].isoformat(),
93160
"path": relative,
94161
}
95162
)
96163

164+
buffers.clear()
165+
pending_records = 0
166+
167+
def finalize_partitions() -> None:
168+
nonlocal manifest
169+
170+
if not partial_files:
171+
return
172+
173+
new_manifest_rows: List[Dict] = []
174+
175+
for target, temp_paths in list(partial_files.items()):
176+
if not temp_paths:
177+
continue
178+
179+
frames: List[pd.DataFrame] = []
180+
final_path = target.path
181+
if final_path.exists():
182+
frames.append(pd.read_parquet(final_path))
183+
184+
for tmp_path in temp_paths:
185+
frames.append(pd.read_parquet(tmp_path))
186+
187+
if not frames:
188+
continue
189+
190+
combined = pd.concat(frames, ignore_index=True)
191+
final_path.parent.mkdir(parents=True, exist_ok=True)
192+
tmp_output = final_path.with_suffix(final_path.suffix + ".tmp")
193+
combined.to_parquet(tmp_output, index=False)
194+
tmp_output.replace(final_path)
195+
196+
for tmp_path in temp_paths:
197+
if tmp_path.exists():
198+
tmp_path.unlink()
199+
200+
new_manifest_rows.extend(manifest_rows.get(target, []))
201+
97202
if new_manifest_rows:
98203
manifest = pd.concat(
99204
[manifest, pd.DataFrame(new_manifest_rows)], ignore_index=True
100205
)
101206
api._write_manifest(manifest_path, manifest)
102207
api._update_manifest_cache(manifest_cache, new_manifest_rows)
103208

104-
buffers.clear()
105-
pending_records = 0
209+
partial_files.clear()
210+
manifest_rows.clear()
211+
212+
if tmp_root.exists():
213+
shutil.rmtree(tmp_root)
106214

107215
try:
108216
for event_name in api.ISSUES:
@@ -125,18 +233,22 @@ def flush_buffers() -> None:
125233
if event_date is None:
126234
continue
127235

128-
partition_dir = api._partition_path(dataset_root, event_name, event_date)
129-
buffers[(event_name, partition_dir)].append(
236+
target = _partition_target(
237+
dataset_root, event_name, event_date, partition_frequency
238+
)
239+
buffers[target].append(
130240
{"record": record, "date": event_date}
131241
)
132242
cache.add(event_id)
133243
pending_records += 1
244+
totals[event_name] += 1
134245

135246
if pending_records >= batch_size:
136247
flush_buffers()
137248

138249
finally:
139250
flush_buffers()
251+
finalize_partitions()
140252
client.close()
141253

142254
click.echo("Migration summary:")

0 commit comments

Comments
 (0)