88from dataflow .utils .registry import OPERATOR_REGISTRY
99from dataflow .utils .storage import DataFlowStorage
1010from dataflow import get_logger
11- from dataflow .utils .pdf2vqa .format_utils import merge_qa_pair , jsonl_to_md
1211
13- @OPERATOR_REGISTRY .register ()
14- class MinerU2LLMInputOperator (OperatorABC ):
15- def __init__ (self ):
16- pass
17-
18- @staticmethod
19- def get_desc (lang : str = "zh" ) -> str :
20- if lang == 'zh' :
21- return (
22- "MinerU格式转换为LLM输入格式算子。"
23- "将MinerU生成的内容列表JSON文件转换为适合LLM处理的格式,"
24- "包括展平列表项并重新编号。"
25- )
26- else :
27- return (
28- "Convert MinerU format to LLM input format operator."
29- "Transforms the content list JSON file generated by MinerU into a format suitable for LLM processing,"
30- "including flattening list items and re-indexing."
31- )
32-
33- def _convert_json (self , input_file , output_file ):
34- with open (input_file , 'r' ) as infile :
35- data = list (json .load (infile ))
36-
37- new_data = []
38- id = 0
39- for item in data :
40- item ['id' ] = id
41- item .pop ('bbox' , None )
42- item .pop ('page_idx' , None )
43- if item .get ('type' ,'' ) == 'list' :
44- if item ['sub_type' ] == 'text' :
45- for idx , list_item in enumerate (item .get ('list_items' , [])):
46- new_item = {
47- 'type' : 'text' ,
48- 'text' : list_item ,
49- 'id' : id + idx ,
50- }
51- new_data .append (new_item )
52- id += len (item .get ('list_items' , []))
53- else :
54- new_data .append (item )
55- id += 1
56-
57- with open (output_file , 'w' ) as outfile :
58- json .dump (new_data , outfile , ensure_ascii = False )
59-
60- def run (self , storage : DataFlowStorage ,
61- input_markdown_path_key ,
62- output_converted_layout_key ,
63- ):
64- dataframe = storage .read ("dataframe" )
65-
66- for index , row in dataframe .iterrows ():
67- input_json_path = row [input_markdown_path_key ].replace ('.md' , '_content_list.json' )
68- converted_path = input_json_path .replace ('.json' , '_converted.json' )
69- self ._convert_json (input_json_path , converted_path )
70- dataframe .at [index , output_converted_layout_key ] = converted_path
71-
72- with open (converted_path , 'r' ) as infile :
73- data = json .load (infile )
74- assert isinstance (data , list ), f"Expected list, got { type (data )} for { input_json_path } "
75-
76- storage .write (dataframe )
77-
7812@OPERATOR_REGISTRY .register ()
7913class LLMOutputParser (OperatorABC ):
8014 def __init__ (self ,
@@ -204,64 +138,4 @@ def run(self, storage: DataFlowStorage,
204138
205139 dataframe .loc [idx , output_qalist_path_key ] = output_qalist_path
206140
207- storage .write (dataframe )
208-
209- @OPERATOR_REGISTRY .register ()
210- class QA_Merger (OperatorABC ):
211- def __init__ (self , output_dir , strict_title_match = False ):
212- self .output_dir = output_dir
213- self .strict_title_match = strict_title_match
214-
215- @staticmethod
216- def get_desc (lang : str = "zh" ) -> str :
217- if lang == 'zh' :
218- return (
219- "QA对合并算子。"
220- "将问题和答案的QA列表进行合并,生成最终的QA对文件,"
221- "并转换为Markdown格式。"
222- )
223- else :
224- return (
225- "QA pair merging operator."
226- "Merges question and answer QA lists to generate final QA pair files,"
227- "and converts them to Markdown format."
228- )
229-
230- def run (self , storage : DataFlowStorage ,
231- input_question_qalist_path_key ,
232- input_answer_qalist_path_key ,
233- input_name_key ,
234- output_merged_qalist_path_key ,
235- output_merged_md_path_key ,
236- output_qa_item_key = "qa_item" # 新增:展开后的 QA 内容列名
237- ):
238- dataframe = storage .read ("dataframe" )
239-
240- # 为了能存储 list 对象,先初始化该列为 object 类型
241- dataframe [output_qa_item_key ] = None
242- dataframe [output_qa_item_key ] = dataframe [output_qa_item_key ].astype (object )
243-
244- for idx , row in dataframe .iterrows ():
245- question_qalist_path = row [input_question_qalist_path_key ]
246- answer_qalist_path = row [input_answer_qalist_path_key ]
247- name = row [input_name_key ]
248-
249- output_merged_qalist_path = os .path .join (self .output_dir , name , "merged_qa_pairs.jsonl" )
250- merge_qa_pair (question_qalist_path , answer_qalist_path , output_merged_qalist_path , strict_title_match = self .strict_title_match )
251-
252- output_merged_md_path = os .path .join (self .output_dir , name , "merged_qa_pairs.md" )
253- jsonl_to_md (output_merged_qalist_path , output_merged_md_path )
254-
255- qa_pairs = []
256- if os .path .exists (output_merged_qalist_path ):
257- with open (output_merged_qalist_path , 'r' , encoding = 'utf-8' ) as f :
258- qa_pairs = [json .loads (line ) for line in f ]
259-
260- dataframe .at [idx , output_qa_item_key ] = qa_pairs
261-
262- dataframe .loc [idx , output_merged_qalist_path_key ] = output_merged_qalist_path
263- dataframe .loc [idx , output_merged_md_path_key ] = output_merged_md_path
264-
265- dataframe = dataframe .explode (output_qa_item_key ).reset_index (drop = True )
266-
267141 storage .write (dataframe )
0 commit comments