Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 4 additions & 21 deletions .github/workflows/run_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,32 +17,15 @@ jobs:
matrix:
os: ["ubuntu-20.04", "macos-latest", "windows-latest"]
# Remove Python 3.7.9 on 27 Jun 2023: https://endoflife.date/python
python-version: ["3.7.9", "3.8.10", "3.9.13", "3.10.8"]
pandas-version: ["1.0.5", "1.1.5", "1.2.5", "1.3.5", "1.4.3", "2.0", ""]
python-version: ["3.9.20", "3.10.15","3.11.11","3.12.8","3.13.1"]
pandas-version: ["1.2.5", "1.3.5", "1.4.3", "2.0.3","2.1.4" , ""]

exclude:
# see https://github.com/nalepae/pandarallel/pull/211#issuecomment-1362647674
- python-version: "3.8.10"
pandas-version: "1.0.5"
# Pandas 2.0 requires Python >= 3.8
- python-version: "3.7.9"
pandas-version: "2.0"
# Pandas 1.4.3 requires Python >= 3.8
- python-version: "3.7.9"
pandas-version: "1.4.3"
# Pandas 1.0.5 has to be fully rebuilt with Python >= 3.9.13 (taking > 10 min)
- python-version: "3.9.13"
pandas-version: "1.0.5"
# Pandas 1.0.5 has to be fully rebuilt with Python >= 3.9.13 (taking > 10 min)
- python-version: "3.10.8"
pandas-version: "1.0.5"
# Pandas 1.1.5 has to be fully rebuilt with Python >= 3.10.5 (taking > 10 min)
- python-version: "3.10.8"
pandas-version: "1.1.5"
# Pandas 1.2.5 has to be fully rebuilt with Python >= 3.10.5 (taking > 10 min)
- python-version: "3.10.8"
pandas-version: "1.2.5"



steps:
- uses: actions/checkout@v3
- name: Set up Python ${{ matrix.python-version }}
Expand Down
111 changes: 109 additions & 2 deletions docs/examples_mac_linux.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,66 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"# DataFrame.applymap"
"# DataFrame.map\n",
"map was introduced with pandas 2.1. Use applymap for earlier versions instead"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"df_size = int(1e7)\n",
"df = pd.DataFrame(dict(a=np.random.randint(1, 8, df_size),\n",
" b=np.random.rand(df_size)))"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def func(x):\n",
" return math.sin(x**2) - math.cos(x**2)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%%time\n",
"res = df.applymap(func)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%%time\n",
"res_parallel = df.parallel_map(func)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"res.equals(res_parallel)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# DataFrame.applymap\n",
"applymap was deprecated with pandas 2.1 use map instead"
]
},
{
Expand Down Expand Up @@ -157,6 +216,54 @@
"res.equals(res_parallel)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# DataFrame.agg"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"df_size = int(5e4)\n",
"df = pd.DataFrame(dict(a=np.random.rand(df_size),\n",
" b=np.random.rand(df_size)\n",
" c=np.random.rand(df_size)))"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%%time\n",
"res = df.agg(['min','max'],axis=1)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%%time\n",
"res_parallel = df.parallel_agg('min',axis=1)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"res.equals(res_parallel)"
]
},
{
"cell_type": "markdown",
"metadata": {},
Expand Down Expand Up @@ -539,7 +646,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.6"
"version": "3.12.3"
},
"mimetype": "text/x-python",
"name": "python",
Expand Down
110 changes: 109 additions & 1 deletion docs/examples_windows.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,67 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"# DataFrame.applymap"
"# DataFrame.map\n",
"map was introduced with pandas 2.1. Use applymap for earlier versions instead"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"df_size = int(1e7)\n",
"df = pd.DataFrame(dict(a=np.random.randint(1, 8, df_size),\n",
" b=np.random.rand(df_size)))"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def func(x):\n",
" import math\n",
" return math.sin(x**2) - math.cos(x**2)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%%time\n",
"res = df.map(func)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%%time\n",
"res_parallel = df.parallel_map(func)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"res.equals(res_parallel)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# DataFrame.applymap\n",
"applymap was deprecated with pandas 2.1 use map instead"
]
},
{
Expand Down Expand Up @@ -188,6 +248,54 @@
"res.equals(res_parallel)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# DataFrame.agg"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"df_size = int(5e4)\n",
"df = pd.DataFrame(dict(a=np.random.rand(df_size),\n",
" b=np.random.rand(df_size),\n",
" c=np.random.rand(df_size)))"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%%time\n",
"res = df.agg(['min','max'],axis=1)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%%time\n",
"res = df.parallel_agg(['min','max'],axis=1)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"res.equals(res_parallel)"
]
},
{
"cell_type": "markdown",
"metadata": {},
Expand Down
9 changes: 9 additions & 0 deletions pandarallel/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,15 @@ def initialize(
pd.DataFrame.parallel_apply = parallelize(
nb_workers, DataFrame.Apply, progress_bars_in_user_defined_function
)
pd.DataFrame.parallel_map = parallelize(
nb_workers,
DataFrame.Map,
progress_bars_in_user_defined_function_multiply_by_number_of_columns,
)
pd.DataFrame.parallel_agg = parallelize(
nb_workers, DataFrame.Agg, progress_bars_in_user_defined_function
)
# applymap is outdated, might get removed in future
pd.DataFrame.parallel_applymap = parallelize(
nb_workers,
DataFrame.ApplyMap,
Expand Down
70 changes: 69 additions & 1 deletion pandarallel/data_types/dataframe.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Any, Callable, Dict, Iterable, Iterator
from typing import Any, Callable, Dict, List, Iterable, Iterator
from types import GeneratorType

import pandas as pd
Expand Down Expand Up @@ -50,6 +50,7 @@ def reduce(
axis = 0 if isinstance(datas[0], pd.Series) else 1 - extra["axis"]
return pd.concat(datas, copy=False, axis=axis)

# applymap is outdated might be removed in future
class ApplyMap(DataType):
@staticmethod
def get_chunks(
Expand All @@ -73,3 +74,70 @@ def reduce(
datas: Iterable[pd.DataFrame], extra: Dict[str, Any]
) -> pd.DataFrame:
return pd.concat(datas, copy=False)


class Map(DataType):
@staticmethod
def get_chunks(
nb_workers: int, data: pd.DataFrame, **kwargs
) -> Iterator[pd.DataFrame]:
for chunk_ in chunk(data.shape[0], nb_workers):
yield data.iloc[chunk_]

@staticmethod
def work(
data: pd.DataFrame,
user_defined_function: Callable,
user_defined_function_args: tuple,
user_defined_function_kwargs: Dict[str, Any],
extra: Dict[str, Any],
) -> pd.DataFrame:
return data.map(user_defined_function)

@staticmethod
def reduce(
datas: Iterable[pd.DataFrame], extra: Dict[str, Any]
) -> pd.DataFrame:
return pd.concat(datas, copy=False)

class Agg(DataType):
@staticmethod
def get_chunks(
nb_workers: int, data: pd.DataFrame, **kwargs
) -> Iterator[pd.DataFrame]:
user_defined_function_kwargs = kwargs["user_defined_function_kwargs"]

axis_int = get_axis_int(user_defined_function_kwargs)
opposite_axis_int = 1 - axis_int

for chunk_ in chunk(data.shape[opposite_axis_int], nb_workers):
yield data.iloc[chunk_] if axis_int == 1 else data.iloc[:, chunk_]

@staticmethod
def work(
data: pd.DataFrame,
user_defined_functions: List[Callable],
user_defined_function_args: tuple,
user_defined_function_kwargs: Dict[str, Any],
extra: Dict[str, Any],
) -> pd.DataFrame:
return data.agg(
user_defined_functions,
*user_defined_function_args,
**user_defined_function_kwargs,
)

@staticmethod
def get_reduce_extra(
data: Any, user_defined_function_kwargs: Dict[str, Any]
) -> Dict[str, Any]:
return {"axis": get_axis_int(user_defined_function_kwargs)}

@staticmethod
def reduce(
datas: Iterable[pd.DataFrame], extra: Dict[str, Any]
) -> pd.DataFrame:
if isinstance(datas, GeneratorType):
datas = list(datas)
axis = 0 if isinstance(datas[0], pd.Series) else 1 - extra["axis"]
return pd.concat(datas, copy=False, axis=axis)
Loading
Loading