Skip to content

Commit 71c7bd0

Browse files
authored
[Data] Add exception handling for invalid URIs in download operation (#58464)
1 parent d74c157 commit 71c7bd0

File tree

2 files changed

+121
-4
lines changed

2 files changed

+121
-4
lines changed

python/ray/data/_internal/planner/plan_download_op.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -189,8 +189,14 @@ def download_bytes_threaded(
189189
def load_uri_bytes(uri_path_iterator):
190190
"""Function that takes an iterator of URI paths and yields downloaded bytes for each."""
191191
for uri_path in uri_path_iterator:
192-
with fs.open_input_file(uri_path) as f:
193-
yield f.read()
192+
try:
193+
with fs.open_input_file(uri_path) as f:
194+
yield f.read()
195+
except OSError as e:
196+
logger.debug(
197+
f"Failed to download URI '{uri_path}' from column '{uri_column_name}' with error: {e}"
198+
)
199+
yield None
194200

195201
# Use make_async_gen to download URI bytes concurrently
196202
# This preserves the order of results to match the input URIs
@@ -322,9 +328,9 @@ def get_file_size(uri_path, fs):
322328
for future in as_completed(futures):
323329
try:
324330
size = future.result()
325-
if size is not None:
326-
file_sizes.append(size)
331+
file_sizes.append(size if size is not None else 0)
327332
except Exception as e:
328333
logger.warning(f"Error fetching file size for download: {e}")
334+
file_sizes.append(0)
329335

330336
return file_sizes

python/ray/data/tests/test_download_expression.py

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,117 @@ def test_download_expression_with_null_uris(self):
294294
# If it fails, should be a reasonable error (not a crash)
295295
assert isinstance(e, (ValueError, KeyError, RuntimeError))
296296

297+
def test_download_expression_with_invalid_uris(self, tmp_path):
298+
"""Test download expression with URIs that fail to download.
299+
300+
This tests the exception handling in load_uri_bytes
301+
where OSError is caught and None is returned for failed downloads.
302+
"""
303+
# Create one valid file
304+
valid_file = tmp_path / "valid.txt"
305+
valid_file.write_bytes(b"valid content")
306+
307+
# Create URIs: one valid, one non-existent file, one invalid path
308+
table = pa.Table.from_arrays(
309+
[
310+
pa.array(
311+
[
312+
f"local://{valid_file}",
313+
f"local://{tmp_path}/nonexistent.txt", # File doesn't exist
314+
"local:///this/path/does/not/exist/file.txt", # Invalid path
315+
]
316+
),
317+
],
318+
names=["uri"],
319+
)
320+
321+
ds = ray.data.from_arrow(table)
322+
ds_with_downloads = ds.with_column("bytes", download("uri"))
323+
324+
# Should not crash - failed downloads return None
325+
results = ds_with_downloads.take_all()
326+
assert len(results) == 3
327+
328+
# First URI should succeed
329+
assert results[0]["bytes"] == b"valid content"
330+
331+
# Second and third URIs should fail gracefully (return None)
332+
assert results[1]["bytes"] is None
333+
assert results[2]["bytes"] is None
334+
335+
def test_download_expression_all_size_estimations_fail(self):
336+
"""Test download expression when all URI size estimations fail.
337+
338+
This tests the failed download does not cause division by zero error.
339+
"""
340+
# Create URIs that will fail size estimation (non-existent files)
341+
# Using enough URIs to trigger size estimation sampling
342+
invalid_uris = [
343+
f"local:///nonexistent/path/file_{i}.txt"
344+
for i in range(30) # More than INIT_SAMPLE_BATCH_SIZE (25)
345+
]
346+
347+
table = pa.Table.from_arrays(
348+
[pa.array(invalid_uris)],
349+
names=["uri"],
350+
)
351+
352+
ds = ray.data.from_arrow(table)
353+
ds_with_downloads = ds.with_column("bytes", download("uri"))
354+
355+
# Should not crash with divide-by-zero error
356+
# The PartitionActor should handle all failed size estimations gracefully
357+
# and fall back to using the number of rows in the block as partition size
358+
results = ds_with_downloads.take_all()
359+
360+
# All downloads should fail gracefully (return None)
361+
assert len(results) == 30
362+
for result in results:
363+
assert result["bytes"] is None
364+
365+
def test_download_expression_mixed_valid_and_invalid_size_estimation(
366+
self, tmp_path
367+
):
368+
"""Test download expression with mix of valid and invalid URIs for size estimation.
369+
370+
This tests that size estimation handles partial failures correctly.
371+
"""
372+
# Create some valid files
373+
valid_files = []
374+
for i in range(10):
375+
file_path = tmp_path / f"valid_{i}.txt"
376+
file_path.write_bytes(b"x" * 100) # 100 bytes each
377+
valid_files.append(str(file_path))
378+
379+
# Mix valid and invalid URIs
380+
mixed_uris = []
381+
for i in range(30):
382+
if i % 3 == 0 and i // 3 < len(valid_files):
383+
# Every 3rd URI is valid (for first 10)
384+
mixed_uris.append(f"local://{valid_files[i // 3]}")
385+
else:
386+
# Others are invalid
387+
mixed_uris.append(f"local:///nonexistent/file_{i}.txt")
388+
389+
table = pa.Table.from_arrays(
390+
[pa.array(mixed_uris)],
391+
names=["uri"],
392+
)
393+
394+
ds = ray.data.from_arrow(table)
395+
ds_with_downloads = ds.with_column("bytes", download("uri"))
396+
397+
# Should not crash - should handle mixed valid/invalid gracefully
398+
results = ds_with_downloads.take_all()
399+
assert len(results) == 30
400+
401+
# Verify valid URIs downloaded successfully
402+
for i, result in enumerate(results):
403+
if i % 3 == 0 and i // 3 < len(valid_files):
404+
assert result["bytes"] == b"x" * 100
405+
else:
406+
assert result["bytes"] is None
407+
297408

298409
class TestDownloadExpressionIntegration:
299410
"""Integration tests combining download expressions with other Ray Data operations."""

0 commit comments

Comments
 (0)