Proposal for more scalable seq-pack data format#2019
Proposal for more scalable seq-pack data format#2019marcromeyn wants to merge 13 commits intomainfrom
Conversation
Signed-off-by: Marc Romeyn <marcromeyn@gmail.com>
Signed-off-by: Marc Romeyn <marcromeyn@gmail.com>
Signed-off-by: Marc Romeyn <marcromeyn@gmail.com>
Signed-off-by: Marc Romeyn <marcromeyn@gmail.com>
Signed-off-by: Marc Romeyn <marcromeyn@gmail.com>
|
|
||
| if path.suffix == ".npy": | ||
| # Check for .npy packed dataset (legacy format) | ||
| if path_str.lower().endswith(".npy"): |
There was a problem hiding this comment.
Let's add a deprecation msg here for npy. instruct people to re-run data generation
|
/ok to test |
@marcromeyn, there was an error processing your request: See the following link for more information: https://docs.gha-runners.nvidia.com/cpr/e/1/ |
|
/ok to test e6e40ec |
Signed-off-by: Marc Romeyn <marcromeyn@gmail.com>
…/Megatron-Bridge into romeyn/parquet-sequence-pack
|
/ok to test 3a8d112 |
Signed-off-by: Marc Romeyn <marcromeyn@gmail.com>
📝 WalkthroughWalkthroughThe changes add comprehensive support for packed Parquet datasets in the supervised fine-tuning module. This includes introducing pyarrow as an optional dependency, implementing a new GPTSFTPackedParquetDataset class with path resolution and validation utilities, updating the dataset builder to handle parquet formats alongside legacy .npy paths, and deprecating .npy packed sequence handling in favor of parquet format. Changes
Sequence Diagram(s)sequenceDiagram
actor Client
participant Factory as SFT Factory<br/>(create_sft_dataset)
participant PathDetector as Path Detection<br/>(is_packed_parquet_spec)
participant Dataset as GPTSFTPackedParquetDataset
participant Resolver as Path Resolver<br/>(_resolve_parquet_paths)
participant FileSystem as File System
participant PyArrow as PyArrow Reader
Client->>Factory: create_sft_dataset(path, ...)
Factory->>PathDetector: is_packed_parquet_spec(path)?
PathDetector->>FileSystem: glob/stat path
FileSystem-->>PathDetector: matches .parquet/.pq
PathDetector-->>Factory: true
Factory->>Dataset: __init__(file_path, tokenizer, ...)
Dataset->>Resolver: resolve_packed_parquet_paths(spec)
Resolver->>FileSystem: glob/list files
FileSystem-->>Resolver: sorted file list
Resolver-->>Dataset: [file1.parquet, file2.parquet, ...]
Dataset->>Dataset: _load_dataset()
Dataset->>PyArrow: lazy initialize readers
Dataset->>Dataset: build cumulative row/rowgroup offsets
Dataset-->>Factory: initialized dataset
Client->>Dataset: __getitem__(idx)
Dataset->>Dataset: _locate_row(idx) via offsets
Dataset->>PyArrow: _ensure_reader(file_idx)
PyArrow-->>Dataset: reader handle
Dataset->>PyArrow: read row_group
PyArrow-->>Dataset: {input_ids, loss_mask, seq_start_id}
Dataset->>Dataset: compute seq_boundaries
Dataset-->>Client: {input_ids, seq_boundaries, loss_mask}
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Suggested labels
Suggested reviewers
🚥 Pre-merge checks | ✅ 2 | ❌ 2❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Tip Try Coding Plans. Let us write the prompt for your AI agent so you can ship faster (with fewer bugs). Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 4
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/megatron/bridge/data/datasets/packed_sequence.py (1)
77-98:⚠️ Potential issue | 🟠 MajorHandle
pad_seq_to_mult=Nonebefore comparing it.This function explicitly supports
Noneand even normalizes it on Line 81, but Line 98 still doespad_seq_to_mult > 1unconditionally. A caller usingNonewill get aTypeErrorduring packing instead of disabling extra pre-padding.🔧 Minimal fix
- if pad_seq_to_mult > 1: + if pad_seq_to_mult is not None and pad_seq_to_mult > 1:🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/megatron/bridge/data/datasets/packed_sequence.py` around lines 77 - 98, The code currently compares pad_seq_to_mult > 1 unguarded which raises TypeError when pad_seq_to_mult is None; fix by using the normalized value or an explicit None check—replace the unconditional check with either "if pad_seq_to_mult is not None and pad_seq_to_mult > 1" or "if pad_seq_length_to_mult > 1" (since pad_seq_length_to_mult is set to 1 when pad_seq_to_mult is None) in packed_sequence.py so extra pre-padding is only applied when a valid multiplier was provided.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/megatron/bridge/data/builders/finetuning_dataset.py`:
- Around line 142-150: The current check in the builder where
packed_path_str.lower().endswith(".npy") only emits a DeprecationWarning and
returns, which lets the build proceed and later fail with a None-packed path;
change this to fail fast by raising an explicit error (e.g., ValueError or
RuntimeError) in the same branch inside the method handling packed paths (the
block referencing packed_path_str and .endswith(".npy")), replacing the
warnings.warn + return with a raised exception that includes the deprecation
message and actionable guidance to use packed parquet format so callers
immediately see the unsupported input.
In `@src/megatron/bridge/data/datasets/packed_sequence.py`:
- Around line 272-320: _validate_packed_path currently requires packed parquet
specs to resolve to existing files which prevents using explicit output targets;
change the logic inside the is_packed_parquet_spec branch to skip resolution
when the spec is a literal single-file target (no glob characters like '*?[]'
and not a directory spec), i.e. treat plain file paths (e.g.
"custom/train.idx.parquet") as valid write targets and simply setattr(self,
attr_name, path_str) without calling resolve_packed_parquet_paths; continue to
call resolve_packed_parquet_paths and raise FileNotFoundError for patterns/globs
or directory-like specs, referencing _validate_packed_path,
is_packed_parquet_spec, and resolve_packed_parquet_paths to locate where to
change the behavior.
In `@src/megatron/bridge/data/datasets/sft.py`:
- Around line 210-216: The packed-Parquet branch instantiates
GPTSFTPackedParquetDataset without forwarding the pad_seq_to_mult argument,
causing _pad_seq_to_mult to remain 1 in GPTSFTPackedDataset.collate_fn; modify
the GPTSFTPackedParquetDataset constructor call to pass
pad_seq_to_mult=pad_seq_to_mult (same name used by the .npy branch) so the
collate_fn receives the intended padding multiplier and emits
cu_seqlens_unpadded correctly.
In `@tests/unit_tests/data/datasets/test_chat_template.py`:
- Around line 452-535: Update the tests to patch the actual lazy import location
and fix the expectation for regular .parquet routing: change patch targets from
"megatron.bridge.data.datasets.sft.GPTSFTPackedParquetDataset" to
"megatron.bridge.data.datasets.packed_parquet.GPTSFTPackedParquetDataset" for
tests that expect packed routing, and in
test_regular_parquet_not_routed_to_packed either assert
GPTSFTPackedParquetDataset is called (since is_packed_parquet_spec() treats
*.parquet as packed) or change the test path to a non-parquet extension (e.g.,
.json) if you want to assert routing to GPTSFTChatDataset; reference
create_sft_dataset, GPTSFTPackedParquetDataset, GPTSFTChatDataset, and
is_packed_parquet_spec when making the edits.
---
Outside diff comments:
In `@src/megatron/bridge/data/datasets/packed_sequence.py`:
- Around line 77-98: The code currently compares pad_seq_to_mult > 1 unguarded
which raises TypeError when pad_seq_to_mult is None; fix by using the normalized
value or an explicit None check—replace the unconditional check with either "if
pad_seq_to_mult is not None and pad_seq_to_mult > 1" or "if
pad_seq_length_to_mult > 1" (since pad_seq_length_to_mult is set to 1 when
pad_seq_to_mult is None) in packed_sequence.py so extra pre-padding is only
applied when a valid multiplier was provided.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 70b3ae17-3591-4d4c-9dfe-cecade391318
📒 Files selected for processing (7)
pyproject.tomlsrc/megatron/bridge/data/builders/finetuning_dataset.pysrc/megatron/bridge/data/datasets/packed_parquet.pysrc/megatron/bridge/data/datasets/packed_sequence.pysrc/megatron/bridge/data/datasets/sft.pytests/unit_tests/data/datasets/test_chat_template.pytests/unit_tests/data/datasets/test_packed_parquet.py
| packed_path_str = str(packed_path) | ||
| if packed_path_str.lower().endswith(".npy"): | ||
| warnings.warn( | ||
| "Automatic .npy packed sequence preparation is deprecated and will be removed in the next release. " | ||
| "Please use packed parquet format instead.", | ||
| DeprecationWarning, | ||
| stacklevel=3, | ||
| ) | ||
| return |
There was a problem hiding this comment.
Fail fast when automatic .npy preparation is no longer supported.
This branch only warns and returns, so the build can continue until _create_dataset() later hands back None for the missing packed path. Raising a clear error here would make the deprecation actionable.
🔧 Suggested change
if packed_path_str.lower().endswith(".npy"):
- warnings.warn(
- "Automatic .npy packed sequence preparation is deprecated and will be removed in the next release. "
- "Please use packed parquet format instead.",
- DeprecationWarning,
- stacklevel=3,
- )
- return
+ raise NotImplementedError(
+ "Automatic .npy packed sequence preparation is deprecated and no longer supported. "
+ "Please switch the packed output path to `.parquet` or `.pq`."
+ )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/megatron/bridge/data/builders/finetuning_dataset.py` around lines 142 -
150, The current check in the builder where
packed_path_str.lower().endswith(".npy") only emits a DeprecationWarning and
returns, which lets the build proceed and later fail with a None-packed path;
change this to fail fast by raising an explicit error (e.g., ValueError or
RuntimeError) in the same branch inside the method handling packed paths (the
block referencing packed_path_str and .endswith(".npy")), replacing the
warnings.warn + return with a raised exception that includes the deprecation
message and actionable guidance to use packed parquet format so callers
immediately see the unsupported input.
| def _validate_packed_path(self, attr_name: str, path_value: str) -> None: | ||
| """Validate a packed data path and store it appropriately. | ||
|
|
||
| For .npy files: strict validation with Path.exists() | ||
| For packed parquet specs: validate via resolution (supports dirs/globs) | ||
|
|
||
| Args: | ||
| attr_name: The attribute name being validated (for error messages) | ||
| path_value: The path value to validate | ||
|
|
||
| Raises: | ||
| FileNotFoundError: If the path does not exist or resolves to no files | ||
| ValueError: If the path format is invalid | ||
| """ | ||
| path_str = str(path_value) | ||
|
|
||
| # Check if it's an .npy file (legacy format) | ||
| if path_str.lower().endswith(".npy"): | ||
| warnings.warn( | ||
| f"The .npy packed sequence format is deprecated and will be removed in the next release. " | ||
| f"Please use packed parquet format instead. Path: {path_str}", | ||
| DeprecationWarning, | ||
| stacklevel=2, | ||
| ) | ||
| if MultiStorageClientFeature.is_enabled(): | ||
| msc = MultiStorageClientFeature.import_package() | ||
| self.packed_val_data_path = msc.Path(self.packed_val_data_path) | ||
| path_obj = msc.Path(path_str) | ||
| else: | ||
| self.packed_val_data_path = Path(self.packed_val_data_path) | ||
| assert self.packed_val_data_path.suffix == ".npy", ( | ||
| f"packed validation data file must be a .npy file: {self.packed_val_data_path}" | ||
| ) | ||
| assert self.packed_val_data_path.exists(), ( | ||
| f"packed validation data file does not exist: {self.packed_val_data_path}" | ||
| ) | ||
| path_obj = Path(path_str) | ||
|
|
||
| if self.pad_seq_to_mult is not None and self.pad_seq_to_mult <= 0: | ||
| raise ValueError("pad_seq_to_mult must be a positive integer when provided.") | ||
| if not path_obj.exists(): | ||
| raise FileNotFoundError(f"{attr_name} file does not exist: {path_str}") | ||
| setattr(self, attr_name, path_obj) | ||
| return | ||
|
|
||
| # Check if it's a packed parquet spec (file/dir/glob) | ||
| if is_packed_parquet_spec(path_str): | ||
| # Validate by resolving - this checks that files actually exist | ||
| try: | ||
| resolved_paths = resolve_packed_parquet_paths(path_str) | ||
| if len(resolved_paths) == 0: | ||
| raise FileNotFoundError(f"{attr_name} resolved to no files: {path_str}") | ||
| except ValueError as e: | ||
| raise FileNotFoundError(f"{attr_name} could not be resolved: {path_str}. Error: {e}") from e | ||
|
|
||
| # Store the original string spec (not Path) to preserve globs | ||
| # The dataset loader will handle resolution | ||
| setattr(self, attr_name, path_str) | ||
| return |
There was a problem hiding this comment.
Don't require custom packed output files to pre-exist.
FinetuningDatasetBuilder._prepare_packed_split() uses packed_train_data_path and packed_val_data_path as write targets when packed data is missing. Requiring something like custom/train.idx.parquet to resolve here makes explicit output paths unusable unless the file is already there.
🧰 Tools
🪛 Ruff (0.15.4)
[warning] 303-303: Avoid specifying long messages outside the exception class
(TRY003)
[warning] 313-313: Avoid specifying long messages outside the exception class
(TRY003)
[warning] 315-315: Avoid specifying long messages outside the exception class
(TRY003)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/megatron/bridge/data/datasets/packed_sequence.py` around lines 272 - 320,
_validate_packed_path currently requires packed parquet specs to resolve to
existing files which prevents using explicit output targets; change the logic
inside the is_packed_parquet_spec branch to skip resolution when the spec is a
literal single-file target (no glob characters like '*?[]' and not a directory
spec), i.e. treat plain file paths (e.g. "custom/train.idx.parquet") as valid
write targets and simply setattr(self, attr_name, path_str) without calling
resolve_packed_parquet_paths; continue to call resolve_packed_parquet_paths and
raise FileNotFoundError for patterns/globs or directory-like specs, referencing
_validate_packed_path, is_packed_parquet_spec, and resolve_packed_parquet_paths
to locate where to change the behavior.
| if is_packed_parquet_spec(path_str): | ||
| return GPTSFTPackedParquetDataset( | ||
| pack_metadata_file_path=pack_metadata_file_path, | ||
| pad_cu_seqlens=pad_cu_seqlens, | ||
| **gpt_sft_dataset_kwargs, | ||
| **kwargs, | ||
| ) |
There was a problem hiding this comment.
Forward pad_seq_to_mult in the packed-Parquet branch.
The .npy path passes this through, but the Parquet path drops it. That leaves _pad_seq_to_mult at 1 inside GPTSFTPackedDataset.collate_fn(), so packed Parquet runs never emit cu_seqlens_unpadded for CP/THD-prepared data.
🔧 Minimal fix
if is_packed_parquet_spec(path_str):
return GPTSFTPackedParquetDataset(
pack_metadata_file_path=pack_metadata_file_path,
pad_cu_seqlens=pad_cu_seqlens,
+ pad_seq_to_mult=pad_seq_to_mult,
**gpt_sft_dataset_kwargs,
**kwargs,
)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/megatron/bridge/data/datasets/sft.py` around lines 210 - 216, The
packed-Parquet branch instantiates GPTSFTPackedParquetDataset without forwarding
the pad_seq_to_mult argument, causing _pad_seq_to_mult to remain 1 in
GPTSFTPackedDataset.collate_fn; modify the GPTSFTPackedParquetDataset
constructor call to pass pad_seq_to_mult=pad_seq_to_mult (same name used by the
.npy branch) so the collate_fn receives the intended padding multiplier and
emits cu_seqlens_unpadded correctly.
| @patch("megatron.bridge.data.datasets.sft.GPTSFTPackedParquetDataset") | ||
| def test_create_packed_parquet_dataset_idx_parquet(self, mock_parquet_class): | ||
| """Test that .idx.parquet files create GPTSFTPackedParquetDataset.""" | ||
| from pathlib import Path | ||
|
|
||
| mock_tokenizer = MagicMock() | ||
| mock_parquet_class.return_value = MagicMock() | ||
|
|
||
| create_sft_dataset( | ||
| path=Path("test.idx.parquet"), | ||
| tokenizer=mock_tokenizer, | ||
| ) | ||
|
|
||
| # Verify GPTSFTPackedParquetDataset was called | ||
| mock_parquet_class.assert_called_once() | ||
|
|
||
| @patch("megatron.bridge.data.datasets.sft.GPTSFTPackedParquetDataset") | ||
| def test_create_packed_parquet_dataset_idx_pq(self, mock_parquet_class): | ||
| """Test that .idx.pq files create GPTSFTPackedParquetDataset.""" | ||
| from pathlib import Path | ||
|
|
||
| mock_tokenizer = MagicMock() | ||
| mock_parquet_class.return_value = MagicMock() | ||
|
|
||
| create_sft_dataset( | ||
| path=Path("test.idx.pq"), | ||
| tokenizer=mock_tokenizer, | ||
| ) | ||
|
|
||
| # Verify GPTSFTPackedParquetDataset was called | ||
| mock_parquet_class.assert_called_once() | ||
|
|
||
| @patch("megatron.bridge.data.datasets.sft.GPTSFTPackedParquetDataset") | ||
| def test_create_packed_parquet_dataset_priority_over_chat(self, mock_parquet_class): | ||
| """Test that packed Parquet files take precedence over chat=True.""" | ||
| from pathlib import Path | ||
|
|
||
| mock_tokenizer = MagicMock() | ||
| mock_parquet_class.return_value = MagicMock() | ||
|
|
||
| create_sft_dataset( | ||
| path=Path("test.idx.parquet"), | ||
| tokenizer=mock_tokenizer, | ||
| chat=True, # Should be ignored for packed Parquet files | ||
| use_hf_tokenizer_chat_template=True, | ||
| ) | ||
|
|
||
| # Verify GPTSFTPackedParquetDataset was called (not GPTSFTChatDataset) | ||
| mock_parquet_class.assert_called_once() | ||
|
|
||
| @patch("megatron.bridge.data.datasets.sft.GPTSFTChatDataset") | ||
| def test_regular_parquet_not_routed_to_packed(self, mock_chat_class): | ||
| """Test that regular .parquet files (without .idx.) are NOT routed to packed dataset.""" | ||
| from pathlib import Path | ||
|
|
||
| mock_tokenizer = MagicMock() | ||
| mock_chat_class.return_value = MagicMock() | ||
|
|
||
| create_sft_dataset( | ||
| path=Path("test.parquet"), # No .idx. prefix | ||
| tokenizer=mock_tokenizer, | ||
| chat=True, | ||
| use_hf_tokenizer_chat_template=True, | ||
| ) | ||
|
|
||
| # Verify GPTSFTChatDataset was called (regular parquet goes to chat/default) | ||
| mock_chat_class.assert_called_once() | ||
|
|
||
| @patch("megatron.bridge.data.datasets.sft.GPTSFTPackedParquetDataset") | ||
| def test_create_packed_parquet_glob_pattern(self, mock_parquet_class): | ||
| """Test that glob patterns like data*.idx.parquet route to GPTSFTPackedParquetDataset.""" | ||
| from pathlib import Path | ||
|
|
||
| mock_tokenizer = MagicMock() | ||
| mock_parquet_class.return_value = MagicMock() | ||
|
|
||
| create_sft_dataset( | ||
| path=Path("data/shard_*.idx.parquet"), # Glob pattern | ||
| tokenizer=mock_tokenizer, | ||
| ) | ||
|
|
||
| # Verify GPTSFTPackedParquetDataset was called | ||
| mock_parquet_class.assert_called_once() | ||
|
|
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Check where GPTSFTPackedParquetDataset is imported/used in sft.py
echo "=== GPTSFTPackedParquetDataset usage in sft.py ==="
rg -n -C3 'GPTSFTPackedParquetDataset' \
src/megatron/bridge/data/datasets/sft.py
echo -e "\n=== is_packed_parquet_spec and suffix handling in packed_parquet.py ==="
rg -n -C3 'def is_packed_parquet_spec|endswith.*parquet|endswith.*\.pq' \
src/megatron/bridge/data/datasets/packed_parquet.py
echo -e "\n=== Test patches in test_chat_template.py (lines 452-535) ==="
sed -n '452,535p' tests/unit_tests/data/datasets/test_chat_template.py | grep -n '@patch\|def test_'Repository: NVIDIA-NeMo/Megatron-Bridge
Length of output: 3450
Fix patch targets and correct the .parquet routing expectation.
The patch targets must reference the actual import location. Since GPTSFTPackedParquetDataset is imported lazily inside create_sft_dataset() from megatron.bridge.data.datasets.packed_parquet, patch megatron.bridge.data.datasets.packed_parquet.GPTSFTPackedParquetDataset instead of sft.GPTSFTPackedParquetDataset.
Additionally, test_regular_parquet_not_routed_to_packed has an incorrect expectation. The is_packed_parquet_spec() function in packed_parquet.py returns True for any path ending with .parquet or .pq (line 87), so a regular .parquet file will be routed to GPTSFTPackedParquetDataset, not GPTSFTChatDataset. Either adjust the test to expect packed routing, or change the path to a non-parquet extension if testing non-packed behavior is intended.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@tests/unit_tests/data/datasets/test_chat_template.py` around lines 452 - 535,
Update the tests to patch the actual lazy import location and fix the
expectation for regular .parquet routing: change patch targets from
"megatron.bridge.data.datasets.sft.GPTSFTPackedParquetDataset" to
"megatron.bridge.data.datasets.packed_parquet.GPTSFTPackedParquetDataset" for
tests that expect packed routing, and in
test_regular_parquet_not_routed_to_packed either assert
GPTSFTPackedParquetDataset is called (since is_packed_parquet_spec() treats
*.parquet as packed) or change the test path to a non-parquet extension (e.g.,
.json) if you want to assert routing to GPTSFTChatDataset; reference
create_sft_dataset, GPTSFTPackedParquetDataset, GPTSFTChatDataset, and
is_packed_parquet_spec when making the edits.
|
@marcromeyn can you resolve code rabbit if not applicable? I do see some of these are reasonable comments. |
|
@marcromeyn ^^ |
What does this PR do ?
Add a one line overview of what this PR aims to accomplish.
Changelog
GitHub Actions CI
See the CI sectionin the Contributing doc for how to trigger the CI. A Nvidia developer will need to approve and trigger the CI for external contributors.
Before your PR is "Ready for review"
Pre checks:
If you haven't finished some of the above items you can still open "Draft" PR.
Additional Information
Summary by CodeRabbit
Release Notes
New Features
Deprecations
Dependencies
pyarrow>=14.0.0dependency for Parquet support.Tests