Skip to content

Commit 666246c

Browse files
authored
Merge pull request #320 from shijinpjlab/dev_parquet
feat: support parquet file
2 parents df0f9e3 + 5ff278c commit 666246c

File tree

9 files changed

+1072
-2
lines changed

9 files changed

+1072
-2
lines changed

dingo/config/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1-
from dingo.config.input_args import (DatasetArgs, DatasetCsvArgs, DatasetExcelArgs, DatasetFieldArgs, DatasetHFConfigArgs, DatasetS3ConfigArgs, DatasetSqlArgs, EvalPipline, # noqa E402.
2-
EvalPiplineConfig, EvaluatorLLMArgs, EvaluatorRuleArgs, ExecutorArgs, ExecutorResultSaveArgs, InputArgs)
1+
from dingo.config.input_args import (DatasetArgs, DatasetCsvArgs, DatasetExcelArgs, DatasetFieldArgs, DatasetHFConfigArgs, DatasetParquetArgs, DatasetS3ConfigArgs, DatasetSqlArgs, # noqa E402.
2+
EvalPipline, EvalPiplineConfig, EvaluatorLLMArgs, EvaluatorRuleArgs, ExecutorArgs, ExecutorResultSaveArgs, InputArgs)

dingo/config/input_args.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,11 @@ class DatasetCsvArgs(BaseModel):
4040
quotechar: str = '"' # 引号字符,默认双引号
4141

4242

43+
class DatasetParquetArgs(BaseModel):
44+
batch_size: int = 10000 # 每次读取的行数,用于流式读取大文件
45+
columns: Optional[List[str]] = None # 指定读取的列,None 表示读取所有列
46+
47+
4348
class DatasetFieldArgs(BaseModel):
4449
id: str = ''
4550
prompt: str = ''
@@ -58,6 +63,7 @@ class DatasetArgs(BaseModel):
5863
sql_config: DatasetSqlArgs = DatasetSqlArgs()
5964
excel_config: DatasetExcelArgs = DatasetExcelArgs()
6065
csv_config: DatasetCsvArgs = DatasetCsvArgs()
66+
parquet_config: DatasetParquetArgs = DatasetParquetArgs()
6167

6268

6369
class ExecutorResultSaveArgs(BaseModel):

dingo/data/converter/base.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -299,6 +299,25 @@ def _convert(raw: Union[str, Dict]):
299299
return _convert
300300

301301

302+
@BaseConverter.register("parquet")
303+
class ParquetConverter(BaseConverter):
304+
"""Parquet file converter."""
305+
306+
def __init__(self):
307+
super().__init__()
308+
309+
@classmethod
310+
def convertor(cls, input_args: InputArgs) -> Callable:
311+
def _convert(raw: Union[str, Dict]):
312+
j = raw
313+
if isinstance(raw, str):
314+
j = json.loads(raw)
315+
data_dict = j
316+
return Data(**data_dict)
317+
318+
return _convert
319+
320+
302321
@BaseConverter.register("listjson")
303322
class ListJsonConverter(BaseConverter):
304323
"""List json file converter."""

dingo/data/datasource/local.py

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,75 @@ def _load_excel_file_xlsx(self, path: str) -> Generator[str, None, None]:
142142
if wb:
143143
wb.close()
144144

145+
def _load_parquet_file(self, path: str) -> Generator[str, None, None]:
146+
"""
147+
Load a Parquet file and return its contents row by row as JSON strings.
148+
Supports streaming for large files to avoid memory overflow.
149+
150+
Args:
151+
path (str): The path to the Parquet file.
152+
153+
Returns:
154+
Generator[str]: Each row as a JSON string with column keys.
155+
"""
156+
try:
157+
import pyarrow.parquet as pq
158+
except ImportError:
159+
raise RuntimeError(
160+
"pyarrow is required to read Parquet files. "
161+
"Please install it using: pip install pyarrow"
162+
)
163+
164+
# 获取 Parquet 配置
165+
batch_size = self.input_args.dataset.parquet_config.batch_size
166+
columns = self.input_args.dataset.parquet_config.columns
167+
168+
try:
169+
# 打开 Parquet 文件
170+
parquet_file = pq.ParquetFile(path)
171+
172+
# 使用流式读取,分批次处理
173+
for batch in parquet_file.iter_batches(batch_size=batch_size, columns=columns):
174+
# 将 batch 转换为字典格式
175+
batch_dict = batch.to_pydict()
176+
177+
# 获取批次中的行数
178+
num_rows = len(next(iter(batch_dict.values()))) if batch_dict else 0
179+
180+
# 逐行处理
181+
for i in range(num_rows):
182+
# 构建每一行的字典
183+
row_dict = {col: batch_dict[col][i] for col in batch_dict}
184+
185+
# 处理特殊类型的值
186+
for key, value in row_dict.items():
187+
# 处理 None 值
188+
if value is None:
189+
row_dict[key] = ""
190+
# 处理 bytes 类型
191+
elif isinstance(value, bytes):
192+
try:
193+
row_dict[key] = value.decode('utf-8')
194+
except UnicodeDecodeError:
195+
row_dict[key] = str(value)
196+
# 处理其他不可 JSON 序列化的类型
197+
elif not isinstance(value, (str, int, float, bool, list, dict)):
198+
row_dict[key] = str(value)
199+
200+
# 转换为 JSON 字符串并 yield
201+
yield json.dumps(row_dict, ensure_ascii=False) + '\n'
202+
203+
except ImportError as ie:
204+
raise RuntimeError(
205+
f'Failed to load required library for Parquet: {str(ie)}. '
206+
f'Please install pyarrow using: pip install pyarrow'
207+
)
208+
except Exception as e:
209+
raise RuntimeError(
210+
f'Failed to read Parquet file "{path}": {str(e)}. '
211+
f'Please ensure the file is a valid Parquet file.'
212+
)
213+
145214
def _load_csv_file(self, path: str) -> Generator[str, None, None]:
146215
"""
147216
Load a CSV file and return its contents row by row as JSON strings.
@@ -334,6 +403,11 @@ def _load_local_file(self) -> Generator[str, None, None]:
334403
if self.input_args.dataset.format != 'csv':
335404
raise RuntimeError(f'CSV file "{f}" is not supported. Please set dataset.format to "csv" to read CSV files.')
336405
yield from self._load_csv_file(f)
406+
# Check if file is Parquet
407+
elif f.endswith('.parquet'):
408+
if self.input_args.dataset.format != 'parquet':
409+
raise RuntimeError(f'Parquet file "{f}" is not supported. Please set dataset.format to "parquet" to read Parquet files.')
410+
yield from self._load_parquet_file(f)
337411
# Check if file is Excel
338412
elif f.endswith('.xlsx'):
339413
if self.input_args.dataset.format != 'excel':

0 commit comments

Comments
 (0)