Skip to content

Commit 246348f

Browse files
fix: delete param parallelism for readers
1 parent 8bcbe51 commit 246348f

File tree

12 files changed

+42
-42
lines changed

12 files changed

+42
-42
lines changed

graphgen/models/reader/csv_reader.py

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,20 +14,15 @@ class CSVReader(BaseReader):
1414
- if type is "text", "content" column must be present.
1515
"""
1616

17-
def read(
18-
self,
19-
input_path: Union[str, List[str]],
20-
parallelism: int = None,
21-
) -> Dataset:
17+
def read(self, input_path: Union[str, List[str]]) -> Dataset:
2218
"""
2319
Read CSV files and return Ray Dataset.
2420
2521
:param input_path: Path to CSV file or list of CSV files.
26-
:param parallelism: Number of blocks for Ray Dataset reading.
2722
:return: Ray Dataset containing validated and filtered data.
2823
"""
2924

30-
ds = ray.data.read_csv(input_path, override_num_blocks=parallelism)
25+
ds = ray.data.read_csv(input_path)
3126
ds = ds.map_batches(self._validate_batch, batch_format="pandas")
3227
ds = ds.filter(self._should_keep_item)
3328
return ds

graphgen/models/reader/json_reader.py

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,19 +14,14 @@ class JSONReader(BaseReader):
1414
- if type is "text", "content" column must be present.
1515
"""
1616

17-
def read(
18-
self,
19-
input_path: Union[str, List[str]],
20-
parallelism: int = 4,
21-
) -> Dataset:
17+
def read(self, input_path: Union[str, List[str]]) -> Dataset:
2218
"""
2319
Read JSON file and return Ray Dataset.
2420
:param input_path: Path to JSON/JSONL file or list of JSON/JSONL files.
25-
:param parallelism: Number of parallel workers for reading files.
2621
:return: Ray Dataset containing validated and filtered data.
2722
"""
2823

29-
ds = ray.data.read_json(input_path, override_num_blocks=parallelism)
24+
ds = ray.data.read_json(input_path)
3025
ds = ds.map_batches(self._validate_batch, batch_format="pandas")
3126
ds = ds.filter(self._should_keep_item)
3227
return ds

graphgen/models/reader/parquet_reader.py

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,22 +14,17 @@ class ParquetReader(BaseReader):
1414
- if type is "text", "content" column must be present.
1515
"""
1616

17-
def read(
18-
self,
19-
input_path: Union[str, List[str]],
20-
parallelism: int = None,
21-
) -> Dataset:
17+
def read(self, input_path: Union[str, List[str]]) -> Dataset:
2218
"""
2319
Read Parquet files using Ray Data.
2420
2521
:param input_path: Path to Parquet file or list of Parquet files.
26-
:param parallelism: Number of blocks for Ray Dataset reading.
2722
:return: Ray Dataset containing validated documents.
2823
"""
2924
if not ray.is_initialized():
3025
ray.init()
3126

32-
ds = ray.data.read_parquet(input_path, override_num_blocks=parallelism)
27+
ds = ray.data.read_parquet(input_path)
3328
ds = ds.map_batches(self._validate_batch, batch_format="pandas")
3429
ds = ds.filter(self._should_keep_item)
3530
return ds

graphgen/models/reader/pdf_reader.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,6 @@ def __init__(
6868
def read(
6969
self,
7070
input_path: Union[str, List[str]],
71-
parallelism: int = 4,
7271
**override,
7372
) -> Dataset:
7473

graphgen/models/reader/pickle_reader.py

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,22 +23,18 @@ class PickleReader(BaseReader):
2323
def read(
2424
self,
2525
input_path: Union[str, List[str]],
26-
parallelism: int = None,
2726
) -> Dataset:
2827
"""
2928
Read Pickle files using Ray Data.
3029
3130
:param input_path: Path to pickle file or list of pickle files.
32-
:param parallelism: Number of blocks for Ray Dataset reading.
3331
:return: Ray Dataset containing validated documents.
3432
"""
3533
if not ray.is_initialized():
3634
ray.init()
3735

3836
# Use read_binary_files as a reliable alternative to read_pickle
39-
ds = ray.data.read_binary_files(
40-
input_path, override_num_blocks=parallelism, include_paths=True
41-
)
37+
ds = ray.data.read_binary_files(input_path, include_paths=True)
4238

4339
# Deserialize pickle files and flatten into individual records
4440
def deserialize_batch(batch: pd.DataFrame) -> pd.DataFrame:

graphgen/models/reader/rdf_reader.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,11 @@ def __init__(self, *, text_column: str = "content", **kwargs):
3030
def read(
3131
self,
3232
input_path: Union[str, List[str]],
33-
parallelism: int = 4,
3433
) -> Dataset:
3534
"""
3635
Read RDF file(s) using Ray Data.
3736
3837
:param input_path: Path to RDF file or list of RDF files.
39-
:param parallelism: Number of parallel workers for processing.
4038
:return: Ray Dataset containing extracted documents.
4139
"""
4240
if not ray.is_initialized():

graphgen/models/reader/txt_reader.py

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,17 +10,13 @@ class TXTReader(BaseReader):
1010
def read(
1111
self,
1212
input_path: Union[str, List[str]],
13-
parallelism: int = 4,
1413
) -> Dataset:
1514
"""
1615
Read text files from the specified input path.
1716
:param input_path: Path to the input text file or list of text files.
18-
:param parallelism: Number of blocks to override for Ray Dataset reading.
1917
:return: Ray Dataset containing the read text data.
2018
"""
21-
docs_ds = ray.data.read_text(
22-
input_path, encoding="utf-8", override_num_blocks=parallelism
23-
)
19+
docs_ds = ray.data.read_text(input_path, encoding="utf-8")
2420

2521
docs_ds = docs_ds.map(
2622
lambda row: {

graphgen/operators/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,6 @@
44
from .init import init_llm
55
from .partition import partition_kg
66
from .quiz_and_judge import judge_statement, quiz
7-
from .read import read_files
7+
from .read import read
88
from .search import search_all
99
from .split import chunk_documents

graphgen/operators/evaluate.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,13 @@
99
from dotenv import load_dotenv
1010

1111
from graphgen.bases.datatypes import QAPair
12-
13-
from .models import LengthEvaluator, MTLDEvaluator, RewardEvaluator, UniEvaluator
14-
from .utils import logger, set_logger
12+
from graphgen.models import (
13+
LengthEvaluator,
14+
MTLDEvaluator,
15+
RewardEvaluator,
16+
UniEvaluator,
17+
)
18+
from graphgen.utils import logger, set_logger
1519

1620
sys_path = os.path.abspath(os.path.dirname(__file__))
1721
set_logger(os.path.join(sys_path, "cache", "logs", "evaluate.log"))
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
from .read_files import read_files
1+
from .read import read

0 commit comments

Comments
 (0)