Skip to content

Conversation

@tolgacangoz
Copy link

@tolgacangoz tolgacangoz commented Jun 11, 2025

This PR proposes to fix #394

Previous:

2025-06-11 10:49:55 - [INFO] - Starting IterableDatasetPreprocessingWrapper for the dataset
2025-06-11 10:49:55 - [INFO] - Starting IterableDatasetPreprocessingWrapper for the dataset
2025-06-11 10:49:55 - [INFO] - Starting IterableDatasetPreprocessingWrapper for the dataset
[0, 1, 2, 3]
[0, 1, 2, 3]
[0, 1, 2, 3]
[4, 5, 6, 7]
[4, 5, 6, 7]
[4, 5, 6, 7]
[8, 9, 10, 11]
[8, 9, 10, 11]
[8, 9, 10, 11]
[12, 13, 14, 15]
[12, 13, 14, 15]
[12, 13, 14, 15]
[16, 17, 18, 19]
[16, 17, 18, 19]
[16, 17, 18, 19]
[20, 21, 22, 23]
[20, 21, 22, 23]
[20, 21, 22, 23]
[24, 25, 26, 27]
[24, 25, 26, 27]
[24, 25, 26, 27]
[28, 29, 30, 31]
[28, 29, 30, 31]
[28, 29, 30, 31]
[32, 33, 34, 35]
[32, 33, 34, 35]
[32, 33, 34, 35]
[36, 37, 38, 39]
[36, 37, 38, 39]
[36, 37, 38, 39]
[40, 41, 42, 43]
[40, 41, 42, 43]
[40, 41, 42, 43]
[44, 45, 46, 47]
[44, 45, 46, 47]
[44, 45, 46, 47]
[48, 49, 50, 51]
[48, 49, 50, 51]
[48, 49, 50, 51]
[52, 53, 54, 55]
[52, 53, 54, 55]
[52, 53, 54, 55]
[56, 57, 58, 59]
[56, 57, 58, 59]
[56, 57, 58, 59]
[60, 61, 62, 63]
[60, 61, 62, 63]
[60, 61, 62, 63]
[64, 65, 66, 67]
[64, 65, 66, 67]
[64, 65, 66, 67]
[68, 69, 70, 71]
[68, 69, 70, 71]
[68, 69, 70, 71]
[72, 73, 74, 75]
[72, 73, 74, 75]
[72, 73, 74, 75]
[76, 77, 78, 79]
[76, 77, 78, 79]
[76, 77, 78, 79]
[80, 81, 82, 83]
[80, 81, 82, 83]
[80, 81, 82, 83]
[84, 85, 86, 87]
[84, 85, 86, 87]
[84, 85, 86, 87]
[88, 89, 90, 91]
[88, 89, 90, 91]
[88, 89, 90, 91]
[92, 93, 94, 95]
[92, 93, 94, 95]
[92, 93, 94, 95]
[96, 97, 98, 99]
[96, 97, 98, 99]
[96, 97, 98, 99]

Current:

2025-06-11 10:55:08 - [INFO] - Starting IterableDatasetPreprocessingWrapper for the dataset
2025-06-11 10:55:08 - [INFO] - Starting IterableDatasetPreprocessingWrapper for the dataset
2025-06-11 10:55:08 - [INFO] - Starting IterableDatasetPreprocessingWrapper for the dataset
[0, 3, 6, 9]
[1, 4, 7, 10]
[2, 5, 8, 11]
[12, 15, 18, 21]
[13, 16, 19, 22]
[14, 17, 20, 23]
[24, 27, 30, 33]
[25, 28, 31, 34]
[26, 29, 32, 35]
[36, 39, 42, 45]
[37, 40, 43, 46]
[38, 41, 44, 47]
[48, 51, 54, 57]
[49, 52, 55, 58]
[50, 53, 56, 59]
[60, 63, 66, 69]
[61, 64, 67, 70]
[62, 65, 68, 71]
[72, 75, 78, 81]
[73, 76, 79, 82]
[74, 77, 80, 83]
[84, 87, 90, 93]
[85, 88, 91, 94]
[86, 89, 92, 95]

The example implementation from the PyTorch documentation uses chunking sequentially while distributing the data between workers, like
[0, 1, 2, 3]
[4, 5, 6, 7]
[8, 9, 10, 11] ...
I thought the interleaving way seemed more "balanced". WDYT?

Reproducer
import torch
from finetrainers.data.dataset import IterableDatasetPreprocessingWrapper

class DummyIterableDataset(torch.utils.data.IterableDataset):
    def __init__(self, num_samples=100):
        super().__init__()
        self.num_samples = num_samples

    def __iter__(self):
        for i in range(self.num_samples):
            yield {"caption": f"caption_{i}", "image": i}

num_samples = 101  # Not perfectly divisible by batch_size to test drop_last
batch_size = 4
drop_last = True
num_workers = 3

original_dataset = DummyIterableDataset(num_samples)
wrapped_dataset = IterableDatasetPreprocessingWrapper(
    dataset=original_dataset,
    dataset_type="image",
)
dataloader = torch.utils.data.DataLoader(
    wrapped_dataset,
    batch_size=batch_size,
    num_workers=num_workers,
    drop_last=drop_last,
)

for batch in dataloader:
    print(batch['image'].tolist())

@a-r-r-o-w @sayakpaul @jzhang38

- Added `drop_last` parameter to `DPDataLoader` to manage batch sizes.
- Updated `IterableDatasetPreprocessingWrapper` to split dataset across workers when using multiple workers.
- Adjusted `AccelerateParallelBackend` and `PytorchDTensorParallelBackend` to set `drop_last` for `IterableDataset` when `num_workers > 1`, ensuring consistent batch sizes.
These changes improve data loading efficiency and consistency in parallel processing scenarios.
…workers

- Introduced a new test file to validate that the IterableDatasetPreprocessingWrapper correctly handles data loading with multiple workers.
- Ensured no duplicate data is yielded and that the last batch is dropped when necessary.
- The tests cover various worker configurations to confirm expected behavior in parallel processing scenarios.
- Updated the __iter__ method to utilize worker information for slicing the dataset, ensuring that each worker processes a distinct subset of data.
- This change improves data loading efficiency when using multiple workers in the validation dataset.
…dation of multi-worker data loading

- Updated the test to directly compare loaded items against a manually simulated expected set, ensuring correct sharding and drop_last behavior.
- Simplified the logic for asserting no duplicates and matching expected item counts across different worker configurations.
@tolgacangoz tolgacangoz changed the title Fix-data-duplication-in-multi-processing Fix data duplication in multi-processing for iterable datasets Jun 11, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Multiprocess dataloader for IterableDataset bug

1 participant