diff --git a/docs/source/_toctree.yml b/docs/source/_toctree.yml index 861925a7d99..862708a6b29 100644 --- a/docs/source/_toctree.yml +++ b/docs/source/_toctree.yml @@ -30,6 +30,8 @@ title: Process - local: stream title: Stream + - local: stream_best_practices + title: Streaming best practices - local: use_with_pytorch title: Use with PyTorch - local: use_with_tensorflow diff --git a/docs/source/stream_best_practices.mdx b/docs/source/stream_best_practices.mdx new file mode 100644 index 00000000000..9989464ba6e --- /dev/null +++ b/docs/source/stream_best_practices.mdx @@ -0,0 +1,103 @@ +# Streaming best practices + +This page complements the Streaming guide with practical patterns and pitfalls for large-scale, production-grade usage. + +## TL;DR patterns + +- Prefer local streaming (to_iterable_dataset) when data is already on disk. +- Use batched map with remove_columns to keep memory bounded. +- Shuffle with a large buffer and set_epoch per epoch for determinism. +- For multi-worker loaders, shard at source: to_iterable_dataset(num_shards=...) +- Persist results incrementally: push_to_hub with num_proc or to_parquet per shard. + +## Local vs HTTP streaming + +- Local streaming avoids network flakiness and maximizes throughput. Convert hub datasets once, then do: + +```py +from datasets import load_dataset +base = load_dataset("allenai/c4", "en") +it = base.to_iterable_dataset(num_shards=64) +``` + +- HTTP streaming is ideal for huge corpora you won’t store. Use resumable endpoints; retry-aware file systems (fsspec, HF Hub); and keep transformations simple. + +## Buffered transforms that stay memory-safe + +```py +from datasets import load_dataset +from transformers import AutoTokenizer + +ds = load_dataset("HuggingFaceFW/fineweb", split="train", streaming=True) +tokenizer = AutoTokenizer.from_pretrained("distilbert-base-uncased") + +# Avoid keeping original text after tokenization +batched = ds.map( + lambda ex: tokenizer(ex["text"], truncation=True, padding="max_length"), + batched=True, + remove_columns=["text", "timestamp", "url"], + batch_size=1024, +) +``` + +- Use batched=True to amortize tokenizer overhead. +- remove_columns to drop large raw fields ASAP. +- Tune batch_size to the smallest size that saturates your CPU/GPU. + +## Deterministic shuffling and epoch control + +```py +seed, buffer_size = 42, 10_000 +shuffled = it.shuffle(seed=seed, buffer_size=buffer_size) +for epoch in range(num_epochs): + shuffled.set_epoch(epoch) + for example in shuffled: + ... +``` + +- Shuffle reorders shards and uses a buffer; larger buffers approximate global shuffle. +- set_epoch(epoch) changes the RNG stream so each epoch differs but is reproducible. + +## Multi-worker data loading + +```py +import torch +from torch.utils.data import DataLoader + +it = base.to_iterable_dataset(num_shards=64).shuffle(seed=42, buffer_size=10_000) +loader = DataLoader(it, num_workers=4) +``` + +- Sharding at creation lets each worker read disjoint shards without contention. +- Prefer num_shards >> num_workers to keep workers busy. + +## Checkpoint and resume + +- Use IterableDataset.state_dict() and .load_state_dict() to save/restore progress. +- With shuffle(), buffer contents aren’t preserved; expect slight differences after resume. + +## Persisting results + +- Save per-shard parquet to keep memory bounded and enable parallelism: + +```py +num = it.num_shards +for i in range(num): + shard = it.shard(num_shards=num, index=i) + shard.to_parquet(f"out/data-{i:05d}.parquet") +``` + +- Or push_to_hub("username/dataset", num_proc=8) to upload in parallel. + +## Common pitfalls + +- Trying to index by absolute position; IterableDataset only supports iteration. +- Mapping with functions that capture large globals or return Python objects not supported by Arrow. +- Using take/skip before shuffle, which locks shard order and prevents later shuffling. +- Small shuffle buffers leading to poor mixing; increase buffer_size. + +## See also + +- Streaming guide: ./stream +- Dataset vs IterableDataset: ./about_mapstyle_vs_iterable +- Batch mapping: ./about_map_batch