Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions graphgen/operators/read/read.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ def read(
working_dir: Optional[str] = "cache",
parallelism: int = 4,
recursive: bool = True,
read_nums: Optional[int] = None,
**reader_kwargs: Any,
) -> ray.data.Dataset:
"""
Expand All @@ -63,6 +64,7 @@ def read(
:param working_dir: Directory to cache intermediate files (PDF processing)
:param parallelism: Number of parallel workers
:param recursive: Whether to scan directories recursively
:param read_nums: Limit the number of documents to read
:param reader_kwargs: Additional kwargs passed to readers
:return: Ray Dataset containing all documents
"""
Expand Down Expand Up @@ -120,6 +122,9 @@ def read(
}
)

if read_nums is not None:
combined_ds = combined_ds.limit(read_nums)
Comment on lines +125 to +126
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

This implementation has a couple of issues:

  1. Non-determinism (High Severity): The use of limit() here can lead to non-deterministic results. combined_ds is created using ray.data.Dataset.union() on line 116, which does not guarantee the order of records between runs. Consequently, limit(read_nums) will yield different subsets of data each time, which is problematic for reproducibility. To improve determinism, you should ensure the file processing order is stable (e.g., by sorting file lists). For true determinism, a more significant refactor to avoid union() might be necessary.

  2. Missing Input Validation (Medium Severity): The read_nums parameter is not validated. A negative value will cause limit() to raise a ValueError, but it's better to validate inputs explicitly in this function to provide a clearer error message.

Here is a suggestion to address the input validation:

Suggested change
if read_nums is not None:
combined_ds = combined_ds.limit(read_nums)
if read_nums is not None:
if read_nums < 0:
raise ValueError("read_nums cannot be negative")
combined_ds = combined_ds.limit(read_nums)


logger.info("[READ] Successfully read files from %s", input_path)
return combined_ds

Expand Down
Loading