From b5d026e702452bb1e52ad28cf98df54b4a811d32 Mon Sep 17 00:00:00 2001 From: chenzihong-gavin Date: Mon, 29 Dec 2025 16:38:11 +0800 Subject: [PATCH 1/2] feat: add param read_nums --- graphgen/operators/read/read.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/graphgen/operators/read/read.py b/graphgen/operators/read/read.py index c55f3d3d..40e22c8d 100644 --- a/graphgen/operators/read/read.py +++ b/graphgen/operators/read/read.py @@ -53,6 +53,7 @@ def read( working_dir: Optional[str] = "cache", parallelism: int = 4, recursive: bool = True, + read_num: Optional[int] = None, **reader_kwargs: Any, ) -> ray.data.Dataset: """ @@ -120,6 +121,9 @@ def read( } ) + if read_num is not None: + combined_ds = combined_ds.limit(read_num) + logger.info("[READ] Successfully read files from %s", input_path) return combined_ds From ecb617372d2b98b173722b5d09b4b935bc23ae1b Mon Sep 17 00:00:00 2001 From: chenzihong-gavin Date: Mon, 29 Dec 2025 16:38:45 +0800 Subject: [PATCH 2/2] feat: add param read_nums --- graphgen/operators/read/read.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/graphgen/operators/read/read.py b/graphgen/operators/read/read.py index 40e22c8d..3ff60c15 100644 --- a/graphgen/operators/read/read.py +++ b/graphgen/operators/read/read.py @@ -53,7 +53,7 @@ def read( working_dir: Optional[str] = "cache", parallelism: int = 4, recursive: bool = True, - read_num: Optional[int] = None, + read_nums: Optional[int] = None, **reader_kwargs: Any, ) -> ray.data.Dataset: """ @@ -64,6 +64,7 @@ def read( :param working_dir: Directory to cache intermediate files (PDF processing) :param parallelism: Number of parallel workers :param recursive: Whether to scan directories recursively + :param read_nums: Limit the number of documents to read :param reader_kwargs: Additional kwargs passed to readers :return: Ray Dataset containing all documents """ @@ -121,8 +122,8 @@ def read( } ) - if read_num is not None: - combined_ds = combined_ds.limit(read_num) + if read_nums is not None: + combined_ds = combined_ds.limit(read_nums) logger.info("[READ] Successfully read files from %s", input_path) return combined_ds