Skip to content

Commit 4028c08

Browse files
authored
(enhancement): Apply modin repartitioning where required only (#1701)
* (enhancement): Apply modin repartitioning where required only
1 parent 847f43a commit 4028c08

File tree

5 files changed

+37
-10
lines changed

5 files changed

+37
-10
lines changed

awswrangler/distributed/ray/_register.py

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,16 @@
33
from awswrangler._data_types import pyarrow_types_from_pandas
44
from awswrangler._distributed import MemoryFormatEnum, engine, memory_format
55
from awswrangler._utils import is_pandas_frame, table_refs_to_df
6-
from awswrangler.distributed.ray._core import ray_remote
6+
from awswrangler.distributed.ray import ray_remote
77
from awswrangler.lakeformation._read import _get_work_unit_results
88
from awswrangler.s3._delete import _delete_objects
99
from awswrangler.s3._read_parquet import _read_parquet, _read_parquet_metadata_file
1010
from awswrangler.s3._read_text import _read_text
1111
from awswrangler.s3._select import _select_object_content, _select_query
1212
from awswrangler.s3._wait import _wait_object_batch
1313
from awswrangler.s3._write_dataset import _to_buckets, _to_partitions
14-
from awswrangler.s3._write_parquet import _to_parquet, to_parquet
15-
from awswrangler.s3._write_text import _to_text, to_csv, to_json
14+
from awswrangler.s3._write_parquet import _to_parquet
15+
from awswrangler.s3._write_text import _to_text
1616

1717

1818
def register_ray() -> None:
@@ -28,7 +28,6 @@ def register_ray() -> None:
2828
engine.register_func(func, ray_remote(func))
2929

3030
if memory_format.get() == MemoryFormatEnum.MODIN:
31-
from awswrangler.distributed.ray.modin._core import modin_repartition
3231
from awswrangler.distributed.ray.modin._data_types import pyarrow_types_from_pandas_distributed
3332
from awswrangler.distributed.ray.modin._utils import _arrow_refs_to_df, _is_pandas_or_modin_frame
3433
from awswrangler.distributed.ray.modin.s3._read_parquet import _read_parquet_distributed
@@ -48,9 +47,6 @@ def register_ray() -> None:
4847
_to_parquet: _to_parquet_distributed,
4948
_to_partitions: _to_partitions_distributed,
5049
_to_text: _to_text_distributed,
51-
to_csv: modin_repartition(to_csv),
52-
to_json: modin_repartition(to_json),
53-
to_parquet: modin_repartition(to_parquet),
5450
table_refs_to_df: _arrow_refs_to_df,
5551
is_pandas_frame: _is_pandas_or_modin_frame,
5652
}.items():

awswrangler/distributed/ray/modin/_data_types.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
import ray
77

88
from awswrangler._data_types import pyarrow_types_from_pandas
9-
from awswrangler.distributed.ray._core import ray_get, ray_remote
9+
from awswrangler.distributed.ray import ray_get, ray_remote
1010

1111

1212
def pyarrow_types_from_pandas_distributed(

awswrangler/distributed/ray/modin/s3/_write_dataset.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
from awswrangler._distributed import engine
1111
from awswrangler.distributed.ray import ray_get, ray_remote
12+
from awswrangler.distributed.ray.modin import modin_repartition
1213
from awswrangler.s3._write_concurrent import _WriteProxy
1314
from awswrangler.s3._write_dataset import _delete_objects, _get_bucketing_series, _to_partitions
1415

@@ -22,6 +23,7 @@ def _retrieve_paths(values: Union[str, List[Any]]) -> Iterator[str]:
2223
yield values
2324

2425

26+
@modin_repartition
2527
def _to_buckets_distributed( # pylint: disable=unused-argument
2628
df: pd.DataFrame,
2729
func: Callable[..., List[str]],
@@ -109,6 +111,7 @@ def _write_partitions_distributed(
109111
return prefix, df_group.name, paths
110112

111113

114+
@modin_repartition
112115
def _to_partitions_distributed( # pylint: disable=unused-argument
113116
df: pd.DataFrame,
114117
func: Callable[..., List[str]],

tests/load/test_s3.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
1+
import math
2+
3+
import modin.config as cfg
14
import modin.pandas as pd
5+
import numpy as np
26
import pytest
37
import ray
8+
from modin.distributed.dataframe.pandas import unwrap_partitions
49

510
import awswrangler as wr
611

@@ -192,3 +197,27 @@ def test_wait_object_not_exists(path: str, benchmark_time: int) -> None:
192197
wr.s3.wait_objects_not_exist(file_paths, parallelism=16)
193198

194199
assert timer.elapsed_time < benchmark_time
200+
201+
202+
@pytest.mark.parametrize("size", [(5000, 5000), (1, 5000), (5000, 1), (1, 1)])
203+
def test_wide_df(size, path) -> None:
204+
df = pd.DataFrame(np.random.randint(0, 100, size=size))
205+
df.columns = df.columns.map(str)
206+
207+
num_cols = size[0]
208+
df["int"] = np.random.choice(["1", "2", None], num_cols)
209+
df["decimal"] = np.random.choice(["1.0", "2.0", None], num_cols)
210+
df["date"] = np.random.choice(["2020-01-01", "2020-01-02", None], num_cols)
211+
df["par0"] = np.random.choice(["A", "B"], num_cols)
212+
213+
partitions_shape = np.array(unwrap_partitions(df)).shape
214+
assert partitions_shape[1] == min(math.ceil(len(df.columns) / cfg.MinPartitionSize.get()), cfg.NPartitions.get())
215+
216+
dtype = {
217+
"int": "tinyint",
218+
"decimal": "double",
219+
"date": "date",
220+
}
221+
222+
result = wr.s3.to_csv(df=df, path=path, dataset=True, dtype=dtype, partition_cols=["par0"])
223+
assert len(result["paths"]) == partitions_shape[0] * len(df["par0"].unique())

tests/unit/test_s3_text.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -163,11 +163,10 @@ def test_csv_dataset_header_modes(path, mode, glue_database, glue_table):
163163
index=False,
164164
header=True,
165165
)
166-
dfs_conc = pd.concat(dfs)
167166
df_res = wr.s3.read_csv(path=path0)
168167

169168
if mode == "append":
170-
assert len(df_res) == len(dfs_conc)
169+
assert len(df_res) == sum([len(df) for df in dfs])
171170
else:
172171
assert df_res.equals(dfs[-1])
173172

0 commit comments

Comments
 (0)