}
onClick={() => {
Modal.confirm({
- title: `确认删除任务?`,
+ title: `确认删除任务?`,
content: `任务名:${task.name}`,
okText: "删除",
okType: "danger",
@@ -214,6 +233,37 @@ export default function SynthesisTaskTab() {
},
];
+ const handleArchiveTask = async (task: SynthesisTask) => {
+ try {
+ // 1. 创建目标数据集(使用简单的默认命名 + 随机后缀,可后续扩展为弹窗自定义)
+ const randomSuffix = Math.random().toString(36).slice(2, 8);
+ const datasetReq = {
+ name: `${task.name}-合成数据留用${randomSuffix}`,
+ description: `由合成任务 ${task.id} 留用生成`,
+ datasetType: "TEXT",
+ category: "SYNTHESIS",
+ format: "JSONL",
+ status: "DRAFT",
+ } as any;
+ const datasetRes = await createDatasetUsingPost(datasetReq);
+ const datasetId = datasetRes?.data?.id;
+ if (!datasetId) {
+ message.error("创建数据集失败");
+ return;
+ }
+
+ // 2. 调用后端归档接口,将合成数据写入该数据集
+ await archiveSynthesisTaskToDatasetUsingPost(task.id, datasetId);
+
+ message.success("归档成功");
+ // 3. 可选:跳转到数据集详情页
+ navigate(`/data/management/detail/${datasetId}`);
+ } catch (e) {
+ console.error(e);
+ message.error("归档失败");
+ }
+ };
+
return (
{/* 搜索和筛选 */}
diff --git a/frontend/src/pages/SynthesisTask/synthesis-api.ts b/frontend/src/pages/SynthesisTask/synthesis-api.ts
index 6ee3c228..1b07898e 100644
--- a/frontend/src/pages/SynthesisTask/synthesis-api.ts
+++ b/frontend/src/pages/SynthesisTask/synthesis-api.ts
@@ -45,3 +45,8 @@ export function querySynthesisDataByChunkUsingGet(chunkId: string) {
export function getPromptByTypeUsingGet(synthType: string) {
return get(`/api/synthesis/gen/prompt`, { synth_type: synthType });
}
+
+// 将合成任务数据归档到已存在的数据集中
+export function archiveSynthesisTaskToDatasetUsingPost(taskId: string, datasetId: string) {
+ return post(`/api/synthesis/gen/task/${taskId}/export-dataset/${datasetId}`);
+}
diff --git a/runtime/datamate-python/app/module/generation/interface/generation_api.py b/runtime/datamate-python/app/module/generation/interface/generation_api.py
index 4018af37..05865f5a 100644
--- a/runtime/datamate-python/app/module/generation/interface/generation_api.py
+++ b/runtime/datamate-python/app/module/generation/interface/generation_api.py
@@ -28,6 +28,7 @@
)
from app.module.generation.service.generation_service import GenerationService
from app.module.generation.service.prompt import get_prompt
+from app.module.generation.service.export_service import SynthesisDatasetExporter, SynthesisExportError
from app.module.shared.schema import StandardResponse
router = APIRouter(
@@ -443,3 +444,35 @@ async def list_synthesis_data_by_chunk(
message="Success",
data=items,
)
+
+
+@router.post("/task/{task_id}/export-dataset/{dataset_id}", response_model=StandardResponse[str])
+async def export_synthesis_task_to_dataset(
+ task_id: str,
+ dataset_id: str,
+ db: AsyncSession = Depends(get_db),
+):
+ """将指定合成任务的全部合成数据归档到已有数据集中。
+
+ 规则:
+ - 以原始文件为维度,每个原始文件生成一个 JSONL 文件;
+ - JSONL 文件名称与原始文件名称完全一致;
+ - 仅写入文件,不再创建数据集。
+ """
+ exporter = SynthesisDatasetExporter(db)
+ try:
+ dataset = await exporter.export_task_to_dataset(task_id, dataset_id)
+ except SynthesisExportError as e:
+ logger.error(
+ "Failed to export synthesis task %s to dataset %s: %s",
+ task_id,
+ dataset_id,
+ e,
+ )
+ raise HTTPException(status_code=400, detail=str(e))
+
+ return StandardResponse(
+ code=200,
+ message="success",
+ data=dataset.id,
+ )
diff --git a/runtime/datamate-python/app/module/generation/service/export_service.py b/runtime/datamate-python/app/module/generation/service/export_service.py
new file mode 100644
index 00000000..2f2dde61
--- /dev/null
+++ b/runtime/datamate-python/app/module/generation/service/export_service.py
@@ -0,0 +1,160 @@
+import datetime
+import json
+import os
+import time
+from typing import Iterable, List, Sequence, cast
+
+from sqlalchemy import select
+from sqlalchemy.ext.asyncio import AsyncSession
+
+from app.core.logging import get_logger
+from app.db.models.data_synthesis import (
+ DataSynthesisInstance,
+ DataSynthesisFileInstance,
+ SynthesisData,
+)
+from app.db.models.dataset_management import Dataset, DatasetFiles
+
+logger = get_logger(__name__)
+
+
+class SynthesisExportError(Exception):
+ """Raised when exporting synthesis data to dataset fails."""
+
+
+class SynthesisDatasetExporter:
+ """Export synthesis data of a task into an existing dataset.
+
+ Export rules:
+ - Dimension: original file (DatasetFiles)
+ - One JSONL file per original file
+ - JSONL file name is exactly the same as the original file name
+ """
+
+ def __init__(self, db: AsyncSession):
+ self._db = db
+
+ async def export_task_to_dataset(
+ self,
+ task_id: str,
+ dataset_id: str,
+ ) -> Dataset:
+ """Export the full synthesis data of the given task into an existing dataset.
+
+ Optimized to process one file at a time to reduce memory usage.
+ """
+ task = await self._db.get(DataSynthesisInstance, task_id)
+ if not task:
+ raise SynthesisExportError(f"Synthesis task {task_id} not found")
+
+ dataset = await self._db.get(Dataset, dataset_id)
+ if not dataset:
+ raise SynthesisExportError(f"Dataset {dataset_id} not found")
+
+ file_instances = await self._load_file_instances(task_id)
+ if not file_instances:
+ raise SynthesisExportError("No synthesis file instances found for task")
+
+ base_path = self._ensure_dataset_path(dataset)
+
+ created_files: list[DatasetFiles] = []
+ total_size = 0
+
+ # 一个文件一个文件处理,避免一次性加载所有合成数据
+ for file_instance in file_instances:
+ records = await self._load_synthesis_data_for_file(file_instance.id)
+ if not records:
+ continue
+
+ # 归档文件名称:原始文件名称.xxx -> 原始文件名称.jsonl
+ original_name = file_instance.file_name or "unknown"
+ base_name, _ = os.path.splitext(original_name)
+ archived_file_name = f"{base_name}.jsonl"
+
+ file_path = os.path.join(base_path, archived_file_name)
+ os.makedirs(os.path.dirname(file_path), exist_ok=True)
+ self._write_jsonl(file_path, records)
+
+ # 计算文件大小
+ try:
+ file_size = os.path.getsize(file_path)
+ except OSError:
+ file_size = 0
+
+ df = DatasetFiles(
+ dataset_id=dataset.id,
+ file_name=archived_file_name,
+ file_path=file_path,
+ file_type="jsonl",
+ file_size=file_size,
+ last_access_time=datetime.datetime.now(datetime.UTC),
+ )
+ self._db.add(df)
+ created_files.append(df)
+ total_size += file_size
+
+ # 更新数据集的文件数、总大小和状态
+ if created_files:
+ dataset.file_count = (dataset.file_count or 0) + len(created_files)
+ dataset.size_bytes = (dataset.size_bytes or 0) + total_size
+ dataset.status = "ACTIVE"
+
+ await self._db.commit()
+
+ logger.info(
+ "Exported synthesis task %s to dataset %s with %d files (total %d bytes)",
+ task_id,
+ dataset.id,
+ len(created_files),
+ total_size,
+ )
+
+ return dataset
+
+ async def _load_file_instances(self, task_id: str) -> Sequence[DataSynthesisFileInstance]:
+ result = await self._db.execute(
+ select(DataSynthesisFileInstance).where(
+ DataSynthesisFileInstance.synthesis_instance_id == task_id
+ )
+ )
+ return result.scalars().all()
+
+ async def _load_synthesis_data_for_file(
+ self, file_instance_id: str
+ ) -> List[dict]:
+ """Load all synthesis data records for a single file instance.
+
+ Each returned item is a plain JSON-serialisable dict based on SynthesisData.data.
+ """
+ result = await self._db.execute(
+ select(SynthesisData).where(
+ SynthesisData.synthesis_file_instance_id == file_instance_id
+ )
+ )
+ rows: Sequence[SynthesisData] = result.scalars().all()
+
+ records: List[dict] = []
+ for row in rows:
+ payload = row.data or {}
+ records.append(payload)
+ return records
+
+ @staticmethod
+ def _write_jsonl(path: str, records: Iterable[dict]) -> None:
+ with open(path, "w", encoding="utf-8") as f:
+ for record in records:
+ f.write(json.dumps(record, ensure_ascii=False))
+ f.write("\n")
+
+ @staticmethod
+ def _ensure_dataset_path(dataset: Dataset) -> str:
+ """Ensure dataset.path is available and the directory exists.
+
+ The actual value of dataset.path should come from Dataset's default
+ path generation logic or external configuration, not from the
+ synthesis task's result_data_location.
+ """
+ if not dataset.path:
+ raise SynthesisExportError("Dataset path is empty")
+ os.makedirs(dataset.path, exist_ok=True)
+ return dataset.path
diff --git a/runtime/datamate-python/app/module/generation/service/prompt.py b/runtime/datamate-python/app/module/generation/service/prompt.py
index a5d699ce..42ea87f8 100644
--- a/runtime/datamate-python/app/module/generation/service/prompt.py
+++ b/runtime/datamate-python/app/module/generation/service/prompt.py
@@ -20,7 +20,7 @@
5. **答案质量**:答案应准确、简洁、完整。
# 输出格式
-请严格按照以下JSON格式输出,确保没有额外的解释或标记:
+请严格按照以下JSON格式输出,保持字段顺序,确保没有额外的解释或标记:
[
{{"instruction": "问题1","input": "参考内容1","output": "答案1"}},
{{"instruction": "问题2","input": "参考内容1","output": "答案2"}},
@@ -53,7 +53,7 @@
* 请根据输入文档的主要语言进行提问和回答。
# 输出格式
-请严格按照以下 JSON 格式输出,确保没有额外的解释或标记,每条 COT 数据独立成项:
+请严格按照以下 JSON 格式输出,保持字段顺序,确保没有额外的解释或标记,每条 COT 数据独立成项:
[
{{"question": "具体问题","chain_of_thought": "步骤 1:明确问题核心,定位文档中相关信息范围;步骤 2:提取文档中与问题相关的关键信息 1;步骤 3:结合关键信息 1 推导中间结论 1;步骤 4:提取文档中与问题相关的关键信息 2;步骤 5:结合中间结论 1 和关键信息 2 推导中间结论 2;...(逐步推进);步骤 N:汇总所有中间结论,得出最终结论","conclusion": "简洁准确的最终结论"}},