Skip to content

Commit 31c4966

Browse files
authored
feat(synthesis): add functionality to archive synthesis tasks to existing datasets (#132)
1 parent 7a9530c commit 31c4966

File tree

5 files changed

+251
-3
lines changed

5 files changed

+251
-3
lines changed

frontend/src/pages/SynthesisTask/components/SynthesisTaskTab.tsx

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@ import { formatDateTime } from "@/utils/unit";
1616
import {
1717
querySynthesisTasksUsingGet,
1818
deleteSynthesisTaskByIdUsingDelete,
19+
archiveSynthesisTaskToDatasetUsingPost,
1920
} from "@/pages/SynthesisTask/synthesis-api";
21+
import { createDatasetUsingPost } from "@/pages/DataManagement/dataset.api";
2022

2123
interface SynthesisTask {
2224
id: string;
@@ -183,6 +185,23 @@ export default function SynthesisTaskTab() {
183185
icon={<EyeOutlined />}
184186
/>
185187
</Tooltip>
188+
<Tooltip title="归档到数据集">
189+
<Button
190+
type="text"
191+
className="hover:bg-green-50 p-1 h-7 w-7"
192+
onClick={() => {
193+
Modal.confirm({
194+
title: "确认归档该合成任务?",
195+
content: `任务名称:${task.name}`,
196+
okText: "归档",
197+
cancelText: "取消",
198+
onOk: () => handleArchiveTask(task),
199+
});
200+
}}
201+
>
202+
归档
203+
</Button>
204+
</Tooltip>
186205
<Tooltip title="删除任务">
187206
<Button
188207
danger
@@ -191,7 +210,7 @@ export default function SynthesisTaskTab() {
191210
icon={<DeleteOutlined />}
192211
onClick={() => {
193212
Modal.confirm({
194-
title: `确认删除任务`,
213+
title: `确认删除任务?`,
195214
content: `任务名:${task.name}`,
196215
okText: "删除",
197216
okType: "danger",
@@ -214,6 +233,37 @@ export default function SynthesisTaskTab() {
214233
},
215234
];
216235

236+
const handleArchiveTask = async (task: SynthesisTask) => {
237+
try {
238+
// 1. 创建目标数据集(使用简单的默认命名 + 随机后缀,可后续扩展为弹窗自定义)
239+
const randomSuffix = Math.random().toString(36).slice(2, 8);
240+
const datasetReq = {
241+
name: `${task.name}-合成数据留用${randomSuffix}`,
242+
description: `由合成任务 ${task.id} 留用生成`,
243+
datasetType: "TEXT",
244+
category: "SYNTHESIS",
245+
format: "JSONL",
246+
status: "DRAFT",
247+
} as any;
248+
const datasetRes = await createDatasetUsingPost(datasetReq);
249+
const datasetId = datasetRes?.data?.id;
250+
if (!datasetId) {
251+
message.error("创建数据集失败");
252+
return;
253+
}
254+
255+
// 2. 调用后端归档接口,将合成数据写入该数据集
256+
await archiveSynthesisTaskToDatasetUsingPost(task.id, datasetId);
257+
258+
message.success("归档成功");
259+
// 3. 可选:跳转到数据集详情页
260+
navigate(`/data/management/detail/${datasetId}`);
261+
} catch (e) {
262+
console.error(e);
263+
message.error("归档失败");
264+
}
265+
};
266+
217267
return (
218268
<div className="space-y-4">
219269
{/* 搜索和筛选 */}

frontend/src/pages/SynthesisTask/synthesis-api.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,3 +45,8 @@ export function querySynthesisDataByChunkUsingGet(chunkId: string) {
4545
export function getPromptByTypeUsingGet(synthType: string) {
4646
return get(`/api/synthesis/gen/prompt`, { synth_type: synthType });
4747
}
48+
49+
// 将合成任务数据归档到已存在的数据集中
50+
export function archiveSynthesisTaskToDatasetUsingPost(taskId: string, datasetId: string) {
51+
return post(`/api/synthesis/gen/task/${taskId}/export-dataset/${datasetId}`);
52+
}

runtime/datamate-python/app/module/generation/interface/generation_api.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
)
2929
from app.module.generation.service.generation_service import GenerationService
3030
from app.module.generation.service.prompt import get_prompt
31+
from app.module.generation.service.export_service import SynthesisDatasetExporter, SynthesisExportError
3132
from app.module.shared.schema import StandardResponse
3233

3334
router = APIRouter(
@@ -443,3 +444,35 @@ async def list_synthesis_data_by_chunk(
443444
message="Success",
444445
data=items,
445446
)
447+
448+
449+
@router.post("/task/{task_id}/export-dataset/{dataset_id}", response_model=StandardResponse[str])
450+
async def export_synthesis_task_to_dataset(
451+
task_id: str,
452+
dataset_id: str,
453+
db: AsyncSession = Depends(get_db),
454+
):
455+
"""将指定合成任务的全部合成数据归档到已有数据集中。
456+
457+
规则:
458+
- 以原始文件为维度,每个原始文件生成一个 JSONL 文件;
459+
- JSONL 文件名称与原始文件名称完全一致;
460+
- 仅写入文件,不再创建数据集。
461+
"""
462+
exporter = SynthesisDatasetExporter(db)
463+
try:
464+
dataset = await exporter.export_task_to_dataset(task_id, dataset_id)
465+
except SynthesisExportError as e:
466+
logger.error(
467+
"Failed to export synthesis task %s to dataset %s: %s",
468+
task_id,
469+
dataset_id,
470+
e,
471+
)
472+
raise HTTPException(status_code=400, detail=str(e))
473+
474+
return StandardResponse(
475+
code=200,
476+
message="success",
477+
data=dataset.id,
478+
)
Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
import datetime
2+
import json
3+
import os
4+
import time
5+
from typing import Iterable, List, Sequence, cast
6+
7+
from sqlalchemy import select
8+
from sqlalchemy.ext.asyncio import AsyncSession
9+
10+
from app.core.logging import get_logger
11+
from app.db.models.data_synthesis import (
12+
DataSynthesisInstance,
13+
DataSynthesisFileInstance,
14+
SynthesisData,
15+
)
16+
from app.db.models.dataset_management import Dataset, DatasetFiles
17+
18+
logger = get_logger(__name__)
19+
20+
21+
class SynthesisExportError(Exception):
22+
"""Raised when exporting synthesis data to dataset fails."""
23+
24+
25+
class SynthesisDatasetExporter:
26+
"""Export synthesis data of a task into an existing dataset.
27+
28+
Export rules:
29+
- Dimension: original file (DatasetFiles)
30+
- One JSONL file per original file
31+
- JSONL file name is exactly the same as the original file name
32+
"""
33+
34+
def __init__(self, db: AsyncSession):
35+
self._db = db
36+
37+
async def export_task_to_dataset(
38+
self,
39+
task_id: str,
40+
dataset_id: str,
41+
) -> Dataset:
42+
"""Export the full synthesis data of the given task into an existing dataset.
43+
44+
Optimized to process one file at a time to reduce memory usage.
45+
"""
46+
task = await self._db.get(DataSynthesisInstance, task_id)
47+
if not task:
48+
raise SynthesisExportError(f"Synthesis task {task_id} not found")
49+
50+
dataset = await self._db.get(Dataset, dataset_id)
51+
if not dataset:
52+
raise SynthesisExportError(f"Dataset {dataset_id} not found")
53+
54+
file_instances = await self._load_file_instances(task_id)
55+
if not file_instances:
56+
raise SynthesisExportError("No synthesis file instances found for task")
57+
58+
base_path = self._ensure_dataset_path(dataset)
59+
60+
created_files: list[DatasetFiles] = []
61+
total_size = 0
62+
63+
# 一个文件一个文件处理,避免一次性加载所有合成数据
64+
for file_instance in file_instances:
65+
records = await self._load_synthesis_data_for_file(file_instance.id)
66+
if not records:
67+
continue
68+
69+
# 归档文件名称:原始文件名称.xxx -> 原始文件名称.jsonl
70+
original_name = file_instance.file_name or "unknown"
71+
base_name, _ = os.path.splitext(original_name)
72+
archived_file_name = f"{base_name}.jsonl"
73+
74+
file_path = os.path.join(base_path, archived_file_name)
75+
os.makedirs(os.path.dirname(file_path), exist_ok=True)
76+
self._write_jsonl(file_path, records)
77+
78+
# 计算文件大小
79+
try:
80+
file_size = os.path.getsize(file_path)
81+
except OSError:
82+
file_size = 0
83+
84+
df = DatasetFiles(
85+
dataset_id=dataset.id,
86+
file_name=archived_file_name,
87+
file_path=file_path,
88+
file_type="jsonl",
89+
file_size=file_size,
90+
last_access_time=datetime.datetime.now(datetime.UTC),
91+
)
92+
self._db.add(df)
93+
created_files.append(df)
94+
total_size += file_size
95+
96+
# 更新数据集的文件数、总大小和状态
97+
if created_files:
98+
dataset.file_count = (dataset.file_count or 0) + len(created_files)
99+
dataset.size_bytes = (dataset.size_bytes or 0) + total_size
100+
dataset.status = "ACTIVE"
101+
102+
await self._db.commit()
103+
104+
logger.info(
105+
"Exported synthesis task %s to dataset %s with %d files (total %d bytes)",
106+
task_id,
107+
dataset.id,
108+
len(created_files),
109+
total_size,
110+
)
111+
112+
return dataset
113+
114+
async def _load_file_instances(self, task_id: str) -> Sequence[DataSynthesisFileInstance]:
115+
result = await self._db.execute(
116+
select(DataSynthesisFileInstance).where(
117+
DataSynthesisFileInstance.synthesis_instance_id == task_id
118+
)
119+
)
120+
return result.scalars().all()
121+
122+
async def _load_synthesis_data_for_file(
123+
self, file_instance_id: str
124+
) -> List[dict]:
125+
"""Load all synthesis data records for a single file instance.
126+
127+
Each returned item is a plain JSON-serialisable dict based on SynthesisData.data.
128+
"""
129+
result = await self._db.execute(
130+
select(SynthesisData).where(
131+
SynthesisData.synthesis_file_instance_id == file_instance_id
132+
)
133+
)
134+
rows: Sequence[SynthesisData] = result.scalars().all()
135+
136+
records: List[dict] = []
137+
for row in rows:
138+
payload = row.data or {}
139+
records.append(payload)
140+
return records
141+
142+
@staticmethod
143+
def _write_jsonl(path: str, records: Iterable[dict]) -> None:
144+
with open(path, "w", encoding="utf-8") as f:
145+
for record in records:
146+
f.write(json.dumps(record, ensure_ascii=False))
147+
f.write("\n")
148+
149+
@staticmethod
150+
def _ensure_dataset_path(dataset: Dataset) -> str:
151+
"""Ensure dataset.path is available and the directory exists.
152+
153+
The actual value of dataset.path should come from Dataset's default
154+
path generation logic or external configuration, not from the
155+
synthesis task's result_data_location.
156+
"""
157+
if not dataset.path:
158+
raise SynthesisExportError("Dataset path is empty")
159+
os.makedirs(dataset.path, exist_ok=True)
160+
return dataset.path

runtime/datamate-python/app/module/generation/service/prompt.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
5. **答案质量**:答案应准确、简洁、完整。
2121
2222
# 输出格式
23-
请严格按照以下JSON格式输出,确保没有额外的解释或标记:
23+
请严格按照以下JSON格式输出,保持字段顺序,确保没有额外的解释或标记:
2424
[
2525
{{"instruction": "问题1","input": "参考内容1","output": "答案1"}},
2626
{{"instruction": "问题2","input": "参考内容1","output": "答案2"}},
@@ -53,7 +53,7 @@
5353
* 请根据输入文档的主要语言进行提问和回答。
5454
5555
# 输出格式
56-
请严格按照以下 JSON 格式输出,确保没有额外的解释或标记,每条 COT 数据独立成项:
56+
请严格按照以下 JSON 格式输出,保持字段顺序,确保没有额外的解释或标记,每条 COT 数据独立成项:
5757
[
5858
{{"question": "具体问题","chain_of_thought": "步骤 1:明确问题核心,定位文档中相关信息范围;步骤 2:提取文档中与问题相关的关键信息 1;步骤 3:结合关键信息 1 推导中间结论 1;步骤 4:提取文档中与问题相关的关键信息 2;步骤 5:结合中间结论 1 和关键信息 2 推导中间结论 2;...(逐步推进);步骤 N:汇总所有中间结论,得出最终结论","conclusion": "简洁准确的最终结论"}},
5959

0 commit comments

Comments
 (0)