+ ),
+ },
+ ];
+
+ return (
+
+ {/* Filter Controls */}
+
+
+ setSearchParams((prev) => ({
+ ...prev,
+ filter: { ...prev.filter, status: [] },
+ current: 1,
+ }))
+ }
+ showDatePicker
+ dateRange={dateRange as any}
+ onDateChange={(date) => {
+ setDateRange(date as any);
+ const start = (date?.[0] as any)?.toISOString?.() || undefined;
+ const end = (date?.[1] as any)?.toISOString?.() || undefined;
+ setSearchParams((prev) => ({
+ ...prev,
+ current: 1,
+ start_time: start,
+ end_time: end,
+ }));
+ }}
+ onReload={handleReset}
+ searchPlaceholder="搜索任务名称..."
+ className="flex-1"
+ />
+
+
+
+
+
+
{
+ setLogOpen(false);
+ if (logBlobUrl) {
+ URL.revokeObjectURL(logBlobUrl);
+ setLogBlobUrl("");
+ }
+ }}
+ footer={
+
+
{logFilename || ""}
+
+ {logBlobUrl ? (
+ {
+ const a = document.createElement("a");
+ a.href = logBlobUrl;
+ a.download = logFilename || "execution.log";
+ document.body.appendChild(a);
+ a.click();
+ document.body.removeChild(a);
+ }}
+ >
+ 下载日志
+
+ ) : null}
+ {
+ setLogOpen(false);
+ if (logBlobUrl) {
+ URL.revokeObjectURL(logBlobUrl);
+ setLogBlobUrl("");
+ }
+ }}
+ >
+ 关闭
+
+
+
+ }
+ width={900}
+ >
+
+ {logLoading ? "Loading..." : (logContent || "(empty)")}
+
+
+
+ );
+}
diff --git a/frontend/src/pages/DataCollection/Home/ExecutionLog.tsx b/frontend/src/pages/DataCollection/Home/ExecutionLog.tsx
deleted file mode 100644
index f840b6362..000000000
--- a/frontend/src/pages/DataCollection/Home/ExecutionLog.tsx
+++ /dev/null
@@ -1,149 +0,0 @@
-import { Card, Badge, Table } from "antd";
-import type { ColumnsType } from "antd/es/table";
-import { SearchControls } from "@/components/SearchControls";
-import type { CollectionLog } from "@/pages/DataCollection/collection.model";
-import { queryExecutionLogUsingPost } from "../collection.apis";
-import { LogStatusMap, LogTriggerTypeMap } from "../collection.const";
-import useFetchData from "@/hooks/useFetchData";
-
-const filterOptions = [
- {
- key: "status",
- label: "状态筛选",
- options: Object.values(LogStatusMap),
- },
- {
- key: "triggerType",
- label: "触发类型",
- options: Object.values(LogTriggerTypeMap),
- },
-];
-
-export default function ExecutionLog() {
- const handleReset = () => {
- setSearchParams({
- keyword: "",
- filters: {},
- current: 1,
- pageSize: 10,
- dateRange: null,
- });
- };
-
- const {
- loading,
- tableData,
- pagination,
- searchParams,
- setSearchParams,
- handleFiltersChange,
- handleKeywordChange,
- } = useFetchData(queryExecutionLogUsingPost);
-
- const columns: ColumnsType
= [
- {
- title: "任务名称",
- dataIndex: "taskName",
- key: "taskName",
- fixed: "left",
- render: (text: string) => {text},
- },
- {
- title: "状态",
- dataIndex: "status",
- key: "status",
- render: (status: string) => (
-
- ),
- },
- {
- title: "触发类型",
- dataIndex: "triggerType",
- key: "triggerType",
- render: (type: string) => LogTriggerTypeMap[type].label,
- },
- {
- title: "开始时间",
- dataIndex: "startTime",
- key: "startTime",
- },
- {
- title: "结束时间",
- dataIndex: "endTime",
- key: "endTime",
- },
- {
- title: "执行时长",
- dataIndex: "duration",
- key: "duration",
- },
- {
- title: "重试次数",
- dataIndex: "retryCount",
- key: "retryCount",
- },
- {
- title: "进程ID",
- dataIndex: "processId",
- key: "processId",
- render: (text: string) => (
- {text}
- ),
- },
- {
- title: "错误信息",
- dataIndex: "errorMessage",
- key: "errorMessage",
- render: (msg?: string) =>
- msg ? (
-
- {msg}
-
- ) : (
- -
- ),
- },
- ];
-
- return (
-
- {/* Filter Controls */}
-
-
- setSearchParams((prev) => ({
- ...prev,
- filters: {},
- }))
- }
- showDatePicker
- dateRange={searchParams.dateRange || [null, null]}
- onDateChange={(date) =>
- setSearchParams((prev) => ({ ...prev, dateRange: date }))
- }
- onReload={handleReset}
- searchPlaceholder="搜索任务名称、进程ID或错误信息..."
- className="flex-1"
- />
-
-
-
-
-
- );
-}
diff --git a/frontend/src/pages/DataCollection/Home/TaskManagement.tsx b/frontend/src/pages/DataCollection/Home/TaskManagement.tsx
index f67e21718..5482d45bd 100644
--- a/frontend/src/pages/DataCollection/Home/TaskManagement.tsx
+++ b/frontend/src/pages/DataCollection/Home/TaskManagement.tsx
@@ -1,34 +1,16 @@
-import {
- Card,
- Button,
- Badge,
- Table,
- Dropdown,
- App,
- Tooltip,
- Popconfirm,
-} from "antd";
-import {
- DeleteOutlined,
- EditOutlined,
- EllipsisOutlined,
- PauseCircleOutlined,
- PauseOutlined,
- PlayCircleOutlined,
- StopOutlined,
-} from "@ant-design/icons";
-import { SearchControls } from "@/components/SearchControls";
+import {App, Button, Card, Popconfirm, Table, Tag, Tooltip,} from "antd";
+import {DeleteOutlined, PauseCircleOutlined, PlayCircleOutlined, ProfileOutlined,} from "@ant-design/icons";
+import {SearchControls} from "@/components/SearchControls";
import {
deleteTaskByIdUsingDelete,
executeTaskByIdUsingPost,
queryTasksUsingGet,
stopTaskByIdUsingPost,
} from "../collection.apis";
-import { TaskStatus, type CollectionTask } from "../collection.model";
-import { StatusMap, SyncModeMap } from "../collection.const";
+import {type CollectionTask, TaskStatus} from "../collection.model";
+import {mapCollectionTask, StatusMap} from "../collection.const";
import useFetchData from "@/hooks/useFetchData";
-import { useNavigate } from "react-router";
-import { mapCollectionTask } from "../collection.const";
+import {useNavigate} from "react-router";
export default function TaskManagement() {
const { message } = App.useApp();
@@ -51,8 +33,20 @@ export default function TaskManagement() {
searchParams,
setSearchParams,
fetchData,
- handleFiltersChange,
- } = useFetchData(queryTasksUsingGet, mapCollectionTask);
+ } = useFetchData(
+ (params) => {
+ const { keyword, ...rest } = params || {};
+ return queryTasksUsingGet({
+ ...rest,
+ name: keyword || undefined,
+ });
+ },
+ mapCollectionTask,
+ 30000,
+ false,
+ [],
+ 0
+ );
const handleStartTask = async (taskId: string) => {
await executeTaskByIdUsingPost(taskId);
@@ -86,21 +80,21 @@ export default function TaskManagement() {
icon: ,
onClick: () => handleStopTask(record.id),
};
- const items = [
- // isStopped ? startButton : stopButton,
- // {
- // key: "edit",
- // label: "编辑",
- // icon: ,
- // onClick: () => {
- // showEditTaskModal(record);
- // },
- // },
+ return [
+ {
+ key: "executions",
+ label: "执行记录",
+ icon: ,
+ onClick: () =>
+ navigate(
+ `/data/collection?tab=task-execution&taskId=${encodeURIComponent(record.id)}`
+ ),
+ },
{
key: "delete",
label: "删除",
danger: true,
- icon: ,
+ icon: ,
confirm: {
title: "确定要删除该任务吗?此操作不可撤销。",
okText: "删除",
@@ -110,7 +104,6 @@ export default function TaskManagement() {
onClick: () => handleDeleteTask(record.id),
},
];
- return items;
};
const columns = [
@@ -128,17 +121,49 @@ export default function TaskManagement() {
key: "status",
width: 150,
ellipsis: true,
- render: (status: string) => (
-
+ render: (status: any) => (
+ {status.label}
),
},
+ {
+ title: "所用模板",
+ dataIndex: "templateName",
+ key: "templateName",
+ width: 180,
+ ellipsis: true,
+ render: (v?: string) => v || "-",
+ },
{
title: "同步方式",
dataIndex: "syncMode",
key: "syncMode",
width: 150,
ellipsis: true,
- render: (text: string) => {SyncModeMap[text]?.label},
+ render: (text: any) => (
+ {text.label}
+ ),
+ },
+ {
+ title: "Cron调度表达式",
+ dataIndex: "scheduleExpression",
+ key: "scheduleExpression",
+ width: 200,
+ ellipsis: true,
+ },
+ {
+ title: "超时时间",
+ dataIndex: "timeoutSeconds",
+ key: "timeoutSeconds",
+ width: 140,
+ ellipsis: true,
+ render: (v?: number) => (v === undefined || v === null ? "-" : `${v}s`),
+ },
+ {
+ title: "描述",
+ dataIndex: "description",
+ key: "description",
+ ellipsis: true,
+ width: 200,
},
{
title: "创建时间",
@@ -154,20 +179,6 @@ export default function TaskManagement() {
width: 150,
ellipsis: true,
},
- {
- title: "最近执行ID",
- dataIndex: "lastExecutionId",
- key: "lastExecutionId",
- width: 150,
- ellipsis: true,
- },
- {
- title: "描述",
- dataIndex: "description",
- key: "description",
- ellipsis: true,
- width: 200,
- },
{
title: "操作",
key: "action",
@@ -180,7 +191,7 @@ export default function TaskManagement() {
type="text"
icon={op.icon}
danger={op?.danger}
- onClick={() => op.onClick(record)}
+ onClick={() => op.onClick()}
/>
);
@@ -192,7 +203,7 @@ export default function TaskManagement() {
okText={op.confirm.okText}
cancelText={op.confirm.cancelText}
okType={op.danger ? "danger" : "primary"}
- onConfirm={() => op.onClick(record)}
+ onConfirm={() => op.onClick()}
>
@@ -218,14 +229,15 @@ export default function TaskManagement() {
current: 1,
}))
}
- searchPlaceholder="搜索任务名称或描述..."
+ searchPlaceholder="搜索任务名称..."
filters={filters}
- onFiltersChange={handleFiltersChange}
+ onFiltersChange={() => {}}
showViewToggle={false}
onClearFilters={() =>
setSearchParams((prev) => ({
...prev,
- filters: {},
+ filter: { ...prev.filter, status: [] },
+ current: 1,
}))
}
onReload={fetchData}
diff --git a/frontend/src/pages/DataCollection/Home/TemplateManagement.tsx b/frontend/src/pages/DataCollection/Home/TemplateManagement.tsx
new file mode 100644
index 000000000..35db8d4cd
--- /dev/null
+++ b/frontend/src/pages/DataCollection/Home/TemplateManagement.tsx
@@ -0,0 +1,173 @@
+import { App, Card, Table, Tag } from "antd";
+import type { ColumnsType } from "antd/es/table";
+import { SearchControls } from "@/components/SearchControls";
+import useFetchData from "@/hooks/useFetchData";
+import { queryDataXTemplatesUsingGet } from "../collection.apis";
+import { formatDateTime } from "@/utils/unit";
+
+type CollectionTemplate = {
+ id: string;
+ name: string;
+ description?: string;
+ sourceType: string;
+ sourceName: string;
+ targetType: string;
+ targetName: string;
+ builtIn?: boolean;
+ createdAt?: string;
+ updatedAt?: string;
+};
+
+export default function TemplateManagement() {
+ const { message } = App.useApp();
+
+ const filters = [
+ {
+ key: "builtIn",
+ label: "模板类型",
+ options: [
+ { value: "all", label: "全部" },
+ { value: "true", label: "内置" },
+ { value: "false", label: "自定义" },
+ ],
+ },
+ ];
+
+ const {
+ loading,
+ tableData,
+ pagination,
+ searchParams,
+ setSearchParams,
+ fetchData,
+ handleFiltersChange,
+ } = useFetchData(
+ (params) => {
+ const { keyword, builtIn, ...rest } = params || {};
+ const builtInValue = Array.isArray(builtIn)
+ ? builtIn?.[0]
+ : builtIn;
+
+ return queryDataXTemplatesUsingGet({
+ ...rest,
+ name: keyword || undefined,
+ built_in:
+ builtInValue && builtInValue !== "all"
+ ? builtInValue === "true"
+ : undefined,
+ });
+ },
+ (tpl) => ({
+ ...tpl,
+ createdAt: tpl.createdAt ? formatDateTime(tpl.createdAt) : "-",
+ updatedAt: tpl.updatedAt ? formatDateTime(tpl.updatedAt) : "-",
+ }),
+ 30000,
+ false,
+ [],
+ 0
+ );
+
+ const columns: ColumnsType = [
+ {
+ title: "模板名称",
+ dataIndex: "name",
+ key: "name",
+ fixed: "left",
+ width: 200,
+ ellipsis: true,
+ },
+ {
+ title: "模板类型",
+ dataIndex: "builtIn",
+ key: "builtIn",
+ width: 120,
+ render: (v?: boolean) => (
+ {v ? "内置" : "自定义"}
+ ),
+ },
+ {
+ title: "源端",
+ key: "source",
+ width: 220,
+ ellipsis: true,
+ render: (_: any, record: CollectionTemplate) => (
+ {`${record.sourceType} / ${record.sourceName}`}
+ ),
+ },
+ {
+ title: "目标端",
+ key: "target",
+ width: 220,
+ ellipsis: true,
+ render: (_: any, record: CollectionTemplate) => (
+ {`${record.targetType} / ${record.targetName}`}
+ ),
+ },
+ {
+ title: "描述",
+ dataIndex: "description",
+ key: "description",
+ width: 260,
+ ellipsis: true,
+ render: (v?: string) => v || "-",
+ },
+ {
+ title: "创建时间",
+ dataIndex: "createdAt",
+ key: "createdAt",
+ width: 160,
+ },
+ {
+ title: "更新时间",
+ dataIndex: "updatedAt",
+ key: "updatedAt",
+ width: 160,
+ },
+ ];
+
+ return (
+
+
+ setSearchParams((prev) => ({
+ ...prev,
+ keyword: newSearchTerm,
+ current: 1,
+ }))
+ }
+ searchPlaceholder="搜索模板名称..."
+ filters={filters}
+ onFiltersChange={handleFiltersChange}
+ showViewToggle={false}
+ onClearFilters={() =>
+ setSearchParams((prev) => ({
+ ...prev,
+ filter: { ...prev.filter, builtIn: [] },
+ current: 1,
+ }))
+ }
+ onReload={() => {
+ fetchData().catch(() => message.error("刷新失败"));
+ }}
+ />
+
+
+
+
+
+ );
+}
diff --git a/frontend/src/pages/DataCollection/collection.apis.ts b/frontend/src/pages/DataCollection/collection.apis.ts
index 940b6cb07..3e6bb9ee7 100644
--- a/frontend/src/pages/DataCollection/collection.apis.ts
+++ b/frontend/src/pages/DataCollection/collection.apis.ts
@@ -28,7 +28,7 @@ export function queryDataXTemplatesUsingGet(params?: any) {
return get("/api/data-collection/templates", params);
}
export function deleteTaskByIdUsingDelete(id: string | number) {
- return del(`/api/data-collection/tasks/${id}`);
+ return del("/api/data-collection/tasks", { ids: [id] });
}
export function executeTaskByIdUsingPost(
@@ -47,13 +47,47 @@ export function stopTaskByIdUsingPost(
// 执行日志相关接口
export function queryExecutionLogUsingPost(params?: any) {
- return post("/api/data-collection/executions", params);
+ return get("/api/data-collection/executions", params);
}
export function queryExecutionLogByIdUsingGet(id: string | number) {
return get(`/api/data-collection/executions/${id}`);
}
+export function queryExecutionLogContentByIdUsingGet(id: string | number) {
+ return get(`/api/data-collection/executions/${id}/log`);
+}
+
+export async function queryExecutionLogFileByIdUsingGet(id: string | number) {
+ const token = localStorage.getItem("token") || sessionStorage.getItem("token");
+ const resp = await fetch(`/api/data-collection/executions/${id}/log`, {
+ method: "GET",
+ headers: {
+ ...(token ? { Authorization: `Bearer ${token}` } : {}),
+ },
+ credentials: "include",
+ });
+
+ if (!resp.ok) {
+ let detail = "";
+ try {
+ detail = await resp.text();
+ } catch {
+ detail = resp.statusText;
+ }
+ const err: any = new Error(detail || `HTTP error ${resp.status}`);
+ err.status = resp.status;
+ err.data = detail;
+ throw err;
+ }
+
+ const contentDisposition = resp.headers.get("content-disposition") || "";
+ const filenameMatch = contentDisposition.match(/filename\*?=(?:UTF-8''|\")?([^;\"\n]+)/i);
+ const filename = filenameMatch?.[1] ? decodeURIComponent(filenameMatch[1].replace(/\"/g, "").trim()) : `execution_${id}.log`;
+ const blob = await resp.blob();
+ return { blob, filename };
+}
+
// 监控统计相关接口
export function queryCollectionStatisticsUsingGet(params?: any) {
return get("/api/data-collection/monitor/statistics", params);
diff --git a/frontend/src/pages/DataCollection/collection.const.ts b/frontend/src/pages/DataCollection/collection.const.ts
index 544bfb74e..29ace71df 100644
--- a/frontend/src/pages/DataCollection/collection.const.ts
+++ b/frontend/src/pages/DataCollection/collection.const.ts
@@ -1,9 +1,11 @@
import {
+ CollectionTask,
LogStatus,
- SyncMode,
+ SyncMode, TaskExecution,
TaskStatus,
TriggerType,
} from "./collection.model";
+import {formatDateTime} from "@/utils/unit.ts";
export const StatusMap: Record<
TaskStatus,
@@ -24,23 +26,27 @@ export const StatusMap: Record<
color: "red",
value: TaskStatus.FAILED,
},
- [TaskStatus.SUCCESS]: {
+ [TaskStatus.COMPLETED]: {
label: "成功",
color: "green",
- value: TaskStatus.SUCCESS,
+ value: TaskStatus.COMPLETED,
},
[TaskStatus.DRAFT]: {
label: "草稿",
color: "orange",
value: TaskStatus.DRAFT,
},
- [TaskStatus.READY]: { label: "就绪", color: "cyan", value: TaskStatus.READY },
+ [TaskStatus.PENDING]: {
+ label: "就绪",
+ color: "cyan",
+ value: TaskStatus.PENDING
+ },
};
-export const SyncModeMap: Record =
+export const SyncModeMap: Record =
{
- [SyncMode.ONCE]: { label: "立即同步", value: SyncMode.ONCE },
- [SyncMode.SCHEDULED]: { label: "定时同步", value: SyncMode.SCHEDULED },
+ [SyncMode.ONCE]: { label: "立即同步", value: SyncMode.ONCE, color: "orange" },
+ [SyncMode.SCHEDULED]: { label: "定时同步", value: SyncMode.SCHEDULED, color: "blue" },
};
export const LogStatusMap: Record<
@@ -73,9 +79,21 @@ export const LogTriggerTypeMap: Record<
[TriggerType.API]: { label: "API", value: TriggerType.API },
};
-export function mapCollectionTask(task: CollectionTask): CollectionTask {
+export function mapCollectionTask(task: CollectionTask): any {
return {
...task,
status: StatusMap[task.status],
+ syncMode: SyncModeMap[task.syncMode],
+ createdAt: formatDateTime(task.createdAt),
+ updatedAt: formatDateTime(task.updatedAt)
+ };
+}
+
+export function mapTaskExecution(execution: TaskExecution): any {
+ return {
+ ...execution,
+ status: StatusMap[execution.status],
+ startedAt: formatDateTime(execution.startedAt),
+ completedAt: formatDateTime(execution.completedAt)
};
}
diff --git a/frontend/src/pages/DataCollection/collection.model.ts b/frontend/src/pages/DataCollection/collection.model.ts
index a6bdcfb9c..3f3f3b350 100644
--- a/frontend/src/pages/DataCollection/collection.model.ts
+++ b/frontend/src/pages/DataCollection/collection.model.ts
@@ -1,8 +1,8 @@
export enum TaskStatus {
DRAFT = "DRAFT",
- READY = "READY",
+ PENDING = "PENDING",
RUNNING = "RUNNING",
- SUCCESS = "SUCCESS",
+ COMPLETED = "COMPLETED",
FAILED = "FAILED",
STOPPED = "STOPPED",
}
@@ -19,12 +19,26 @@ export interface CollectionTask {
config: object; // 具体配置结构根据实际需求定义
status: TaskStatus;
syncMode: SyncMode;
+ templateName?: string;
scheduleExpression?: string; // 仅当 syncMode 为 SCHEDULED 时存在
+ timeoutSeconds?: number;
lastExecutionId: string;
createdAt: string; // ISO date string
updatedAt: string; // ISO date string
}
+export interface TaskExecution {
+ id: string;
+ taskId: string;
+ taskName: string;
+ status: string;
+ logPath: string;
+ startedAt: string;
+ completedAt: string;
+ durationSeconds: number;
+ errorMessage: string;
+}
+
export enum LogStatus {
RUNNING = "RUNNING",
SUCCESS = "SUCCESS",
diff --git a/runtime/datamate-python/app/db/models/data_collection.py b/runtime/datamate-python/app/db/models/data_collection.py
new file mode 100644
index 000000000..547e2ddfb
--- /dev/null
+++ b/runtime/datamate-python/app/db/models/data_collection.py
@@ -0,0 +1,66 @@
+import uuid
+from sqlalchemy import Column, String, Text, TIMESTAMP, Integer, BigInteger, Numeric, JSON, Boolean
+from sqlalchemy.sql import func
+
+from app.db.session import Base
+
+class CollectionTemplate(Base):
+ """归集模板表(UUID 主键) -> t_dc_collection_templates"""
+
+ __tablename__ = "t_dc_collection_templates"
+
+ id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4()), comment="模板ID(UUID)")
+ name = Column(String(255), nullable=False, comment="模板名称")
+ description = Column(Text, nullable=True, comment="模板描述")
+ source_type = Column(String(64), nullable=False, comment="源数据源类型")
+ source_name = Column(String(64), nullable=False, comment="源数据源名称")
+ target_type = Column(String(64), nullable=False, comment="目标数据源类型")
+ target_name = Column(String(64), nullable=False, comment="目标数据源名称")
+ template_content = Column(JSON, nullable=False, comment="模板内容")
+ built_in = Column(Boolean, default=False, comment="是否系统内置模板")
+ created_at = Column(TIMESTAMP, server_default=func.current_timestamp(), comment="创建时间")
+ updated_at = Column(TIMESTAMP, server_default=func.current_timestamp(), onupdate=func.current_timestamp(), comment="更新时间")
+ created_by = Column(String(255), nullable=True, comment="创建者")
+ updated_by = Column(String(255), nullable=True, comment="更新者")
+
+class CollectionTask(Base):
+ """归集任务表(UUID 主键) -> t_dc_collection_tasks"""
+
+ __tablename__ = "t_dc_collection_tasks"
+
+ id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4()), comment="UUID")
+ name = Column(String(255), nullable=False, comment="任务名称")
+ description = Column(Text, nullable=True, comment="任务描述")
+ sync_mode = Column(String(20), nullable=False, server_default="ONCE", comment="同步模式:ONCE/SCHEDULED")
+ template_id = Column(String(36), nullable=False, comment="归集模板ID")
+ template_name = Column(String(255), nullable=False, comment="归集模板名称")
+ target_path = Column(String(1000), nullable=True, server_default="", comment="目标存储路径")
+ config = Column(JSON, nullable=False, comment="归集配置(DataX配置),包含源端和目标端配置信息")
+ schedule_expression = Column(String(255), nullable=True, comment="Cron调度表达式")
+ status = Column(String(20), nullable=True, server_default="DRAFT", comment="任务状态:DRAFT/READY/RUNNING/SUCCESS/FAILED/STOPPED")
+ retry_count = Column(Integer, nullable=True, server_default="3", comment="重试次数")
+ timeout_seconds = Column(Integer, nullable=True, server_default="3600", comment="超时时间(秒)")
+ last_execution_id = Column(String(36), nullable=True, comment="最后执行ID(UUID)")
+ created_at = Column(TIMESTAMP, server_default=func.current_timestamp(), comment="创建时间")
+ updated_at = Column(TIMESTAMP, server_default=func.current_timestamp(), onupdate=func.current_timestamp(), comment="更新时间")
+ created_by = Column(String(255), nullable=True, comment="创建者")
+ updated_by = Column(String(255), nullable=True, comment="更新者")
+
+class TaskExecution(Base):
+ """任务执行记录表(UUID 主键) -> t_dc_task_executions"""
+
+ __tablename__ = "t_dc_task_executions"
+
+ id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4()), comment="执行记录ID(UUID)")
+ task_id = Column(String(36), nullable=False, comment="任务ID")
+ task_name = Column(String(255), nullable=False, comment="任务名称")
+ status = Column(String(20), nullable=True, server_default="RUNNING", comment="执行状态:RUNNING/SUCCESS/FAILED/STOPPED")
+ log_path = Column(String(1000), nullable=True, server_default="", comment="日志文件路径")
+ started_at = Column(TIMESTAMP, nullable=True, comment="开始时间")
+ completed_at = Column(TIMESTAMP, nullable=True, comment="完成时间")
+ duration_seconds = Column(Integer, nullable=True, server_default="0", comment="执行时长(秒)")
+ error_message = Column(Text, nullable=True, comment="错误信息")
+ created_at = Column(TIMESTAMP, server_default=func.current_timestamp(), comment="创建时间")
+ updated_at = Column(TIMESTAMP, server_default=func.current_timestamp(), onupdate=func.current_timestamp(), comment="更新时间")
+ created_by = Column(String(255), nullable=True, comment="创建者")
+ updated_by = Column(String(255), nullable=True, comment="更新者")
diff --git a/runtime/datamate-python/app/module/__init__.py b/runtime/datamate-python/app/module/__init__.py
index 2eea538be..74ae7c2d3 100644
--- a/runtime/datamate-python/app/module/__init__.py
+++ b/runtime/datamate-python/app/module/__init__.py
@@ -5,6 +5,7 @@
from .ratio.interface import router as ratio_router
from .generation.interface import router as generation_router
from .evaluation.interface import router as evaluation_router
+from .collection.interface import router as collection_route
router = APIRouter(
prefix="/api"
@@ -15,5 +16,6 @@
router.include_router(ratio_router)
router.include_router(generation_router)
router.include_router(evaluation_router)
+router.include_router(collection_route)
__all__ = ["router"]
diff --git a/runtime/datamate-python/app/module/annotation/client/labelstudio/client.py b/runtime/datamate-python/app/module/annotation/client/labelstudio/client.py
index b56893bf0..a7404a3be 100644
--- a/runtime/datamate-python/app/module/annotation/client/labelstudio/client.py
+++ b/runtime/datamate-python/app/module/annotation/client/labelstudio/client.py
@@ -1,11 +1,12 @@
import httpx
+import re
from typing import Optional, Dict, Any, List
from app.core.config import settings
from app.core.logging import get_logger
from .schema import (
- LabelStudioProject,
+ LabelStudioProject,
LabelStudioCreateProjectRequest,
LabelStudioCreateTaskRequest
)
@@ -14,11 +15,11 @@
class Client:
"""Label Studio服务客户端
-
+
使用 HTTP REST API 直接与 Label Studio 交互
认证方式:使用 Authorization: Token {token} 头部进行认证
"""
-
+
# 默认标注配置模板
DEFAULT_LABEL_CONFIGS = {
"image": """
@@ -57,15 +58,15 @@ class Client:
"""
}
-
+
def __init__(
- self,
- base_url: Optional[str] = None,
+ self,
+ base_url: Optional[str] = None,
token: Optional[str] = None,
timeout: float = 30.0
):
"""初始化 Label Studio 客户端
-
+
Args:
base_url: Label Studio 服务地址
token: API Token(使用 Authorization: Token {token} 头部)
@@ -74,10 +75,10 @@ def __init__(
self.base_url = (base_url or settings.label_studio_base_url).rstrip("/")
self.token = token or settings.label_studio_user_token
self.timeout = timeout
-
+
if not self.token:
raise ValueError("Label Studio API token is required")
-
+
# 初始化 HTTP 客户端
self.client = httpx.AsyncClient(
base_url=self.base_url,
@@ -87,46 +88,80 @@ def __init__(
"Content-Type": "application/json"
}
)
-
+
logger.debug(f"Label Studio client initialized: {self.base_url}")
-
+
def get_label_config_by_type(self, data_type: str) -> str:
"""根据数据类型获取标注配置"""
return self.DEFAULT_LABEL_CONFIGS.get(data_type.lower(), self.DEFAULT_LABEL_CONFIGS["image"])
-
+
+ @staticmethod
+ def get_csrf_token(html: str) -> str:
+ m = re.search(r'name="csrfmiddlewaretoken"\s+value="([^"]+)"', html)
+ if not m:
+ raise IOError("CSRF Token not found")
+ return m.group(1)
+
+ async def login_label_studio(self):
+ try:
+ response = await self.client.get("/user/login/")
+ response.raise_for_status()
+ body = response.text
+ set_cookie_headers = response.headers.get_list("set-cookie")
+ cookie_header = "; ".join(set_cookie_headers)
+ form = {
+ "email": settings.label_studio_username,
+ "password": settings.label_studio_password,
+ "csrfmiddlewaretoken": self.get_csrf_token(body),
+ }
+ headers = {
+ "Content-Type": "application/x-www-form-urlencoded",
+ "Cookie": cookie_header,
+ }
+ login_response = await self.client.post("/user/login/", data=form, headers=headers)
+ logger.info(f"response is: {login_response}, {login_response.text}")
+ return login_response
+ except httpx.HTTPStatusError as e:
+ logger.error(f"Login failed HTTP {e.response.status_code}: {e.response.text}")
+ return None
+ except Exception as e:
+ logger.error(f"Error while login: {e}", e)
+ return None
+
+
async def create_project(
- self,
- title: str,
- description: str = "",
+ self,
+ title: str,
+ description: str = "",
label_config: Optional[str] = None,
) -> Optional[Dict[str, Any]]:
"""创建Label Studio项目"""
try:
logger.debug(f"Creating Label Studio project: {title}")
logger.debug(f"Label Studio URL: {self.base_url}/api/projects")
-
+
project_data = {
"title": title,
"description": description,
"label_config": label_config or ""
}
-
+
# Log the request body for debugging
logger.debug(f"Request body: {project_data}")
logger.debug(f"Label config being sent:\n{project_data['label_config']}")
-
+
response = await self.client.post("/api/projects", json=project_data)
response.raise_for_status()
-
+
project = response.json()
project_id = project.get("id")
-
+
if not project_id:
raise Exception("Label Studio response does not contain project ID")
-
+
logger.debug(f"Project created successfully, ID: {project_id}")
return project
-
+
except httpx.HTTPStatusError as e:
logger.error(
f"Create project failed - HTTP {e.response.status_code}\n"
@@ -151,7 +186,7 @@ async def create_project(
except Exception as e:
logger.error(f"Error while creating Label Studio project: {str(e)}", exc_info=True)
return None
-
+
async def import_tasks(
self,
project_id: int,
@@ -162,7 +197,7 @@ async def import_tasks(
"""批量导入任务到Label Studio项目"""
try:
logger.debug(f"Importing {len(tasks)} tasks into project {project_id}")
-
+
response = await self.client.post(
f"/api/projects/{project_id}/import",
json=tasks,
@@ -172,20 +207,20 @@ async def import_tasks(
}
)
response.raise_for_status()
-
+
result = response.json()
task_count = result.get("task_count", len(tasks))
-
+
logger.debug(f"Tasks imported successfully: {task_count}")
return result
-
+
except httpx.HTTPStatusError as e:
logger.error(f"Import tasks failed HTTP {e.response.status_code}: {e.response.text}")
return None
except Exception as e:
logger.error(f"Error while importing tasks: {e}")
return None
-
+
async def create_tasks_batch(
self,
project_id: str,
@@ -201,7 +236,7 @@ async def create_tasks_batch(
except Exception as e:
logger.error(f"Error while creating tasks in batch: {e}")
return None
-
+
async def create_task(
self,
project_id: str,
@@ -213,13 +248,13 @@ async def create_task(
task = {"data": data}
if meta:
task["meta"] = meta
-
+
return await self.create_tasks_batch(project_id, [task])
-
+
except Exception as e:
logger.error(f"Error while creating single task: {e}")
return None
-
+
async def get_project_tasks(
self,
project_id: str,
@@ -227,12 +262,12 @@ async def get_project_tasks(
page_size: int = 1000
) -> Optional[Dict[str, Any]]:
"""获取项目任务信息
-
+
Args:
project_id: 项目ID
page: 页码(从1开始)。如果为None,则获取所有任务
page_size: 每页大小
-
+
Returns:
如果指定了page参数,返回包含分页信息的字典:
{
@@ -242,9 +277,9 @@ async def get_project_tasks(
"project_id": 项目ID,
"tasks": 当前页的任务列表
}
-
+
如果page为None,返回包含所有任务的字典:
-
+
"count": 总任务数,
"project_id": 项目ID,
"tasks": 所有任务列表
@@ -252,11 +287,11 @@ async def get_project_tasks(
"""
try:
pid = int(project_id)
-
+
# 如果指定了page,直接获取单页任务
if page is not None:
logger.debug(f"Fetching tasks for project {pid}, page {page} (page_size={page_size})")
-
+
response = await self.client.get(
f"/api/tasks",
params={
@@ -266,9 +301,9 @@ async def get_project_tasks(
}
)
response.raise_for_status()
-
+
result = response.json()
-
+
# 返回单页结果,包含分页信息
return {
"count": result.get("total", len(result.get("tasks", []))),
@@ -277,11 +312,11 @@ async def get_project_tasks(
"project_id": pid,
"tasks": result.get("tasks", [])
}
-
+
# 如果未指定page,获取所有任务
logger.debug(f"(page) not specified, fetching all tasks.")
all_tasks = []
-
+
response = await self.client.get(
f"/api/tasks",
params={
@@ -289,31 +324,31 @@ async def get_project_tasks(
}
)
response.raise_for_status()
-
+
result = response.json()
tasks = result.get("tasks", [])
-
+
if not tasks:
logger.debug(f"No tasks found for this project.")
-
+
all_tasks.extend(tasks)
logger.debug(f"Fetched {len(tasks)} tasks.")
-
+
# 返回所有任务,不包含分页信息
return {
"count": len(all_tasks),
"project_id": pid,
"tasks": all_tasks
}
-
+
except httpx.HTTPStatusError as e:
logger.error(f"获取项目任务失败 HTTP {e.response.status_code}: {e.response.text}")
return None
except Exception as e:
logger.error(f"获取项目任务时发生错误: {e}")
return None
-
+
async def delete_task(
self,
task_id: int
@@ -321,20 +356,20 @@ async def delete_task(
"""删除单个任务"""
try:
logger.debug(f"Deleting task: {task_id}")
-
+
response = await self.client.delete(f"/api/tasks/{task_id}")
response.raise_for_status()
-
+
logger.debug(f"Task deleted: {task_id}")
return True
-
+
except httpx.HTTPStatusError as e:
logger.error(f"Delete task {task_id} failed HTTP {e.response.status_code}: {e.response.text}")
return False
except Exception as e:
logger.error(f"Error while deleting task {task_id}: {e}")
return False
-
+
async def delete_tasks_batch(
self,
task_ids: List[int]
@@ -342,24 +377,24 @@ async def delete_tasks_batch(
"""批量删除任务"""
try:
logger.debug(f"Deleting {len(task_ids)} tasks in batch")
-
+
successful_deletions = 0
failed_deletions = 0
-
+
for task_id in task_ids:
if await self.delete_task(task_id):
successful_deletions += 1
else:
failed_deletions += 1
-
+
logger.debug(f"Batch deletion finished: success {successful_deletions}, failed {failed_deletions}")
-
+
return {
"successful": successful_deletions,
"failed": failed_deletions,
"total": len(task_ids)
}
-
+
except Exception as e:
logger.error(f"Error while deleting tasks in batch: {e}")
return {
@@ -367,72 +402,72 @@ async def delete_tasks_batch(
"failed": len(task_ids),
"total": len(task_ids)
}
-
+
async def get_project(self, project_id: int) -> Optional[Dict[str, Any]]:
"""获取项目信息"""
try:
logger.debug(f"Fetching project info: {project_id}")
-
+
response = await self.client.get(f"/api/projects/{project_id}")
response.raise_for_status()
-
+
return response.json()
-
+
except httpx.HTTPStatusError as e:
logger.error(f"Get project info failed HTTP {e.response.status_code}: {e.response.text}")
return None
except Exception as e:
logger.error(f"Error while getting project info: {e}")
return None
-
+
async def delete_project(self, project_id: int) -> bool:
"""删除项目"""
try:
logger.debug(f"Deleting project: {project_id}")
-
+
response = await self.client.delete(f"/api/projects/{project_id}")
response.raise_for_status()
-
+
logger.debug(f"Project deleted: {project_id}")
return True
-
+
except httpx.HTTPStatusError as e:
logger.error(f"Delete project {project_id} failed HTTP {e.response.status_code}: {e.response.text}")
return False
except Exception as e:
logger.error(f"Error while deleting project {project_id}: {e}")
return False
-
+
async def get_task_annotations(
self,
task_id: int
) -> Optional[List[Dict[str, Any]]]:
"""获取任务的标注结果
-
+
Args:
task_id: 任务ID
-
+
Returns:
标注结果列表,每个标注包含完整的annotation信息
"""
try:
logger.debug(f"Fetching annotations for task: {task_id}")
-
+
response = await self.client.get(f"/api/tasks/{task_id}/annotations")
response.raise_for_status()
-
+
annotations = response.json()
logger.debug(f"Fetched {len(annotations)} annotations for task {task_id}")
-
+
return annotations
-
+
except httpx.HTTPStatusError as e:
logger.error(f"Get task annotations failed HTTP {e.response.status_code}: {e.response.text}")
return None
except Exception as e:
logger.error(f"Error while getting task annotations: {e}")
return None
-
+
async def create_annotation(
self,
task_id: int,
@@ -440,111 +475,111 @@ async def create_annotation(
completed_by: Optional[int] = None
) -> Optional[Dict[str, Any]]:
"""为任务创建新的标注
-
+
Args:
task_id: 任务ID
result: 标注结果列表
completed_by: 完成标注的用户ID(可选)
-
+
Returns:
创建的标注信息,失败返回None
"""
try:
logger.debug(f"Creating annotation for task: {task_id}")
-
+
annotation_data = {
"result": result,
"task": task_id
}
-
+
if completed_by:
annotation_data["completed_by"] = completed_by
-
+
response = await self.client.post(
f"/api/tasks/{task_id}/annotations",
json=annotation_data
)
response.raise_for_status()
-
+
annotation = response.json()
logger.debug(f"Created annotation {annotation.get('id')} for task {task_id}")
-
+
return annotation
-
+
except httpx.HTTPStatusError as e:
logger.error(f"Create annotation failed HTTP {e.response.status_code}: {e.response.text}")
return None
except Exception as e:
logger.error(f"Error while creating annotation: {e}")
return None
-
+
async def update_annotation(
self,
annotation_id: int,
result: List[Dict[str, Any]]
) -> Optional[Dict[str, Any]]:
"""更新已存在的标注
-
+
Args:
annotation_id: 标注ID
result: 新的标注结果列表
-
+
Returns:
更新后的标注信息,失败返回None
"""
try:
logger.debug(f"Updating annotation: {annotation_id}")
-
+
annotation_data = {
"result": result
}
-
+
response = await self.client.patch(
f"/api/annotations/{annotation_id}",
json=annotation_data
)
response.raise_for_status()
-
+
annotation = response.json()
logger.debug(f"Updated annotation {annotation_id}")
-
+
return annotation
-
+
except httpx.HTTPStatusError as e:
logger.error(f"Update annotation failed HTTP {e.response.status_code}: {e.response.text}")
return None
except Exception as e:
logger.error(f"Error while updating annotation: {e}")
return None
-
+
async def delete_annotation(
self,
annotation_id: int
) -> bool:
"""删除标注
-
+
Args:
annotation_id: 标注ID
-
+
Returns:
成功返回True,失败返回False
"""
try:
logger.debug(f"Deleting annotation: {annotation_id}")
-
+
response = await self.client.delete(f"/api/annotations/{annotation_id}")
response.raise_for_status()
-
+
logger.debug(f"Deleted annotation {annotation_id}")
return True
-
+
except httpx.HTTPStatusError as e:
logger.error(f"Delete annotation failed HTTP {e.response.status_code}: {e.response.text}")
return False
except Exception as e:
logger.error(f"Error while deleting annotation: {e}")
return False
-
+
async def create_local_storage(
self,
project_id: int,
@@ -555,7 +590,7 @@ async def create_local_storage(
description: Optional[str] = None
) -> Optional[Dict[str, Any]]:
"""创建本地文件存储配置
-
+
Args:
project_id: Label Studio 项目 ID
path: 本地文件路径(在 Label Studio 容器中的路径)
@@ -563,37 +598,37 @@ async def create_local_storage(
use_blob_urls: 是否使用 blob URLs(建议 True)
regex_filter: 文件过滤正则表达式(可选)
description: 存储描述(可选)
-
+
Returns:
创建的存储配置信息,失败返回 None
"""
try:
logger.debug(f"Creating local storage for project {project_id}: {path}")
-
+
storage_data = {
"project": project_id,
"path": path,
"title": title,
"use_blob_urls": use_blob_urls
}
-
+
if regex_filter:
storage_data["regex_filter"] = regex_filter
if description:
storage_data["description"] = description
-
+
response = await self.client.post(
"/api/storages/localfiles/",
json=storage_data
)
response.raise_for_status()
-
+
storage = response.json()
storage_id = storage.get("id")
-
+
logger.debug(f"Local storage created successfully, ID: {storage_id}")
return storage
-
+
except httpx.HTTPStatusError as e:
logger.error(f"Create local storage failed HTTP {e.response.status_code}: {e.response.text}")
return None
diff --git a/runtime/datamate-python/app/module/annotation/interface/project.py b/runtime/datamate-python/app/module/annotation/interface/project.py
index 71a90c584..4da3727c3 100644
--- a/runtime/datamate-python/app/module/annotation/interface/project.py
+++ b/runtime/datamate-python/app/module/annotation/interface/project.py
@@ -2,7 +2,7 @@
import math
import uuid
-from fastapi import APIRouter, Depends, HTTPException, Query, Path
+from fastapi import APIRouter, Depends, HTTPException, Query, Path, Response
from sqlalchemy.ext.asyncio import AsyncSession
from app.db.session import get_db
@@ -29,6 +29,39 @@
)
logger = get_logger(__name__)
+@router.get("/{mapping_id}/login")
+async def list_mappings(
+ db: AsyncSession = Depends(get_db)
+):
+ try:
+ ls_client = LabelStudioClient(base_url=settings.label_studio_base_url,
+ token=settings.label_studio_user_token)
+ target_response = await ls_client.login_label_studio()
+ headers = dict(target_response.headers)
+ set_cookies = target_response.headers.get_list("set-cookie")
+
+ # 删除合并的 Set-Cookie
+ if "set-cookie" in headers:
+ del headers["set-cookie"]
+
+ # 创建新响应,添加多个 Set-Cookie
+ response = Response(
+ content=target_response.content,
+ status_code=target_response.status_code,
+ headers=headers
+ )
+
+ # 分别添加每个 Set-Cookie
+ for cookie in set_cookies:
+ response.headers.append("set-cookie", cookie)
+
+ return response
+ except HTTPException:
+ raise
+ except Exception as e:
+ logger.error(f"Error while logining in LabelStudio: {e}", e)
+ raise HTTPException(status_code=500, detail="Internal server error")
+
@router.post("", response_model=StandardResponse[DatasetMappingCreateResponse], status_code=201)
async def create_mapping(
request: DatasetMappingCreateRequest,
@@ -36,12 +69,12 @@ async def create_mapping(
):
"""
创建数据集映射
-
+
根据指定的DM程序中的数据集,创建Label Studio中的数据集,
在数据库中记录这一关联关系,返回Label Studio数据集的ID
-
+
注意:一个数据集可以创建多个标注项目
-
+
支持通过 template_id 指定标注模板,如果提供了模板ID,则使用模板的配置
"""
try:
@@ -51,9 +84,9 @@ async def create_mapping(
mapping_service = DatasetMappingService(db)
sync_service = SyncService(dm_client, ls_client, mapping_service)
template_service = AnnotationTemplateService()
-
+
logger.info(f"Create dataset mapping request: {request.dataset_id}")
-
+
# 从DM服务获取数据集信息
dataset_info = await dm_client.get_dataset(request.dataset_id)
if not dataset_info:
@@ -61,11 +94,11 @@ async def create_mapping(
status_code=404,
detail=f"Dataset not found in DM service: {request.dataset_id}"
)
-
+
project_name = request.name or \
dataset_info.name or \
"A new project from DataMate"
-
+
project_description = request.description or \
dataset_info.description or \
f"Imported from DM dataset {dataset_info.name} ({dataset_info.id})"
@@ -89,15 +122,15 @@ async def create_mapping(
description=project_description,
label_config=label_config # 传递模板配置
)
-
+
if not project_data:
raise HTTPException(
status_code=500,
detail="Fail to create Label Studio project."
)
-
+
project_id = project_data["id"]
-
+
# 配置本地存储:dataset/
local_storage_path = f"{settings.label_studio_local_document_root}/{request.dataset_id}"
storage_result = await ls_client.create_local_storage(
@@ -107,7 +140,7 @@ async def create_mapping(
use_blob_urls=True,
description=f"Local storage for dataset {dataset_info.name}"
)
-
+
if not storage_result:
# 本地存储配置失败,记录警告但不中断流程
logger.warning(f"Failed to configure local storage for project {project_id}")
@@ -124,28 +157,28 @@ async def create_mapping(
# 创建映射关系,包含项目名称(先持久化映射以获得 mapping.id)
mapping = await mapping_service.create_mapping(labeling_project)
-
+
# 进行一次同步,使用创建后的 mapping.id
await sync_service.sync_dataset_files(mapping.id, 100)
-
+
response_data = DatasetMappingCreateResponse(
id=mapping.id,
labeling_project_id=str(mapping.labeling_project_id),
labeling_project_name=mapping.name or project_name
)
-
+
return StandardResponse(
code=201,
message="success",
data=response_data
)
-
+
except HTTPException:
raise
except Exception as e:
logger.error(f"Error while creating dataset mapping: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
-
+
@router.get("", response_model=StandardResponse[PaginatedData[DatasetMappingResponse]])
async def list_mappings(
page: int = Query(1, ge=1, description="页码(从1开始)"),
@@ -155,10 +188,10 @@ async def list_mappings(
):
"""
查询所有映射关系(分页)
-
+
返回所有有效的数据集映射关系(未被软删除的),支持分页查询。
可选择是否包含完整的标注模板信息(默认不包含,以提高列表查询性能)。
-
+
参数:
- page: 页码(从1开始)
- pageSize: 每页记录数
@@ -166,12 +199,12 @@ async def list_mappings(
"""
try:
service = DatasetMappingService(db)
-
+
# 计算 skip
skip = (page - 1) * page_size
-
+
logger.info(f"List mappings: page={page}, page_size={page_size}, include_template={include_template}")
-
+
# 获取数据和总数
mappings, total = await service.get_all_mappings_with_count(
skip=skip,
@@ -179,10 +212,10 @@ async def list_mappings(
include_deleted=False,
include_template=include_template
)
-
+
# 计算总页数
total_pages = math.ceil(total / page_size) if total > 0 else 0
-
+
# 构造分页响应
paginated_data = PaginatedData(
page=page,
@@ -191,15 +224,15 @@ async def list_mappings(
total_pages=total_pages,
content=mappings
)
-
+
logger.info(f"List mappings: page={page}, returned {len(mappings)}/{total}, templates_included: {include_template}")
-
+
return StandardResponse(
code=200,
message="success",
data=paginated_data
)
-
+
except Exception as e:
logger.error(f"Error listing mappings: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
@@ -211,7 +244,7 @@ async def get_mapping(
):
"""
根据 UUID 查询单个映射关系(包含关联的标注模板详情)
-
+
返回数据集映射关系以及关联的完整标注模板信息,包括:
- 映射基本信息
- 数据集信息
@@ -220,26 +253,26 @@ async def get_mapping(
"""
try:
service = DatasetMappingService(db)
-
+
logger.info(f"Get mapping with template details: {mapping_id}")
-
+
# 获取映射,并包含完整的模板信息
mapping = await service.get_mapping_by_uuid(mapping_id, include_template=True)
-
+
if not mapping:
raise HTTPException(
status_code=404,
detail=f"Mapping not found: {mapping_id}"
)
-
+
logger.info(f"Found mapping: {mapping.id}, template_included: {mapping.template is not None}")
-
+
return StandardResponse(
code=200,
message="success",
data=mapping
)
-
+
except HTTPException:
raise
except Exception as e:
@@ -256,10 +289,10 @@ async def get_mappings_by_source(
):
"""
根据源数据集 ID 查询所有映射关系(分页,包含模板详情)
-
+
返回该数据集创建的所有标注项目(不包括已删除的),支持分页查询。
默认包含关联的完整标注模板信息。
-
+
参数:
- dataset_id: 数据集ID
- page: 页码(从1开始)
@@ -268,12 +301,12 @@ async def get_mappings_by_source(
"""
try:
service = DatasetMappingService(db)
-
+
# 计算 skip
skip = (page - 1) * page_size
-
+
logger.info(f"Get mappings by source dataset id: {dataset_id}, page={page}, page_size={page_size}, include_template={include_template}")
-
+
# 获取数据和总数(包含模板信息)
mappings, total = await service.get_mappings_by_source_with_count(
dataset_id=dataset_id,
@@ -281,10 +314,10 @@ async def get_mappings_by_source(
limit=page_size,
include_template=include_template
)
-
+
# 计算总页数
total_pages = math.ceil(total / page_size) if total > 0 else 0
-
+
# 构造分页响应
paginated_data = PaginatedData(
page=page,
@@ -293,15 +326,15 @@ async def get_mappings_by_source(
total_pages=total_pages,
content=mappings
)
-
+
logger.info(f"Found {len(mappings)} mappings on page {page}, total: {total}, templates_included: {include_template}")
-
+
return StandardResponse(
code=200,
message="success",
data=paginated_data
)
-
+
except HTTPException:
raise
except Exception as e:
@@ -328,24 +361,24 @@ async def delete_mapping(
ls_client = LabelStudioClient(base_url=settings.label_studio_base_url,
token=settings.label_studio_user_token)
service = DatasetMappingService(db)
-
+
# 使用 mapping UUID 查询映射记录
logger.debug(f"Deleting by mapping UUID: {project_id}")
mapping = await service.get_mapping_by_uuid(project_id)
logger.debug(f"Mapping lookup result: {mapping}")
-
+
if not mapping:
raise HTTPException(
status_code=404,
detail=f"Mapping either not found or not specified."
)
-
+
id = mapping.id
labeling_project_id = mapping.labeling_project_id
logger.debug(f"Found mapping: {id}, Label Studio project ID: {labeling_project_id}")
-
+
# 1. 删除 Label Studio 项目
try:
logger.debug(f"Deleting Label Studio project: {labeling_project_id}")
@@ -357,11 +390,11 @@ async def delete_mapping(
except Exception as e:
logger.error(f"Error deleting Label Studio project: {e}")
# 继续执行,即使 Label Studio 项目删除失败也要删除映射记录
-
+
# 2. 软删除映射记录
soft_delete_success = await service.soft_delete_mapping(id)
logger.debug(f"Soft delete result for mapping {id}: {soft_delete_success}")
-
+
if not soft_delete_success:
raise HTTPException(
status_code=500,
@@ -378,7 +411,7 @@ async def delete_mapping(
status="success"
)
)
-
+
except HTTPException:
raise
except Exception as e:
diff --git a/runtime/datamate-python/app/module/collection/__init__.py b/runtime/datamate-python/app/module/collection/__init__.py
new file mode 100644
index 000000000..e69de29bb
diff --git a/runtime/datamate-python/app/module/collection/client/__init__.py b/runtime/datamate-python/app/module/collection/client/__init__.py
new file mode 100644
index 000000000..e69de29bb
diff --git a/runtime/datamate-python/app/module/collection/client/datax_client.py b/runtime/datamate-python/app/module/collection/client/datax_client.py
new file mode 100644
index 000000000..650673d6e
--- /dev/null
+++ b/runtime/datamate-python/app/module/collection/client/datax_client.py
@@ -0,0 +1,200 @@
+import json
+import threading
+import subprocess
+from datetime import datetime
+from pathlib import Path
+from typing import Dict, Any
+
+from app.core.logging import get_logger
+from app.db.models.data_collection import CollectionTask, TaskExecution, CollectionTemplate
+from app.module.collection.schema.collection import CollectionConfig, SyncMode
+from app.module.shared.schema import TaskStatus
+
+logger = get_logger(__name__)
+
+class DataxClient:
+ def __init__(self, task: CollectionTask, execution: TaskExecution):
+ self.execution = execution
+ self.task = task
+ self.config_file_path = f"/flow/data-collection/{task.id}/config.json"
+ self.python_path = "python"
+ self.datax_main = "/opt/datax/bin/datax.py"
+ Path(self.config_file_path).parent.mkdir(parents=True, exist_ok=True)
+
+ def validate_json_string(self) -> Dict[str, Any]:
+ """
+ 验证 JSON 字符串
+
+ Returns:
+ 解析后的配置字典
+ """
+ try:
+ config = json.loads(self.task.config)
+
+ # 基本验证
+ if 'job' not in config:
+ raise ValueError("JSON 必须包含 'job' 字段")
+
+ if 'content' not in config.get('job', {}):
+ raise ValueError("job 必须包含 'content' 字段")
+
+ logger.info("JSON 配置验证通过")
+ return config
+
+ except json.JSONDecodeError as e:
+ raise ValueError(f"JSON 格式错误: {e}")
+ except Exception as e:
+ raise ValueError(f"配置验证失败: {e}")
+
+ @staticmethod
+ def generate_datx_config(task_config: CollectionConfig, template: CollectionTemplate, target_path: str):
+ # 校验参数
+ reader_parameter = {
+ **(task_config.parameter if task_config.parameter else {}),
+ **(task_config.reader if task_config.reader else {})
+ }
+ writer_parameter = {
+ **(task_config.parameter if task_config.parameter else {}),
+ **(task_config.writer if task_config.writer else {}),
+ "destPath": target_path
+ }
+ # 生成任务运行配置
+ job_config = {
+ "content": [
+ {
+ "reader": {
+ "name": template.source_type,
+ "parameter": reader_parameter
+ },
+ "writer": {
+ "name": template.target_type,
+ "parameter": writer_parameter
+ }
+ }
+ ],
+ "setting": {
+ "speed": {
+ "channel": 2
+ }
+ }
+ }
+ task_config.job = job_config
+
+ def create__config_file(self) -> str:
+ """
+ 创建配置文件
+
+ Returns:
+ 临时文件路径
+ """
+ # 验证 JSON
+ config = self.validate_json_string()
+
+ # 写入临时文件
+ with open(self.config_file_path, 'w', encoding='utf-8') as f:
+ json.dump(config, f, indent=2, ensure_ascii=False)
+
+ logger.debug(f"创建配置文件: {self.config_file_path}")
+ return self.config_file_path
+
+ def run_datax_job(self):
+ """
+ 启动 DataX 任务
+
+ Returns:
+ 执行结果字典
+ """
+ # 创建配置文件
+ self.create__config_file()
+ try:
+ # 构建命令
+ cmd = [self.python_path, str(self.datax_main), str(self.config_file_path)]
+ cmd_str = ' '.join(cmd)
+ logger.info(f"执行命令: {cmd_str}")
+ if not self.execution.started_at:
+ self.execution.started_at = datetime.now()
+ # 执行命令并写入日志
+ with open(self.execution.log_path, 'w', encoding='utf-8') as log_f:
+ # 写入头信息
+ self.write_header_log(cmd_str, log_f)
+ # 启动datax进程
+ exit_code = self._run_process(cmd, log_f)
+ # 记录结束时间
+ self.execution.completed_at = datetime.now()
+ self.execution.duration_seconds = (self.execution.completed_at - self.execution.started_at).total_seconds()
+ # 写入结束信息
+ self.write_tail_log(exit_code, log_f)
+ if exit_code == 0:
+ logger.info(f"DataX 任务执行成功: {self.execution.id}")
+ logger.info(f"执行耗时: {self.execution.duration_seconds:.2f} 秒")
+ self.execution.status = TaskStatus.COMPLETED.name
+ else:
+ self.execution.error_message = self.execution.error_message or f"DataX 任务执行失败,退出码: {exit_code}"
+ self.execution.status = TaskStatus.FAILED.name
+ logger.error(self.execution.error_message)
+ except Exception as e:
+ self.execution.completed_at = datetime.now()
+ self.execution.duration_seconds = (self.execution.completed_at - self.execution.started_at).total_seconds()
+ self.execution.error_message = f"执行异常: {e}"
+ self.execution.status = TaskStatus.FAILED.name
+ logger.error(f"执行异常: {e}", exc_info=True)
+ if self.task.sync_mode == SyncMode.ONCE:
+ self.task.status = self.execution.status
+
+ def _run_process(self, cmd: list[str], log_f) -> int:
+ # 启动进程
+ process = subprocess.Popen(
+ cmd,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ text=True,
+ encoding='utf-8',
+ bufsize=1,
+ universal_newlines=True
+ )
+
+ # 创建读取线程
+ stdout_thread = threading.Thread(target=lambda stream=process.stdout: self.read_stream(stream, log_f))
+ stderr_thread = threading.Thread(target=lambda stream=process.stderr: self.read_stream(stream, log_f))
+
+ stdout_thread.start()
+ stderr_thread.start()
+
+ # 等待进程完成
+ try:
+ exit_code = process.wait(timeout=self.task.timeout_seconds)
+ except subprocess.TimeoutExpired:
+ process.kill()
+ exit_code = -1
+ self.execution.error_message = f"任务执行超时({self.task.timeout_seconds}秒)"
+ logger.error(f"任务执行超时({self.task.timeout_seconds}秒)")
+
+ # 等待线程完成
+ stdout_thread.join(timeout=5)
+ stderr_thread.join(timeout=5)
+ return exit_code
+
+ def write_tail_log(self, exit_code: int, log_f):
+ log_f.write("\n" + "=" * 100 + "\n")
+ log_f.write(f"End Time: {self.execution.completed_at}\n")
+ log_f.write(f"Execution Time: {self.execution.duration_seconds:.2f} seconds\n")
+ log_f.write(f"Exit Code: {exit_code}\n")
+ log_f.write(f"Status: {'SUCCESS' if exit_code == 0 else 'FAILED'}\n")
+
+ def write_header_log(self, cmd: str, log_f):
+ log_f.write(f"DataX Task Execution Log\n")
+ log_f.write(f"Job ID: {self.execution.id}\n")
+ log_f.write(f"Start Time: {self.execution.started_at}\n")
+ log_f.write(f"Config Source: JSON String\n")
+ log_f.write(f"Command: {cmd}\n")
+ log_f.write("=" * 100 + "\n\n")
+
+ @staticmethod
+ def read_stream(stream, log_f):
+ """读取输出流"""
+ for line in stream:
+ line = line.rstrip('\n')
+ if line:
+ # 写入日志文件
+ log_f.write(f"{line}\n")
+ log_f.flush()
diff --git a/runtime/datamate-python/app/module/collection/interface/__init__.py b/runtime/datamate-python/app/module/collection/interface/__init__.py
new file mode 100644
index 000000000..c8768c22a
--- /dev/null
+++ b/runtime/datamate-python/app/module/collection/interface/__init__.py
@@ -0,0 +1,15 @@
+from fastapi import APIRouter
+
+router = APIRouter(
+ prefix="/data-collection",
+ tags = ["data-collection"]
+)
+
+# Include sub-routers
+from .collection import router as collection_router
+from .execution import router as execution_router
+from .template import router as template_router
+
+router.include_router(collection_router)
+router.include_router(execution_router)
+router.include_router(template_router)
diff --git a/runtime/datamate-python/app/module/collection/interface/collection.py b/runtime/datamate-python/app/module/collection/interface/collection.py
new file mode 100644
index 000000000..ab6bed0c8
--- /dev/null
+++ b/runtime/datamate-python/app/module/collection/interface/collection.py
@@ -0,0 +1,157 @@
+import math
+import uuid
+from typing import Optional
+
+from fastapi import APIRouter, Depends, HTTPException, Query
+from sqlalchemy import select, func
+from sqlalchemy.ext.asyncio import AsyncSession
+
+from app.core.logging import get_logger
+from app.db.models.data_collection import CollectionTask, TaskExecution, CollectionTemplate
+from app.db.session import get_db
+from app.module.collection.client.datax_client import DataxClient
+from app.module.collection.schema.collection import CollectionTaskBase, CollectionTaskCreate, converter_to_response, \
+ convert_for_create
+from app.module.collection.service.collection import CollectionTaskService
+from app.module.shared.schema import StandardResponse, PaginatedData
+
+router = APIRouter(
+ prefix="/tasks",
+ tags=["data-collection/tasks"],
+)
+logger = get_logger(__name__)
+
+
+@router.post("", response_model=StandardResponse[CollectionTaskBase])
+async def create_task(
+ request: CollectionTaskCreate,
+ db: AsyncSession = Depends(get_db)
+):
+ """创建归集任务"""
+ try:
+ template = await db.execute(select(CollectionTemplate).where(CollectionTemplate.id == request.template_id))
+ template = template.scalar_one_or_none()
+ if not template:
+ raise HTTPException(status_code=400, detail="Template not found")
+
+ task_id = str(uuid.uuid4())
+ DataxClient.generate_datx_config(request.config, template, f"/dataset/local/{task_id}")
+ task = convert_for_create(request, task_id)
+ task.template_name = template.name
+
+ task_service = CollectionTaskService(db)
+ task = await task_service.create_task(task)
+
+ task = await db.execute(select(CollectionTask).where(CollectionTask.id == task.id))
+ task = task.scalar_one_or_none()
+ await db.commit()
+
+ return StandardResponse(
+ code=200,
+ message="Success",
+ data=converter_to_response(task)
+ )
+ except HTTPException:
+ await db.rollback()
+ raise
+ except Exception as e:
+ await db.rollback()
+ logger.error(f"Failed to create collection task: {str(e)}", e)
+ raise HTTPException(status_code=500, detail="Internal server error")
+
+
+@router.get("", response_model=StandardResponse[PaginatedData[CollectionTaskBase]])
+async def list_tasks(
+ page: int = 1,
+ size: int = 20,
+ name: Optional[str] = Query(None, description="任务名称模糊查询"),
+ db: AsyncSession = Depends(get_db)
+):
+ """分页查询归集任务"""
+ try:
+ # 构建查询条件
+ page = page if page > 0 else 1
+ size = size if size > 0 else 20
+ query = select(CollectionTask)
+
+ if name:
+ query = query.where(CollectionTask.name.ilike(f"%{name}%"))
+
+ # 获取总数
+ count_query = select(func.count()).select_from(query.subquery())
+ total = (await db.execute(count_query)).scalar_one()
+
+ # 分页查询
+ offset = (page - 1) * size
+ tasks = (await db.execute(
+ query.order_by(CollectionTask.created_at.desc())
+ .offset(offset)
+ .limit(size)
+ )).scalars().all()
+
+ # 转换为响应模型
+ items = [converter_to_response(task) for task in tasks]
+ total_pages = math.ceil(total / size) if total > 0 else 0
+
+ return StandardResponse(
+ code=200,
+ message="Success",
+ data=PaginatedData(
+ content=items,
+ total_elements=total,
+ total_pages=total_pages,
+ page=page,
+ size=size,
+ )
+ )
+
+ except Exception as e:
+ logger.error(f"Failed to list evaluation tasks: {str(e)}", e)
+ raise HTTPException(status_code=500, detail="Internal server error")
+
+
+@router.delete("", response_model=StandardResponse[str], status_code=200)
+async def delete_collection_tasks(
+ ids: list[str] = Query(..., description="要删除的任务ID列表"),
+ db: AsyncSession = Depends(get_db),
+):
+ """
+ 删除归集任务
+
+ Args:
+ ids: 任务ID
+ db: 数据库会话
+
+ Returns:
+ StandardResponse[str]: 删除结果
+ """
+ try:
+ # 检查任务是否存在
+ task_id = ids[0]
+ task = await db.get(CollectionTask, task_id)
+ if not task:
+ raise HTTPException(status_code=404, detail="Collection task not found")
+
+ # 删除任务执行记录
+ await db.execute(
+ TaskExecution.__table__.delete()
+ .where(TaskExecution.task_id == task_id)
+ )
+
+ # 删除任务
+ await db.delete(task)
+ await db.commit()
+
+ return StandardResponse(
+ code=200,
+ message="Collection task deleted successfully",
+ data="success"
+ )
+
+ except HTTPException:
+ await db.rollback()
+ raise
+ except Exception as e:
+ await db.rollback()
+ logger.error(f"Failed to delete collection task: {str(e)}")
+ raise HTTPException(status_code=500, detail="Internal server error")
diff --git a/runtime/datamate-python/app/module/collection/interface/execution.py b/runtime/datamate-python/app/module/collection/interface/execution.py
new file mode 100644
index 000000000..b012642bf
--- /dev/null
+++ b/runtime/datamate-python/app/module/collection/interface/execution.py
@@ -0,0 +1,120 @@
+
+import math
+import os
+from pathlib import Path
+from datetime import datetime
+from typing import Optional
+
+from fastapi import APIRouter, Depends, HTTPException, Query
+from fastapi.responses import FileResponse
+from sqlalchemy import select, func
+from sqlalchemy.ext.asyncio import AsyncSession
+
+from app.core.logging import get_logger
+from app.db.models.data_collection import TaskExecution
+from app.db.session import get_db
+from app.module.collection.schema.collection import TaskExecutionBase, converter_execution_to_response
+from app.module.shared.schema import StandardResponse, PaginatedData
+
+router = APIRouter(
+ prefix="/executions",
+ tags=["data-collection/executions"],
+)
+logger = get_logger(__name__)
+
+
+@router.get("", response_model=StandardResponse[PaginatedData[TaskExecutionBase]])
+async def list_executions(
+ page: int = 1,
+ size: int = 20,
+ task_id: Optional[str] = Query(None, description="任务ID"),
+ task_name: Optional[str] = Query(None, description="任务名称模糊查询"),
+ start_time: Optional[datetime] = Query(None, description="开始执行时间范围-起(started_at >= start_time)"),
+ end_time: Optional[datetime] = Query(None, description="开始执行时间范围-止(started_at <= end_time)"),
+ db: AsyncSession = Depends(get_db)
+):
+ """分页查询归集任务执行记录"""
+ try:
+ query = select(TaskExecution)
+
+ if task_id:
+ query = query.where(TaskExecution.task_id == task_id)
+
+ if task_name:
+ query = query.where(TaskExecution.task_name.ilike(f"%{task_name}%"))
+
+ if start_time:
+ query = query.where(TaskExecution.started_at >= start_time)
+
+ if end_time:
+ query = query.where(TaskExecution.started_at <= end_time)
+
+ count_query = select(func.count()).select_from(query.subquery())
+ total = (await db.execute(count_query)).scalar_one()
+
+ offset = (page - 1) * size
+ executions = (await db.execute(
+ query.order_by(TaskExecution.created_at.desc())
+ .offset(offset)
+ .limit(size)
+ )).scalars().all()
+
+ items = [converter_execution_to_response(exe) for exe in executions]
+ total_pages = math.ceil(total / size) if total > 0 else 0
+
+ return StandardResponse(
+ code=200,
+ message="Success",
+ data=PaginatedData(
+ content=items,
+ total_elements=total,
+ total_pages=total_pages,
+ page=page,
+ size=size,
+ )
+ )
+
+ except Exception as e:
+ logger.error(f"Failed to list task executions: {str(e)}", e)
+ raise HTTPException(status_code=500, detail="Internal server error")
+
+
+@router.get("/{execution_id}/log")
+async def get_execution_log(
+ execution_id: str,
+ db: AsyncSession = Depends(get_db)
+):
+ """获取执行记录对应的日志文件内容"""
+ try:
+ execution = await db.get(TaskExecution, execution_id)
+ if not execution:
+ raise HTTPException(status_code=404, detail="Execution record not found")
+
+ log_path = getattr(execution, "log_path", None)
+ if not log_path:
+ raise HTTPException(status_code=404, detail="Log path not found")
+
+ path = Path(str(log_path))
+ if not path.is_absolute():
+ path = Path(os.getcwd()) / path
+ path = path.resolve()
+
+ if not path.exists() or not path.is_file():
+ raise HTTPException(status_code=404, detail="Log file not found")
+
+ filename = path.name
+ headers = {
+ "Content-Disposition": f'inline; filename="{filename}"'
+ }
+ return FileResponse(
+ path=str(path),
+ media_type="text/plain; charset=utf-8",
+ filename=filename,
+ headers=headers,
+ )
+
+ except HTTPException:
+ raise
+ except Exception as e:
+ logger.error(f"Failed to get execution log: {str(e)}", e)
+ raise HTTPException(status_code=500, detail="Internal server error")
diff --git a/runtime/datamate-python/app/module/collection/interface/template.py b/runtime/datamate-python/app/module/collection/interface/template.py
new file mode 100644
index 000000000..83d6125c4
--- /dev/null
+++ b/runtime/datamate-python/app/module/collection/interface/template.py
@@ -0,0 +1,67 @@
+
+import math
+from typing import Optional
+
+from fastapi import APIRouter, Depends, HTTPException, Query
+from sqlalchemy import select, func
+from sqlalchemy.ext.asyncio import AsyncSession
+
+from app.core.logging import get_logger
+from app.db.models.data_collection import CollectionTemplate
+from app.db.session import get_db
+from app.module.collection.schema.collection import CollectionTemplateBase, converter_template_to_response
+from app.module.shared.schema import StandardResponse, PaginatedData
+
+router = APIRouter(
+ prefix="/templates",
+ tags=["data-collection/templates"],
+)
+logger = get_logger(__name__)
+
+
+@router.get("", response_model=StandardResponse[PaginatedData[CollectionTemplateBase]])
+async def list_templates(
+ page: int = 1,
+ size: int = 20,
+ name: Optional[str] = Query(None, description="模板名称模糊查询"),
+ built_in: Optional[bool] = Query(None, description="是否系统内置模板"),
+ db: AsyncSession = Depends(get_db)
+):
+ """分页查询归集任务模板"""
+ try:
+ query = select(CollectionTemplate)
+
+ if name:
+ query = query.where(CollectionTemplate.name.ilike(f"%{name}%"))
+
+ if built_in is not None:
+ query = query.where(CollectionTemplate.built_in == built_in)
+
+ count_query = select(func.count()).select_from(query.subquery())
+ total = (await db.execute(count_query)).scalar_one()
+
+ offset = (page - 1) * size
+ templates = (await db.execute(
+ query.order_by(CollectionTemplate.created_at.desc())
+ .offset(offset)
+ .limit(size)
+ )).scalars().all()
+
+ items = [converter_template_to_response(tpl) for tpl in templates]
+ total_pages = math.ceil(total / size) if total > 0 else 0
+
+ return StandardResponse(
+ code=200,
+ message="Success",
+ data=PaginatedData(
+ content=items,
+ total_elements=total,
+ total_pages=total_pages,
+ page=page,
+ size=size,
+ )
+ )
+
+ except Exception as e:
+ logger.error(f"Failed to list collection templates: {str(e)}", e)
+ raise HTTPException(status_code=500, detail="Internal server error")
diff --git a/runtime/datamate-python/app/module/collection/schema/__init__.py b/runtime/datamate-python/app/module/collection/schema/__init__.py
new file mode 100644
index 000000000..e69de29bb
diff --git a/runtime/datamate-python/app/module/collection/schema/collection.py b/runtime/datamate-python/app/module/collection/schema/collection.py
new file mode 100644
index 000000000..3781ff88e
--- /dev/null
+++ b/runtime/datamate-python/app/module/collection/schema/collection.py
@@ -0,0 +1,182 @@
+import json
+import uuid
+from datetime import datetime
+from enum import Enum
+from typing import Optional
+
+from pydantic import BaseModel, Field, validator, ConfigDict
+from pydantic.alias_generators import to_camel
+
+from app.db.models.data_collection import CollectionTask, TaskExecution, CollectionTemplate
+from app.module.shared.schema import TaskStatus
+
+
+class SyncMode(str, Enum):
+ ONCE = "ONCE"
+ SCHEDULED = "SCHEDULED"
+
+class CollectionConfig(BaseModel):
+ parameter: Optional[dict] = Field(None, description="模板参数")
+ reader: Optional[dict] = Field(None, description="reader参数")
+ writer: Optional[dict] = Field(None, description="writer参数")
+ job: Optional[dict] = Field(None, description="任务配置")
+
+class CollectionTaskBase(BaseModel):
+ id: str = Field(..., description="任务id")
+ name: str = Field(..., description="任务名称")
+ description: Optional[str] = Field(None, description="任务描述")
+ target_path: str = Field(..., description="目标存放路径")
+ config: CollectionConfig = Field(..., description="任务配置")
+ template_id: str = Field(..., description="模板ID")
+ template_name: Optional[str] = Field(None, description="模板名称")
+ status: TaskStatus = Field(..., description="任务状态")
+ sync_mode: SyncMode = Field(default=SyncMode.ONCE, description="同步方式")
+ schedule_expression: Optional[str] = Field(None, description="调度表达式(cron)")
+ retry_count: int = Field(default=3, description="重试次数")
+ timeout_seconds: int = Field(default=3600, description="超时时间")
+ last_execution_id: Optional[str] = Field(None, description="最后执行id")
+ created_at: Optional[datetime] = Field(None, description="创建时间")
+ updated_at: Optional[datetime] = Field(None, description="更新时间")
+ created_by: Optional[str] = Field(None, description="创建人")
+ updated_by: Optional[str] = Field(None, description="更新人")
+
+ model_config = ConfigDict(
+ alias_generator=to_camel,
+ populate_by_name=True
+ )
+
+class CollectionTaskCreate(BaseModel):
+ name: str = Field(..., description="任务名称")
+ description: Optional[str] = Field(None, description="任务描述")
+ sync_mode: SyncMode = Field(default=SyncMode.ONCE, description="同步方式")
+ schedule_expression: Optional[str] = Field(None, description="调度表达式(cron)")
+ config: CollectionConfig = Field(..., description="任务配置")
+ template_id: str = Field(..., description="模板ID")
+
+ model_config = ConfigDict(
+ alias_generator=to_camel,
+ populate_by_name=True
+ )
+
+def converter_to_response(task: CollectionTask) -> CollectionTaskBase:
+ return CollectionTaskBase(
+ id=task.id,
+ name=task.name,
+ description=task.description,
+ sync_mode=task.sync_mode,
+ template_id=task.template_id,
+ template_name=task.template_name,
+ target_path=task.target_path,
+ config=json.loads(task.config),
+ schedule_expression=task.schedule_expression,
+ status=task.status,
+ retry_count=task.retry_count,
+ timeout_seconds=task.timeout_seconds,
+ last_execution_id=task.last_execution_id,
+ created_at=task.created_at,
+ updated_at=task.updated_at,
+ created_by=task.created_by,
+ updated_by=task.updated_by,
+ )
+
+def convert_for_create(task: CollectionTaskCreate, task_id: str) -> CollectionTask:
+ return CollectionTask(
+ id=task_id,
+ name=task.name,
+ description=task.description,
+ sync_mode=task.sync_mode,
+ template_id=task.template_id,
+ target_path=f"/dataset/local/{task_id}",
+ config=json.dumps(task.config.dict()),
+ schedule_expression=task.schedule_expression,
+ status=TaskStatus.PENDING.name
+ )
+
+def create_execute_record(task: CollectionTask) -> TaskExecution:
+ execution_id = str(uuid.uuid4())
+ return TaskExecution(
+ id=execution_id,
+ task_id=task.id,
+ task_name=task.name,
+ status=TaskStatus.RUNNING.name,
+ started_at=datetime.now(),
+ log_path=f"/flow/data-collection/{task.id}/{execution_id}.log"
+ )
+
+
+class TaskExecutionBase(BaseModel):
+ id: str = Field(..., description="执行记录ID")
+ task_id: str = Field(..., description="任务ID")
+ task_name: str = Field(..., description="任务名称")
+ status: Optional[str] = Field(None, description="执行状态")
+ log_path: Optional[str] = Field(None, description="日志文件路径")
+ started_at: Optional[datetime] = Field(None, description="开始时间")
+ completed_at: Optional[datetime] = Field(None, description="完成时间")
+ duration_seconds: Optional[int] = Field(None, description="执行时长(秒)")
+ error_message: Optional[str] = Field(None, description="错误信息")
+ created_at: Optional[datetime] = Field(None, description="创建时间")
+ updated_at: Optional[datetime] = Field(None, description="更新时间")
+ created_by: Optional[str] = Field(None, description="创建者")
+ updated_by: Optional[str] = Field(None, description="更新者")
+
+ model_config = ConfigDict(
+ alias_generator=to_camel,
+ populate_by_name=True
+ )
+
+
+def converter_execution_to_response(execution: TaskExecution) -> TaskExecutionBase:
+ return TaskExecutionBase(
+ id=execution.id,
+ task_id=execution.task_id,
+ task_name=execution.task_name,
+ status=execution.status,
+ log_path=execution.log_path,
+ started_at=execution.started_at,
+ completed_at=execution.completed_at,
+ duration_seconds=execution.duration_seconds,
+ error_message=execution.error_message,
+ created_at=execution.created_at,
+ updated_at=execution.updated_at,
+ created_by=execution.created_by,
+ updated_by=execution.updated_by,
+ )
+
+
+class CollectionTemplateBase(BaseModel):
+ id: str = Field(..., description="模板ID")
+ name: str = Field(..., description="模板名称")
+ description: Optional[str] = Field(None, description="模板描述")
+ source_type: str = Field(..., description="源数据源类型")
+ source_name: str = Field(..., description="源数据源名称")
+ target_type: str = Field(..., description="目标数据源类型")
+ target_name: str = Field(..., description="目标数据源名称")
+ template_content: dict = Field(..., description="模板内容")
+ built_in: Optional[bool] = Field(None, description="是否系统内置模板")
+ created_at: Optional[datetime] = Field(None, description="创建时间")
+ updated_at: Optional[datetime] = Field(None, description="更新时间")
+ created_by: Optional[str] = Field(None, description="创建者")
+ updated_by: Optional[str] = Field(None, description="更新者")
+
+ model_config = ConfigDict(
+ alias_generator=to_camel,
+ populate_by_name=True
+ )
+
+
+def converter_template_to_response(template: CollectionTemplate) -> CollectionTemplateBase:
+ return CollectionTemplateBase(
+ id=template.id,
+ name=template.name,
+ description=template.description,
+ source_type=template.source_type,
+ source_name=template.source_name,
+ target_type=template.target_type,
+ target_name=template.target_name,
+ template_content=template.template_content,
+ built_in=template.built_in,
+ created_at=template.created_at,
+ updated_at=template.updated_at,
+ created_by=template.created_by,
+ updated_by=template.updated_by,
+ )
diff --git a/runtime/datamate-python/app/module/collection/service/__init__.py b/runtime/datamate-python/app/module/collection/service/__init__.py
new file mode 100644
index 000000000..e69de29bb
diff --git a/runtime/datamate-python/app/module/collection/service/collection.py b/runtime/datamate-python/app/module/collection/service/collection.py
new file mode 100644
index 000000000..38671bbb0
--- /dev/null
+++ b/runtime/datamate-python/app/module/collection/service/collection.py
@@ -0,0 +1,70 @@
+import asyncio
+from dataclasses import dataclass
+from typing import Any, Optional
+
+from sqlalchemy import select
+from sqlalchemy.ext.asyncio import AsyncSession
+
+from app.core.logging import get_logger
+from app.db.models.data_collection import CollectionTask, CollectionTemplate
+from app.db.session import AsyncSessionLocal
+from app.module.collection.client.datax_client import DataxClient
+from app.module.collection.schema.collection import SyncMode, create_execute_record
+from app.module.shared.schema import TaskStatus
+
+logger = get_logger(__name__)
+
+
+@dataclass
+class _RuntimeTask:
+ id: str
+ config: str
+ timeout_seconds: int
+ sync_mode: str
+ status: Optional[str] = None
+
+
+@dataclass
+class _RuntimeExecution:
+ id: str
+ log_path: str
+ started_at: Optional[Any] = None
+ completed_at: Optional[Any] = None
+ duration_seconds: Optional[float] = None
+ error_message: Optional[str] = None
+ status: Optional[str] = None
+
+class CollectionTaskService:
+ def __init__(self, db: AsyncSession):
+ self.db = db
+
+ async def create_task(self, task: CollectionTask) -> CollectionTask:
+ self.db.add(task)
+
+ # If it's a one-time task, execute it immediately
+ if task.sync_mode == SyncMode.ONCE:
+ task.status = TaskStatus.RUNNING.name
+ await self.db.commit()
+ asyncio.create_task(CollectionTaskService.run_async(task.id))
+ return task
+
+ @staticmethod
+ async def run_async(task_id: str):
+ logger.info(f"start to execute task {task_id}")
+ async with AsyncSessionLocal() as session:
+ task = await session.execute(select(CollectionTask).where(CollectionTask.id == task_id))
+ task = task.scalar_one_or_none()
+ if not task:
+ logger.error(f"task {task_id} not exist")
+ return
+ template = await session.execute(select(CollectionTemplate).where(CollectionTemplate.id == task.template_id))
+ if not template:
+ logger.error(f"template {task.template_name} not exist")
+ return
+ task_execution = create_execute_record(task)
+ session.add(task_execution)
+ await session.commit()
+ await asyncio.to_thread(
+ DataxClient(execution=task_execution, task=task).run_datax_job
+ )
+ await session.commit()
diff --git a/runtime/datax/package.xml b/runtime/datax/package.xml
index a900328f1..9102336e5 100644
--- a/runtime/datax/package.xml
+++ b/runtime/datax/package.xml
@@ -236,13 +236,13 @@
-
-
-
-
-
-
-
+
+ starrocksreader/target/datax/
+
+ **/*.*
+
+ datax
+
@@ -287,13 +287,13 @@
-
-
-
-
-
-
-
+
+ starrockswriter/target/datax/
+
+ **/*.*
+
+ datax
+
diff --git a/runtime/datax/pom.xml b/runtime/datax/pom.xml
index 68e9ab172..f275e0072 100644
--- a/runtime/datax/pom.xml
+++ b/runtime/datax/pom.xml
@@ -81,13 +81,13 @@
-
+ starrocksreader
nfsreader
mysqlwriter
-
+ starrockswriter
diff --git a/scripts/db/data-collection-init.sql b/scripts/db/data-collection-init.sql
index d28a1099a..11b6912cd 100644
--- a/scripts/db/data-collection-init.sql
+++ b/scripts/db/data-collection-init.sql
@@ -10,35 +10,7 @@ USE datamate;
-- 删除现有表(支持重复执行 调测阶段使用)
DROP TABLE IF EXISTS t_dc_task_executions;
DROP TABLE IF EXISTS t_dc_collection_tasks;
-DROP TABLE IF EXISTS t_dc_datax_templates;
-
-CREATE TABLE t_dc_task_executions (
- id VARCHAR(36) PRIMARY KEY COMMENT '执行记录ID(UUID)',
- task_id VARCHAR(36) NOT NULL COMMENT '任务ID',
- task_name VARCHAR(255) NOT NULL COMMENT '任务名称',
- status VARCHAR(20) DEFAULT 'RUNNING' COMMENT '执行状态:RUNNING/SUCCESS/FAILED/STOPPED',
- progress DECIMAL(5,2) DEFAULT 0.00 COMMENT '进度百分比',
- records_total BIGINT DEFAULT 0 COMMENT '总记录数',
- records_processed BIGINT DEFAULT 0 COMMENT '已处理记录数',
- records_success BIGINT DEFAULT 0 COMMENT '成功记录数',
- records_failed BIGINT DEFAULT 0 COMMENT '失败记录数',
- throughput DECIMAL(10,2) DEFAULT 0.00 COMMENT '吞吐量(条/秒)',
- data_size_bytes BIGINT DEFAULT 0 COMMENT '数据量(字节)',
- started_at TIMESTAMP NULL COMMENT '开始时间',
- completed_at TIMESTAMP NULL COMMENT '完成时间',
- duration_seconds INT DEFAULT 0 COMMENT '执行时长(秒)',
- config JSON COMMENT '执行配置',
- error_message TEXT COMMENT '错误信息',
- datax_job_id TEXT COMMENT 'datax任务ID',
- result TEXT COMMENT '执行结果',
- created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
- updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
- created_by VARCHAR(255) COMMENT '创建者',
- updated_by VARCHAR(255) COMMENT '更新者',
- INDEX idx_task_id (task_id),
- INDEX idx_status (status),
- INDEX idx_started_at (started_at)
-) COMMENT='任务执行明细表';
+DROP TABLE IF EXISTS t_dc_collection_templates;
-- 数据归集任务表
CREATE TABLE t_dc_collection_tasks (
@@ -46,120 +18,60 @@ CREATE TABLE t_dc_collection_tasks (
name VARCHAR(255) NOT NULL COMMENT '任务名称',
description TEXT COMMENT '任务描述',
sync_mode VARCHAR(20) DEFAULT 'ONCE' COMMENT '同步模式:ONCE/SCHEDULED',
- task_type VARCHAR(20) DEFAULT 'NAS' COMMENT '任务类型:NAS/OBS/MYSQL/CUSTOM',
+ template_id VARCHAR(36) NOT NULL COMMENT '归集模板ID',
+ template_name VARCHAR(255) NOT NULL COMMENT '归集模板名称',
target_path VARCHAR(1000) DEFAULT '' COMMENT '目标存储路径',
- config TEXT NOT NULL COMMENT '归集配置(DataX配置),包含源端和目标端配置信息',
+ config JSON NOT NULL COMMENT '归集配置(DataX配置),包含源端和目标端配置信息',
schedule_expression VARCHAR(255) COMMENT 'Cron调度表达式',
status VARCHAR(20) DEFAULT 'DRAFT' COMMENT '任务状态:DRAFT/READY/RUNNING/SUCCESS/FAILED/STOPPED',
retry_count INT DEFAULT 3 COMMENT '重试次数',
timeout_seconds INT DEFAULT 3600 COMMENT '超时时间(秒)',
- max_records BIGINT COMMENT '最大处理记录数',
- sort_field VARCHAR(100) COMMENT '增量字段',
last_execution_id VARCHAR(36) COMMENT '最后执行ID(UUID)',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
created_by VARCHAR(255) COMMENT '创建者',
updated_by VARCHAR(255) COMMENT '更新者',
INDEX idx_status (status),
- INDEX idx_created_at (created_at),
- INDEX idx_schedule (schedule_expression)
+ INDEX idx_created_at (created_at)
) COMMENT='数据归集任务表';
--- 任务执行记录表
-CREATE TABLE t_dc_task_log (
+CREATE TABLE t_dc_task_executions (
id VARCHAR(36) PRIMARY KEY COMMENT '执行记录ID(UUID)',
task_id VARCHAR(36) NOT NULL COMMENT '任务ID',
task_name VARCHAR(255) NOT NULL COMMENT '任务名称',
- sync_mode VARCHAR(20) DEFAULT 'FULL' COMMENT '同步模式:FULL/INCREMENTAL',
status VARCHAR(20) DEFAULT 'RUNNING' COMMENT '执行状态:RUNNING/SUCCESS/FAILED/STOPPED',
- start_time TIMESTAMP NULL COMMENT '开始时间',
- end_time TIMESTAMP NULL COMMENT '结束时间',
- duration BIGINT COMMENT '执行时长(毫秒)',
- process_id VARCHAR(50) COMMENT '进程ID',
- log_path VARCHAR(500) COMMENT '日志文件路径',
- error_msg LONGTEXT COMMENT '错误信息',
- result LONGTEXT COMMENT '执行结果',
- retry_times INT DEFAULT 0 COMMENT '重试次数',
- create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间'
-) COMMENT='任务执行记录表';
-
+ log_path VARCHAR(1000) NOT NULL COMMENT '日志文件路径',
+ started_at TIMESTAMP NULL COMMENT '开始时间',
+ completed_at TIMESTAMP NULL COMMENT '完成时间',
+ duration_seconds INT DEFAULT 0 COMMENT '执行时长(秒)',
+ error_message TEXT COMMENT '错误信息',
+ created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
+ updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
+ created_by VARCHAR(255) COMMENT '创建者',
+ updated_by VARCHAR(255) COMMENT '更新者',
+ INDEX idx_task_id (task_id),
+ INDEX idx_status (status),
+ INDEX idx_started_at (started_at)
+) COMMENT='任务执行明细表';
--- DataX模板配置表
-CREATE TABLE t_dc_datax_templates (
+-- 数据归集模板配置表
+CREATE TABLE t_dc_collection_templates (
id VARCHAR(36) PRIMARY KEY COMMENT '模板ID(UUID)',
name VARCHAR(255) NOT NULL UNIQUE COMMENT '模板名称',
- source_type VARCHAR(50) NOT NULL COMMENT '源数据源类型',
- target_type VARCHAR(50) NOT NULL COMMENT '目标数据源类型',
- template_content TEXT NOT NULL COMMENT '模板内容',
description TEXT COMMENT '模板描述',
- version VARCHAR(20) DEFAULT '1.0.0' COMMENT '版本号',
- is_system BOOLEAN DEFAULT FALSE COMMENT '是否系统模板',
+ source_type VARCHAR(64) NOT NULL COMMENT '源数据源类型',
+ source_name VARCHAR(64) NOT NULL COMMENT '源数据源名称',
+ target_type VARCHAR(64) NOT NULL COMMENT '目标数据源类型',
+ target_name VARCHAR(64) NOT NULL COMMENT '目标数据源名称',
+ template_content JSON NOT NULL COMMENT '模板内容',
+ built_in BOOLEAN DEFAULT FALSE COMMENT '是否系统内置',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
created_by VARCHAR(255) COMMENT '创建者',
- INDEX idx_source_target (source_type, target_type),
- INDEX idx_system (is_system)
-) COMMENT='DataX模板配置表';
-
--- =====================================
--- DML语句 - 数据操作
--- =====================================
-
--- 插入默认的DataX模板
-INSERT INTO t_dc_datax_templates (id, name, source_type, target_type, template_content, description, is_system, created_by) VALUES
--- MySQL to MySQL 模板
-('e4272e51-d431-4681-a370-1b3d0b036cd0', 'MySQL到MySQL', 'MYSQL', 'MYSQL', JSON_OBJECT(
- 'job', JSON_OBJECT(
- 'setting', JSON_OBJECT(
- 'speed', JSON_OBJECT('channel', 3)
- ),
- 'content', JSON_ARRAY(
- JSON_OBJECT(
- 'reader', JSON_OBJECT(
- 'name', 'mysqlreader',
- 'parameter', JSON_OBJECT(
- 'username', '${source.username}',
- 'password', '${source.password}',
- 'column', JSON_ARRAY('*'),
- 'splitPk', '${source.splitPk:id}',
- 'connection', JSON_ARRAY(
- JSON_OBJECT(
- 'jdbcUrl', JSON_ARRAY('${source.jdbcUrl}'),
- 'table', JSON_ARRAY('${source.table}')
- )
- )
- )
- ),
- 'writer', JSON_OBJECT(
- 'name', 'mysqlwriter',
- 'parameter', JSON_OBJECT(
- 'writeMode', 'insert',
- 'username', '${target.username}',
- 'password', '${target.password}',
- 'column', JSON_ARRAY('*'),
- 'session', JSON_ARRAY('set session sql_mode="PIPES_AS_CONCAT"'),
- 'preSql', JSON_ARRAY('${target.preSql:}'),
- 'connection', JSON_ARRAY(
- JSON_OBJECT(
- 'jdbcUrl', '${target.jdbcUrl}',
- 'table', JSON_ARRAY('${target.table}')
- )
- )
- )
- )
- )
- )
- )
-), 'MySQL到MySQL数据同步模板', TRUE, 'system');
-
--- 插入任务执行记录模拟数据
-INSERT INTO t_dc_task_executions (id, task_id, task_name, status, progress, records_total, records_processed, records_success, records_failed, throughput, data_size_bytes, started_at, completed_at, duration_seconds, config) VALUES
--- 成功执行记录
-('12128059-a266-4d4f-b647-3cb8c24b8aad', '54cefc4d-3071-43d9-9fbf-baeb87932acd', '用户数据同步', 'SUCCESS', 100.00, 15000, 15000, 15000, 0, 125.50, 2048576,
- DATE_SUB(NOW(), INTERVAL 1 DAY), DATE_SUB(NOW(), INTERVAL 1 DAY) + INTERVAL 2 MINUTE, 120,
- JSON_OBJECT('batchSize', 1000, 'parallelism', 3)),
-
-('9d418e0c-fa54-4f01-8633-3a5ad57f46a1', '3039a5c8-c894-42ab-ad49-5c2c5eccda31', '订单增量同步', 'SUCCESS', 100.00, 8500, 8500, 8500, 0, 94.44, 1536000,
- DATE_SUB(NOW(), INTERVAL 12 HOUR), DATE_SUB(NOW(), INTERVAL 12 HOUR) + INTERVAL 90 SECOND, 90,
- JSON_OBJECT('batchSize', 2000, 'parallelism', 2));
+ updated_by VARCHAR(255) COMMENT '更新者',
+ INDEX idx_source_target (source_type, target_type)
+) COMMENT='数据归集模板配置表';
+INSERT IGNORE INTO t_dc_collection_templates(id, name, description, source_type, source_name, target_type, target_name, template_content, built_in, created_by, updated_by)
+VALUES ('1', 'NAS归集模板', '将NAS存储上的文件归集到DataMate平台上。', 'nfsreader', 'nfsreader', 'nfswriter', 'nfswriter', '{"parameter": {}, "reader": {}, "writer": {}}', True, 'system', 'system'),
+ ('2', 'OBS归集模板', '将OBS存储上的文件归集到DataMate平台上。', 'obsreader', 'obsreader', 'obswriter', 'obswriter', '{"parameter": {"endpoint": {"name": "服务地址","description": "OBS的服务地址。","type": "input"},"bucket": {"name": "存储桶名称","description": "OBS存储桶名称。","type": "input"},"accessKey": {"name": "访问密钥","description": "OBS访问密钥。","type": "input"},"secretKey": {"name": "密钥","description": "OBS密钥。","type": "input"},"prefix": {"name": "匹配前缀","description": "按照匹配前缀去选中OBS中的文件进行归集。","type": "input"}}, "reader": {}, "writer": {}}', True, 'system', 'system');
diff --git a/scripts/images/backend-python/Dockerfile b/scripts/images/backend-python/Dockerfile
index 9cd9f3d15..dbc3a1592 100644
--- a/scripts/images/backend-python/Dockerfile
+++ b/scripts/images/backend-python/Dockerfile
@@ -1,9 +1,26 @@
+FROM maven:3-eclipse-temurin-8 AS datax-builder
+
+RUN apt-get update && \
+ apt-get install -y git && \
+ git clone https://github.com/alibaba/DataX.git
+
+COPY runtime/datax/ DataX/
+
+RUN cd DataX && \
+ sed -i "s/com.mysql.jdbc.Driver/com.mysql.cj.jdbc.Driver/g" \
+ plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/util/DataBaseType.java && \
+ mvn -U clean package assembly:assembly -Dmaven.test.skip=true
+
FROM python:3.12-slim
# Single-stage image with build cache optimization using BuildKit cache mounts.
# Note: to use the cache mount syntax you must build with BuildKit enabled:
# DOCKER_BUILDKIT=1 docker build . -f scripts/images/datamate-python/Dockerfile -t datamate-backend-python
+RUN apt-get update \
+ && apt-get install -y --no-install-recommends openjdk-21-jre-headless \
+ && rm -rf /var/lib/apt/lists/*
+
ENV PYTHONDONTWRITEBYTECODE=1 \
PYTHONUNBUFFERED=1 \
# Poetry configuration
@@ -12,7 +29,9 @@ ENV PYTHONDONTWRITEBYTECODE=1 \
POETRY_VIRTUALENVS_CREATE=false \
POETRY_CACHE_DIR=/tmp/poetry_cache
-ENV PATH="/root/.local/bin:$PATH"
+ENV JAVA_HOME=/usr/lib/jvm/java-21-openjdk-amd64
+
+ENV PATH="/root/.local/bin:$JAVA_HOME/bin:$PATH"
WORKDIR /app
@@ -22,6 +41,8 @@ RUN --mount=type=cache,target=/root/.cache/pip \
&& pip install --root-user-action=ignore pipx \
&& pipx install "poetry==$POETRY_VERSION"
+COPY --from=datax-builder /DataX/target/datax/datax /opt/datax
+
# Copy only dependency files first (leverages layer caching when dependencies don't change)
COPY runtime/datamate-python/pyproject.toml runtime/datamate-python/poetry.lock* /app/
diff --git a/scripts/images/gateway/Dockerfile b/scripts/images/gateway/Dockerfile
index e63e493f8..06e642847 100644
--- a/scripts/images/gateway/Dockerfile
+++ b/scripts/images/gateway/Dockerfile
@@ -1,28 +1,19 @@
FROM maven:3-eclipse-temurin-21 AS builder
-RUN apt-get update && \
- apt-get install -y git && \
- git clone https://github.com/ModelEngine-Group/Terrabase.git && \
- cd Terrabase && \
- git -c core.quotepath=false -c log.showSignature=false checkout -b pyh/feat_terrabase_develop origin/pyh/feat_terrabase_develop -- && \
- mvn -U clean package install -Dmaven.test.skip=true
-
COPY backend/ /opt/gateway
RUN cd /opt/gateway/api-gateway && \
- mvn -U clean package -Dmaven.test.skip=true && \
- ls /opt/gateway/api-gateway/target
+ mvn -U clean package -Dmaven.test.skip=true
FROM eclipse-temurin:21-jdk
RUN apt-get update && \
- apt-get install -y vim wget curl python3 python3-pip python-is-python3 dos2unix && \
+ apt-get install -y vim wget curl dos2unix && \
apt-get clean && \
rm -rf /var/lib/apt/lists/*
COPY --from=builder /opt/gateway/api-gateway/target/gateway.jar /opt/gateway/gateway.jar
-COPY --from=builder /Terrabase/enterprise-impl-commercial/target/*.jar /opt/terrabase/
COPY scripts/images/gateway/start.sh /opt/gateway/start.sh