Skip to content

Commit c49b666

Browse files
authored
[Data] Fix checkpoint filter PyArrow zero-copy conversion error (#59839)
# Fix `ArrowInvalid` error in checkpoint filter when converting PyArrow chunks to NumPy arrays ## Issue Fixes `ArrowInvalid` error when checkpoint filtering converts PyArrow chunks to NumPy arrays with `zero_copy_only=True`: ``` File "/usr/local/lib/python3.10/dist-packages/ray/data/checkpoint/checkpoint_filter.py", line 249, in filter_rows_for_block masks = list(executor.map(filter_with_ckpt_chunk, ckpt_chunks)) File "/usr/lib/python3.10/concurrent/futures/_base.py", line 621, in result_iterator yield _result_or_cancel(fs.pop()) File "/usr/lib/python3.10/concurrent/futures/_base.py", line 319, in _result_or_cancel return fut.result(timeout) File "/usr/lib/python3.10/concurrent/futures/_base.py", line 458, in result return self.__get_result() File "/usr/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result raise self._exception File "/usr/lib/python3.10/concurrent/futures/thread.py", line 58, in run result = self.fn(*self.args, **self.kwargs) File "/usr/local/lib/python3.10/dist-packages/ray/data/checkpoint/checkpoint_filter.py", line 229, in filter_with_ckpt_chunk ckpt_ids = ckpt_chunk.to_numpy(zero_copy_only=True) File "pyarrow/array.pxi", line 1789, in pyarrow.lib.Array.to_numpy File "pyarrow/error.pxi", line 92, in pyarrow.lib.check_status pyarrow.lib.ArrowInvalid: Needed to copy 1 chunks with 0 nulls, but zero_copy_only was True ``` This error occurs when checkpoint data is loaded from Ray's object store, where PyArrow buffers may reside in shared memory and cannot be zero-copied to NumPy. ## Reproduction ```python #!/usr/bin/env python3 import ray from ray.data import DataContext from ray.data.checkpoint import CheckpointConfig import tempfile ray.init() with tempfile.TemporaryDirectory() as ckpt_dir, \ tempfile.TemporaryDirectory() as data_dir, \ tempfile.TemporaryDirectory() as output_dir: # Step 1: Create data ray.data.range(10).map(lambda x: {"id": f"id_{x['id']}"}).write_parquet(data_dir) # Step 2: Enable checkpoint and write ctx = DataContext.get_current() ctx.checkpoint_config = CheckpointConfig( checkpoint_path=ckpt_dir, id_column="id", delete_checkpoint_on_success=False ) ray.data.read_parquet(data_dir).filter(lambda x: x["id"] != 'id_0').write_parquet(output_dir) # Step 3: Second write triggers checkpoint filtering ray.data.read_parquet(data_dir).write_parquet(output_dir) ray.shutdown() ``` ## Solution Change `to_numpy(zero_copy_only=True)` to `to_numpy(zero_copy_only=False)` in `BatchBasedCheckpointFilter.filter_rows_for_block()`. This allows PyArrow to copy data when necessary. ### Changes **File**: `ray/python/ray/data/checkpoint/checkpoint_filter.py` - Line 229: Changed `ckpt_chunk.to_numpy(zero_copy_only=True)` to `ckpt_chunk.to_numpy(zero_copy_only=False)` ### Performance Impact No performance regression expected. PyArrow will only perform a copy when zero-copy is not possible. Signed-off-by: dragongu <andrewgu@vip.qq.com>
1 parent 2ac0f3f commit c49b666

File tree

1 file changed

+3
-1
lines changed

1 file changed

+3
-1
lines changed

python/ray/data/checkpoint/checkpoint_filter.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import pyarrow
88

99
import ray
10+
from ray.data._internal.arrow_ops import transform_pyarrow
1011
from ray.data._internal.execution.interfaces.ref_bundle import RefBundle
1112
from ray.data.block import Block, BlockAccessor, BlockMetadata, DataBatch, Schema
1213
from ray.data.checkpoint import CheckpointConfig
@@ -226,7 +227,8 @@ def filter_rows_for_block(
226227

227228
def filter_with_ckpt_chunk(ckpt_chunk: pyarrow.ChunkedArray) -> numpy.ndarray:
228229
# Convert checkpoint chunk to numpy for fast search.
229-
ckpt_ids = ckpt_chunk.to_numpy(zero_copy_only=True)
230+
# Use internal helper function for consistency and robustness (handles null-typed arrays, etc.)
231+
ckpt_ids = transform_pyarrow.to_numpy(ckpt_chunk, zero_copy_only=False)
230232
# Start with a mask of all True (keep all rows).
231233
mask = numpy.ones(len(block_ids), dtype=bool)
232234
# Use binary search to find where block_ids would be in ckpt_ids.

0 commit comments

Comments
 (0)