Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion awswrangler/distributed/ray/_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def __init__(self, max_concurrency: int) -> None:
super().__init__()

_logger.debug("Initializing Ray Actor with maximum concurrency %d", max_concurrency)
self._actor: ray.actor.ActorHandle = AsyncActor.options(max_concurrency=max_concurrency).remote() # type: ignore[attr-defined]
self._actor: "ray.actor.ActorHandle[AsyncActor]" = AsyncActor.options(max_concurrency=max_concurrency).remote() # type: ignore[attr-defined]

def map(self, func: Callable[..., MapOutputType], _: "BaseClient" | None, *args: Any) -> list[MapOutputType]:
"""Map func and return ray futures."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,6 @@ def get_read_tasks(self, parallelism: int) -> list[ReadTask]:

meta = self._meta_provider(
paths, # type: ignore[arg-type]
self._inferred_schema,
num_fragments=len(fragments),
prefetched_metadata=metadata,
)
Expand Down Expand Up @@ -566,7 +565,7 @@ def compute_batch_size_rows(sample_info: _SampleInfo) -> int:
if sample_info.actual_bytes_per_row is None:
return PARQUET_READER_ROW_BATCH_SIZE
else:
max_parquet_reader_row_batch_size_bytes = DataContext.get_current().target_max_block_size // 10
max_parquet_reader_row_batch_size_bytes = DataContext.get_current().target_max_block_size // 10 # type: ignore[operator]
return max(
1,
min(
Expand Down
2 changes: 1 addition & 1 deletion awswrangler/distributed/ray/datasources/file_datasink.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def _write_block(write_path: str, block: pd.DataFrame) -> str:
write_path = self.path

if write_path.endswith("/"):
filename = self.filename_provider.get_filename_for_block(block, ctx.task_idx, 0)
filename = self.filename_provider.get_filename_for_block(block, "", ctx.task_idx, 0)
write_path = posixpath.join(self.path, filename)

return _write_block(write_path, block)
Expand Down
5 changes: 4 additions & 1 deletion awswrangler/distributed/ray/datasources/filename_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,16 @@ def __init__(
def get_filename_for_block(
self,
block: Block,
write_uuid: str,
task_index: int,
block_index: int,
) -> str:
file_id = f"{task_index:06}_{block_index:06}"
return self._generate_filename(file_id)

def get_filename_for_row(self, row: dict[str, Any], task_index: int, block_index: int, row_index: int) -> str:
def get_filename_for_row(
self, row: dict[str, Any], write_uuid: str, task_index: int, block_index: int, row_index: int
) -> str:
file_id = f"{task_index:06}_{block_index:06}_{row_index:06}"
return self._generate_filename(file_id)

Expand Down
11 changes: 4 additions & 7 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,7 @@ dependencies = [
"botocore>=1.23.32,<2",
"pandas>=1.2.0,<3.0.0",
"numpy>=1.26,<3.0",
"pyarrow>=18.0.0,<21.0.0 ; sys_platform != 'darwin' or platform_machine != 'x86_64'",
# pyarrow 18 causes macos build failures
# https://github.com/ray-project/ray/pull/48446
"pyarrow>=8.0.0,<18.0.0 ; sys_platform == 'darwin' and platform_machine == 'x86_64'",
"pyarrow>=8.0.0,<21.0.0",
"typing-extensions>=4.4.0,<5",
"packaging>=21.1,<26.0",
"setuptools ; python_version >= '3.12'",
Expand All @@ -41,7 +38,7 @@ oracle = ["oracledb>=1,<4"]
gremlin = [
"gremlinpython>=3.7.1,<4",
"requests>=2.0.0,<3",
"aiohttp>=3.9.0,<4",
"aiohttp>=3.12.14,<4",
"async-timeout>=4.0.3,<6.0.0",
]
sparql = [
Expand All @@ -58,8 +55,8 @@ openpyxl = ["openpyxl>=3.0.0,<4"]
progressbar = ["progressbar2>=4.0.0,<5"]
deltalake = ["deltalake>=0.18.0,<1.2.0"]
geopandas = ["geopandas>=1.0.0,<2"]
modin = ["modin>=0.31,<0.35"]
ray = ["ray[default, data]>=2.45.0,<3"]
modin = ["modin>=0.31,<0.36"]
ray = ["ray[default, data]>=2.49.0,<3"]

[project.urls]
Homepage = "https://aws-sdk-pandas.readthedocs.io/"
Expand Down
Loading
Loading