Skip to content

Commit 9d3cb4d

Browse files
LBWhite55HaiHui886
authored andcommitted
fix the half bug of #98.Control whether mdToJson generates meta.
1 parent 73e42a8 commit 9d3cb4d

File tree

1 file changed

+172
-137
lines changed

1 file changed

+172
-137
lines changed

data_engine/tools/preprocess/md_to_jsonl_preprocess.py

Lines changed: 172 additions & 137 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,24 @@ def __init__(self, tool_defination: Tool_def, params: ExecutedParams):
147147
else:
148148
self.min_sentences_per_chunk = 1 # Not used for token-based chunking
149149

150+
# Get generate_meta_log parameter (default: False)
151+
# This controls whether to generate meta.log file
152+
generate_meta_log_value = next(
153+
(item.value for item in self.tool_def.params if item.name == "generate_meta_log"),
154+
False)
155+
try:
156+
# Handle boolean values (may come as string "true"/"false" or boolean)
157+
if isinstance(generate_meta_log_value, str):
158+
self.generate_meta_log = generate_meta_log_value.lower() in ('true', '1', 'yes')
159+
elif isinstance(generate_meta_log_value, bool):
160+
self.generate_meta_log = generate_meta_log_value
161+
else:
162+
self.generate_meta_log = bool(generate_meta_log_value) if generate_meta_log_value is not None else False
163+
except (ValueError, TypeError):
164+
self.generate_meta_log = False
165+
166+
logger.info(f'[META_LOG] generate_meta_log parameter: {self.generate_meta_log} (will {"generate" if self.generate_meta_log else "NOT generate"} meta.log)')
167+
150168
# text_key is fixed as 'text' since MD files are converted to this format before passing to tool
151169
self.text_key = "text"
152170

@@ -289,20 +307,23 @@ def process(self):
289307
if not os.path.exists(self.tool_def.export_path):
290308
os.makedirs(self.tool_def.export_path, exist_ok=True)
291309

292-
# Create meta directory for meta.json
293-
meta_dir = Path(self.tool_def.export_path) / "meta"
294-
meta_dir.mkdir(parents=True, exist_ok=True)
295-
meta_file_path = meta_dir / "meta.json"
310+
# Create meta directory and file path only if generate_meta_log is True
311+
meta_file_path = None
312+
if self.generate_meta_log:
313+
meta_dir = Path(self.tool_def.export_path) / "meta"
314+
meta_dir.mkdir(parents=True, exist_ok=True)
315+
meta_file_path = meta_dir / "meta.log"
296316

297-
# Try to extract job_name from work_dir path
317+
# Try to extract job_name from work_dir path (only needed if generating meta.log)
298318
job_name = self.tool_def.name
299-
work_dir_path = Path(self.executed_params.work_dir)
300-
if work_dir_path.name and '_' in work_dir_path.name:
301-
parts = work_dir_path.name.split('_')
302-
if len(parts) >= 2:
303-
potential_job_name = '_'.join(parts[:-1])
304-
if potential_job_name:
305-
job_name = potential_job_name
319+
if self.generate_meta_log:
320+
work_dir_path = Path(self.executed_params.work_dir)
321+
if work_dir_path.name and '_' in work_dir_path.name:
322+
parts = work_dir_path.name.split('_')
323+
if len(parts) >= 2:
324+
potential_job_name = '_'.join(parts[:-1])
325+
if potential_job_name:
326+
job_name = potential_job_name
306327

307328
formatter = load_formatter(
308329
dataset_path=self.tool_def.dataset_path,
@@ -334,29 +355,92 @@ def process(self):
334355
dataset.to_json(output_file, force_ascii=False, num_proc=self.tool_def.np)
335356
logger.info(f'Dataset saved to {output_file} ({num_chunks} chunks)')
336357

337-
# Generate meta.json for fallback case
338-
# Keep original dataset_path (including Chinese chars if any)
339-
dataset_path_for_meta = self.tool_def.dataset_path or "unknown"
340-
if dataset_path_for_meta != "unknown":
341-
dataset_path_for_meta = os.path.basename(dataset_path_for_meta)
358+
# Generate meta.log for fallback case (only if generate_meta_log is True)
359+
if self.generate_meta_log and meta_file_path:
360+
# Keep original dataset_path (including Chinese chars if any)
361+
dataset_path_for_meta = self.tool_def.dataset_path or "unknown"
362+
if dataset_path_for_meta != "unknown":
363+
dataset_path_for_meta = os.path.basename(dataset_path_for_meta)
364+
365+
meta_data = {
366+
"job_name": job_name,
367+
"tool_name": TOOL_NAME,
368+
"source_repo": self.tool_def.repo_id or "",
369+
"source_branch": self.tool_def.branch or "main",
370+
"target_repo": self.tool_def.repo_id or "",
371+
"files": [
372+
{
373+
"from": dataset_path_for_meta,
374+
"to": "x.jsonl",
375+
"status": "success",
376+
"chunks": num_chunks
377+
}
378+
],
379+
"result": {
380+
"total": 1,
381+
"success": 1,
382+
"failure": 0
383+
},
384+
"parameters": {
385+
"chunk_method": self.chunk_method,
386+
"chunk_size": self.chunk_size,
387+
"overlap": self.overlap,
388+
"hf_tokenizer": self.hf_tokenizer,
389+
"min_sentences_per_chunk": self.min_sentences_per_chunk
390+
},
391+
"statistics": {
392+
"total_chunks": num_chunks,
393+
"avg_chunks_per_file": float(num_chunks)
394+
},
395+
"note": "Fallback mode: processed as single dataset (no MD files found)"
396+
}
397+
398+
with open(meta_file_path, 'w', encoding='utf-8') as f:
399+
json.dump(meta_data, f, indent=2, ensure_ascii=False)
400+
logger.info(f'Generated meta.log for fallback mode (total_chunks: {num_chunks})')
401+
402+
return Path(self.tool_def.export_path)
403+
404+
logger.info(f'Found {len(md_files)} MD file(s) to process')
405+
406+
# Ensure the export directory exists
407+
if not os.path.exists(self.tool_def.export_path):
408+
os.makedirs(self.tool_def.export_path, exist_ok=True)
409+
410+
# Create meta directory and file path only if generate_meta_log is True
411+
meta_file_path = None
412+
meta_data = None
413+
if self.generate_meta_log:
414+
meta_dir = Path(self.tool_def.export_path) / "meta"
415+
meta_dir.mkdir(parents=True, exist_ok=True)
416+
meta_file_path = meta_dir / "meta.log"
417+
418+
# Initialize meta.log data structure
419+
total_count = len(md_files)
420+
base_path = Path(self.tool_def.dataset_path)
421+
422+
# Try to extract job_name from work_dir path (format: job_name_uuid)
423+
job_name = self.tool_def.name # Use tool name as default
424+
work_dir_path = Path(self.executed_params.work_dir)
425+
if work_dir_path.name and '_' in work_dir_path.name:
426+
# Try to extract job_name from path like "job_name_uuid"
427+
parts = work_dir_path.name.split('_')
428+
if len(parts) >= 2:
429+
# Assume last part is uuid, rest is job_name
430+
potential_job_name = '_'.join(parts[:-1])
431+
if potential_job_name:
432+
job_name = potential_job_name
342433

343434
meta_data = {
344435
"job_name": job_name,
345436
"tool_name": TOOL_NAME,
346437
"source_repo": self.tool_def.repo_id or "",
347438
"source_branch": self.tool_def.branch or "main",
348439
"target_repo": self.tool_def.repo_id or "",
349-
"files": [
350-
{
351-
"from": dataset_path_for_meta,
352-
"to": "x.jsonl",
353-
"status": "success",
354-
"chunks": num_chunks
355-
}
356-
],
440+
"files": [],
357441
"result": {
358-
"total": 1,
359-
"success": 1,
442+
"total": total_count,
443+
"success": 0,
360444
"failure": 0
361445
},
362446
"parameters": {
@@ -367,75 +451,19 @@ def process(self):
367451
"min_sentences_per_chunk": self.min_sentences_per_chunk
368452
},
369453
"statistics": {
370-
"total_chunks": num_chunks,
371-
"avg_chunks_per_file": float(num_chunks)
372-
},
373-
"note": "Fallback mode: processed as single dataset (no MD files found)"
454+
"total_chunks": 0,
455+
"avg_chunks_per_file": 0.0
456+
}
374457
}
375458

459+
# Save initial meta.log
376460
with open(meta_file_path, 'w', encoding='utf-8') as f:
377461
json.dump(meta_data, f, indent=2, ensure_ascii=False)
378-
logger.info(f'Generated meta.json for fallback mode (total_chunks: {num_chunks})')
379-
380-
return Path(self.tool_def.export_path)
381-
382-
logger.info(f'Found {len(md_files)} MD file(s) to process')
462+
logger.info(f'Generated initial meta.log file with total: {total_count} files')
383463

384-
# Ensure the export directory exists
385-
if not os.path.exists(self.tool_def.export_path):
386-
os.makedirs(self.tool_def.export_path, exist_ok=True)
387-
388-
# Create meta directory for meta.json
389-
meta_dir = Path(self.tool_def.export_path) / "meta"
390-
meta_dir.mkdir(parents=True, exist_ok=True)
391-
meta_file_path = meta_dir / "meta.json"
392-
393-
# Initialize meta.json data structure
394464
total_count = len(md_files)
395465
base_path = Path(self.tool_def.dataset_path)
396466

397-
# Try to extract job_name from work_dir path (format: job_name_uuid)
398-
job_name = self.tool_def.name # Use tool name as default
399-
work_dir_path = Path(self.executed_params.work_dir)
400-
if work_dir_path.name and '_' in work_dir_path.name:
401-
# Try to extract job_name from path like "job_name_uuid"
402-
parts = work_dir_path.name.split('_')
403-
if len(parts) >= 2:
404-
# Assume last part is uuid, rest is job_name
405-
potential_job_name = '_'.join(parts[:-1])
406-
if potential_job_name:
407-
job_name = potential_job_name
408-
409-
meta_data = {
410-
"job_name": job_name,
411-
"tool_name": TOOL_NAME,
412-
"source_repo": self.tool_def.repo_id or "",
413-
"source_branch": self.tool_def.branch or "main",
414-
"target_repo": self.tool_def.repo_id or "",
415-
"files": [],
416-
"result": {
417-
"total": total_count,
418-
"success": 0,
419-
"failure": 0
420-
},
421-
"parameters": {
422-
"chunk_method": self.chunk_method,
423-
"chunk_size": self.chunk_size,
424-
"overlap": self.overlap,
425-
"hf_tokenizer": self.hf_tokenizer,
426-
"min_sentences_per_chunk": self.min_sentences_per_chunk
427-
},
428-
"statistics": {
429-
"total_chunks": 0,
430-
"avg_chunks_per_file": 0.0
431-
}
432-
}
433-
434-
# Save initial meta.json
435-
with open(meta_file_path, 'w', encoding='utf-8') as f:
436-
json.dump(meta_data, f, indent=2, ensure_ascii=False)
437-
logger.info(f'Generated initial meta.json file with total: {total_count} files')
438-
439467
# Create mapper operator (can be reused for all files, using downloaded model path)
440468
if self.chunk_method == "sentence":
441469
# Use sentence-based chunking (supports mixed Chinese-English by default)
@@ -550,25 +578,26 @@ def process(self):
550578
processed_files.append(output_file)
551579
total_chunks += num_chunks
552580

553-
# Record success in meta.json
554-
to_rel_path = output_filename.replace('\\', '/')
555-
file_record = {
556-
"from": original_rel_path, # Keep original relative path with Chinese chars
557-
"to": to_rel_path, # Output filename with Chinese chars preserved
558-
"status": "success",
559-
"chunks": num_chunks
560-
}
561-
562-
# Add note if numeric suffix was added due to duplicate filename
563-
if suffix > 0:
564-
file_record["note"] = f"Duplicate filename, added suffix {suffix}"
565-
566-
meta_data["files"].append(file_record)
567-
meta_data["result"]["success"] += 1
568-
569-
# Update meta.json immediately with UTF-8 encoding
570-
with open(meta_file_path, 'w', encoding='utf-8') as f:
571-
json.dump(meta_data, f, indent=2, ensure_ascii=False)
581+
# Record success in meta.log (only if generate_meta_log is True)
582+
if self.generate_meta_log and meta_data is not None:
583+
to_rel_path = output_filename.replace('\\', '/')
584+
file_record = {
585+
"from": original_rel_path, # Keep original relative path with Chinese chars
586+
"to": to_rel_path, # Output filename with Chinese chars preserved
587+
"status": "success",
588+
"chunks": num_chunks
589+
}
590+
591+
# Add note if numeric suffix was added due to duplicate filename
592+
if suffix > 0:
593+
file_record["note"] = f"Duplicate filename, added suffix {suffix}"
594+
595+
meta_data["files"].append(file_record)
596+
meta_data["result"]["success"] += 1
597+
598+
# Update meta.log immediately with UTF-8 encoding
599+
with open(meta_file_path, 'w', encoding='utf-8') as f:
600+
json.dump(meta_data, f, indent=2, ensure_ascii=False)
572601

573602
logger.info(f'Successfully saved {os.path.basename(md_file_path)} -> {output_filename} ({num_chunks} chunks)')
574603

@@ -577,39 +606,44 @@ def process(self):
577606
logger.error(error_msg)
578607
failed_files.append((md_file_path, str(e)))
579608

580-
# Record failure in meta.json
581-
failure_record = {
582-
"from": original_rel_path, # Keep original path with Chinese chars
583-
"to": None,
584-
"status": "failure",
585-
"error": str(e)
586-
}
587-
588-
meta_data["files"].append(failure_record)
589-
meta_data["result"]["failure"] += 1
590-
591-
# Update meta.json immediately with UTF-8 encoding
592-
with open(meta_file_path, 'w', encoding='utf-8') as f:
593-
json.dump(meta_data, f, indent=2, ensure_ascii=False)
609+
# Record failure in meta.log (only if generate_meta_log is True)
610+
if self.generate_meta_log and meta_data is not None:
611+
failure_record = {
612+
"from": original_rel_path, # Keep original path with Chinese chars
613+
"to": None,
614+
"status": "failure",
615+
"error": str(e)
616+
}
617+
618+
meta_data["files"].append(failure_record)
619+
meta_data["result"]["failure"] += 1
620+
621+
# Update meta.log immediately with UTF-8 encoding
622+
with open(meta_file_path, 'w', encoding='utf-8') as f:
623+
json.dump(meta_data, f, indent=2, ensure_ascii=False)
594624

595625
# Continue processing other files even if one fails
596626

597-
# Calculate final statistics
598-
success_count = meta_data["result"]["success"]
599-
failure_count = meta_data["result"]["failure"]
600-
avg_chunks = total_chunks / success_count if success_count > 0 else 0.0
601-
602-
meta_data["statistics"]["total_chunks"] = total_chunks
603-
meta_data["statistics"]["avg_chunks_per_file"] = round(avg_chunks, 2)
604-
605-
# Ensure total remains correct
606-
assert meta_data["result"]["total"] == total_count, \
607-
f"Total should remain {total_count}, but got {meta_data['result']['total']}"
608-
609-
# Save final meta.json with statistics
610-
with open(meta_file_path, 'w', encoding='utf-8') as f:
611-
json.dump(meta_data, f, indent=2, ensure_ascii=False)
612-
logger.info(f'Updated final meta.json (total: {total_count}, success: {success_count}, failure: {failure_count}, total_chunks: {total_chunks})')
627+
# Calculate final statistics and save final meta.log (only if generate_meta_log is True)
628+
if self.generate_meta_log and meta_data is not None:
629+
success_count = meta_data["result"]["success"]
630+
failure_count = meta_data["result"]["failure"]
631+
avg_chunks = total_chunks / success_count if success_count > 0 else 0.0
632+
633+
meta_data["statistics"]["total_chunks"] = total_chunks
634+
meta_data["statistics"]["avg_chunks_per_file"] = round(avg_chunks, 2)
635+
636+
# Ensure total remains correct
637+
assert meta_data["result"]["total"] == total_count, \
638+
f"Total should remain {total_count}, but got {meta_data['result']['total']}"
639+
640+
# Save final meta.log with statistics
641+
with open(meta_file_path, 'w', encoding='utf-8') as f:
642+
json.dump(meta_data, f, indent=2, ensure_ascii=False)
643+
logger.info(f'Updated final meta.log (total: {total_count}, success: {success_count}, failure: {failure_count}, total_chunks: {total_chunks})')
644+
else:
645+
success_count = len(processed_files)
646+
failure_count = len(failed_files)
613647

614648
# Summary
615649
logger.info(f'Processing completed. Successfully processed {len(processed_files)} file(s)')
@@ -708,5 +742,6 @@ def init_params(cls, userid: str = None, isadmin: bool = False):
708742
Param("chunk_size", DataType.PositiveFloat, None, 512),
709743
Param("overlap", DataType.PositiveFloat, None, 0),
710744
Param("min_sentences_per_chunk", DataType.PositiveFloat, None, 1),
745+
Param("generate_meta_log", DataType.BOOLEAN, None, False),
711746
]
712747

0 commit comments

Comments
 (0)