Skip to content

Commit 28c1268

Browse files
authored
feat: Allow Ray 2.5 & downgrade tox (#2338)
* Allow Ray 2.5, upgrade modin & downgrade tox due to dependency version conflicts * [skip ci] Remove generics * Force ray 2.5 * Downgrade Modin * [skip ci] Typing fixes * Allow ray 2+
1 parent af2ed51 commit 28c1268

File tree

11 files changed

+2518
-2577
lines changed

11 files changed

+2518
-2577
lines changed

awswrangler/distributed/ray/_executor.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ def map(self, func: Callable[..., MapOutputType], _: Optional["BaseClient"], *ar
2626
return list(func(*arg) for arg in zip(itertools.repeat(None), *args))
2727

2828

29-
@ray.remote
29+
@ray.remote # type: ignore[attr-defined]
3030
class AsyncActor:
3131
async def run_concurrent(self, func: Callable[..., MapOutputType], *args: Any) -> MapOutputType:
3232
return func(*args)

awswrangler/distributed/ray/_utils.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@ def _estimate_avail_cpus(cur_pg: Optional[PlacementGroup]) -> int:
1818
Args:
1919
cur_pg: The current placement group, if any.
2020
"""
21-
cluster_cpus = int(ray.cluster_resources().get("CPU", 1))
22-
cluster_gpus = int(ray.cluster_resources().get("GPU", 0))
21+
cluster_cpus = int(ray.cluster_resources().get("CPU", 1)) # type: ignore[attr-defined]
22+
cluster_gpus = int(ray.cluster_resources().get("GPU", 0)) # type: ignore[attr-defined]
2323

2424
# If we're in a placement group, we shouldn't assume the entire cluster's
2525
# resources are available for us to use. Estimate an upper bound on what's

awswrangler/distributed/ray/datasources/arrow_csv_datasource.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ def _read_stream( # type: ignore[override] # pylint: disable=arguments-differ
5959
def _write_block( # type: ignore[override] # pylint: disable=arguments-differ
6060
self,
6161
f: pa.NativeFile,
62-
block: BlockAccessor[Any],
62+
block: BlockAccessor,
6363
**writer_args: Any,
6464
) -> None:
6565
write_options_dict = writer_args.get("write_options", {})

awswrangler/distributed/ray/datasources/arrow_orc_datasource.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ def _open_input_source(
4444
def _write_block( # type: ignore[override]
4545
self,
4646
f: pa.NativeFile,
47-
block: BlockAccessor[Any],
47+
block: BlockAccessor,
4848
pandas_kwargs: Optional[Dict[str, Any]],
4949
**writer_args: Any,
5050
) -> None:

awswrangler/distributed/ray/datasources/arrow_parquet_base_datasource.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ def _open_input_source(
7070
def _write_block( # type: ignore[override]
7171
self,
7272
f: pa.NativeFile,
73-
block: BlockAccessor[Any],
73+
block: BlockAccessor,
7474
**writer_args: Any,
7575
) -> None:
7676
schema: Optional[pa.schema] = writer_args.get("schema", None)

awswrangler/distributed/ray/datasources/arrow_parquet_datasource.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -82,14 +82,14 @@ class ArrowParquetDatasource(ArrowParquetBaseDatasource): # pylint: disable=abs
8282
relative to the root S3 prefix.
8383
"""
8484

85-
def create_reader(self, **kwargs: Dict[str, Any]) -> Reader[Any]:
85+
def create_reader(self, **kwargs: Dict[str, Any]) -> Reader:
8686
"""Return a Reader for the given read arguments."""
8787
return _ArrowParquetDatasourceReader(**kwargs) # type: ignore[arg-type]
8888

8989
def _write_block( # type: ignore[override] # pylint: disable=arguments-differ, arguments-renamed, unused-argument
9090
self,
9191
f: "pyarrow.NativeFile",
92-
block: BlockAccessor[Any],
92+
block: BlockAccessor,
9393
pandas_kwargs: Optional[Dict[str, Any]],
9494
**writer_args: Any,
9595
) -> None:
@@ -185,7 +185,7 @@ def _deserialize_pieces_with_retry(
185185
raise final_exception # type: ignore[misc]
186186

187187

188-
class _ArrowParquetDatasourceReader(Reader[Any]): # pylint: disable=too-many-instance-attributes
188+
class _ArrowParquetDatasourceReader(Reader): # pylint: disable=too-many-instance-attributes
189189
def __init__(
190190
self,
191191
paths: Union[str, List[str]],
@@ -194,7 +194,7 @@ def __init__(
194194
columns: Optional[List[str]] = None,
195195
schema: Optional[Schema] = None,
196196
meta_provider: ParquetMetadataProvider = DefaultParquetMetadataProvider(),
197-
_block_udf: Optional[Callable[[Block[Any]], Block[Any]]] = None,
197+
_block_udf: Optional[Callable[[Block], Block]] = None,
198198
**reader_args: Any,
199199
):
200200
import pyarrow as pa
@@ -209,7 +209,7 @@ def __init__(
209209
import ray
210210
from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy
211211

212-
self._local_scheduling = NodeAffinitySchedulingStrategy(ray.get_runtime_context().get_node_id(), soft=False)
212+
self._local_scheduling = NodeAffinitySchedulingStrategy(ray.get_runtime_context().get_node_id(), soft=False) # type: ignore[attr-defined]
213213

214214
dataset_kwargs = reader_args.pop("dataset_kwargs", {})
215215
try:
@@ -361,7 +361,7 @@ def _estimate_files_encoding_ratio(self) -> float:
361361
# 1. Use _add_table_partitions to add partition columns. The behavior is controlled by Pandas SDK
362362
# native `dataset` parameter. The partitions are loaded relative to the `path_root` prefix.
363363
def _read_pieces(
364-
block_udf: Optional[Callable[[Block[Any]], Block[Any]]],
364+
block_udf: Optional[Callable[[Block], Block]],
365365
reader_args: Any,
366366
columns: Optional[List[str]],
367367
schema: Optional[Union[type, "pyarrow.lib.Schema"]],

awswrangler/distributed/ray/datasources/pandas_file_based_datasource.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ def _get_write_path_for_block(
3535
*,
3636
filesystem: Optional["pyarrow.fs.FileSystem"] = None,
3737
dataset_uuid: Optional[str] = None,
38-
block: Optional[Block[Any]] = None,
38+
block: Optional[Block] = None,
3939
block_index: Optional[int] = None,
4040
file_format: Optional[str] = None,
4141
) -> str:
@@ -64,7 +64,7 @@ def __init__(self) -> None:
6464
def _read_file(self, f: pyarrow.NativeFile, path: str, **reader_args: Any) -> pd.DataFrame:
6565
raise NotImplementedError()
6666

67-
def do_write( # type: ignore[override] # pylint: disable=arguments-differ
67+
def do_write( # pylint: disable=arguments-differ
6868
self,
6969
blocks: List[ObjectRef[pd.DataFrame]],
7070
metadata: List[BlockMetadata],
@@ -141,9 +141,9 @@ def write_block(write_path: str, block: pd.DataFrame) -> str:
141141

142142
return write_tasks
143143

144-
def write(
144+
def write( # type: ignore[override]
145145
self,
146-
blocks: Iterable[Union[Block[pd.DataFrame], ObjectRef[pd.DataFrame]]],
146+
blocks: Iterable[Union[Block, ObjectRef[pd.DataFrame]]],
147147
ctx: TaskContext,
148148
path: str,
149149
dataset_uuid: str,
@@ -188,7 +188,7 @@ def write_block(write_path: str, block: pd.DataFrame) -> str:
188188

189189
file_suffix = self._get_file_suffix(self._FILE_EXTENSION, compression)
190190

191-
builder = DelegatingBlockBuilder() # type: ignore[no-untyped-call,var-annotated]
191+
builder = DelegatingBlockBuilder() # type: ignore[no-untyped-call]
192192
for block in blocks:
193193
# Dereference the block if ObjectRef is passed
194194
builder.add_block(ray_get(block) if isinstance(block, ray.ObjectRef) else block) # type: ignore[arg-type]
@@ -198,7 +198,7 @@ def write_block(write_path: str, block: pd.DataFrame) -> str:
198198
path,
199199
filesystem=filesystem,
200200
dataset_uuid=dataset_uuid,
201-
block=block, # type: ignore[arg-type]
201+
block=block,
202202
block_index=ctx.task_idx,
203203
file_format=file_suffix,
204204
)
@@ -211,7 +211,7 @@ def _get_file_suffix(self, file_format: str, compression: Optional[str]) -> str:
211211
def _write_block(
212212
self,
213213
f: "pyarrow.NativeFile",
214-
block: BlockAccessor[Any],
214+
block: BlockAccessor,
215215
writer_args_fn: Callable[[], Dict[str, Any]] = lambda: {},
216216
**writer_args: Any,
217217
) -> None:

awswrangler/distributed/ray/datasources/pandas_text_datasource.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ def _read_file(self, f: pyarrow.NativeFile, path: str, **reader_args: Any) -> pd
7272
def _write_block( # type: ignore[override] # pylint: disable=arguments-differ, arguments-renamed
7373
self,
7474
f: io.TextIOWrapper,
75-
block: BlockAccessor[Any],
75+
block: BlockAccessor,
7676
pandas_kwargs: Optional[Dict[str, Any]],
7777
**writer_args: Any,
7878
) -> None:

awswrangler/distributed/ray/modin/_utils.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ def _block_to_df(
2929
return _table_to_df(table=block._table, kwargs=to_pandas_kwargs) # pylint: disable=protected-access
3030

3131

32-
def _ray_dataset_from_df(df: Union[pd.DataFrame, modin_pd.DataFrame]) -> Dataset[Any]:
32+
def _ray_dataset_from_df(df: Union[pd.DataFrame, modin_pd.DataFrame]) -> Dataset:
3333
"""Create Ray dataset from supported types of data frames."""
3434
if isinstance(df, modin_pd.DataFrame):
3535
return from_modin(df) # type: ignore[no-any-return]
@@ -39,7 +39,7 @@ def _ray_dataset_from_df(df: Union[pd.DataFrame, modin_pd.DataFrame]) -> Dataset
3939

4040

4141
def _to_modin(
42-
dataset: Union[ray.data.Dataset[Any], ray.data.Dataset[pd.DataFrame]],
42+
dataset: Dataset,
4343
to_pandas_kwargs: Optional[Dict[str, Any]] = None,
4444
ignore_index: Optional[bool] = True,
4545
) -> modin_pd.DataFrame:

0 commit comments

Comments
 (0)