-
Notifications
You must be signed in to change notification settings - Fork 374
Update dask docs #1532
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Update dask docs #1532
Changes from 5 commits
ebf4a99
8c76816
31ed970
af4604e
dc5f8e9
d320b77
7ac13a5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,7 +1,14 @@ | ||
| # Dask | ||
|
|
||
| [Dask](https://github.com/dask/dask) is a parallel and distributed computing library that scales the existing Python and PyData ecosystem. | ||
| Since it uses [fsspec](https://filesystem-spec.readthedocs.io) to read and write remote data, you can use the Hugging Face paths ([`hf://`](/docs/huggingface_hub/guides/hf_file_system#integrations)) to read and write data on the Hub: | ||
|
|
||
| In particular, we can use Dask DataFrame to scale up pandas workflows. Dask DataFrame parallelizes pandas to handle large tabular data. It closely mirrors the pandas API, making it simple to transition from testing on a single dataset to processing the full dataset. Dask is particularly effective with Parquet, the default format on Hugging Face Datasets, as it supports rich data types, efficient columnar filtering, and compression. | ||
|
|
||
| A good practical use case for Dask is running data processing or model inference on a dataset in a distributed manner. See, for example, Coiled's excellent blog post on [Scaling AI-Based Data Processing with Hugging Face + Dask](https://huggingface.co/blog/dask-scaling). | ||
|
|
||
| # Read and Write | ||
|
|
||
| Since Dask uses [fsspec](https://filesystem-spec.readthedocs.io) to read and write remote data, you can use the Hugging Face paths ([`hf://`](/docs/huggingface_hub/guides/hf_file_system#integrations)) to read and write data on the Hub; | ||
|
|
||
| First you need to [Login with your Hugging Face account](/docs/huggingface_hub/quick-start#login), for example using: | ||
|
|
||
|
|
@@ -17,7 +24,8 @@ from huggingface_hub import HfApi | |
| HfApi().create_repo(repo_id="username/my_dataset", repo_type="dataset") | ||
| ``` | ||
|
|
||
| Finally, you can use [Hugging Face paths](/docs/huggingface_hub/guides/hf_file_system#integrations) in Dask: | ||
| Finally, you can use [Hugging Face paths](/docs/huggingface_hub/guides/hf_file_system#integrations) in Dask. | ||
| Dask DataFrame supports distributed writing to Parquet on Hugging Face, which uses commits to track dataset changes: | ||
|
|
||
| ```python | ||
| import dask.dataframe as dd | ||
|
|
@@ -30,6 +38,14 @@ df_valid.to_parquet("hf://datasets/username/my_dataset/validation") | |
| df_test .to_parquet("hf://datasets/username/my_dataset/test") | ||
| ``` | ||
|
|
||
| Since this creates one commit per file, it is recommended to squash the history after the upload: | ||
|
|
||
| ```python | ||
| from huggingface_hub import HfApi | ||
|
|
||
| HfApi().super_squash_history(repo_id=repo_id, repo_type="dataset") | ||
| ``` | ||
|
|
||
| This creates a dataset repository `username/my_dataset` containing your Dask dataset in Parquet format. | ||
| You can reload it later: | ||
|
|
||
|
|
@@ -45,3 +61,56 @@ df_test = dd.read_parquet("hf://datasets/username/my_dataset/test") | |
| ``` | ||
|
|
||
| For more information on the Hugging Face paths and how they are implemented, please refer to the [the client library's documentation on the HfFileSystem](/docs/huggingface_hub/guides/hf_file_system). | ||
|
|
||
| # Process data | ||
|
|
||
| To process a dataset in parallel using Dask, you can first define your data processing function for a pandas DataFrame or Series, and then use the Dask `map_partitions` function to apply this function to all the partitions of a dataset in parallel: | ||
|
|
||
| ```python | ||
| def dummy_count_words(texts): | ||
| return pd.Series([len(text.split(" ")) for text in texts]) | ||
| ``` | ||
|
|
||
| In pandas you can use this function on a text column: | ||
|
|
||
| ```python | ||
| # pandas API | ||
| df["num_words"] = dummy_count_words(df.text) | ||
| ``` | ||
|
|
||
| And in Dask you can run this function on every partition: | ||
|
|
||
| ```python | ||
| # Dask API: run the function on every partition | ||
| df["num_words"] = df.text.map_partitions(dummy_count_words, meta=int) | ||
| ``` | ||
|
|
||
| Note that you also need to provide `meta` which is the type of the pandas Series or DataFrame in the output of your function. | ||
| This is needed because Dask DataFrame uses a lazy API. Since Dask will only run the data processing once `.compute()` is called, it needs | ||
| the `meta` argument to know the type of the new column in the meantime. | ||
|
|
||
| # Predicate and Projection Pushdown | ||
|
|
||
| When reading Parquet data from Hugging Face, Dask automatically leverages the metadata in Parquet files to skip entire files or row groups if they are not needed. For example if you apply a filter (predicate) on a Hugging Face Dataset in Parquet format or if you select a subset of the columns (projection), Dask will read the metadata of the Paquet files to discard the parts that are not needed without downloading them. | ||
|
|
||
| This is possible thanks to the `dask-expr` package which is generally installed by default with Dask. | ||
|
|
||
| For example this subset of FineWeb-Edu contains many Parquet files. If you can filter the dataset to keep the text from recent CC dumps, Dask will skip most of the files and only download the data that match the filter: | ||
|
|
||
| ```python | ||
| import dask.dataframe as dd | ||
|
|
||
| df = dd.read_parquet("hf://datasets/HuggingFaceFW/fineweb-edu/sample/10BT/*.parquet") | ||
|
|
||
| # Dask will skip the files or row groups that don't | ||
| # match the query without downloading them. | ||
| df = df[df.dump >= "CC-MAIN-2023"] | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does Dask not still need to download the data to check the values in this column match this filter? From what I understood in the Polars case the predicate push down is usually used for skipping the reading of a column i.e. if you drop it later it doesn't bother to load it and/or doing a filtering step early on. Is Dask directly able to do this before loading?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So it will skip the row groups which don't have any row that matches the query using the row group metadata Then on the remaining row groups it will download the column used for filtering to apply the filter The other columns will be downloaded or not based on the other operations done on the dataset
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
yes correct ! would be cool to explain that here as well
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's super cool!! For some datasets the download time does seem to end up becoming a blocker so this is very neat! |
||
| ``` | ||
|
|
||
| Dask will also read only the required columns for your computation and skip the rest. This is useful when you want to manipulate a subset of the columns or for analytics: | ||
|
|
||
| ```python | ||
| # Dask will download the 'dump' and 'token_count' needed | ||
| # for the computation and skip the other columns. | ||
| df.token_count.mean().compute() | ||
| ``` | ||
Uh oh!
There was an error while loading. Please reload this page.