Skip to content

Commit ccfb84c

Browse files
authored
feature: add mysql collection and starrocks collection (#222)
* fix: fix the path for backend-python imaage building * feature: add mysql collection and starrocks collection * feature: add mysql collection and starrocks collection * fix: change the permission of those files which collected from nfs to 754 * fix: delete collected files, config files and log files while deleting collection task * fix: add the collection task detail api * fix: change the log of collecting for dataset * fix: add collection task selecting while creating and updating dataset * fix: set the umask value to 0022 for java process
1 parent 8d61eb2 commit ccfb84c

File tree

13 files changed

+208
-115
lines changed

13 files changed

+208
-115
lines changed

.github/workflows/docker-image-backend-python.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,14 @@ on:
44
push:
55
branches: [ "main" ]
66
paths:
7-
- 'scripts/images/datamate-python/**'
7+
- 'scripts/images/backend-python/**'
88
- 'runtime/datamate-python/**'
99
- '.github/workflows/docker-image-backend-python.yml'
1010
- '.github/workflows/docker-images-reusable.yml'
1111
pull_request:
1212
branches: [ "main" ]
1313
paths:
14-
- 'scripts/images/datamate-python/**'
14+
- 'scripts/images/backend-python/**'
1515
- 'runtime/datamate-python/**'
1616
- '.github/workflows/docker-image-backend-python.yml'
1717
- '.github/workflows/docker-images-reusable.yml'

backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/DatasetApplicationService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -237,8 +237,8 @@ public void processDataSourceAsync(String datasetId, String dataSourceId) {
237237
if (CollectionUtils.isEmpty(filePaths)) {
238238
return;
239239
}
240-
log.info("Starting file scan, total files: {}", filePaths.size());
241240
datasetFileApplicationService.copyFilesToDatasetDir(datasetId, new CopyFilesRequest(filePaths));
241+
log.info("Success file scan, total files: {}", filePaths.size());
242242
} catch (Exception e) {
243243
log.error("处理数据源文件扫描失败,数据集ID: {}, 数据源ID: {}", datasetId, dataSourceId, e);
244244
}

frontend/src/pages/DataCollection/Create/CreateTask.tsx

Lines changed: 98 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -81,19 +81,7 @@ export default function CollectionTaskCreate() {
8181
const handleSubmit = async () => {
8282
try {
8383
await form.validateFields();
84-
85-
const values = form.getFieldsValue(true);
86-
const payload = {
87-
name: values.name,
88-
description: values.description,
89-
syncMode: values.syncMode,
90-
scheduleExpression: values.scheduleExpression,
91-
timeoutSeconds: values.timeoutSeconds,
92-
templateId: values.templateId,
93-
config: values.config,
94-
};
95-
96-
await createTaskUsingPost(payload);
84+
await createTaskUsingPost(newTask);
9785
message.success("任务创建成功");
9886
navigate("/data/collection");
9987
} catch (error) {
@@ -104,88 +92,108 @@ export default function CollectionTaskCreate() {
10492
const selectedTemplate = templates.find((t) => t.id === selectedTemplateId);
10593

10694
const renderTemplateFields = (
107-
section: "parameter" | "reader" | "writer",
95+
section: any[],
10896
defs: Record<string, TemplateFieldDef> | undefined
10997
) => {
11098
if (!defs || typeof defs !== "object") return null;
99+
let items_ = []
111100

112-
const items = Object.entries(defs).map(([key, def]) => {
101+
Object.entries(defs).sort(([key1, def1], [key2, def2]) => {
102+
const def1Order = def1?.index || 0;
103+
const def2Order = def2?.index || 0;
104+
return def1Order - def2Order;
105+
}).forEach(([key, def]) => {
113106
const label = def?.name || key;
114107
const description = def?.description;
115108
const fieldType = (def?.type || "input").toLowerCase();
116109
const required = def?.required !== false;
117-
118110
const rules = required
119111
? [{ required: true, message: `请输入${label}` }]
120112
: undefined;
113+
const name = section.concat(key)
121114

122-
if (fieldType === "password") {
123-
return (
124-
<Form.Item
125-
key={`${section}.${key}`}
126-
name={["config", section, key]}
127-
label={label}
128-
tooltip={description}
129-
rules={rules}
130-
>
131-
<Input.Password placeholder={description || `请输入${label}`} />
132-
</Form.Item>
133-
);
134-
}
135-
136-
if (fieldType === "textarea") {
137-
return (
138-
<Form.Item
139-
key={`${section}.${key}`}
140-
name={["config", section, key]}
141-
label={label}
142-
tooltip={description}
143-
rules={rules}
144-
className="md:col-span-2"
145-
>
146-
<TextArea rows={4} placeholder={description || `请输入${label}`} />
147-
</Form.Item>
148-
);
149-
}
150-
151-
if (fieldType === "select") {
152-
const options = (def?.options || []).map((opt: any) => {
153-
if (typeof opt === "string" || typeof opt === "number") {
154-
return { label: String(opt), value: opt };
155-
}
156-
return { label: opt?.label ?? String(opt?.value), value: opt?.value };
157-
});
158-
return (
159-
<Form.Item
160-
key={`${section}.${key}`}
161-
name={["config", section, key]}
162-
label={label}
163-
tooltip={description}
164-
rules={rules}
165-
>
166-
<Select placeholder={description || `请选择${label}`} options={options} />
167-
</Form.Item>
168-
);
115+
switch (fieldType) {
116+
case "password":
117+
items_.push((
118+
<Form.Item
119+
key={`${section}.${key}`}
120+
name={name}
121+
label={label}
122+
tooltip={description}
123+
rules={rules}
124+
>
125+
<Input.Password placeholder={description || `请输入${label}`} />
126+
</Form.Item>
127+
));
128+
break;
129+
case "selecttag":
130+
items_.push((
131+
<Form.Item
132+
name={name}
133+
label={label}
134+
rules={rules}
135+
>
136+
<Select placeholder={description || `请输入${label}`} mode="tags" />
137+
</Form.Item>
138+
));
139+
break;
140+
case "select":
141+
const options = (def?.options || []).map((opt: any) => {
142+
if (typeof opt === "string" || typeof opt === "number") {
143+
return { label: String(opt), value: opt };
144+
}
145+
return { label: opt?.label ?? String(opt?.value), value: opt?.value };
146+
});
147+
items_.push((
148+
<Form.Item
149+
key={`${section}.${key}`}
150+
name={name}
151+
label={label}
152+
tooltip={description}
153+
rules={rules}
154+
>
155+
<Select placeholder={description || `请选择${label}`} options={options} />
156+
</Form.Item>
157+
));
158+
break;
159+
case "multiple":
160+
const itemsMultiple = renderTemplateFields(name, def?.properties)
161+
items_.push(itemsMultiple)
162+
break;
163+
case "multiplelist":
164+
const realName = name.concat(0)
165+
const itemsMultipleList = renderTemplateFields(realName, def?.properties)
166+
items_.push(itemsMultipleList)
167+
break;
168+
case "inputlist":
169+
items_.push((
170+
<Form.Item
171+
key={`${section}.${key}`}
172+
name={name.concat(0)}
173+
label={label}
174+
tooltip={description}
175+
rules={rules}
176+
>
177+
<Input placeholder={description || `请输入${label}`} />
178+
</Form.Item>
179+
));
180+
break;
181+
default:
182+
items_.push((
183+
<Form.Item
184+
key={`${section}.${key}`}
185+
name={name}
186+
label={label}
187+
tooltip={description}
188+
rules={rules}
189+
>
190+
<Input placeholder={description || `请输入${label}`} />
191+
</Form.Item>
192+
));
169193
}
194+
})
170195

171-
return (
172-
<Form.Item
173-
key={`${section}.${key}`}
174-
name={["config", section, key]}
175-
label={label}
176-
tooltip={description}
177-
rules={rules}
178-
>
179-
<Input placeholder={description || `请输入${label}`} />
180-
</Form.Item>
181-
);
182-
});
183-
184-
return (
185-
<div className="grid grid-cols-1 md:grid-cols-2 gap-x-4 gap-y-2">
186-
{items}
187-
</div>
188-
);
196+
return items_
189197
};
190198

191199
const getPropertyCountSafe = (obj: any) => {
@@ -342,10 +350,12 @@ export default function CollectionTaskCreate() {
342350
<h3 className="font-medium text-gray-900 pt-2 mb-2">
343351
模板参数
344352
</h3>
353+
<div className="grid grid-cols-1 md:grid-cols-2 gap-x-4 gap-y-2">
345354
{renderTemplateFields(
346-
"parameter",
355+
["config", "parameter"],
347356
selectedTemplate.templateContent?.parameter as Record<string, TemplateFieldDef>
348357
)}
358+
</div>
349359
</>
350360
): null}
351361

@@ -354,10 +364,12 @@ export default function CollectionTaskCreate() {
354364
<h3 className="font-medium text-gray-900 pt-2 mb-2">
355365
源端参数
356366
</h3>
367+
<div className="grid grid-cols-1 md:grid-cols-2 gap-x-4 gap-y-2">
357368
{renderTemplateFields(
358-
"reader",
369+
["config", "reader"],
359370
selectedTemplate.templateContent?.reader as Record<string, TemplateFieldDef>
360371
)}
372+
</div>
361373
</>
362374
) : null}
363375

@@ -366,10 +378,12 @@ export default function CollectionTaskCreate() {
366378
<h3 className="font-medium text-gray-900 pt-2 mb-2">
367379
目标端参数
368380
</h3>
381+
<div className="grid grid-cols-1 md:grid-cols-2 gap-x-4 gap-y-2">
369382
{renderTemplateFields(
370-
"writer",
383+
["config", "writer"],
371384
selectedTemplate.templateContent?.writer as Record<string, TemplateFieldDef>
372385
)}
386+
</div>
373387
</>
374388
) : null}
375389
</>

frontend/src/pages/DataManagement/Create/components/BasicInformation.tsx

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import { Input, Select, Form } from "antd";
33
import { datasetTypes } from "../../dataset.const";
44
import { useEffect, useState } from "react";
55
import { queryDatasetTagsUsingGet } from "../../dataset.api";
6+
import {queryTasksUsingGet} from "@/pages/DataCollection/collection.apis.ts";
67

78
export default function BasicInformation({
89
data,
@@ -20,6 +21,7 @@ export default function BasicInformation({
2021
options: { label: JSX.Element; value: string }[];
2122
}[]
2223
>([]);
24+
const [collectionOptions, setCollectionOptions] = useState([]);
2325

2426
// 获取标签
2527
const fetchTags = async () => {
@@ -36,8 +38,23 @@ export default function BasicInformation({
3638
}
3739
};
3840

41+
// 获取归集任务
42+
const fetchCollectionTasks = async () => {
43+
try {
44+
const res = await queryTasksUsingGet({ page: 0, size: 100 });
45+
const options = res.data.content.map((task: any) => ({
46+
label: task.name,
47+
value: task.id,
48+
}));
49+
setCollectionOptions(options);
50+
} catch (error) {
51+
console.error("Error fetching collection tasks:", error);
52+
}
53+
};
54+
3955
useEffect(() => {
4056
fetchTags();
57+
fetchCollectionTasks();
4158
}, []);
4259
return (
4360
<>
@@ -78,6 +95,11 @@ export default function BasicInformation({
7895
/>
7996
</Form.Item>
8097
)}
98+
{!hidden.includes("dataSource") && (
99+
<Form.Item name="dataSource" label="关联归集任务">
100+
<Select placeholder="请选择归集任务" options={collectionOptions} />
101+
</Form.Item>
102+
)}
81103
</>
82104
);
83105
}

runtime/datamate-python/app/module/collection/client/datax_client.py

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,10 @@
1313
logger = get_logger(__name__)
1414

1515
class DataxClient:
16-
def __init__(self, task: CollectionTask, execution: TaskExecution):
16+
def __init__(self, task: CollectionTask, execution: TaskExecution, template: CollectionTemplate):
1717
self.execution = execution
1818
self.task = task
19+
self.template = template
1920
self.config_file_path = f"/flow/data-collection/{task.id}/config.json"
2021
self.python_path = "python"
2122
self.datax_main = "/opt/datax/bin/datax.py"
@@ -53,10 +54,21 @@ def generate_datx_config(task_config: CollectionConfig, template: CollectionTemp
5354
**(task_config.parameter if task_config.parameter else {}),
5455
**(task_config.reader if task_config.reader else {})
5556
}
57+
dest_parameter = {}
58+
if template.target_type == "txtfilewriter":
59+
dest_parameter = {
60+
"path": target_path,
61+
"fileName": "collection_result",
62+
"writeMode": "truncate"
63+
}
64+
elif template.target_type == "nfswriter" or template.target_type == "obswriter":
65+
dest_parameter = {
66+
"destPath": target_path
67+
}
5668
writer_parameter = {
5769
**(task_config.parameter if task_config.parameter else {}),
5870
**(task_config.writer if task_config.writer else {}),
59-
"destPath": target_path
71+
**dest_parameter
6072
}
6173
# 生成任务运行配置
6274
job_config = {
@@ -128,6 +140,7 @@ def run_datax_job(self):
128140
logger.info(f"DataX 任务执行成功: {self.execution.id}")
129141
logger.info(f"执行耗时: {self.execution.duration_seconds:.2f} 秒")
130142
self.execution.status = TaskStatus.COMPLETED.name
143+
self.rename_collection_result()
131144
else:
132145
self.execution.error_message = self.execution.error_message or f"DataX 任务执行失败,退出码: {exit_code}"
133146
self.execution.status = TaskStatus.FAILED.name
@@ -141,6 +154,23 @@ def run_datax_job(self):
141154
if self.task.sync_mode == SyncMode.ONCE:
142155
self.task.status = self.execution.status
143156

157+
def rename_collection_result(self):
158+
if self.template.target_type != "txtfilewriter":
159+
return
160+
target_path = Path(self.task.target_path)
161+
if not target_path.exists():
162+
logger.warning(f"Target path does not exist: {target_path}")
163+
return
164+
# If it's a directory, find all files without extensions
165+
for file_path in target_path.iterdir():
166+
if file_path.is_file() and not file_path.suffix:
167+
new_path = file_path.with_suffix('.csv')
168+
try:
169+
file_path.rename(new_path)
170+
logger.info(f"Renamed {file_path} to {new_path}")
171+
except Exception as e:
172+
logger.error(f"Failed to rename {file_path} to {new_path}: {str(e)}")
173+
144174
def _run_process(self, cmd: list[str], log_f) -> int:
145175
# 启动进程
146176
process = subprocess.Popen(

0 commit comments

Comments
 (0)