diff --git a/docs/hub/datasets-spark.md b/docs/hub/datasets-spark.md index c771d046d..ba2a14b8f 100644 --- a/docs/hub/datasets-spark.md +++ b/docs/hub/datasets-spark.md @@ -2,23 +2,25 @@ Spark enables real-time, large-scale data processing in a distributed environment. -In particular you can use `huggingface_hub` to access Hugging Face datasets repositories in PySpark +You can use `pyspark_huggingface` to access Hugging Face datasets repositories in PySpark via the "huggingface" Data Source. -## Installation +Try out [Spark Notebooks](https://huggingface.co/spaces/Dataset-Tools/Spark-Notebooks) on Hugging Face Spaces to get Notebooks with PySpark and `pyspark_huggingface` pre-installed. -To be able to read and write to Hugging Face URLs (e.g. `hf://datasets/username/dataset/data.parquet`), you need to install the `huggingface_hub` library: + -``` -pip install huggingface_hub -``` +## Set up + +### Installation -You also need to install `pyarrow` to read/write Parquet / JSON / CSV / etc. files using the filesystem API provided by `huggingFace_hub`: +To be able to read and write to Hugging Face Datasets, you need to install the `pyspark_huggingface` library: ``` -pip install pyarrow +pip install pyspark_huggingface ``` -## Authentication +This will also install required dependencies like `huggingface_hub` for authentication, and `pyarrow` for reading and writing datasets. + +### Authentication You need to authenticate to Hugging Face to read private/gated dataset repositories or to write to your dataset repositories. @@ -28,127 +30,39 @@ You can use the CLI for example: huggingface-cli login ``` -It's also possible to provide your Hugging Face token with the `HF_TOKEN` environment variable or passing the `storage_options` parameter to helper functions below: +It's also possible to provide your Hugging Face token with the `HF_TOKEN` environment variable or passing the `token` option to the reader. +For more details about authentication, check out [this guide](https://huggingface.co/docs/huggingface_hub/quick-start#authentication). + +### Enable the "huggingface" Data Source + +PySpark 4 came with a new Data Source API which allows to use datasets from custom sources. +If `pyspark_huggingface` is installed, PySpark auto-imports it and enables the "huggingface" Data Source. + +The library also backports the Data Source API for the "huggingface" Data Source for PySpark 3.5, 3.4 and 3.3. +However in this case `pyspark_huggingface` should be imported explicitly to activate the backport and enable the "huggingface" Data Dource: ```python -storage_options = {"token": "hf_xxx"} +>>> import pyspark_huggingface +huggingface datasource enabled for pyspark 3.x.x (backport from pyspark 4) ``` -For more details about authentication, check out [this guide](https://huggingface.co/docs/huggingface_hub/quick-start#authentication). - ## Read -PySpark doesn't have an official support for Hugging Face paths, so we provide a helper function to read datasets in a distributed manner. +The "huggingface" Data Source allows to read datasets from Hugging Face, using `pyarrow` under the hood to stream Arrow data. +This is compatible with all the dataset in [supported format](https://huggingface.co/docs/hub/datasets-adding#file-formats) on Hugging Face, like Parquet datasets. -For example you can read Parquet files from Hugging Face in an optimized way using PyArrow by defining this `read_parquet` helper function: +For example here is how to load the [stanfordnlp/imdb](https://huggingface.co/stanfordnlp/imdb) dataset: ```python -from functools import partial -from typing import Iterator, Optional, Union - -import pyarrow as pa -import pyarrow.parquet as pq -from huggingface_hub import HfFileSystem -from pyspark.sql.dataframe import DataFrame -from pyspark.sql.pandas.types import from_arrow_schema - - -def _read(iterator: Iterator[pa.RecordBatch], columns: Optional[list[str]], filters: Optional[Union[list[tuple], list[list[tuple]]]], **kwargs) -> Iterator[pa.RecordBatch]: - for batch in iterator: - paths = batch[0].to_pylist() - ds = pq.ParquetDataset(paths, **kwargs) - yield from ds._dataset.to_batches(columns=columns, filter=pq.filters_to_expression(filters) if filters else None) - - -def read_parquet( - path: str, - columns: Optional[list[str]] = None, - filters: Optional[Union[list[tuple], list[list[tuple]]]] = None, - **kwargs, -) -> DataFrame: - """ - Loads Parquet files from Hugging Face using PyArrow, returning a PySPark `DataFrame`. - - It reads Parquet files in a distributed manner. - - Access private or gated repositories using `huggingface-cli login` or passing a token - using the `storage_options` argument: `storage_options={"token": "hf_xxx"}` - - Parameters - ---------- - path : str - Path to the file. Prefix with a protocol like `hf://` to read from Hugging Face. - You can read from multiple files if you pass a globstring. - columns : list, default None - If not None, only these columns will be read from the file. - filters : List[Tuple] or List[List[Tuple]], default None - To filter out data. - Filter syntax: [[(column, op, val), ...],...] - where op is [==, =, >, >=, <, <=, !=, in, not in] - The innermost tuples are transposed into a set of filters applied - through an `AND` operation. - The outer list combines these sets of filters through an `OR` - operation. - A single list of tuples can also be used, meaning that no `OR` - operation between set of filters is to be conducted. - - **kwargs - Any additional kwargs are passed to pyarrow.parquet.ParquetDataset. - - Returns - ------- - DataFrame - DataFrame based on parquet file. - - Examples - -------- - >>> path = "hf://datasets/username/dataset/data.parquet" - >>> pd.DataFrame({"foo": range(5), "bar": range(5, 10)}).to_parquet(path) - >>> read_parquet(path).show() - +---+---+ - |foo|bar| - +---+---+ - | 0| 5| - | 1| 6| - | 2| 7| - | 3| 8| - | 4| 9| - +---+---+ - >>> read_parquet(path, columns=["bar"]).show() - +---+ - |bar| - +---+ - | 5| - | 6| - | 7| - | 8| - | 9| - +---+ - >>> sel = [("foo", ">", 2)] - >>> read_parquet(path, filters=sel).show() - +---+---+ - |foo|bar| - +---+---+ - | 3| 8| - | 4| 9| - +---+---+ - """ - filesystem: HfFileSystem = kwargs.pop("filesystem") if "filesystem" in kwargs else HfFileSystem(**kwargs.pop("storage_options", {})) - paths = filesystem.glob(path) - if not paths: - raise FileNotFoundError(f"Counldn't find any file at {path}") - rdd = spark.sparkContext.parallelize([{"path": path} for path in paths], len(paths)) - df = spark.createDataFrame(rdd) - arrow_schema = pq.read_schema(filesystem.open(paths[0])) - schema = pa.schema([field for field in arrow_schema if (columns is None or field.name in columns)], metadata=arrow_schema.metadata) - return df.mapInArrow( - partial(_read, columns=columns, filters=filters, filesystem=filesystem, schema=arrow_schema, **kwargs), - from_arrow_schema(schema), - ) +>>> import pyspark_huggingface +>>> from pyspark.sql import SparkSession +>>> spark = SparkSession.builder.appName("demo").getOrCreate() +>>> df = spark.read.format("huggingface").load("stanfordnlp/imdb") ``` -Here is how we can use this on the [BAAI/Infinity-Instruct](https://huggingface.co/datasets/BAAI/Infinity-Instruct) dataset. +Here is another example with the [BAAI/Infinity-Instruct](https://huggingface.co/datasets/BAAI/Infinity-Instruct) dataset. It is a gated repository, users have to accept the terms of use before accessing it. +It also has multiple subsets, namely, "3M" and "7M". So we need to specify which one to load.
@@ -156,14 +70,15 @@ It is a gated repository, users have to accept the terms of use before accessing
-We use the `read_parquet` function to read data from the dataset, compute the number of dialogue per language and filter the dataset. +We use the `.format()` function to use the "huggingface" Data Source, and `.load()` to load the dataset (more precisely the config or subset named "7M" containing 7M samples). Then we compute the number of dialogue per language and filter the dataset. After logging-in to access the gated repository, we can run: ```python +>>> import pyspark_huggingface >>> from pyspark.sql import SparkSession >>> spark = SparkSession.builder.appName("demo").getOrCreate() ->>> df = read_parquet("hf://datasets/BAAI/Infinity-Instruct/7M/*.parquet") +>>> df = spark.read.format("huggingface").option("config", "7M").load("BAAI/Infinity-Instruct") >>> df.show() +---+----------------------------+-----+----------+--------------------+ | id| conversations|label|langdetect| source| @@ -191,12 +106,19 @@ After logging-in to access the gated repository, we can run: +---+----------------------------+-----+----------+--------------------+ ``` -To compute the number of dialogues per language we run this code. -The `columns` argument is useful to only load the data we need, since PySpark doesn't enable predicate push-down in this case. -There is also a `filters` argument to only load data with values within a certain range. +This loads the dataset in a streaming fashion, and the output DataFrame has one partition per data file in the dataset to enable efficient distributed processing. + +To compute the number of dialogues per language we run this code that uses the `columns` option and a `groupBy()` operation. +The `columns` option is useful to only load the data we need, since PySpark doesn't enable predicate push-down with the Data Source API. +There is also a `filters` option to only load data with values within a certain range. ```python ->>> df_langdetect_only = read_parquet("hf://datasets/BAAI/Infinity-Instruct/7M/*.parquet", columns=["langdetect"]) +>>> df_langdetect_only = ( +... spark.read.format("huggingface") +... .option("config", "7M") +... .option("columns", '["langdetect"]') +... .load("BAAI/Infinity-Instruct") +... ) >>> df_langdetect_only.groupBy("langdetect").count().show() +----------+-------+ |langdetect| count| @@ -209,8 +131,12 @@ There is also a `filters` argument to only load data with values within a certai To filter the dataset and only keep dialogues in Chinese: ```python ->>> criteria = [("langdetect", "=", "zh-cn")] ->>> df_chinese_only = read_parquet("hf://datasets/BAAI/Infinity-Instruct/7M/*.parquet", filters=criteria) +>>> df_chinese_only = ( +... spark.read.format("huggingface") +... .option("config", "7M") +... .option("filters", '[("langdetect", "=", "zh-cn")]') +... .load("BAAI/Infinity-Instruct") +... ) >>> df_chinese_only.show() +---+----------------------------+-----+----------+----------+ | id| conversations|label|langdetect| source| @@ -238,14 +164,37 @@ To filter the dataset and only keep dialogues in Chinese: +---+----------------------------+-----+----------+----------+ ``` +It is also possible to apply filters or remove columns on the loaded DataFrame, but it is more efficient to do it while loading, especially on Parquet datasets. +Indeed, Parquet contains metadata at the file and row group level, which allows to skip entire parts of the dataset that don't contain samples that satisfy the criteria. Columns in Parquet can also be loaded indepentently, whch allows to skip the excluded columns and avoid loading unnecessary data. + +### Options + +Here is the list of available options you can pass to `read..option()`: + +* `config` (string): select a dataset subset/config +* `split` (string): select a dataset split (default is "train") +* `token` (string): your Hugging Face token + +For Parquet datasets: +* `columns` (string): select a subset of columns to load, e.g. `'["id"]'` +* `filters` (string): to skip files and row groups that don't match a criteria, e.g. `'["source", "=", "code_exercises"]'`. Filters are passed to [pyarrow.parquet.ParquetDataset](https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetDataset.html). + +Any other option is passed as an argument to [datasets.load_dataset] (https://huggingface.co/docs/datasets/en/package_reference/loading_methods#datasets.load_dataset) + ### Run SQL queries Once you have your PySpark Dataframe ready, you can run SQL queries using `spark.sql`: ```python +>>> import pyspark_huggingface >>> from pyspark.sql import SparkSession >>> spark = SparkSession.builder.appName("demo").getOrCreate() ->>> df = read_parquet("hf://datasets/BAAI/Infinity-Instruct/7M/*.parquet", columns=["source"]) +>>> df = ( +... spark.read.format("huggingface") +... .option("config", "7M") +... .option("columns", '["source"]') +... .load("BAAI/Infinity-Instruct") +... ) >>> spark.sql("SELECT source, count(*) AS total FROM {df} GROUP BY source ORDER BY total DESC", df=df).show() +--------------------+-------+ | source| total| @@ -272,120 +221,26 @@ Once you have your PySpark Dataframe ready, you can run SQL queries using `spark +--------------------+-------+ ``` +Again, specifying the `columns` option is not necessary, but is useful to avoid loading unnecessary data and make the query faster. ## Write -We also provide a helper function to write datasets in a distributed manner to a Hugging Face repository. - -You can write a PySpark Dataframe to Hugging Face using this `write_parquet` helper function based on the `huggingface_hub` API. -In particular it uses the `preupload_lfs_files` utility to upload Parquet files in parallel in a distributed manner, and only commits the files once they're all uploaded: - +You can write a PySpark Dataframe to Hugging Face with the "huggingface" Data Source. +It uploads Parquet files in parallel in a distributed manner, and only commits the files once they're all uploaded. +It works like this: ```python -import math -import pickle -import tempfile -from functools import partial -from typing import Iterator, Optional - -import pyarrow as pa -import pyarrow.parquet as pq -from huggingface_hub import CommitOperationAdd, HfFileSystem -from pyspark.sql.dataframe import DataFrame -from pyspark.sql.pandas.types import from_arrow_schema, to_arrow_schema - - -def _preupload(iterator: Iterator[pa.RecordBatch], path: str, schema: pa.Schema, filesystem: HfFileSystem, row_group_size: Optional[int] = None, **kwargs) -> Iterator[pa.RecordBatch]: - resolved_path = filesystem.resolve_path(path) - with tempfile.NamedTemporaryFile(suffix=".parquet") as temp_file: - with pq.ParquetWriter(temp_file.name, schema=schema, **kwargs) as writer: - for batch in iterator: - writer.write_batch(batch, row_group_size=row_group_size) - addition = CommitOperationAdd(path_in_repo=temp_file.name, path_or_fileobj=temp_file.name) - filesystem._api.preupload_lfs_files(repo_id=resolved_path.repo_id, additions=[addition], repo_type=resolved_path.repo_type, revision=resolved_path.revision) - yield pa.record_batch({"addition": [pickle.dumps(addition)]}, schema=pa.schema({"addition": pa.binary()})) - - -def _commit(iterator: Iterator[pa.RecordBatch], path: str, filesystem: HfFileSystem, max_operations_per_commit=50) -> Iterator[pa.RecordBatch]: - resolved_path = filesystem.resolve_path(path) - additions: list[CommitOperationAdd] = [pickle.loads(addition) for addition in pa.Table.from_batches(iterator, schema=pa.schema({"addition": pa.binary()}))[0].to_pylist()] - num_commits = math.ceil(len(additions) / max_operations_per_commit) - for shard_idx, addition in enumerate(additions): - addition.path_in_repo = resolved_path.path_in_repo.replace("{shard_idx:05d}", f"{shard_idx:05d}") - for i in range(0, num_commits): - operations = additions[i * max_operations_per_commit : (i + 1) * max_operations_per_commit] - commit_message = "Upload using PySpark" + (f" (part {i:05d}-of-{num_commits:05d})" if num_commits > 1 else "") - filesystem._api.create_commit(repo_id=resolved_path.repo_id, repo_type=resolved_path.repo_type, revision=resolved_path.revision, operations=operations, commit_message=commit_message) - yield pa.record_batch({"path": [addition.path_in_repo for addition in operations]}, schema=pa.schema({"path": pa.string()})) - - -def write_parquet(df: DataFrame, path: str, **kwargs) -> None: - """ - Write Parquet files to Hugging Face using PyArrow. - - It uploads Parquet files in a distributed manner in two steps: - - 1. Preupload the Parquet files in parallel in a distributed banner - 2. Commit the preuploaded files - - Authenticate using `huggingface-cli login` or passing a token - using the `storage_options` argument: `storage_options={"token": "hf_xxx"}` - - Parameters - ---------- - path : str - Path of the file or directory. Prefix with a protocol like `hf://` to read from Hugging Face. - It writes Parquet files in the form "part-xxxxx.parquet", or to a single file if `path ends with ".parquet". - - **kwargs - Any additional kwargs are passed to pyarrow.parquet.ParquetWriter. - - Returns - ------- - DataFrame - DataFrame based on parquet file. - - Examples - -------- - >>> spark.createDataFrame(pd.DataFrame({"foo": range(5), "bar": range(5, 10)})) - >>> # Save to one file - >>> write_parquet(df, "hf://datasets/username/dataset/data.parquet") - >>> # OR save to a directory (possibly in many files) - >>> write_parquet(df, "hf://datasets/username/dataset") - """ - filesystem: HfFileSystem = kwargs.pop("filesystem", HfFileSystem(**kwargs.pop("storage_options", {}))) - if path.endswith(".parquet") or path.endswith(".pq"): - df = df.coalesce(1) - else: - path += "/part-{shard_idx:05d}.parquet" - df.mapInArrow( - partial(_preupload, path=path, schema=to_arrow_schema(df.schema), filesystem=filesystem, **kwargs), - from_arrow_schema(pa.schema({"addition": pa.binary()})), - ).repartition(1).mapInArrow( - partial(_commit, path=path, filesystem=filesystem), - from_arrow_schema(pa.schema({"path": pa.string()})), - ).collect() +>>> import pyspark_huggingface +>>> df.write.format("huggingface").save("username/dataset_name") ``` Here is how we can use this function to write the filtered version of the [BAAI/Infinity-Instruct](https://huggingface.co/datasets/BAAI/Infinity-Instruct) dataset back to Hugging Face. First you need to [create a dataset repository](https://huggingface.co/new-dataset), e.g. `username/Infinity-Instruct-Chinese-Only` (you can set it to private if you want). -Then, make sure you are authenticated and you can run: +Then, make sure you are authenticated and you can use the "huggingface" Data Source, set the `mode` to "overwrite" (or "append" if you want to extend an existing dataset), and push to Hugging Face with `.save()`: ```python ->>> write_parquet(df_chinese_only, "hf://datasets/username/Infinity-Instruct-Chinese-Only") -tmph9jwu9py.parquet: 100%|██████████| 50.5M/50.5M [00:03<00:00, 14.6MB/s] -tmp0oqt99nc.parquet: 100%|██████████| 50.8M/50.8M [00:02<00:00, 17.9MB/s] -tmpgnizkwqp.parquet: 100%|██████████| 50.5M/50.5M [00:02<00:00, 19.6MB/s] -tmpanm04k4n.parquet: 100%|██████████| 51.4M/51.4M [00:02<00:00, 22.9MB/s] -tmp14uy9oqb.parquet: 100%|██████████| 50.4M/50.4M [00:02<00:00, 23.0MB/s] -tmpcp8t_qdl.parquet: 100%|██████████| 50.4M/50.4M [00:02<00:00, 23.5MB/s] -tmpjui5mns8.parquet: 100%|██████████| 50.3M/50.3M [00:02<00:00, 24.1MB/s] -tmpydqh6od1.parquet: 100%|██████████| 50.9M/50.9M [00:02<00:00, 23.8MB/s] -tmp52f2t8tu.parquet: 100%|██████████| 50.5M/50.5M [00:02<00:00, 23.7MB/s] -tmpg7egv3ye.parquet: 100%|██████████| 50.1M/50.1M [00:06<00:00, 7.68MB/s] -tmp2s0fq2hm.parquet: 100%|██████████| 50.8M/50.8M [00:02<00:00, 18.1MB/s] -tmpmj97ab30.parquet: 100%|██████████| 71.3M/71.3M [00:02<00:00, 23.9MB/s] +>>> df_chinese_only.write.format("huggingface").mode("overwrite").save("username/Infinity-Instruct-Chinese-Only2") ```
@@ -393,10 +248,17 @@ tmpmj97ab30.parquet: 100%|██████████| 71.3M/71.3M [00:02<00:
-## Run in JupyterLab on Hugging Face Spaces +### Mode + +Two modes are available when pushing a dataset to Hugging Face: + +* "overwrite": overwrite the dataset if it already exists +* "append": append the dataset to an existing dataset + +### Options -You can duplicate the [Spark on HF JupyterLab](https://huggingface.co/spaces/lhoestq/Spark-on-HF-JupyterLab) Space to get a Notebook with PySpark and those helper functions pre-installed. +Here is the list of available options you can pass to `write.option()`: -Click on "Duplicate Space", choose a name for your Space, select your hardware and you are ready: +* `token` (string): your Hugging Face token - +Contributions are welcome to add more options here, in particular `subset` and `split`.