|
771 | 771 | "\n", |
772 | 772 | "You can open an issue on the [Dask issue tracker](https://github.com/dask/dask/issues) to check how feasible the function could be to implement, and you can consider contributing this function to Dask.\n", |
773 | 773 | "\n", |
774 | | - "If it's a custom function or tricky to implement, `dask.dataframe` provides a few methods to make applying custom functions to Dask DataFrames easier:\n", |
| 774 | + "In case it's a custom function or tricky to implement, `dask.dataframe` provides a few methods to make applying custom functions to Dask DataFrames easier:\n", |
775 | 775 | "\n", |
776 | 776 | "- [`map_partitions`](https://docs.dask.org/en/latest/generated/dask.dataframe.DataFrame.map_partitions.html): to run a function on each partition (each pandas DataFrame) of the Dask DataFrame\n", |
777 | 777 | "- [`map_overlap`](https://docs.dask.org/en/latest/generated/dask.dataframe.rolling.map_overlap.html): to run a function on each partition (each pandas DataFrame) of the Dask DataFrame, with some rows shared between neighboring partitions\n", |
|
796 | 796 | "help(ddf.map_partitions)" |
797 | 797 | ] |
798 | 798 | }, |
| 799 | + { |
| 800 | + "cell_type": "markdown", |
| 801 | + "metadata": {}, |
| 802 | + "source": [ |
| 803 | + "The \"Distance\" column in `ddf` is currently in miles. Let's say we want to convert the units to kilometers and we have a general helper function as shown below. In this case, we can use `map_partitions` to apply this function across each of the internal pandas `DataFrame`s in parallel. " |
| 804 | + ] |
| 805 | + }, |
799 | 806 | { |
800 | 807 | "cell_type": "code", |
801 | 808 | "execution_count": null, |
802 | 809 | "metadata": {}, |
803 | 810 | "outputs": [], |
804 | 811 | "source": [ |
805 | | - "# TODO: update with df.a + df.b - df.c\n", |
| 812 | + "def my_custom_converter(df, multiplier=1):\n", |
| 813 | + " return df * multiplier\n", |
806 | 814 | "\n", |
807 | | - "def my_custom_function(df, parameter_a=True):\n", |
808 | | - " # toy function just for demonstration\n", |
809 | | - " if parameter_a:\n", |
810 | | - " # do something with df\n", |
811 | | - " return df\n", |
812 | | - " return df\n", |
| 815 | + "meta = pd.Series(name=\"Distance\", dtype=\"float64\")\n", |
813 | 816 | "\n", |
814 | | - "meta = ddf.head()\n", |
815 | | - "\n", |
816 | | - "ddf = ddf.map_partitions(my_custom_function, parameter_a=True, meta=meta)" |
| 817 | + "distance_km = ddf.Distance.map_partitions(my_custom_converter, multiplier=0.6, meta=meta)" |
| 818 | + ] |
| 819 | + }, |
| 820 | + { |
| 821 | + "cell_type": "code", |
| 822 | + "execution_count": null, |
| 823 | + "metadata": {}, |
| 824 | + "outputs": [], |
| 825 | + "source": [ |
| 826 | + "distance_km.visualize()" |
| 827 | + ] |
| 828 | + }, |
| 829 | + { |
| 830 | + "cell_type": "code", |
| 831 | + "execution_count": null, |
| 832 | + "metadata": {}, |
| 833 | + "outputs": [], |
| 834 | + "source": [ |
| 835 | + "distance_km.head()" |
817 | 836 | ] |
818 | 837 | }, |
819 | 838 | { |
820 | 839 | "cell_type": "markdown", |
821 | 840 | "metadata": {}, |
822 | 841 | "source": [ |
823 | | - "We suggest using Dask's `apply` and `map` functions when you can because they already use `map_partitions` internally.\n", |
| 842 | + "### What is `meta`?\n", |
| 843 | + "\n", |
| 844 | + "Since Dask operates lazily, it doesn't always have enough information to infer the output structure (which includes datatypes) of certain operations.\n", |
824 | 845 | "\n", |
825 | | - "Using the correct `meta` is important here, because your output will depend on it. A few notes about `meta`:\n", |
826 | | - "* Think of `meta` as a suggestion that Dask uses while it is operating lazily. Importantly `meta` _never infers with the output structure_.\n", |
827 | | - "* The best way to specify `meta` is using a small pandas DataFrame or Series that match the structure of your final output." |
| 846 | + "`meta` is a _suggestion_ to Dask about the output of your computation. Importantly, `meta` _never infers with the output structure_. Dask uses this `meta` until it can determine the actual output structure.\n", |
| 847 | + "\n", |
| 848 | + "Even though there are many ways to define `meta`, we suggest using a small pandas Series or DataFrame that matches the structure of your final output." |
828 | 849 | ] |
829 | 850 | }, |
830 | 851 | { |
|
840 | 861 | "cell_type": "markdown", |
841 | 862 | "metadata": {}, |
842 | 863 | "source": [ |
843 | | - "It's good practice to close any Dask cluster you create:" |
| 864 | + "It's good practice to always close any Dask cluster you create:" |
844 | 865 | ] |
845 | 866 | }, |
846 | 867 | { |
|
851 | 872 | "source": [ |
852 | 873 | "client.shutdown()" |
853 | 874 | ] |
854 | | - }, |
855 | | - { |
856 | | - "cell_type": "markdown", |
857 | | - "metadata": {}, |
858 | | - "source": [ |
859 | | - "## Final thoughts" |
860 | | - ] |
861 | | - }, |
862 | | - { |
863 | | - "cell_type": "markdown", |
864 | | - "metadata": {}, |
865 | | - "source": [ |
866 | | - "`dask.dataframe` operations use `pandas` operations internally. Generally, they run at about the same speed except in the following two cases:\n", |
867 | | - "\n", |
868 | | - "1. Dask introduces a bit of overhead, around 1ms per task. This is usually negligible.\n", |
869 | | - "2. When pandas releases the GIL `dask.dataframe` can call several pandas operations in parallel within a process, increasing speed somewhat proportional to the number of cores. For operations which don't release the GIL, multiple processes would be needed to get the same speedup.\n", |
870 | | - "\n", |
871 | | - "**To reiterate**, in this tutorial you used a small dataset consisting of a few CSV files. This dataset is small enough that you would normally use pandas. We've chosen this size so that exercises finish quickly. `dask.dataframe` only really becomes meaningful for problems significantly larger than this, when pandas breaks with the dreaded \n", |
872 | | - "\n", |
873 | | - " MemoryError: ...\n", |
874 | | - " \n", |
875 | | - "Furthermore, the distributed scheduler (you will learn about it later) allows the same `dask.dataframe` expressions to be executed across a cluster. To enable massive \"big data\" processing, one could execute data ingestion functions such as `read_parquet`, where the data is held on storage accessible to every worker node (e.g., amazon's S3), and because most operations begin by selecting only some columns, transforming and filtering the data, only relatively small amounts of data need to be communicated between the machines." |
876 | | - ] |
877 | 875 | } |
878 | 876 | ], |
879 | 877 | "metadata": { |
|
0 commit comments