Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
0120f75
fix:配比任务需要能够跳转到目标数据集
szc0616 Nov 6, 2025
b64bcc9
feature:增加配比任务详情接口
szc0616 Nov 6, 2025
d43048f
Merge branch 'ModelEngine-Group:main' into main
szc0616 Nov 6, 2025
99cf8a8
fix:删除不存在的配比详情页面
szc0616 Nov 6, 2025
a8cc247
fix:使用正式的逻辑来展示标签
szc0616 Nov 7, 2025
4856f95
Merge remote-tracking branch 'origin/main'
szc0616 Nov 7, 2025
a0f20c5
Merge remote-tracking branch 'origin/main'
szc0616 Nov 7, 2025
798d633
fix:参数默认值去掉多余的-
szc0616 Nov 7, 2025
3006026
fix:修复配比任务相关操作
szc0616 Nov 7, 2025
cc22e47
Merge branch 'ModelEngine-Group:main' into main
szc0616 Nov 7, 2025
1ea7f17
fix:去除不需要的日志打印和import
szc0616 Nov 10, 2025
9061dc4
Merge branch 'ModelEngine-Group:main' into main
szc0616 Nov 10, 2025
6d069bc
Merge remote-tracking branch 'origin/main'
szc0616 Nov 10, 2025
8391160
feature:数据归集创建时将obs、mysql归集也放出
szc0616 Nov 11, 2025
bdfd609
refactor:重构数据归集的代码
szc0616 Nov 11, 2025
8fdb42c
refactor:重构数据归集的代码
szc0616 Nov 11, 2025
da9b96b
Merge branch 'ModelEngine-Group:main' into main
szc0616 Nov 11, 2025
26265a9
Merge remote-tracking branch 'origin/main'
szc0616 Nov 12, 2025
30b43aa
Merge branch 'ModelEngine-Group:main' into main
szc0616 Nov 12, 2025
734b5e1
feature:增加实现mysql归集为csv文件
szc0616 Nov 12, 2025
b8e3799
Merge branch 'ModelEngine-Group:main' into main
szc0616 Nov 12, 2025
41bb77a
Merge remote-tracking branch 'origin/main'
szc0616 Nov 12, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.datamate.collection.application;

import com.datamate.collection.common.enums.TemplateType;
import com.datamate.collection.domain.model.entity.CollectionTask;
import com.datamate.collection.domain.model.entity.TaskExecution;
import com.datamate.collection.common.enums.TaskStatus;
Expand All @@ -9,6 +10,7 @@
import com.datamate.datamanagement.application.DatasetApplicationService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
Expand Down Expand Up @@ -50,7 +52,9 @@ public void runAsync(CollectionTask task, String executionId, int timeoutSeconds
executionRepository.completeExecution(executionId, TaskStatus.SUCCESS.name(), LocalDateTime.now(),
0, 0L, 0L, 0L, null);
collectionTaskRepository.updateStatus(task.getId(), TaskStatus.SUCCESS.name());
datasetApplicationService.processDataSourceAsync(datasetId, task.getId());
if (StringUtils.isNotBlank(datasetId)) {
datasetApplicationService.processDataSourceAsync(datasetId, task.getId());
}
} catch (Exception e) {
log.error("DataX execution failed", e);
executionRepository.completeExecution(executionId, TaskStatus.FAILED.name(), LocalDateTime.now(),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
// java
package com.datamate.collection.infrastructure.datax;

import com.datamate.collection.common.enums.TemplateType;
import com.datamate.collection.domain.model.entity.CollectionTask;
import com.datamate.collection.domain.process.ProcessRunner;
import com.datamate.collection.infrastructure.datax.config.MysqlConfig;
import com.datamate.collection.infrastructure.datax.config.NasConfig;
import com.datamate.common.infrastructure.exception.BusinessException;
import com.datamate.common.infrastructure.exception.SystemErrorCode;
Expand All @@ -15,10 +17,10 @@
import org.springframework.stereotype.Component;

import java.io.*;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.*;
import java.time.Duration;
import java.util.*;
import java.util.regex.Pattern;

@Slf4j
@Component
Expand All @@ -30,7 +32,10 @@ public class DataxProcessRunner implements ProcessRunner {
@Override
public int runJob(CollectionTask task, String executionId, int timeoutSeconds) throws Exception {
Path job = buildJobFile(task);
return runJob(job.toFile(), executionId, Duration.ofSeconds(timeoutSeconds));
int code = runJob(job.toFile(), executionId, Duration.ofSeconds(timeoutSeconds));
// 任务成功后做后处理(仅针对 MYSQL 类型)
postProcess(task);
return code;
}

private int runJob(File jobFile, String executionId, Duration timeout) throws Exception {
Expand Down Expand Up @@ -90,11 +95,12 @@ private String getJobConfig(CollectionTask task) {
switch (templateType) {
case NAS:
// NAS 特殊处理
// 移除 templateType 字段
NasConfig nasConfig = objectMapper.readValue(task.getConfig(), NasConfig.class);
return nasConfig.toJobConfig(objectMapper, task);
case OBS:
case MYSQL:
MysqlConfig mysqlConfig = objectMapper.readValue(task.getConfig(), MysqlConfig.class);
return mysqlConfig.toJobConfig(objectMapper, task);
default:
throw BusinessException.of(SystemErrorCode.UNKNOWN_ERROR, "Unsupported template type: " + templateType);
}
Expand All @@ -103,4 +109,35 @@ private String getJobConfig(CollectionTask task) {
throw new RuntimeException("Failed to parse task config", e);
}
}

private void postProcess(CollectionTask task) throws IOException {
if (task.getTaskType() != TemplateType.MYSQL) {
return;
}
String targetPath = task.getTargetPath();
// 将targetPath下所有不以.csv结尾的文件修改为以.csv结尾
Path dir = Paths.get(targetPath);
if (!Files.exists(dir) || !Files.isDirectory(dir)) {
log.info("Target path {} does not exist or is not a directory for task {}, skip post processing.", targetPath, task.getId());
return;
}

try (DirectoryStream<Path> stream = Files.newDirectoryStream(dir)) {
for (Path path : stream) {
if (!Files.isRegularFile(path)) continue;
String name = path.getFileName().toString();
if (name.toLowerCase().endsWith(".csv")) continue;

Path target = dir.resolve(name + ".csv");
try {
Files.move(path, target, StandardCopyOption.REPLACE_EXISTING);
log.info("Renamed file for task {}: {} -> {}", task.getId(), name, target.getFileName().toString());
} catch (IOException ex) {
log.warn("Failed to rename file {} for task {}: {}", path, task.getId(), ex.getMessage(), ex);
}
}
} catch (IOException ioe) {
log.warn("Error scanning target directory {} for task {}: {}", targetPath, task.getId(), ioe.getMessage(), ioe);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package com.datamate.collection.infrastructure.datax.config;

import com.datamate.collection.domain.model.entity.CollectionTask;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.Getter;
import lombok.Setter;
import org.apache.commons.collections4.CollectionUtils;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

@Getter
@Setter
public class MysqlConfig {
private String jdbcUrl;

private String username;

private String password;

private String querySql;

private List<String> headers;

/**
* 将当前 MYSQL 配置构造成 DataX 所需的 job JSON 字符串。
*/
public String toJobConfig(ObjectMapper objectMapper, CollectionTask task) throws Exception {
Map<String, Object> mysqlParameter = new HashMap<>();
Map<String, Object> connection = new HashMap<>();
if (username != null) mysqlParameter.put("username", username);
if (password != null) mysqlParameter.put("password", password);
if (jdbcUrl != null) connection.put("jdbcUrl", Collections.singletonList(jdbcUrl));
if (querySql != null) connection.put("querySql", Collections.singletonList(querySql));
mysqlParameter.put("connection", Collections.singletonList(connection));

Map<String, Object> job = new HashMap<>();
Map<String, Object> content = new HashMap<>();
Map<String, Object> reader = new HashMap<>();
reader.put("name", "mysqlreader");
reader.put("parameter", mysqlParameter);
content.put("reader", reader);

Map<String, Object> writer = new HashMap<>();
Map<String, Object> writerParameter = new HashMap<>();
writer.put("name", "txtfilewriter");
if (CollectionUtils.isNotEmpty(headers)) {
writerParameter.put("header", headers);
}
writerParameter.put("path", task.getTargetPath());
writerParameter.put("fileName", "collectionResult");
writerParameter.put("writeMode", "truncate");
writerParameter.put("dateFormat", "yyyy-MM-dd HH:mm:ss");
writerParameter.put("fileFormat", "csv");
writerParameter.put("encoding", "UTF-8");
writerParameter.put("fieldDelimiter", ",");
writer.put("parameter", writerParameter);
content.put("writer", writer);

job.put("content", List.of(content));
Map<String, Object> setting = new HashMap<>();
Map<String, Object> channel = new HashMap<>();
channel.put("channel", 1);
setting.put("speed", channel);
job.put("setting", setting);

Map<String, Object> jobConfig = new HashMap<>();
jobConfig.put("job", job);
return objectMapper.writeValueAsString(jobConfig);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@ public interface CollectionTaskConverter {
default Map<String, Object> parseJsonToMap(String json) {
try {
ObjectMapper objectMapper = new ObjectMapper();
return
objectMapper.readValue(json, Map.class);
return objectMapper.readValue(json, Map.class);
} catch (Exception e) {
throw BusinessException.of(SystemErrorCode.INVALID_PARAMETER);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,19 +287,4 @@ private List<String> getFilePaths(String dataSourceId) {
log.info("获取到归集任务详情: {}", taskDetail);
return Collections.singletonList(taskDetail.getTargetPath());
}

/**
* 解析任务配置
*/
private LocalCollectionConfig parseTaskConfig(Map<String, Object> configMap) {
try {
if (configMap == null || configMap.isEmpty()) {
return null;
}
return objectMapper.convertValue(configMap, LocalCollectionConfig.class);
} catch (Exception e) {
log.error("解析任务配置失败", e);
return null;
}
}
}
36 changes: 15 additions & 21 deletions frontend/src/pages/DataCollection/Create/CreateTask.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -321,21 +321,15 @@ export default function CollectionTaskCreate() {
{selectedTemplate === TemplateType.MYSQL && (
<div className="grid grid-cols-2 gap-3 px-2 bg-blue-50 rounded">
<Form.Item
name={["config", "host"]}
rules={[{ required: true, message: "请输入MYSQL主机名" }]}
label="MYSQL主机名"
>
<Input placeholder="192.168.1.100" />
</Form.Item>
<Form.Item
name={["config", "port"]}
rules={[{ required: true, message: "请输入端口号" }]}
label="端口号"
name={["config", "jdbcUrl"]}
rules={[{ required: true, message: "请输入数据库链接" }]}
label="数据库链接"
className="col-span-2"
>
<Input placeholder="3306" />
<Input placeholder="jdbc:mysql://localhost:3306/mysql?useUnicode=true&characterEncoding=utf8" />
</Form.Item>
<Form.Item
name={["config", "user"]}
name={["config", "username"]}
rules={[{ required: true, message: "请输入用户名" }]}
label="用户名"
>
Expand All @@ -346,22 +340,22 @@ export default function CollectionTaskCreate() {
rules={[{ required: true, message: "请输入密码" }]}
label="密码"
>
<Input placeholder="" />
</Form.Item>
<Form.Item
name={["config", "schema"]}
rules={[{ required: true, message: "请输入数据库" }]}
label="数据库"
>
<Input placeholder="public" />
<Input type="password" className="h-8 text-xs" placeholder="Secret Key" />
</Form.Item>
<Form.Item
name={["config", "sql"]}
name={["config", "querySql"]}
rules={[{ required: true, message: "请输入查询语句" }]}
label="查询语句"
>
<Input placeholder="select * from your_table" />
</Form.Item>
<Form.Item
name={["config", "headers"]}
label="列名"
className="col-span-2"
>
<Select placeholder="请输入列名" mode="tags" />
</Form.Item>
</div>
)}
</>
Expand Down
16 changes: 8 additions & 8 deletions runtime/datax/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -315,13 +315,13 @@
<!-- </includes>-->
<!-- <outputDirectory>datax</outputDirectory>-->
<!-- </fileSet>-->
<!-- <fileSet>-->
<!-- <directory>txtfilewriter/target/datax/</directory>-->
<!-- <includes>-->
<!-- <include>**/*.*</include>-->
<!-- </includes>-->
<!-- <outputDirectory>datax</outputDirectory>-->
<!-- </fileSet>-->
<fileSet>
<directory>txtfilewriter/target/datax/</directory>
<includes>
<include>**/*.*</include>
</includes>
<outputDirectory>datax</outputDirectory>
</fileSet>
<!-- <fileSet>-->
<!-- <directory>ftpwriter/target/datax/</directory>-->
<!-- <includes>-->
Expand Down Expand Up @@ -582,4 +582,4 @@
<outputDirectory>datax</outputDirectory>
</fileSet>
</fileSets>
</assembly>
</assembly>
4 changes: 2 additions & 2 deletions runtime/datax/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@
<!-- <module>kuduwriter</module>-->
<!-- <module>ftpwriter</module>-->
<!-- <module>hdfswriter</module>-->
<!-- <module>txtfilewriter</module>-->
<module>txtfilewriter</module>
<!-- <module>streamwriter</module>-->

<!-- <module>elasticsearchwriter</module>-->
Expand Down Expand Up @@ -305,4 +305,4 @@
</plugin>
</plugins>
</build>
</project>
</project>