Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/docker-image-backend-python.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ on:
push:
branches: [ "main" ]
paths:
- 'scripts/images/datamate-python/**'
- 'scripts/images/backend-python/**'
- 'runtime/datamate-python/**'
- '.github/workflows/docker-image-backend-python.yml'
- '.github/workflows/docker-images-reusable.yml'
pull_request:
branches: [ "main" ]
paths:
- 'scripts/images/datamate-python/**'
- 'scripts/images/backend-python/**'
- 'runtime/datamate-python/**'
- '.github/workflows/docker-image-backend-python.yml'
- '.github/workflows/docker-images-reusable.yml'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,8 @@ public void processDataSourceAsync(String datasetId, String dataSourceId) {
if (CollectionUtils.isEmpty(filePaths)) {
return;
}
log.info("Starting file scan, total files: {}", filePaths.size());
datasetFileApplicationService.copyFilesToDatasetDir(datasetId, new CopyFilesRequest(filePaths));
log.info("Success file scan, total files: {}", filePaths.size());
} catch (Exception e) {
log.error("处理数据源文件扫描失败,数据集ID: {}, 数据源ID: {}", datasetId, dataSourceId, e);
}
Expand Down
182 changes: 98 additions & 84 deletions frontend/src/pages/DataCollection/Create/CreateTask.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -81,19 +81,7 @@ export default function CollectionTaskCreate() {
const handleSubmit = async () => {
try {
await form.validateFields();

const values = form.getFieldsValue(true);
const payload = {
name: values.name,
description: values.description,
syncMode: values.syncMode,
scheduleExpression: values.scheduleExpression,
timeoutSeconds: values.timeoutSeconds,
templateId: values.templateId,
config: values.config,
};

await createTaskUsingPost(payload);
await createTaskUsingPost(newTask);
message.success("任务创建成功");
navigate("/data/collection");
} catch (error) {
Expand All @@ -104,88 +92,108 @@ export default function CollectionTaskCreate() {
const selectedTemplate = templates.find((t) => t.id === selectedTemplateId);

const renderTemplateFields = (
section: "parameter" | "reader" | "writer",
section: any[],
defs: Record<string, TemplateFieldDef> | undefined
) => {
if (!defs || typeof defs !== "object") return null;
let items_ = []

const items = Object.entries(defs).map(([key, def]) => {
Object.entries(defs).sort(([key1, def1], [key2, def2]) => {
const def1Order = def1?.index || 0;
const def2Order = def2?.index || 0;
return def1Order - def2Order;
}).forEach(([key, def]) => {
const label = def?.name || key;
const description = def?.description;
const fieldType = (def?.type || "input").toLowerCase();
const required = def?.required !== false;

const rules = required
? [{ required: true, message: `请输入${label}` }]
: undefined;
const name = section.concat(key)

if (fieldType === "password") {
return (
<Form.Item
key={`${section}.${key}`}
name={["config", section, key]}
label={label}
tooltip={description}
rules={rules}
>
<Input.Password placeholder={description || `请输入${label}`} />
</Form.Item>
);
}

if (fieldType === "textarea") {
return (
<Form.Item
key={`${section}.${key}`}
name={["config", section, key]}
label={label}
tooltip={description}
rules={rules}
className="md:col-span-2"
>
<TextArea rows={4} placeholder={description || `请输入${label}`} />
</Form.Item>
);
}

if (fieldType === "select") {
const options = (def?.options || []).map((opt: any) => {
if (typeof opt === "string" || typeof opt === "number") {
return { label: String(opt), value: opt };
}
return { label: opt?.label ?? String(opt?.value), value: opt?.value };
});
return (
<Form.Item
key={`${section}.${key}`}
name={["config", section, key]}
label={label}
tooltip={description}
rules={rules}
>
<Select placeholder={description || `请选择${label}`} options={options} />
</Form.Item>
);
switch (fieldType) {
case "password":
items_.push((
<Form.Item
key={`${section}.${key}`}
name={name}
label={label}
tooltip={description}
rules={rules}
>
<Input.Password placeholder={description || `请输入${label}`} />
</Form.Item>
));
break;
case "selecttag":
items_.push((
<Form.Item
name={name}
label={label}
rules={rules}
>
<Select placeholder={description || `请输入${label}`} mode="tags" />
</Form.Item>
));
break;
case "select":
const options = (def?.options || []).map((opt: any) => {
if (typeof opt === "string" || typeof opt === "number") {
return { label: String(opt), value: opt };
}
return { label: opt?.label ?? String(opt?.value), value: opt?.value };
});
items_.push((
<Form.Item
key={`${section}.${key}`}
name={name}
label={label}
tooltip={description}
rules={rules}
>
<Select placeholder={description || `请选择${label}`} options={options} />
</Form.Item>
));
break;
case "multiple":
const itemsMultiple = renderTemplateFields(name, def?.properties)
items_.push(itemsMultiple)
break;
case "multiplelist":
const realName = name.concat(0)
const itemsMultipleList = renderTemplateFields(realName, def?.properties)
items_.push(itemsMultipleList)
break;
case "inputlist":
items_.push((
<Form.Item
key={`${section}.${key}`}
name={name.concat(0)}
label={label}
tooltip={description}
rules={rules}
>
<Input placeholder={description || `请输入${label}`} />
</Form.Item>
));
break;
default:
items_.push((
<Form.Item
key={`${section}.${key}`}
name={name}
label={label}
tooltip={description}
rules={rules}
>
<Input placeholder={description || `请输入${label}`} />
</Form.Item>
));
}
})

return (
<Form.Item
key={`${section}.${key}`}
name={["config", section, key]}
label={label}
tooltip={description}
rules={rules}
>
<Input placeholder={description || `请输入${label}`} />
</Form.Item>
);
});

return (
<div className="grid grid-cols-1 md:grid-cols-2 gap-x-4 gap-y-2">
{items}
</div>
);
return items_
};

const getPropertyCountSafe = (obj: any) => {
Expand Down Expand Up @@ -342,10 +350,12 @@ export default function CollectionTaskCreate() {
<h3 className="font-medium text-gray-900 pt-2 mb-2">
模板参数
</h3>
<div className="grid grid-cols-1 md:grid-cols-2 gap-x-4 gap-y-2">
{renderTemplateFields(
"parameter",
["config", "parameter"],
selectedTemplate.templateContent?.parameter as Record<string, TemplateFieldDef>
)}
</div>
</>
): null}

Expand All @@ -354,10 +364,12 @@ export default function CollectionTaskCreate() {
<h3 className="font-medium text-gray-900 pt-2 mb-2">
源端参数
</h3>
<div className="grid grid-cols-1 md:grid-cols-2 gap-x-4 gap-y-2">
{renderTemplateFields(
"reader",
["config", "reader"],
selectedTemplate.templateContent?.reader as Record<string, TemplateFieldDef>
)}
</div>
</>
) : null}

Expand All @@ -366,10 +378,12 @@ export default function CollectionTaskCreate() {
<h3 className="font-medium text-gray-900 pt-2 mb-2">
目标端参数
</h3>
<div className="grid grid-cols-1 md:grid-cols-2 gap-x-4 gap-y-2">
{renderTemplateFields(
"writer",
["config", "writer"],
selectedTemplate.templateContent?.writer as Record<string, TemplateFieldDef>
)}
</div>
</>
) : null}
</>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { Input, Select, Form } from "antd";
import { datasetTypes } from "../../dataset.const";
import { useEffect, useState } from "react";
import { queryDatasetTagsUsingGet } from "../../dataset.api";
import {queryTasksUsingGet} from "@/pages/DataCollection/collection.apis.ts";

export default function BasicInformation({
data,
Expand All @@ -20,6 +21,7 @@ export default function BasicInformation({
options: { label: JSX.Element; value: string }[];
}[]
>([]);
const [collectionOptions, setCollectionOptions] = useState([]);

// 获取标签
const fetchTags = async () => {
Expand All @@ -36,8 +38,23 @@ export default function BasicInformation({
}
};

// 获取归集任务
const fetchCollectionTasks = async () => {
try {
const res = await queryTasksUsingGet({ page: 0, size: 100 });
const options = res.data.content.map((task: any) => ({
label: task.name,
value: task.id,
}));
setCollectionOptions(options);
} catch (error) {
console.error("Error fetching collection tasks:", error);
}
};

useEffect(() => {
fetchTags();
fetchCollectionTasks();
}, []);
return (
<>
Expand Down Expand Up @@ -78,6 +95,11 @@ export default function BasicInformation({
/>
</Form.Item>
)}
{!hidden.includes("dataSource") && (
<Form.Item name="dataSource" label="关联归集任务">
<Select placeholder="请选择归集任务" options={collectionOptions} />
</Form.Item>
)}
</>
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@
logger = get_logger(__name__)

class DataxClient:
def __init__(self, task: CollectionTask, execution: TaskExecution):
def __init__(self, task: CollectionTask, execution: TaskExecution, template: CollectionTemplate):
self.execution = execution
self.task = task
self.template = template
self.config_file_path = f"/flow/data-collection/{task.id}/config.json"
self.python_path = "python"
self.datax_main = "/opt/datax/bin/datax.py"
Expand Down Expand Up @@ -53,10 +54,21 @@ def generate_datx_config(task_config: CollectionConfig, template: CollectionTemp
**(task_config.parameter if task_config.parameter else {}),
**(task_config.reader if task_config.reader else {})
}
dest_parameter = {}
if template.target_type == "txtfilewriter":
dest_parameter = {
"path": target_path,
"fileName": "collection_result",
"writeMode": "truncate"
}
elif template.target_type == "nfswriter" or template.target_type == "obswriter":
dest_parameter = {
"destPath": target_path
}
writer_parameter = {
**(task_config.parameter if task_config.parameter else {}),
**(task_config.writer if task_config.writer else {}),
"destPath": target_path
**dest_parameter
}
# 生成任务运行配置
job_config = {
Expand Down Expand Up @@ -128,6 +140,7 @@ def run_datax_job(self):
logger.info(f"DataX 任务执行成功: {self.execution.id}")
logger.info(f"执行耗时: {self.execution.duration_seconds:.2f} 秒")
self.execution.status = TaskStatus.COMPLETED.name
self.rename_collection_result()
else:
self.execution.error_message = self.execution.error_message or f"DataX 任务执行失败,退出码: {exit_code}"
self.execution.status = TaskStatus.FAILED.name
Expand All @@ -141,6 +154,23 @@ def run_datax_job(self):
if self.task.sync_mode == SyncMode.ONCE:
self.task.status = self.execution.status

def rename_collection_result(self):
if self.template.target_type != "txtfilewriter":
return
target_path = Path(self.task.target_path)
if not target_path.exists():
logger.warning(f"Target path does not exist: {target_path}")
return
# If it's a directory, find all files without extensions
for file_path in target_path.iterdir():
if file_path.is_file() and not file_path.suffix:
new_path = file_path.with_suffix('.csv')
try:
file_path.rename(new_path)
logger.info(f"Renamed {file_path} to {new_path}")
except Exception as e:
logger.error(f"Failed to rename {file_path} to {new_path}: {str(e)}")

def _run_process(self, cmd: list[str], log_f) -> int:
# 启动进程
process = subprocess.Popen(
Expand Down
Loading
Loading