Skip to content

Commit c93b1bf

Browse files
committed
read
1 parent dbac625 commit c93b1bf

File tree

1 file changed

+52
-124
lines changed

1 file changed

+52
-124
lines changed

docs/hub/datasets-spark.md

Lines changed: 52 additions & 124 deletions
Original file line numberDiff line numberDiff line change
@@ -2,21 +2,17 @@
22

33
Spark enables real-time, large-scale data processing in a distributed environment.
44

5-
In particular you can use `huggingface_hub` to access Hugging Face datasets repositories in PySpark
5+
In particular you can use `pyspark_huggingface` to access Hugging Face datasets repositories in PySpark.
66

77
## Installation
88

9-
To be able to read and write to Hugging Face URLs (e.g. `hf://datasets/username/dataset/data.parquet`), you need to install the `huggingface_hub` library:
9+
To be able to read and write to Hugging Face Datasets, you need to install the `pyspark_huggingface` library:
1010

1111
```
12-
pip install huggingface_hub
12+
pip install pyspark_huggingface
1313
```
1414

15-
You also need to install `pyarrow` to read/write Parquet / JSON / CSV / etc. files using the filesystem API provided by `huggingFace_hub`:
16-
17-
```
18-
pip install pyarrow
19-
```
15+
This will also install required dependencies like `huggingface_hub` for authentication, and `pyarrow` for reading and writing datasets.
2016

2117
## Authentication
2218

@@ -28,142 +24,54 @@ You can use the CLI for example:
2824
huggingface-cli login
2925
```
3026

31-
It's also possible to provide your Hugging Face token with the `HF_TOKEN` environment variable or passing the `storage_options` parameter to helper functions below:
32-
33-
```python
34-
storage_options = {"token": "hf_xxx"}
35-
```
27+
It's also possible to provide your Hugging Face token with the `HF_TOKEN` environment variable or passing the `token` option to the spark context builder.
3628

3729
For more details about authentication, check out [this guide](https://huggingface.co/docs/huggingface_hub/quick-start#authentication).
3830

39-
## Read
31+
## Enable the "huggingface" Data Source
4032

41-
PySpark doesn't have an official support for Hugging Face paths, so we provide a helper function to read datasets in a distributed manner.
33+
PySpark 4 came with a new Data Source API which allows to use datasets from custom sources.
34+
If `pyspark_huggingface` is installed, PySpark auto-imports it and enables the "huggingface" Data Dource.
4235

43-
For example you can read Parquet files from Hugging Face in an optimized way using PyArrow by defining this `read_parquet` helper function:
36+
The library also backports the Data Source API for the "huggingface" Data Source for PySpark 3.5, 3.4 and 3.3.
37+
However in this case `pyspark_huggingface` should be imported explicitly to activate the backport and enable the "huggingface" Data Dource:
4438

4539
```python
46-
from functools import partial
47-
from typing import Iterator, Optional, Union
48-
49-
import pyarrow as pa
50-
import pyarrow.parquet as pq
51-
from huggingface_hub import HfFileSystem
52-
from pyspark.sql.dataframe import DataFrame
53-
from pyspark.sql.pandas.types import from_arrow_schema
54-
55-
56-
def _read(iterator: Iterator[pa.RecordBatch], columns: Optional[list[str]], filters: Optional[Union[list[tuple], list[list[tuple]]]], **kwargs) -> Iterator[pa.RecordBatch]:
57-
for batch in iterator:
58-
paths = batch[0].to_pylist()
59-
ds = pq.ParquetDataset(paths, **kwargs)
60-
yield from ds._dataset.to_batches(columns=columns, filter=pq.filters_to_expression(filters) if filters else None)
61-
62-
63-
def read_parquet(
64-
path: str,
65-
columns: Optional[list[str]] = None,
66-
filters: Optional[Union[list[tuple], list[list[tuple]]]] = None,
67-
**kwargs,
68-
) -> DataFrame:
69-
"""
70-
Loads Parquet files from Hugging Face using PyArrow, returning a PySPark `DataFrame`.
71-
72-
It reads Parquet files in a distributed manner.
73-
74-
Access private or gated repositories using `huggingface-cli login` or passing a token
75-
using the `storage_options` argument: `storage_options={"token": "hf_xxx"}`
40+
>>> import pyspark_huggingface
41+
huggingface datasource enabled for pyspark 3.x.x (backport from pyspark 4)
42+
```
7643

77-
Parameters
78-
----------
79-
path : str
80-
Path to the file. Prefix with a protocol like `hf://` to read from Hugging Face.
81-
You can read from multiple files if you pass a globstring.
82-
columns : list, default None
83-
If not None, only these columns will be read from the file.
84-
filters : List[Tuple] or List[List[Tuple]], default None
85-
To filter out data.
86-
Filter syntax: [[(column, op, val), ...],...]
87-
where op is [==, =, >, >=, <, <=, !=, in, not in]
88-
The innermost tuples are transposed into a set of filters applied
89-
through an `AND` operation.
90-
The outer list combines these sets of filters through an `OR`
91-
operation.
92-
A single list of tuples can also be used, meaning that no `OR`
93-
operation between set of filters is to be conducted.
44+
## Read
9445

95-
**kwargs
96-
Any additional kwargs are passed to pyarrow.parquet.ParquetDataset.
46+
The "huggingface" Data Source allows to read datasets from Hugging Face, using `pyarrow` under the hood to stream Arrow data.
47+
This is compatible with all the dataset in [supported format](https://huggingface.co/docs/hub/datasets-adding#file-formats) on Hugging Face, like Parquet datasets.
9748

98-
Returns
99-
-------
100-
DataFrame
101-
DataFrame based on parquet file.
49+
For example here is how to load the [stanfordnlp/imdb](https://huggingface.co/stanfordnlp/imdb) dataset:
10250

103-
Examples
104-
--------
105-
>>> path = "hf://datasets/username/dataset/data.parquet"
106-
>>> pd.DataFrame({"foo": range(5), "bar": range(5, 10)}).to_parquet(path)
107-
>>> read_parquet(path).show()
108-
+---+---+
109-
|foo|bar|
110-
+---+---+
111-
| 0| 5|
112-
| 1| 6|
113-
| 2| 7|
114-
| 3| 8|
115-
| 4| 9|
116-
+---+---+
117-
>>> read_parquet(path, columns=["bar"]).show()
118-
+---+
119-
|bar|
120-
+---+
121-
| 5|
122-
| 6|
123-
| 7|
124-
| 8|
125-
| 9|
126-
+---+
127-
>>> sel = [("foo", ">", 2)]
128-
>>> read_parquet(path, filters=sel).show()
129-
+---+---+
130-
|foo|bar|
131-
+---+---+
132-
| 3| 8|
133-
| 4| 9|
134-
+---+---+
135-
"""
136-
filesystem: HfFileSystem = kwargs.pop("filesystem") if "filesystem" in kwargs else HfFileSystem(**kwargs.pop("storage_options", {}))
137-
paths = filesystem.glob(path)
138-
if not paths:
139-
raise FileNotFoundError(f"Counldn't find any file at {path}")
140-
rdd = spark.sparkContext.parallelize([{"path": path} for path in paths], len(paths))
141-
df = spark.createDataFrame(rdd)
142-
arrow_schema = pq.read_schema(filesystem.open(paths[0]))
143-
schema = pa.schema([field for field in arrow_schema if (columns is None or field.name in columns)], metadata=arrow_schema.metadata)
144-
return df.mapInArrow(
145-
partial(_read, columns=columns, filters=filters, filesystem=filesystem, schema=arrow_schema, **kwargs),
146-
from_arrow_schema(schema),
147-
)
51+
```python
52+
>>> from pyspark.sql import SparkSession
53+
>>> spark = SparkSession.builder.appName("demo").getOrCreate()
54+
>>> df = spark.read.format("huggingface").load("stanfordnlp/imdb")
14855
```
14956

150-
Here is how we can use this on the [BAAI/Infinity-Instruct](https://huggingface.co/datasets/BAAI/Infinity-Instruct) dataset.
57+
Here is another example with the [BAAI/Infinity-Instruct](https://huggingface.co/datasets/BAAI/Infinity-Instruct) dataset.
15158
It is a gated repository, users have to accept the terms of use before accessing it.
59+
It also has multiple subsets, named "3M", "7M" etc. So wee need to specify which one to load.
15260

15361

15462
<div class="flex justify-center">
15563
<img class="block dark:hidden" src="https://huggingface.co/datasets/huggingface/documentation-images/resolve/main/hub/datasets-spark-infinity-instruct-7M-min.png"/>
15664
<img class="hidden dark:block" src="https://huggingface.co/datasets/huggingface/documentation-images/resolve/main/hub/datasets-spark-infinity-instruct-7M-dark-min.png"/>
15765
</div>
15866

159-
We use the `read_parquet` function to read data from the dataset, compute the number of dialogue per language and filter the dataset.
67+
We use the `.format()` function to use the "huggingface" Data Source, and `.load()` to load the dataset (more precisely the config or subset named "7M" containing 7M samples). Then we compute the number of dialogue per language and filter the dataset.
16068

16169
After logging-in to access the gated repository, we can run:
16270

16371
```python
16472
>>> from pyspark.sql import SparkSession
16573
>>> spark = SparkSession.builder.appName("demo").getOrCreate()
166-
>>> df = read_parquet("hf://datasets/BAAI/Infinity-Instruct/7M/*.parquet")
74+
>>> df = spark.read.format("huggingface").option("config", "7M").load("BAAI/Infinity-Instruct")
16775
>>> df.show()
16876
+---+----------------------------+-----+----------+--------------------+
16977
| id| conversations|label|langdetect| source|
@@ -191,12 +99,19 @@ After logging-in to access the gated repository, we can run:
19199
+---+----------------------------+-----+----------+--------------------+
192100
```
193101

194-
To compute the number of dialogues per language we run this code.
195-
The `columns` argument is useful to only load the data we need, since PySpark doesn't enable predicate push-down in this case.
196-
There is also a `filters` argument to only load data with values within a certain range.
102+
This loads the dataset in a streaming fashion, and the output DataFrame has one partition per data file in the dataset to enable efficient distributed processing.
103+
104+
To compute the number of dialogues per language we run this code that uses the `columns` option and a `groupBy()` operation.
105+
The `columns` option is useful to only load the data we need, since PySpark doesn't enable predicate push-down with the Data Source API.
106+
There is also a `filters` option to only load data with values within a certain range.
197107

198108
```python
199-
>>> df_langdetect_only = read_parquet("hf://datasets/BAAI/Infinity-Instruct/7M/*.parquet", columns=["langdetect"])
109+
>>> df_langdetect_only = (
110+
... spark.read.format("huggingface")
111+
... .option("config", "7M")
112+
... .option("columns", '["langdetect"]')
113+
... .load("BAAI/Infinity-Instruct")
114+
... )
200115
>>> df_langdetect_only.groupBy("langdetect").count().show()
201116
+----------+-------+
202117
|langdetect| count|
@@ -209,8 +124,12 @@ There is also a `filters` argument to only load data with values within a certai
209124
To filter the dataset and only keep dialogues in Chinese:
210125

211126
```python
212-
>>> criteria = [("langdetect", "=", "zh-cn")]
213-
>>> df_chinese_only = read_parquet("hf://datasets/BAAI/Infinity-Instruct/7M/*.parquet", filters=criteria)
127+
>>> df_chinese_only = (
128+
... spark.read.format("huggingface")
129+
... .option("config", "7M")
130+
... .option("filters", '[("langdetect", "=", "zh-cn")]')
131+
... .load("BAAI/Infinity-Instruct")
132+
... )
214133
>>> df_chinese_only.show()
215134
+---+----------------------------+-----+----------+----------+
216135
| id| conversations|label|langdetect| source|
@@ -238,14 +157,22 @@ To filter the dataset and only keep dialogues in Chinese:
238157
+---+----------------------------+-----+----------+----------+
239158
```
240159

160+
It is also possible to apply filters or remove columns on the loaded DataFrame, but it is more efficient to do it while loading, especially on Parquet datasets.
161+
Indeed, Parquet contains metadata at the file and row group level, which allows to skip entire parts of the dataset that don't contain samples that satisfy the criteria. Columns in Parquet can also be loaded indepentently, whch allows to skip the excluded columns and avoid loading unnecessary data.
162+
241163
### Run SQL queries
242164

243165
Once you have your PySpark Dataframe ready, you can run SQL queries using `spark.sql`:
244166

245167
```python
246168
>>> from pyspark.sql import SparkSession
247169
>>> spark = SparkSession.builder.appName("demo").getOrCreate()
248-
>>> df = read_parquet("hf://datasets/BAAI/Infinity-Instruct/7M/*.parquet", columns=["source"])
170+
>>> df = (
171+
... spark.read.format("huggingface")
172+
... .option("config", "7M")
173+
... .option("columns", '["source"]')
174+
... .load("BAAI/Infinity-Instruct")
175+
... )
249176
>>> spark.sql("SELECT source, count(*) AS total FROM {df} GROUP BY source ORDER BY total DESC", df=df).show()
250177
+--------------------+-------+
251178
| source| total|
@@ -272,6 +199,7 @@ Once you have your PySpark Dataframe ready, you can run SQL queries using `spark
272199
+--------------------+-------+
273200
```
274201

202+
Again, specifying the `columns` option is not necessary, but is useful to avoid loading unnecessary data and make the query faster.
275203

276204
## Write
277205

0 commit comments

Comments
 (0)