Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 51 additions & 1 deletion frontend/src/pages/SynthesisTask/components/SynthesisTaskTab.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ import { formatDateTime } from "@/utils/unit";
import {
querySynthesisTasksUsingGet,
deleteSynthesisTaskByIdUsingDelete,
archiveSynthesisTaskToDatasetUsingPost,
} from "@/pages/SynthesisTask/synthesis-api";
import { createDatasetUsingPost } from "@/pages/DataManagement/dataset.api";

interface SynthesisTask {
id: string;
Expand Down Expand Up @@ -183,6 +185,23 @@ export default function SynthesisTaskTab() {
icon={<EyeOutlined />}
/>
</Tooltip>
<Tooltip title="归档到数据集">
<Button
type="text"
className="hover:bg-green-50 p-1 h-7 w-7"
onClick={() => {
Modal.confirm({
title: "确认归档该合成任务?",
content: `任务名称:${task.name}`,
okText: "归档",
cancelText: "取消",
onOk: () => handleArchiveTask(task),
});
}}
>
归档
</Button>
</Tooltip>
<Tooltip title="删除任务">
<Button
danger
Expand All @@ -191,7 +210,7 @@ export default function SynthesisTaskTab() {
icon={<DeleteOutlined />}
onClick={() => {
Modal.confirm({
title: `确认删除任务`,
title: `确认删除任务?`,
content: `任务名:${task.name}`,
okText: "删除",
okType: "danger",
Expand All @@ -214,6 +233,37 @@ export default function SynthesisTaskTab() {
},
];

const handleArchiveTask = async (task: SynthesisTask) => {
try {
// 1. 创建目标数据集(使用简单的默认命名 + 随机后缀,可后续扩展为弹窗自定义)
const randomSuffix = Math.random().toString(36).slice(2, 8);
const datasetReq = {
name: `${task.name}-合成数据留用${randomSuffix}`,
description: `由合成任务 ${task.id} 留用生成`,
datasetType: "TEXT",
category: "SYNTHESIS",
format: "JSONL",
status: "DRAFT",
} as any;
const datasetRes = await createDatasetUsingPost(datasetReq);
const datasetId = datasetRes?.data?.id;
if (!datasetId) {
message.error("创建数据集失败");
return;
}

// 2. 调用后端归档接口,将合成数据写入该数据集
await archiveSynthesisTaskToDatasetUsingPost(task.id, datasetId);

message.success("归档成功");
// 3. 可选:跳转到数据集详情页
navigate(`/data/management/detail/${datasetId}`);
} catch (e) {
console.error(e);
message.error("归档失败");
}
};

return (
<div className="space-y-4">
{/* 搜索和筛选 */}
Expand Down
5 changes: 5 additions & 0 deletions frontend/src/pages/SynthesisTask/synthesis-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,8 @@ export function querySynthesisDataByChunkUsingGet(chunkId: string) {
export function getPromptByTypeUsingGet(synthType: string) {
return get(`/api/synthesis/gen/prompt`, { synth_type: synthType });
}

// 将合成任务数据归档到已存在的数据集中
export function archiveSynthesisTaskToDatasetUsingPost(taskId: string, datasetId: string) {
return post(`/api/synthesis/gen/task/${taskId}/export-dataset/${datasetId}`);
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
)
from app.module.generation.service.generation_service import GenerationService
from app.module.generation.service.prompt import get_prompt
from app.module.generation.service.export_service import SynthesisDatasetExporter, SynthesisExportError
from app.module.shared.schema import StandardResponse

router = APIRouter(
Expand Down Expand Up @@ -443,3 +444,35 @@ async def list_synthesis_data_by_chunk(
message="Success",
data=items,
)


@router.post("/task/{task_id}/export-dataset/{dataset_id}", response_model=StandardResponse[str])
async def export_synthesis_task_to_dataset(
task_id: str,
dataset_id: str,
db: AsyncSession = Depends(get_db),
):
"""将指定合成任务的全部合成数据归档到已有数据集中。

规则:
- 以原始文件为维度,每个原始文件生成一个 JSONL 文件;
- JSONL 文件名称与原始文件名称完全一致;
- 仅写入文件,不再创建数据集。
"""
exporter = SynthesisDatasetExporter(db)
try:
dataset = await exporter.export_task_to_dataset(task_id, dataset_id)
except SynthesisExportError as e:
logger.error(
"Failed to export synthesis task %s to dataset %s: %s",
task_id,
dataset_id,
e,
)
raise HTTPException(status_code=400, detail=str(e))

return StandardResponse(
code=200,
message="success",
data=dataset.id,
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
import datetime
import json
import os
import time
from typing import Iterable, List, Sequence, cast

from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession

from app.core.logging import get_logger
from app.db.models.data_synthesis import (
DataSynthesisInstance,
DataSynthesisFileInstance,
SynthesisData,
)
from app.db.models.dataset_management import Dataset, DatasetFiles

logger = get_logger(__name__)


class SynthesisExportError(Exception):
"""Raised when exporting synthesis data to dataset fails."""


class SynthesisDatasetExporter:
"""Export synthesis data of a task into an existing dataset.

Export rules:
- Dimension: original file (DatasetFiles)
- One JSONL file per original file
- JSONL file name is exactly the same as the original file name
"""

def __init__(self, db: AsyncSession):
self._db = db

async def export_task_to_dataset(
self,
task_id: str,
dataset_id: str,
) -> Dataset:
"""Export the full synthesis data of the given task into an existing dataset.

Optimized to process one file at a time to reduce memory usage.
"""
task = await self._db.get(DataSynthesisInstance, task_id)
if not task:
raise SynthesisExportError(f"Synthesis task {task_id} not found")

dataset = await self._db.get(Dataset, dataset_id)
if not dataset:
raise SynthesisExportError(f"Dataset {dataset_id} not found")

file_instances = await self._load_file_instances(task_id)
if not file_instances:
raise SynthesisExportError("No synthesis file instances found for task")

base_path = self._ensure_dataset_path(dataset)

created_files: list[DatasetFiles] = []
total_size = 0

# 一个文件一个文件处理,避免一次性加载所有合成数据
for file_instance in file_instances:
records = await self._load_synthesis_data_for_file(file_instance.id)
if not records:
continue

# 归档文件名称:原始文件名称.xxx -> 原始文件名称.jsonl
original_name = file_instance.file_name or "unknown"
base_name, _ = os.path.splitext(original_name)
archived_file_name = f"{base_name}.jsonl"

file_path = os.path.join(base_path, archived_file_name)
os.makedirs(os.path.dirname(file_path), exist_ok=True)
self._write_jsonl(file_path, records)

# 计算文件大小
try:
file_size = os.path.getsize(file_path)
except OSError:
file_size = 0

df = DatasetFiles(
dataset_id=dataset.id,
file_name=archived_file_name,
file_path=file_path,
file_type="jsonl",
file_size=file_size,
last_access_time=datetime.datetime.now(datetime.UTC),
)
self._db.add(df)
created_files.append(df)
total_size += file_size

# 更新数据集的文件数、总大小和状态
if created_files:
dataset.file_count = (dataset.file_count or 0) + len(created_files)
dataset.size_bytes = (dataset.size_bytes or 0) + total_size
dataset.status = "ACTIVE"

await self._db.commit()

logger.info(
"Exported synthesis task %s to dataset %s with %d files (total %d bytes)",
task_id,
dataset.id,
len(created_files),
total_size,
)

return dataset

async def _load_file_instances(self, task_id: str) -> Sequence[DataSynthesisFileInstance]:
result = await self._db.execute(
select(DataSynthesisFileInstance).where(
DataSynthesisFileInstance.synthesis_instance_id == task_id
)
)
return result.scalars().all()

async def _load_synthesis_data_for_file(
self, file_instance_id: str
) -> List[dict]:
"""Load all synthesis data records for a single file instance.

Each returned item is a plain JSON-serialisable dict based on SynthesisData.data.
"""
result = await self._db.execute(
select(SynthesisData).where(
SynthesisData.synthesis_file_instance_id == file_instance_id
)
)
rows: Sequence[SynthesisData] = result.scalars().all()

records: List[dict] = []
for row in rows:
payload = row.data or {}
records.append(payload)
return records

@staticmethod
def _write_jsonl(path: str, records: Iterable[dict]) -> None:
with open(path, "w", encoding="utf-8") as f:
for record in records:
f.write(json.dumps(record, ensure_ascii=False))
f.write("\n")

@staticmethod
def _ensure_dataset_path(dataset: Dataset) -> str:
"""Ensure dataset.path is available and the directory exists.

The actual value of dataset.path should come from Dataset's default
path generation logic or external configuration, not from the
synthesis task's result_data_location.
"""
if not dataset.path:
raise SynthesisExportError("Dataset path is empty")
os.makedirs(dataset.path, exist_ok=True)
return dataset.path
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
5. **答案质量**:答案应准确、简洁、完整。

# 输出格式
请严格按照以下JSON格式输出,确保没有额外的解释或标记:
请严格按照以下JSON格式输出,保持字段顺序,确保没有额外的解释或标记:
[
{{"instruction": "问题1","input": "参考内容1","output": "答案1"}},
{{"instruction": "问题2","input": "参考内容1","output": "答案2"}},
Expand Down Expand Up @@ -53,7 +53,7 @@
* 请根据输入文档的主要语言进行提问和回答。

# 输出格式
请严格按照以下 JSON 格式输出,确保没有额外的解释或标记,每条 COT 数据独立成项:
请严格按照以下 JSON 格式输出,保持字段顺序,确保没有额外的解释或标记,每条 COT 数据独立成项:
[
{{"question": "具体问题","chain_of_thought": "步骤 1:明确问题核心,定位文档中相关信息范围;步骤 2:提取文档中与问题相关的关键信息 1;步骤 3:结合关键信息 1 推导中间结论 1;步骤 4:提取文档中与问题相关的关键信息 2;步骤 5:结合中间结论 1 和关键信息 2 推导中间结论 2;...(逐步推进);步骤 N:汇总所有中间结论,得出最终结论","conclusion": "简洁准确的最终结论"}},

Expand Down