Skip to content

Commit ca6c60e

Browse files
authored
Merge branch 'main' into develop_clean
2 parents 6d422c4 + 442e561 commit ca6c60e

File tree

83 files changed

+4215
-581
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

83 files changed

+4215
-581
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
![GitHub Stars](https://img.shields.io/github/stars/ModelEngine-Group/DataMate)
88
![GitHub Forks](https://img.shields.io/github/forks/ModelEngine-Group/DataMate)
99
![GitHub Issues](https://img.shields.io/github/issues/ModelEngine-Group/DataMate)
10-
![GitHub License](https://img.shields.io/github/license/ModelEngine-Group/DataMate)
10+
![GitHub License](https://img.shields.io/github/license/ModelEngine-Group/datamate-docs)
1111

1212
**DataMate is an enterprise-level data processing platform for model fine-tuning and RAG retrieval, supporting core
1313
functions such as data collection, data management, operator marketplace, data cleaning, data synthesis, data

backend/services/data-collection-service/src/main/java/com/datamate/collection/application/CollectionTaskService.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,9 @@
55
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
66
import com.datamate.collection.domain.model.entity.CollectionTask;
77
import com.datamate.collection.domain.model.entity.TaskExecution;
8-
import com.datamate.collection.common.enums.TaskStatus;
98
import com.datamate.collection.domain.repository.CollectionTaskRepository;
109
import com.datamate.collection.common.enums.SyncMode;
1110
import com.datamate.common.domain.utils.ChunksSaver;
12-
import com.datamate.datamanagement.application.DatasetApplicationService;
13-
import com.datamate.datamanagement.domain.model.dataset.Dataset;
1411
import lombok.RequiredArgsConstructor;
1512
import lombok.extern.slf4j.Slf4j;
1613
import org.springframework.stereotype.Service;
@@ -25,7 +22,6 @@
2522
@RequiredArgsConstructor
2623
public class CollectionTaskService {
2724
private final TaskExecutionService taskExecutionService;
28-
private final DatasetApplicationService datasetApplicationService;
2925
private final CollectionTaskRepository collectionTaskRepository;
3026

3127
@Transactional

backend/services/data-collection-service/src/main/java/com/datamate/collection/application/TaskExecutionService.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.datamate.collection.application;
22

3+
import com.datamate.collection.common.enums.TemplateType;
34
import com.datamate.collection.domain.model.entity.CollectionTask;
45
import com.datamate.collection.domain.model.entity.TaskExecution;
56
import com.datamate.collection.common.enums.TaskStatus;
@@ -9,6 +10,7 @@
910
import com.datamate.datamanagement.application.DatasetApplicationService;
1011
import lombok.RequiredArgsConstructor;
1112
import lombok.extern.slf4j.Slf4j;
13+
import org.apache.commons.lang3.StringUtils;
1214
import org.springframework.scheduling.annotation.Async;
1315
import org.springframework.stereotype.Service;
1416
import org.springframework.transaction.annotation.Transactional;
@@ -50,7 +52,9 @@ public void runAsync(CollectionTask task, String executionId, int timeoutSeconds
5052
executionRepository.completeExecution(executionId, TaskStatus.SUCCESS.name(), LocalDateTime.now(),
5153
0, 0L, 0L, 0L, null);
5254
collectionTaskRepository.updateStatus(task.getId(), TaskStatus.SUCCESS.name());
53-
datasetApplicationService.processDataSourceAsync(datasetId, task.getId());
55+
if (StringUtils.isNotBlank(datasetId)) {
56+
datasetApplicationService.processDataSourceAsync(datasetId, task.getId());
57+
}
5458
} catch (Exception e) {
5559
log.error("DataX execution failed", e);
5660
executionRepository.completeExecution(executionId, TaskStatus.FAILED.name(), LocalDateTime.now(),

backend/services/data-collection-service/src/main/java/com/datamate/collection/common/enums/SyncMode.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,6 @@ public enum SyncMode {
77
/** 一次性(ONCE) */
88
ONCE,
99
/// 定时(SCHEDULED)
10-
SCHEDULED;
10+
SCHEDULED
1111
}
1212

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package com.datamate.collection.common.enums;
2+
3+
/**
4+
* 模板类型枚举
5+
*
6+
*/
7+
public enum TemplateType {
8+
NAS,
9+
OBS,
10+
MYSQL
11+
}

backend/services/data-collection-service/src/main/java/com/datamate/collection/domain/model/entity/CollectionTask.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import com.baomidou.mybatisplus.annotation.TableName;
44
import com.datamate.collection.common.enums.SyncMode;
55
import com.datamate.collection.common.enums.TaskStatus;
6+
import com.datamate.collection.common.enums.TemplateType;
67
import com.datamate.common.domain.model.base.BaseEntity;
78
import com.fasterxml.jackson.core.JsonProcessingException;
89
import com.fasterxml.jackson.core.type.TypeReference;
@@ -24,6 +25,8 @@
2425
public class CollectionTask extends BaseEntity<String> {
2526
private String name;
2627
private String description;
28+
private TemplateType taskType; // 任务类型
29+
private String targetPath; // 目标存储路径
2730
private String config; // DataX JSON 配置,包含源端和目标端配置信息
2831
private TaskStatus status;
2932
private SyncMode syncMode; // ONCE / SCHEDULED
@@ -51,7 +54,7 @@ public void addPath() {
5154

5255
public void initCreateParam() {
5356
this.id = UUID.randomUUID().toString();
54-
this.addPath();
57+
this.targetPath = "/dataset/local/" + id;
5558
this.status = TaskStatus.READY;
5659
this.createdAt = LocalDateTime.now();
5760
this.updatedAt = LocalDateTime.now();

backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/datax/DataxProcessRunner.java

Lines changed: 59 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,26 @@
1+
// java
12
package com.datamate.collection.infrastructure.datax;
23

4+
import com.datamate.collection.common.enums.TemplateType;
35
import com.datamate.collection.domain.model.entity.CollectionTask;
46
import com.datamate.collection.domain.process.ProcessRunner;
7+
import com.datamate.collection.infrastructure.datax.config.MysqlConfig;
8+
import com.datamate.collection.infrastructure.datax.config.NasConfig;
59
import com.datamate.common.infrastructure.exception.BusinessException;
610
import com.datamate.common.infrastructure.exception.SystemErrorCode;
7-
import com.fasterxml.jackson.core.type.TypeReference;
811
import com.fasterxml.jackson.databind.ObjectMapper;
912
import lombok.RequiredArgsConstructor;
1013
import lombok.extern.slf4j.Slf4j;
1114
import org.apache.commons.exec.*;
15+
import org.apache.commons.io.output.TeeOutputStream;
1216
import org.apache.commons.lang3.StringUtils;
1317
import org.springframework.stereotype.Component;
1418

15-
import java.io.File;
16-
import java.io.FileNotFoundException;
17-
import java.io.FileWriter;
18-
import java.io.IOException;
19-
import java.nio.file.Files;
20-
import java.nio.file.Path;
21-
import java.nio.file.Paths;
19+
import java.io.*;
20+
import java.nio.file.*;
2221
import java.time.Duration;
23-
import java.util.HashMap;
24-
import java.util.List;
25-
import java.util.Map;
22+
import java.util.*;
23+
import java.util.regex.Pattern;
2624

2725
@Slf4j
2826
@Component
@@ -34,7 +32,10 @@ public class DataxProcessRunner implements ProcessRunner {
3432
@Override
3533
public int runJob(CollectionTask task, String executionId, int timeoutSeconds) throws Exception {
3634
Path job = buildJobFile(task);
37-
return runJob(job.toFile(), executionId, Duration.ofSeconds(timeoutSeconds));
35+
int code = runJob(job.toFile(), executionId, Duration.ofSeconds(timeoutSeconds));
36+
// 任务成功后做后处理(仅针对 MYSQL 类型)
37+
postProcess(task);
38+
return code;
3839
}
3940

4041
private int runJob(File jobFile, String executionId, Duration timeout) throws Exception {
@@ -61,10 +62,8 @@ private static DefaultExecutor getExecutor(Duration timeout, File logFile) throw
6162
}
6263

6364
ExecuteStreamHandler streamHandler = new PumpStreamHandler(
64-
new org.apache.commons.io.output.TeeOutputStream(
65-
new java.io.FileOutputStream(logFile, true), System.out),
66-
new org.apache.commons.io.output.TeeOutputStream(
67-
new java.io.FileOutputStream(logFile, true), System.err)
65+
new TeeOutputStream(new FileOutputStream(logFile, true), System.out),
66+
new TeeOutputStream(new FileOutputStream(logFile, true), System.err)
6867
);
6968
executor.setStreamHandler(streamHandler);
7069

@@ -92,33 +91,53 @@ private Path buildJobFile(CollectionTask task) throws IOException {
9291
private String getJobConfig(CollectionTask task) {
9392
try {
9493
ObjectMapper objectMapper = new ObjectMapper();
95-
Map<String, Object> parameter = objectMapper.readValue(
96-
task.getConfig(),
97-
new TypeReference<>() {
98-
}
99-
);
100-
Map<String, Object> job = new HashMap<>();
101-
Map<String, Object> content = new HashMap<>();
102-
Map<String, Object> reader = new HashMap<>();
103-
reader.put("name", "nfsreader");
104-
reader.put("parameter", parameter);
105-
content.put("reader", reader);
106-
Map<String, Object> writer = new HashMap<>();
107-
writer.put("name", "nfswriter");
108-
writer.put("parameter", parameter);
109-
content.put("writer", writer);
110-
job.put("content", List.of(content));
111-
Map<String, Object> setting = new HashMap<>();
112-
Map<String, Object> channel = new HashMap<>();
113-
channel.put("channel", 2);
114-
setting.put("speed", channel);
115-
job.put("setting", setting);
116-
Map<String, Object> jobConfig = new HashMap<>();
117-
jobConfig.put("job", job);
118-
return objectMapper.writeValueAsString(jobConfig);
94+
TemplateType templateType = task.getTaskType();
95+
switch (templateType) {
96+
case NAS:
97+
// NAS 特殊处理
98+
NasConfig nasConfig = objectMapper.readValue(task.getConfig(), NasConfig.class);
99+
return nasConfig.toJobConfig(objectMapper, task);
100+
case OBS:
101+
case MYSQL:
102+
MysqlConfig mysqlConfig = objectMapper.readValue(task.getConfig(), MysqlConfig.class);
103+
return mysqlConfig.toJobConfig(objectMapper, task);
104+
default:
105+
throw BusinessException.of(SystemErrorCode.UNKNOWN_ERROR, "Unsupported template type: " + templateType);
106+
}
119107
} catch (Exception e) {
120108
log.error("Failed to parse task config", e);
121109
throw new RuntimeException("Failed to parse task config", e);
122110
}
123111
}
112+
113+
private void postProcess(CollectionTask task) throws IOException {
114+
if (task.getTaskType() != TemplateType.MYSQL) {
115+
return;
116+
}
117+
String targetPath = task.getTargetPath();
118+
// 将targetPath下所有不以.csv结尾的文件修改为以.csv结尾
119+
Path dir = Paths.get(targetPath);
120+
if (!Files.exists(dir) || !Files.isDirectory(dir)) {
121+
log.info("Target path {} does not exist or is not a directory for task {}, skip post processing.", targetPath, task.getId());
122+
return;
123+
}
124+
125+
try (DirectoryStream<Path> stream = Files.newDirectoryStream(dir)) {
126+
for (Path path : stream) {
127+
if (!Files.isRegularFile(path)) continue;
128+
String name = path.getFileName().toString();
129+
if (name.toLowerCase().endsWith(".csv")) continue;
130+
131+
Path target = dir.resolve(name + ".csv");
132+
try {
133+
Files.move(path, target, StandardCopyOption.REPLACE_EXISTING);
134+
log.info("Renamed file for task {}: {} -> {}", task.getId(), name, target.getFileName().toString());
135+
} catch (IOException ex) {
136+
log.warn("Failed to rename file {} for task {}: {}", path, task.getId(), ex.getMessage(), ex);
137+
}
138+
}
139+
} catch (IOException ioe) {
140+
log.warn("Error scanning target directory {} for task {}: {}", targetPath, task.getId(), ioe.getMessage(), ioe);
141+
}
142+
}
124143
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
package com.datamate.collection.infrastructure.datax.config;
2+
3+
public interface BaseConfig {
4+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
package com.datamate.collection.infrastructure.datax.config;
2+
3+
import com.datamate.collection.domain.model.entity.CollectionTask;
4+
import com.fasterxml.jackson.databind.ObjectMapper;
5+
import lombok.Getter;
6+
import lombok.Setter;
7+
import org.apache.commons.collections4.CollectionUtils;
8+
9+
import java.util.Collections;
10+
import java.util.HashMap;
11+
import java.util.List;
12+
import java.util.Map;
13+
14+
@Getter
15+
@Setter
16+
public class MysqlConfig {
17+
private String jdbcUrl;
18+
19+
private String username;
20+
21+
private String password;
22+
23+
private String querySql;
24+
25+
private List<String> headers;
26+
27+
/**
28+
* 将当前 MYSQL 配置构造成 DataX 所需的 job JSON 字符串。
29+
*/
30+
public String toJobConfig(ObjectMapper objectMapper, CollectionTask task) throws Exception {
31+
Map<String, Object> mysqlParameter = new HashMap<>();
32+
Map<String, Object> connection = new HashMap<>();
33+
if (username != null) mysqlParameter.put("username", username);
34+
if (password != null) mysqlParameter.put("password", password);
35+
if (jdbcUrl != null) connection.put("jdbcUrl", Collections.singletonList(jdbcUrl));
36+
if (querySql != null) connection.put("querySql", Collections.singletonList(querySql));
37+
mysqlParameter.put("connection", Collections.singletonList(connection));
38+
39+
Map<String, Object> job = new HashMap<>();
40+
Map<String, Object> content = new HashMap<>();
41+
Map<String, Object> reader = new HashMap<>();
42+
reader.put("name", "mysqlreader");
43+
reader.put("parameter", mysqlParameter);
44+
content.put("reader", reader);
45+
46+
Map<String, Object> writer = new HashMap<>();
47+
Map<String, Object> writerParameter = new HashMap<>();
48+
writer.put("name", "txtfilewriter");
49+
if (CollectionUtils.isNotEmpty(headers)) {
50+
writerParameter.put("header", headers);
51+
}
52+
writerParameter.put("path", task.getTargetPath());
53+
writerParameter.put("fileName", "collectionResult");
54+
writerParameter.put("writeMode", "truncate");
55+
writerParameter.put("dateFormat", "yyyy-MM-dd HH:mm:ss");
56+
writerParameter.put("fileFormat", "csv");
57+
writerParameter.put("encoding", "UTF-8");
58+
writerParameter.put("fieldDelimiter", ",");
59+
writer.put("parameter", writerParameter);
60+
content.put("writer", writer);
61+
62+
job.put("content", List.of(content));
63+
Map<String, Object> setting = new HashMap<>();
64+
Map<String, Object> channel = new HashMap<>();
65+
channel.put("channel", 1);
66+
setting.put("speed", channel);
67+
job.put("setting", setting);
68+
69+
Map<String, Object> jobConfig = new HashMap<>();
70+
jobConfig.put("job", job);
71+
return objectMapper.writeValueAsString(jobConfig);
72+
}
73+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package com.datamate.collection.infrastructure.datax.config;
2+
3+
import com.datamate.collection.domain.model.entity.CollectionTask;
4+
import com.fasterxml.jackson.databind.ObjectMapper;
5+
import lombok.Getter;
6+
import lombok.Setter;
7+
8+
import java.util.HashMap;
9+
import java.util.List;
10+
import java.util.Map;
11+
12+
@Getter
13+
@Setter
14+
public class NasConfig implements BaseConfig{
15+
private String ip;
16+
17+
private String path;
18+
19+
private List<String> files;
20+
21+
/**
22+
* 将当前 NAS 配置构造成 DataX 所需的 job JSON 字符串。
23+
*/
24+
public String toJobConfig(ObjectMapper objectMapper, CollectionTask task) throws Exception {
25+
Map<String, Object> parameter = new HashMap<>();
26+
if (ip != null) parameter.put("ip", ip);
27+
if (path != null) parameter.put("path", path);
28+
if (files != null) parameter.put("files", files);
29+
parameter.put("destPath", task.getTargetPath());
30+
31+
Map<String, Object> job = new HashMap<>();
32+
Map<String, Object> content = new HashMap<>();
33+
Map<String, Object> reader = new HashMap<>();
34+
reader.put("name", "nfsreader");
35+
reader.put("parameter", parameter);
36+
content.put("reader", reader);
37+
38+
Map<String, Object> writer = new HashMap<>();
39+
writer.put("name", "nfswriter");
40+
writer.put("parameter", parameter);
41+
content.put("writer", writer);
42+
43+
job.put("content", List.of(content));
44+
Map<String, Object> setting = new HashMap<>();
45+
Map<String, Object> channel = new HashMap<>();
46+
channel.put("channel", 2);
47+
setting.put("speed", channel);
48+
job.put("setting", setting);
49+
50+
Map<String, Object> jobConfig = new HashMap<>();
51+
jobConfig.put("job", job);
52+
return objectMapper.writeValueAsString(jobConfig);
53+
}
54+
}

0 commit comments

Comments
 (0)