Improving packed sequences SFT for large datasets#2395
Improving packed sequences SFT for large datasets#2395cuichenx merged 6 commits intoNVIDIA-NeMo:mainfrom
Conversation
Adds support for creating and using packed sequence datasets in `mmap` format for faster loading and reduced memory usage. This includes changes to: - Allow specifying `packed_sequence_format` as `mmap` in `PackedSequenceSpecs` - Modify the data preparation to save `mmap` compatible data files with `.idx.npy` and `.bin` extensions - Update dataset loading logic to use `numpy.memmap` for zero-copy access. - Adds number of tokenizer workers to `prepare_packed_sequence_data` to speed up tokenization.
|
Hi @shaltielshmid thanks for your contribution. We decided to go with parquet format rather than numpy memmap due to the flexibility and file size advantages. Let me know if you have any questions! |
|
Hi @cuichenx Thanks for referring me - that PR looks great ! Two follow ups: 1] I added a few improvements to the packing process itself - the parallelization, dynamic sum calculation for first_fit, and the more verbose tqdm output. I think those should still be put in - what do you think? Should I create a separate PR just with those? These few changes sped up the process from >10 hours => 30 minutes (don't quote me on the numbers). 2] I browsed through the parquet PR - very impressive work! I understand why you chose that, but I would argue that it may not be ideal. Parquets are optimized for data scanning, but not so much for random row access. The PR has a pretty elegant implementation here, but that seems like unnecessary complexity. It means that every row access requires reading the whole row group into memory, and pulling it from there. I built the Of course, you are significantly more experienced in this than I am, so I trust that you know what's best - just offering up my two cents. Let me know how you'd like me to proceed here :) |
|
Sorry for the late reply. We did consider memmap as a strong candidate, but ultimately chose parquet because
For your first point, yes we can definitely merge those other changes. Feel free to create this PR or modify this one, whichever is easier for you. |
|
Point taken - I'm convinced. Will update this PR. |
|
Thanks @cuichenx for the comments - PR is ready for review. |
📝 WalkthroughWalkthroughThe changes introduce multiprocessing-based tokenization support to the dataset preparation pipeline and optimize the bin packing algorithm to use per-bin sums instead of tracking full bin contents, improving performance during dataset preparation and sequence packing operations. Changes
Sequence Diagram(s)sequenceDiagram
participant Client as Dataset Preparation
participant Tokenizer as Tokenization Layer
participant Pool as Worker Pool
participant Workers as Tokenizer Workers
Client->>Tokenizer: retrieve_tokenized(dataset, num_workers)
alt num_workers > 0
Tokenizer->>Pool: Create multiprocessing Pool(num_workers)
Tokenizer->>Workers: map_async(_tokenize_get_item, dataset items)
Note over Workers: Parallel tokenization
Workers-->>Pool: Return tokenized results
Pool-->>Tokenizer: Collect results
else Sequential (num_workers <= 0)
Tokenizer->>Tokenizer: Sequential tokenization loop
end
Tokenizer-->>Client: Return tokenized dataset
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes 🚥 Pre-merge checks | ✅ 2 | ❌ 2❌ Failed checks (2 warnings)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
src/megatron/bridge/data/datasets/packing_utils.py (1)
28-43:⚠️ Potential issue | 🟡 MinorUpdate docstring and fix indentation in function definition.
The docstring parameter description is outdated. The
bin_sumsparameter is now a list of sums (integers), not a list of lists. Additionally, thereturn istatement has inconsistent indentation (5 spaces instead of 4).Docstring and indentation fixes
def find_first_bin_that_fits(bin_sums: List[int], s: int, bin_size: int) -> int: """ Finds the first bin in a list of bins that has enough space to fit a sequence of size 's'. Args: - bins: A list of lists, where each inner list represents a bin and contains the current elements in that bin. + bin_sums: A list of integers, where each integer represents the current total size of items in that bin. s: The size of the sequence to be placed in a bin. bin_size: The maximum capacity of each bin. Returns: The index of the first bin that can fit the sequence 's', or -1 if no such bin exists. """ for i,cur_sum in enumerate(bin_sums): if cur_sum + s <= bin_size: - return i + return i return -1🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/megatron/bridge/data/datasets/packing_utils.py` around lines 28 - 43, Update the docstring for find_first_bin_that_fits to correctly describe bin_sums as a List[int] of current bin sums (not a list of lists) and adjust the parameter descriptions for s and bin_size accordingly; also fix the indentation of the return statement in the function body (ensure "return i" uses 4 spaces like the other lines) so the function is consistently indented and the docstring matches the signature (bin_sums, s, bin_size).src/megatron/bridge/data/datasets/packed_sequence.py (1)
142-185:⚠️ Potential issue | 🟠 MajorExtract
num_tokenizer_workersfromPackedSequenceSpecsand pass it toprepare_packed_sequence_datacalls.The
FinetuningDatasetBuilder.__init__extracts several fields frompacked_sequence_specs(e.g.,packed_sequence_size,pad_seq_to_mult,pad_cu_seqlens) but does not extractnum_tokenizer_workers. Both calls toprepare_packed_sequence_dataat lines 100-110 and 114-124 omit this parameter, causing any user configuration ofPackedSequenceSpecs.num_tokenizer_workersto be ignored.Add extraction in
__init__:self.num_tokenizer_workers = -1 if not packed_sequence_specs else packed_sequence_specs.num_tokenizer_workersThen pass
num_tokenizer_workers=self.num_tokenizer_workersto bothprepare_packed_sequence_datacalls.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/megatron/bridge/data/datasets/packed_sequence.py` around lines 142 - 185, FinetuningDatasetBuilder is not reading PackedSequenceSpecs.num_tokenizer_workers and so prepare_packed_sequence_data never receives the configured tokenizer worker count; in FinetuningDatasetBuilder.__init__ add self.num_tokenizer_workers = -1 if not packed_sequence_specs else packed_sequence_specs.num_tokenizer_workers, and update both calls to prepare_packed_sequence_data to include num_tokenizer_workers=self.num_tokenizer_workers so the value from PackedSequenceSpecs.num_tokenizer_workers is propagated to prepare_packed_sequence_data.
🧹 Nitpick comments (4)
src/megatron/bridge/data/datasets/packed_sequence.py (3)
243-247: Consider using1as the default instead of-1for backward compatibility.The docstring says
-1means "use all CPU cores," but setting this as the default changes behavior for existing users who expect sequential tokenization. A default of1would maintain backward compatibility, and users could explicitly opt into parallelism.Additionally, the docstring could clarify what happens when
num_tokenizer_workers=1(sequential mode).Suggested docstring improvement
num_tokenizer_workers: int = -1 """ The number of worker processes to use for tokenization when preparing the packed sequence dataset. - If -1, the number of workers will be set to the number of CPU cores available + If 1, tokenization runs sequentially (single process). + If -1, the number of workers will be set to the number of CPU cores available. + If greater than 1, uses the specified number of worker processes. """🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/megatron/bridge/data/datasets/packed_sequence.py` around lines 243 - 247, Change the default value of the num_tokenizer_workers parameter in packed_sequence dataset from -1 to 1 and update its docstring: set num_tokenizer_workers: int = 1, clarify that 1 means sequential tokenization, -1 means use all CPU cores and any value >1 sets that many worker processes (retain existing logic that maps -1 to os.cpu_count()), and ensure any code paths or tests that relied on the old default still behave by keeping the -1-to-cpu_count mapping in the tokenization setup (refer to the num_tokenizer_workers variable and the packed sequence dataset/tokenization setup functions to locate the changes).
33-46: Multiprocessing pattern looks reasonable, but consider adding a type annotation.The global
_shared_datasetpattern is a standard approach for sharing read-only data with worker processes. Consider adding type hints for clarity.One minor note:
pool.imappreserves order which is correct here. The implementation looks sound for the use case.Optional type annotation
-_shared_dataset = None +_shared_dataset: np.ndarray | None = None🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/megatron/bridge/data/datasets/packed_sequence.py` around lines 33 - 46, Add explicit type annotations for the shared global and the helper functions: annotate _shared_dataset (e.g., Optional[Any] or a specific dataset type) instead of untyped None, add parameter and return type hints on _tokenize_init_worker(dataset: DatasetType) -> None, _tokenize_get_item(i: int) -> TokenizedType, and _retrieve_tokenized(dataset: DatasetType, num_workers: int) -> np.ndarray; also import any needed typing names (Optional, Any, TypeVar) to make the types clear and help linters while keeping the existing multiprocessing logic intact.
55-55: Document the newnum_tokenizer_workersparameter in the docstring.The parameter is added but not documented in the function's docstring. The docstring should describe its behavior, especially the special values (-1 for auto-detect, 1 for sequential).
Proposed docstring addition
Args: path (Path): Path to the dataset file. tokenizer (MegatronTokenizer): The tokenizer to use for tokenization. max_seq_length (int): Maximum sequence length for the tokens. seed (int): Random seed for shuffling the dataset. dataset_kwargs (dict | None): Additional keyword arguments to pass to create_sft_dataset. Can include 'chat', 'use_hf_tokenizer_chat_template', 'tool_schemas', etc. pad_seq_to_mult (int | None): Optional multiple to pad each sequence to during packing preparation (e.g., set to 2 * context_parallel_size for THD CP). + num_tokenizer_workers (int): Number of worker processes for tokenization. + If 1, tokenization runs sequentially. If -1, uses all available CPU cores. + Default is -1.Also applies to: 68-73
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/megatron/bridge/data/datasets/packed_sequence.py` at line 55, The new parameter num_tokenizer_workers added to the function/constructor signature (the routine that defines packed sequences in packed_sequence.py) is missing from its docstring—update the docstring for that function/constructor to document num_tokenizer_workers, describing its purpose (controls number of workers used by the tokenizer), default value, and special semantics (-1 = auto-detect/choose based on CPU, 1 = run tokenization sequentially/no multiprocessing, >1 = number of worker processes); apply this addition to both docstring locations where the parameter appears (the two function/constructor docstrings around the added signature lines).src/megatron/bridge/data/datasets/packing_utils.py (1)
60-60: Consider adding a description to the tqdm progress bar.Adding a description helps users understand what's happening during long-running operations.
Proposed improvement
- for s in tqdm(seqlens): + for s in tqdm(seqlens, desc="Packing sequences"):🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/megatron/bridge/data/datasets/packing_utils.py` at line 60, The progress loop `for s in tqdm(seqlens):` lacks a descriptive label; update it to include a meaningful description (e.g., `for s in tqdm(seqlens, desc="Packing sequences")`) so users know what is being processed during long runs—modify the `tqdm` call in `packing_utils.py` (the `for s in tqdm(seqlens)` loop) to add a `desc` string (and optionally `unit` or `total` if available) to improve observability.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/megatron/bridge/data/datasets/packed_sequence.py`:
- Line 112: The condition `if pad_seq_to_mult > 1:` can raise TypeError when
pad_seq_to_mult is None; change the guard to explicitly handle None (e.g. check
`pad_seq_to_mult is not None and pad_seq_to_mult > 1`) or normalize
pad_seq_to_mult to an int before use; update the check wherever
`pad_seq_to_mult` is evaluated in packed_sequence.py (the variable
`pad_seq_to_mult` and any function/method that uses it) and ensure you also
validate/coerce its type so comparisons always occur against an int.
- Around line 18-21: Remove the unused typing imports Literal and List by
deleting them from the "from typing import ..." line, and reorder imports to
follow stdlib → third-party → first-party convention: keep "from multiprocessing
import Pool" and "import multiprocessing as mp" (stdlib) first, then "from
typing import ..." (stdlib) if any other typing names remain, and place "from
tqdm import tqdm" after them as a third-party import; ensure no other unused
imports remain and run linter to confirm.
In `@src/megatron/bridge/data/datasets/packing_utils.py`:
- Around line 40-42: The loop has inconsistent indentation and a missing space
after the comma: change the line to use PEP8-compliant spacing and 4-space
indentation so it reads with a space after the comma in the enumerate call (use
"for i, cur_sum in enumerate(bin_sums):") and ensure the subsequent return line
("return i") is indented with 4 spaces to match surrounding code in
packing_utils.py.
---
Outside diff comments:
In `@src/megatron/bridge/data/datasets/packed_sequence.py`:
- Around line 142-185: FinetuningDatasetBuilder is not reading
PackedSequenceSpecs.num_tokenizer_workers and so prepare_packed_sequence_data
never receives the configured tokenizer worker count; in
FinetuningDatasetBuilder.__init__ add self.num_tokenizer_workers = -1 if not
packed_sequence_specs else packed_sequence_specs.num_tokenizer_workers, and
update both calls to prepare_packed_sequence_data to include
num_tokenizer_workers=self.num_tokenizer_workers so the value from
PackedSequenceSpecs.num_tokenizer_workers is propagated to
prepare_packed_sequence_data.
In `@src/megatron/bridge/data/datasets/packing_utils.py`:
- Around line 28-43: Update the docstring for find_first_bin_that_fits to
correctly describe bin_sums as a List[int] of current bin sums (not a list of
lists) and adjust the parameter descriptions for s and bin_size accordingly;
also fix the indentation of the return statement in the function body (ensure
"return i" uses 4 spaces like the other lines) so the function is consistently
indented and the docstring matches the signature (bin_sums, s, bin_size).
---
Nitpick comments:
In `@src/megatron/bridge/data/datasets/packed_sequence.py`:
- Around line 243-247: Change the default value of the num_tokenizer_workers
parameter in packed_sequence dataset from -1 to 1 and update its docstring: set
num_tokenizer_workers: int = 1, clarify that 1 means sequential tokenization, -1
means use all CPU cores and any value >1 sets that many worker processes (retain
existing logic that maps -1 to os.cpu_count()), and ensure any code paths or
tests that relied on the old default still behave by keeping the -1-to-cpu_count
mapping in the tokenization setup (refer to the num_tokenizer_workers variable
and the packed sequence dataset/tokenization setup functions to locate the
changes).
- Around line 33-46: Add explicit type annotations for the shared global and the
helper functions: annotate _shared_dataset (e.g., Optional[Any] or a specific
dataset type) instead of untyped None, add parameter and return type hints on
_tokenize_init_worker(dataset: DatasetType) -> None, _tokenize_get_item(i: int)
-> TokenizedType, and _retrieve_tokenized(dataset: DatasetType, num_workers:
int) -> np.ndarray; also import any needed typing names (Optional, Any, TypeVar)
to make the types clear and help linters while keeping the existing
multiprocessing logic intact.
- Line 55: The new parameter num_tokenizer_workers added to the
function/constructor signature (the routine that defines packed sequences in
packed_sequence.py) is missing from its docstring—update the docstring for that
function/constructor to document num_tokenizer_workers, describing its purpose
(controls number of workers used by the tokenizer), default value, and special
semantics (-1 = auto-detect/choose based on CPU, 1 = run tokenization
sequentially/no multiprocessing, >1 = number of worker processes); apply this
addition to both docstring locations where the parameter appears (the two
function/constructor docstrings around the added signature lines).
In `@src/megatron/bridge/data/datasets/packing_utils.py`:
- Line 60: The progress loop `for s in tqdm(seqlens):` lacks a descriptive
label; update it to include a meaningful description (e.g., `for s in
tqdm(seqlens, desc="Packing sequences")`) so users know what is being processed
during long runs—modify the `tqdm` call in `packing_utils.py` (the `for s in
tqdm(seqlens)` loop) to add a `desc` string (and optionally `unit` or `total` if
available) to improve observability.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: becc4ee0-629c-4139-bc2d-151d8e06391c
📒 Files selected for processing (2)
src/megatron/bridge/data/datasets/packed_sequence.pysrc/megatron/bridge/data/datasets/packing_utils.py
| dataset = np.array([dataset[i] for i in range(len(dataset))]) | ||
| dataset = _retrieve_tokenized(dataset, num_tokenizer_workers) | ||
|
|
||
| if pad_seq_to_mult > 1: |
There was a problem hiding this comment.
Potential TypeError when pad_seq_to_mult is None.
If pad_seq_to_mult is None, the comparison pad_seq_to_mult > 1 will raise a TypeError in Python 3. The parameter has a default of 1 but can explicitly be passed as None.
Proposed fix
- if pad_seq_to_mult > 1:
+ if pad_seq_to_mult is not None and pad_seq_to_mult > 1:🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/megatron/bridge/data/datasets/packed_sequence.py` at line 112, The
condition `if pad_seq_to_mult > 1:` can raise TypeError when pad_seq_to_mult is
None; change the guard to explicitly handle None (e.g. check `pad_seq_to_mult is
not None and pad_seq_to_mult > 1`) or normalize pad_seq_to_mult to an int before
use; update the check wherever `pad_seq_to_mult` is evaluated in
packed_sequence.py (the variable `pad_seq_to_mult` and any function/method that
uses it) and ensure you also validate/coerce its type so comparisons always
occur against an int.
Signed-off-by: Chen Cui <chcui@nvidia.com> Made-with: Cursor
|
/ok to test 625107d |
@cuichenx, there was an error processing your request: See the following link for more information: https://docs.gha-runners.nvidia.com/cpr/e/2/ |
cuichenx
left a comment
There was a problem hiding this comment.
Thanks, changes look good to me
|
/ok to test 625107d |
@cuichenx, there was an error processing your request: See the following link for more information: https://docs.gha-runners.nvidia.com/cpr/e/2/ |
|
Could you run |
|
/ok to test f3dd874 |
What does this PR do ?
Improved packed sequence handling for larger SFT datasets by optimizing the packing process and adding memmap dataset format.
Changelog
GitHub Actions CI
See the CI sectionin the Contributing doc for how to trigger the CI. A Nvidia developer will need to approve and trigger the CI for external contributors.
Before your PR is "Ready for review"
Pre checks:
If you haven't finished some of the above items you can still open "Draft" PR.
Additional Information
I didn't update any of the tests yet - would like to hear whether this PR is relevant to the team, and if so I'd be happy to update the tests.
I made sure to maintain full backwards compatibility for previous packed datasets.
The main motivation for this PR is following adapting the Nemotron model to our low-resource language, we discovered that the packing mechanisms as is aren't feasible when working on larger dataset (e.g., ~1B tokens).
The first improvement is optimizing the packing process - one small adjustment in the first_fit, and using parallelization for the tokenization, and then adding progress bars. This was crucial, when we tried to pack our dataset we would wait for hours without seeing any update.
The second is memory constraints - the npy file in memory would take >400GB of RAM, and when we ran distributed across 8 GPUs the machine would OOM very fast. This uses the same format as the Megatron-Core indexed dataset used in pretraining, adjusted to match the SFT format.
This patch is actually what we ended up using - we would run this patch when launching the docker container (using Nemo-Run's pre_launch_commands), before running the training, and it worked great across ~14 nodes.
Would love to hear any feedback.
Summary by CodeRabbit
New Features
Performance
Improvements