Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 69 additions & 25 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -264,15 +264,35 @@ https://github.com/user-attachments/assets/3ba9e2ef-bf6b-41fc-a578-e4b4113a0e72

</details>

**Prerequisites:**

Install the required dependencies to stream Hugging Face datasets:
```sh
pip install "litdata[extra]" huggingface_hub

# Optional: To speed up downloads on high-bandwidth networks
pip install hf_tansfer
export HF_HUB_ENABLE_HF_TRANSFER=1
```

**Stream Hugging Face dataset:**

```python
import litdata as ld

hf_uri = "hf://datasets/leonardPKU/clevr_cogen_a_train/data"
# Define the Hugging Face dataset URI
hf_dataset_uri = "hf://datasets/leonardPKU/clevr_cogen_a_train/data"

ds = ld.StreamingDataset(hf_uri)
# Create a streaming dataset
dataset = ld.StreamingDataset(hf_dataset_uri)

for _ds in ds:
print(f"{_ds[1]}; {_ds[2]}")
# Print the first sample
print("Sample", dataset[0])

# Stream the dataset using StreamingDataLoader
dataloader = ld.StreamingDataLoader(dataset, batch_size=4)
for sample in dataloader:
pass
```

You don’t need to worry about indexing the dataset or any other setup. **LitData** will **handle all the necessary steps automatically** and `cache` the `index.json` file, so you won't have to index it again.
Expand All @@ -288,12 +308,12 @@ If the Hugging Face dataset hasn't been indexed yet, you can index it first usin
```python
import litdata as ld

hf_uri = "hf://datasets/leonardPKU/clevr_cogen_a_train/data"
hf_dataset_uri = "hf://datasets/leonardPKU/clevr_cogen_a_train/data"

ld.index_hf_dataset(hf_uri)
ld.index_hf_dataset(hf_dataset_uri)
```

- Indexing the Hugging Face dataset ahead of time will make streaming faster, as it avoids the need for real-time indexing during streaming.
- Indexing the Hugging Face dataset ahead of time will make streaming abit faster, as it avoids the need for real-time indexing during streaming.

- To use `HF gated dataset`, ensure the `HF_TOKEN` environment variable is set.

Expand All @@ -310,9 +330,9 @@ For full control over the cache path(`where index.json file will be stored`) and
```python
import litdata as ld

hf_uri = "hf://datasets/open-thoughts/OpenThoughts-114k/data"
hf_dataset_uri = "hf://datasets/open-thoughts/OpenThoughts-114k/data"

ld.index_parquet_dataset(hf_uri, "hf-index-dir")
ld.index_parquet_dataset(hf_dataset_uri, "hf-index-dir")
```

2. To stream HF datasets now, pass the `HF dataset URI`, the path where the `index.json` file is stored, and `ParquetLoader` as the `item_loader` to the **`StreamingDataset`**:
Expand All @@ -321,18 +341,18 @@ ld.index_parquet_dataset(hf_uri, "hf-index-dir")
import litdata as ld
from litdata.streaming.item_loader import ParquetLoader

hf_uri = "hf://datasets/open-thoughts/OpenThoughts-114k/data"
hf_dataset_uri = "hf://datasets/open-thoughts/OpenThoughts-114k/data"

ds = ld.StreamingDataset(hf_uri, item_loader=ParquetLoader(), index_path="hf-index-dir")
dataset = ld.StreamingDataset(hf_dataset_uri, item_loader=ParquetLoader(), index_path="hf-index-dir")

for _ds in ds:
print(f"{_ds[0]}; {_ds[1]}\n")
for batch in ld.StreamingDataLoader(dataset, batch_size=4):
pass
```

&nbsp;

### LitData `Optimize` v/s `Parquet`

<!-- TODO: Update benchmark -->
Below is the benchmark for the `Imagenet dataset (155 GB)`, demonstrating that **`optimizing the dataset using LitData is faster and results in smaller output size compared to raw Parquet files`**.

| **Operation** | **Size (GB)** | **Time (seconds)** | **Throughput (images/sec)** |
Expand Down Expand Up @@ -771,35 +791,59 @@ The `overwrite` mode will delete the existing data and start from fresh.
<summary> ✅ Stream parquet datasets</summary>
&nbsp;

You can stream Parquet datasets directly without the need to convert them into the LitData optimized binary format.
Stream Parquet datasets directly with LitData—no need to convert them into LitData’s optimized binary format! If your dataset is already in Parquet format, you can efficiently index and stream it using `StreamingDataset` and `StreamingDataLoader`.

If your dataset is already in Parquet format, you can index and use it with StreamingDataset and DataLoader for efficient streaming.
**Assumption:**

Assumption:
Your dataset directory contains one or more Parquet files.

- **Index Parquet dataset**:
**Prerequisites:**

Install the required dependencies to stream Parquet datasets from cloud storage like **Amazon S3** or **Google Cloud Storage**:

```bash
# For Amazon S3
pip install "litdata[extra]" s3fs

# For Google Cloud Storage
pip install "litdata[extra]" gcsfs
```

**Index Your Dataset**:

Index your Parquet dataset to create an index file that LitData can use to stream the dataset.

```python
import litdata as ld

pq_data_uri = "gs://deep-litdata-parquet/my-parquet-data"
# Point to your data stored in the cloud
pq_dataset_uri = "s3://my-bucket/my-parquet-data" # or "gs://my-bucket/my-parquet-data"

ld.index_parquet_dataset(pq_data_uri)
ld.index_parquet_dataset(pq_dataset_uri)
```

- **Stream the dataset with `StreamingDataset` and `ParquetLoader`**
**Stream the Dataset**

Use `StreamingDataset` with `ParquetLoader` to load and stream the dataset efficiently:

When using a Streaming Dataset, ensure you use `ParquetLoader`:

```python
import litdata as ld
from litdata.streaming.item_loader import ParquetLoader

ds = ld.StreamingDataset('gs://deep-litdata-parquet/my-parquet-data', item_loader = ParquetLoader())
# Specify your dataset location in the cloud
pq_dataset_uri = "s3://my-bucket/my-parquet-data" # or "gs://my-bucket/my-parquet-data"

# Set up the streaming dataset
dataset = ld.StreamingDataset(pq_dataset_uri, item_loader=ParquetLoader())

for _ds in ds:
print(f"{_ds=}")
# print the first sample
print("Sample", dataset[0])

# Stream the dataset using StreamingDataLoader
dataloader = ld.StreamingDataLoader(dataset, batch_size=4)
for sample in dataloader:
pass
```

</details>
Expand Down
1 change: 0 additions & 1 deletion tests/streaming/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,6 @@ def test_stream_hf_parquet_dataset(monkeypatch, huggingface_hub_fs_mock, pq_data
assert _ds["height"] == pq_data["height"][idx]

# Test case 3: Streaming with passing item_loader
print("pre_load_chunk", pre_load_chunk, "low_memory", low_memory)
ds = StreamingDataset(hf_url, item_loader=ParquetLoader(pre_load_chunk, low_memory))
assert len(ds) == 25
for i, _ds in enumerate(ds):
Expand Down
Loading