Skip to content

Commit 9a244dc

Browse files
committed
write
1 parent c93b1bf commit 9a244dc

File tree

1 file changed

+14
-106
lines changed

1 file changed

+14
-106
lines changed

docs/hub/datasets-spark.md

Lines changed: 14 additions & 106 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ This is compatible with all the dataset in [supported format](https://huggingfac
4949
For example here is how to load the [stanfordnlp/imdb](https://huggingface.co/stanfordnlp/imdb) dataset:
5050

5151
```python
52+
>>> import pyspark_huggingface
5253
>>> from pyspark.sql import SparkSession
5354
>>> spark = SparkSession.builder.appName("demo").getOrCreate()
5455
>>> df = spark.read.format("huggingface").load("stanfordnlp/imdb")
@@ -69,6 +70,7 @@ We use the `.format()` function to use the "huggingface" Data Source, and `.load
6970
After logging-in to access the gated repository, we can run:
7071

7172
```python
73+
>>> import pyspark_huggingface
7274
>>> from pyspark.sql import SparkSession
7375
>>> spark = SparkSession.builder.appName("demo").getOrCreate()
7476
>>> df = spark.read.format("huggingface").option("config", "7M").load("BAAI/Infinity-Instruct")
@@ -165,6 +167,7 @@ Indeed, Parquet contains metadata at the file and row group level, which allows
165167
Once you have your PySpark Dataframe ready, you can run SQL queries using `spark.sql`:
166168

167169
```python
170+
>>> import pyspark_huggingface
168171
>>> from pyspark.sql import SparkSession
169172
>>> spark = SparkSession.builder.appName("demo").getOrCreate()
170173
>>> df = (
@@ -203,128 +206,33 @@ Again, specifying the `columns` option is not necessary, but is useful to avoid
203206

204207
## Write
205208

206-
We also provide a helper function to write datasets in a distributed manner to a Hugging Face repository.
207-
208-
You can write a PySpark Dataframe to Hugging Face using this `write_parquet` helper function based on the `huggingface_hub` API.
209-
In particular it uses the `preupload_lfs_files` utility to upload Parquet files in parallel in a distributed manner, and only commits the files once they're all uploaded:
210-
209+
You can write a PySpark Dataframe to Hugging Face with the "huggingface" Data Source.
210+
It uploads Parquet files in parallel in a distributed manner, and only commits the files once they're all uploaded.
211+
It works like this:
211212

212213
```python
213-
import math
214-
import pickle
215-
import tempfile
216-
from functools import partial
217-
from typing import Iterator, Optional
218-
219-
import pyarrow as pa
220-
import pyarrow.parquet as pq
221-
from huggingface_hub import CommitOperationAdd, HfFileSystem
222-
from pyspark.sql.dataframe import DataFrame
223-
from pyspark.sql.pandas.types import from_arrow_schema, to_arrow_schema
224-
225-
226-
def _preupload(iterator: Iterator[pa.RecordBatch], path: str, schema: pa.Schema, filesystem: HfFileSystem, row_group_size: Optional[int] = None, **kwargs) -> Iterator[pa.RecordBatch]:
227-
resolved_path = filesystem.resolve_path(path)
228-
with tempfile.NamedTemporaryFile(suffix=".parquet") as temp_file:
229-
with pq.ParquetWriter(temp_file.name, schema=schema, **kwargs) as writer:
230-
for batch in iterator:
231-
writer.write_batch(batch, row_group_size=row_group_size)
232-
addition = CommitOperationAdd(path_in_repo=temp_file.name, path_or_fileobj=temp_file.name)
233-
filesystem._api.preupload_lfs_files(repo_id=resolved_path.repo_id, additions=[addition], repo_type=resolved_path.repo_type, revision=resolved_path.revision)
234-
yield pa.record_batch({"addition": [pickle.dumps(addition)]}, schema=pa.schema({"addition": pa.binary()}))
235-
236-
237-
def _commit(iterator: Iterator[pa.RecordBatch], path: str, filesystem: HfFileSystem, max_operations_per_commit=50) -> Iterator[pa.RecordBatch]:
238-
resolved_path = filesystem.resolve_path(path)
239-
additions: list[CommitOperationAdd] = [pickle.loads(addition) for addition in pa.Table.from_batches(iterator, schema=pa.schema({"addition": pa.binary()}))[0].to_pylist()]
240-
num_commits = math.ceil(len(additions) / max_operations_per_commit)
241-
for shard_idx, addition in enumerate(additions):
242-
addition.path_in_repo = resolved_path.path_in_repo.replace("{shard_idx:05d}", f"{shard_idx:05d}")
243-
for i in range(0, num_commits):
244-
operations = additions[i * max_operations_per_commit : (i + 1) * max_operations_per_commit]
245-
commit_message = "Upload using PySpark" + (f" (part {i:05d}-of-{num_commits:05d})" if num_commits > 1 else "")
246-
filesystem._api.create_commit(repo_id=resolved_path.repo_id, repo_type=resolved_path.repo_type, revision=resolved_path.revision, operations=operations, commit_message=commit_message)
247-
yield pa.record_batch({"path": [addition.path_in_repo for addition in operations]}, schema=pa.schema({"path": pa.string()}))
248-
249-
250-
def write_parquet(df: DataFrame, path: str, **kwargs) -> None:
251-
"""
252-
Write Parquet files to Hugging Face using PyArrow.
253-
254-
It uploads Parquet files in a distributed manner in two steps:
255-
256-
1. Preupload the Parquet files in parallel in a distributed banner
257-
2. Commit the preuploaded files
258-
259-
Authenticate using `huggingface-cli login` or passing a token
260-
using the `storage_options` argument: `storage_options={"token": "hf_xxx"}`
261-
262-
Parameters
263-
----------
264-
path : str
265-
Path of the file or directory. Prefix with a protocol like `hf://` to read from Hugging Face.
266-
It writes Parquet files in the form "part-xxxxx.parquet", or to a single file if `path ends with ".parquet".
267-
268-
**kwargs
269-
Any additional kwargs are passed to pyarrow.parquet.ParquetWriter.
270-
271-
Returns
272-
-------
273-
DataFrame
274-
DataFrame based on parquet file.
275-
276-
Examples
277-
--------
278-
>>> spark.createDataFrame(pd.DataFrame({"foo": range(5), "bar": range(5, 10)}))
279-
>>> # Save to one file
280-
>>> write_parquet(df, "hf://datasets/username/dataset/data.parquet")
281-
>>> # OR save to a directory (possibly in many files)
282-
>>> write_parquet(df, "hf://datasets/username/dataset")
283-
"""
284-
filesystem: HfFileSystem = kwargs.pop("filesystem", HfFileSystem(**kwargs.pop("storage_options", {})))
285-
if path.endswith(".parquet") or path.endswith(".pq"):
286-
df = df.coalesce(1)
287-
else:
288-
path += "/part-{shard_idx:05d}.parquet"
289-
df.mapInArrow(
290-
partial(_preupload, path=path, schema=to_arrow_schema(df.schema), filesystem=filesystem, **kwargs),
291-
from_arrow_schema(pa.schema({"addition": pa.binary()})),
292-
).repartition(1).mapInArrow(
293-
partial(_commit, path=path, filesystem=filesystem),
294-
from_arrow_schema(pa.schema({"path": pa.string()})),
295-
).collect()
214+
>>> import pyspark_huggingface
215+
>>> df.write.format("huggingface").save("username/dataset_name")
296216
```
297217

298218
Here is how we can use this function to write the filtered version of the [BAAI/Infinity-Instruct](https://huggingface.co/datasets/BAAI/Infinity-Instruct) dataset back to Hugging Face.
299219

300220
First you need to [create a dataset repository](https://huggingface.co/new-dataset), e.g. `username/Infinity-Instruct-Chinese-Only` (you can set it to private if you want).
301-
Then, make sure you are authenticated and you can run:
221+
Then, make sure you are authenticated, you can use the "huggingface" Data Source, set the `mode` to "overwrite" (or "append" if you want to extend an existing dataset), and push to Hugging Face with `.save()`:
302222

303223
```python
304-
>>> write_parquet(df_chinese_only, "hf://datasets/username/Infinity-Instruct-Chinese-Only")
305-
tmph9jwu9py.parquet: 100%|██████████| 50.5M/50.5M [00:03<00:00, 14.6MB/s]
306-
tmp0oqt99nc.parquet: 100%|██████████| 50.8M/50.8M [00:02<00:00, 17.9MB/s]
307-
tmpgnizkwqp.parquet: 100%|██████████| 50.5M/50.5M [00:02<00:00, 19.6MB/s]
308-
tmpanm04k4n.parquet: 100%|██████████| 51.4M/51.4M [00:02<00:00, 22.9MB/s]
309-
tmp14uy9oqb.parquet: 100%|██████████| 50.4M/50.4M [00:02<00:00, 23.0MB/s]
310-
tmpcp8t_qdl.parquet: 100%|██████████| 50.4M/50.4M [00:02<00:00, 23.5MB/s]
311-
tmpjui5mns8.parquet: 100%|██████████| 50.3M/50.3M [00:02<00:00, 24.1MB/s]
312-
tmpydqh6od1.parquet: 100%|██████████| 50.9M/50.9M [00:02<00:00, 23.8MB/s]
313-
tmp52f2t8tu.parquet: 100%|██████████| 50.5M/50.5M [00:02<00:00, 23.7MB/s]
314-
tmpg7egv3ye.parquet: 100%|██████████| 50.1M/50.1M [00:06<00:00, 7.68MB/s]
315-
tmp2s0fq2hm.parquet: 100%|██████████| 50.8M/50.8M [00:02<00:00, 18.1MB/s]
316-
tmpmj97ab30.parquet: 100%|██████████| 71.3M/71.3M [00:02<00:00, 23.9MB/s]
224+
>>> df_chinese_only.write.format("huggingface").mode("overwrite").save("username/Infinity-Instruct-Chinese-Only2")
317225
```
318226

319227
<div class="flex justify-center">
320228
<img class="block dark:hidden" src="https://huggingface.co/datasets/huggingface/documentation-images/resolve/main/hub/datasets-spark-infinity-instruct-chinese-only-min.png"/>
321229
<img class="hidden dark:block" src="https://huggingface.co/datasets/huggingface/documentation-images/resolve/main/hub/datasets-spark-infinity-instruct-chinese-only-dark-min.png"/>
322230
</div>
323231

324-
## Run in JupyterLab on Hugging Face Spaces
232+
## Try Spark Notebooks on Hugging Face Spaces
325233

326-
You can duplicate the [Spark on HF JupyterLab](https://huggingface.co/spaces/lhoestq/Spark-on-HF-JupyterLab) Space to get a Notebook with PySpark and those helper functions pre-installed.
234+
You can launch the [Spark Notebooks](https://huggingface.co/spaces/Dataset-Tools/Spark-Notebooks) in Spaces to get Notebooks with PySpark and `pyspark_huggingface` pre-installed.
327235

328-
Click on "Duplicate Space", choose a name for your Space, select your hardware and you are ready:
236+
Click on "Launch Spark Notebooks", choose a name for your Space, select your hardware and you are ready !
329237

330-
<img src="https://huggingface.co/datasets/huggingface/documentation-images/resolve/main/hub/spark-on-hf-jupyterlab-screenshot-min.png">
238+
<img src="https://huggingface.co/datasets/huggingface/documentation-images/resolve/main/hub/spark-notebooks-min.png">

0 commit comments

Comments
 (0)