Skip to content

Commit b8d7aca

Browse files
authored
refactor:重构数据归集部分代码 (#75)
* fix:配比任务需要能够跳转到目标数据集 * feature:增加配比任务详情接口 * fix:删除不存在的配比详情页面 * fix:使用正式的逻辑来展示标签 * fix:参数默认值去掉多余的- * fix:修复配比任务相关操作 * fix:去除不需要的日志打印和import * feature:数据归集创建时将obs、mysql归集也放出 * refactor:重构数据归集的代码 * refactor:重构数据归集的代码
1 parent aa01f52 commit b8d7aca

File tree

14 files changed

+247
-110
lines changed

14 files changed

+247
-110
lines changed

backend/services/data-collection-service/src/main/java/com/datamate/collection/application/CollectionTaskService.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,9 @@
55
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
66
import com.datamate.collection.domain.model.entity.CollectionTask;
77
import com.datamate.collection.domain.model.entity.TaskExecution;
8-
import com.datamate.collection.common.enums.TaskStatus;
98
import com.datamate.collection.domain.repository.CollectionTaskRepository;
109
import com.datamate.collection.common.enums.SyncMode;
1110
import com.datamate.common.domain.utils.ChunksSaver;
12-
import com.datamate.datamanagement.application.DatasetApplicationService;
13-
import com.datamate.datamanagement.domain.model.dataset.Dataset;
1411
import lombok.RequiredArgsConstructor;
1512
import lombok.extern.slf4j.Slf4j;
1613
import org.springframework.stereotype.Service;
@@ -25,7 +22,6 @@
2522
@RequiredArgsConstructor
2623
public class CollectionTaskService {
2724
private final TaskExecutionService taskExecutionService;
28-
private final DatasetApplicationService datasetApplicationService;
2925
private final CollectionTaskRepository collectionTaskRepository;
3026

3127
@Transactional

backend/services/data-collection-service/src/main/java/com/datamate/collection/common/enums/SyncMode.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,6 @@ public enum SyncMode {
77
/** 一次性(ONCE) */
88
ONCE,
99
/// 定时(SCHEDULED)
10-
SCHEDULED;
10+
SCHEDULED
1111
}
1212

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package com.datamate.collection.common.enums;
2+
3+
/**
4+
* 模板类型枚举
5+
*
6+
*/
7+
public enum TemplateType {
8+
NAS,
9+
OBS,
10+
MYSQL
11+
}

backend/services/data-collection-service/src/main/java/com/datamate/collection/domain/model/entity/CollectionTask.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import com.baomidou.mybatisplus.annotation.TableName;
44
import com.datamate.collection.common.enums.SyncMode;
55
import com.datamate.collection.common.enums.TaskStatus;
6+
import com.datamate.collection.common.enums.TemplateType;
67
import com.datamate.common.domain.model.base.BaseEntity;
78
import com.fasterxml.jackson.core.JsonProcessingException;
89
import com.fasterxml.jackson.core.type.TypeReference;
@@ -24,6 +25,8 @@
2425
public class CollectionTask extends BaseEntity<String> {
2526
private String name;
2627
private String description;
28+
private TemplateType taskType; // 任务类型
29+
private String targetPath; // 目标存储路径
2730
private String config; // DataX JSON 配置,包含源端和目标端配置信息
2831
private TaskStatus status;
2932
private SyncMode syncMode; // ONCE / SCHEDULED
@@ -51,7 +54,7 @@ public void addPath() {
5154

5255
public void initCreateParam() {
5356
this.id = UUID.randomUUID().toString();
54-
this.addPath();
57+
this.targetPath = "/dataset/local/" + id;
5558
this.status = TaskStatus.READY;
5659
this.createdAt = LocalDateTime.now();
5760
this.updatedAt = LocalDateTime.now();

backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/datax/DataxProcessRunner.java

Lines changed: 18 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,24 @@
11
package com.datamate.collection.infrastructure.datax;
22

3+
import com.datamate.collection.common.enums.TemplateType;
34
import com.datamate.collection.domain.model.entity.CollectionTask;
45
import com.datamate.collection.domain.process.ProcessRunner;
6+
import com.datamate.collection.infrastructure.datax.config.NasConfig;
57
import com.datamate.common.infrastructure.exception.BusinessException;
68
import com.datamate.common.infrastructure.exception.SystemErrorCode;
7-
import com.fasterxml.jackson.core.type.TypeReference;
89
import com.fasterxml.jackson.databind.ObjectMapper;
910
import lombok.RequiredArgsConstructor;
1011
import lombok.extern.slf4j.Slf4j;
1112
import org.apache.commons.exec.*;
13+
import org.apache.commons.io.output.TeeOutputStream;
1214
import org.apache.commons.lang3.StringUtils;
1315
import org.springframework.stereotype.Component;
1416

15-
import java.io.File;
16-
import java.io.FileNotFoundException;
17-
import java.io.FileWriter;
18-
import java.io.IOException;
17+
import java.io.*;
1918
import java.nio.file.Files;
2019
import java.nio.file.Path;
2120
import java.nio.file.Paths;
2221
import java.time.Duration;
23-
import java.util.HashMap;
24-
import java.util.List;
25-
import java.util.Map;
2622

2723
@Slf4j
2824
@Component
@@ -61,10 +57,8 @@ private static DefaultExecutor getExecutor(Duration timeout, File logFile) throw
6157
}
6258

6359
ExecuteStreamHandler streamHandler = new PumpStreamHandler(
64-
new org.apache.commons.io.output.TeeOutputStream(
65-
new java.io.FileOutputStream(logFile, true), System.out),
66-
new org.apache.commons.io.output.TeeOutputStream(
67-
new java.io.FileOutputStream(logFile, true), System.err)
60+
new TeeOutputStream(new FileOutputStream(logFile, true), System.out),
61+
new TeeOutputStream(new FileOutputStream(logFile, true), System.err)
6862
);
6963
executor.setStreamHandler(streamHandler);
7064

@@ -92,30 +86,18 @@ private Path buildJobFile(CollectionTask task) throws IOException {
9286
private String getJobConfig(CollectionTask task) {
9387
try {
9488
ObjectMapper objectMapper = new ObjectMapper();
95-
Map<String, Object> parameter = objectMapper.readValue(
96-
task.getConfig(),
97-
new TypeReference<>() {
98-
}
99-
);
100-
Map<String, Object> job = new HashMap<>();
101-
Map<String, Object> content = new HashMap<>();
102-
Map<String, Object> reader = new HashMap<>();
103-
reader.put("name", "nfsreader");
104-
reader.put("parameter", parameter);
105-
content.put("reader", reader);
106-
Map<String, Object> writer = new HashMap<>();
107-
writer.put("name", "nfswriter");
108-
writer.put("parameter", parameter);
109-
content.put("writer", writer);
110-
job.put("content", List.of(content));
111-
Map<String, Object> setting = new HashMap<>();
112-
Map<String, Object> channel = new HashMap<>();
113-
channel.put("channel", 2);
114-
setting.put("speed", channel);
115-
job.put("setting", setting);
116-
Map<String, Object> jobConfig = new HashMap<>();
117-
jobConfig.put("job", job);
118-
return objectMapper.writeValueAsString(jobConfig);
89+
TemplateType templateType = task.getTaskType();
90+
switch (templateType) {
91+
case NAS:
92+
// NAS 特殊处理
93+
// 移除 templateType 字段
94+
NasConfig nasConfig = objectMapper.readValue(task.getConfig(), NasConfig.class);
95+
return nasConfig.toJobConfig(objectMapper, task);
96+
case OBS:
97+
case MYSQL:
98+
default:
99+
throw BusinessException.of(SystemErrorCode.UNKNOWN_ERROR, "Unsupported template type: " + templateType);
100+
}
119101
} catch (Exception e) {
120102
log.error("Failed to parse task config", e);
121103
throw new RuntimeException("Failed to parse task config", e);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
package com.datamate.collection.infrastructure.datax.config;
2+
3+
public interface BaseConfig {
4+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package com.datamate.collection.infrastructure.datax.config;
2+
3+
import com.datamate.collection.domain.model.entity.CollectionTask;
4+
import com.fasterxml.jackson.databind.ObjectMapper;
5+
import lombok.Getter;
6+
import lombok.Setter;
7+
8+
import java.util.HashMap;
9+
import java.util.List;
10+
import java.util.Map;
11+
12+
@Getter
13+
@Setter
14+
public class NasConfig implements BaseConfig{
15+
private String ip;
16+
17+
private String path;
18+
19+
private List<String> files;
20+
21+
/**
22+
* 将当前 NAS 配置构造成 DataX 所需的 job JSON 字符串。
23+
*/
24+
public String toJobConfig(ObjectMapper objectMapper, CollectionTask task) throws Exception {
25+
Map<String, Object> parameter = new HashMap<>();
26+
if (ip != null) parameter.put("ip", ip);
27+
if (path != null) parameter.put("path", path);
28+
if (files != null) parameter.put("files", files);
29+
parameter.put("destPath", task.getTargetPath());
30+
31+
Map<String, Object> job = new HashMap<>();
32+
Map<String, Object> content = new HashMap<>();
33+
Map<String, Object> reader = new HashMap<>();
34+
reader.put("name", "nfsreader");
35+
reader.put("parameter", parameter);
36+
content.put("reader", reader);
37+
38+
Map<String, Object> writer = new HashMap<>();
39+
writer.put("name", "nfswriter");
40+
writer.put("parameter", parameter);
41+
content.put("writer", writer);
42+
43+
job.put("content", List.of(content));
44+
Map<String, Object> setting = new HashMap<>();
45+
Map<String, Object> channel = new HashMap<>();
46+
channel.put("channel", 2);
47+
setting.put("speed", channel);
48+
job.put("setting", setting);
49+
50+
Map<String, Object> jobConfig = new HashMap<>();
51+
jobConfig.put("job", job);
52+
return objectMapper.writeValueAsString(jobConfig);
53+
}
54+
}

backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/dto/CollectionTaskResponse.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@ public class CollectionTaskResponse {
2929

3030
private String description;
3131

32-
@Valid
32+
private String targetPath;
33+
3334
private Map<String, Object> config = new HashMap<>();
3435

3536
private TaskStatus status;

backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/dto/CreateCollectionTaskRequest.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.datamate.collection.interfaces.dto;
22

33
import com.datamate.collection.common.enums.SyncMode;
4+
import com.datamate.collection.common.enums.TemplateType;
45
import com.datamate.datamanagement.interfaces.dto.CreateDatasetRequest;
56
import com.fasterxml.jackson.annotation.JsonProperty;
67

@@ -35,6 +36,11 @@ public class CreateCollectionTaskRequest {
3536
@JsonProperty("description")
3637
private String description;
3738

39+
@NotNull
40+
@Schema(name = "taskType", description = "任务类型", requiredMode = Schema.RequiredMode.REQUIRED)
41+
@JsonProperty("taskType")
42+
private TemplateType taskType;
43+
3844
@Valid
3945
@NotNull
4046
@Schema(name = "config", description = "归集配置,包含源端和目标端配置信息", requiredMode = Schema.RequiredMode.REQUIRED)

backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/DatasetApplicationService.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -285,12 +285,7 @@ private List<String> getFilePaths(String dataSourceId) {
285285
return Collections.emptyList();
286286
}
287287
log.info("获取到归集任务详情: {}", taskDetail);
288-
LocalCollectionConfig config = parseTaskConfig(taskDetail.getConfig());
289-
if (config == null) {
290-
log.warn("解析任务配置失败,任务ID: {}", dataSourceId);
291-
return Collections.emptyList();
292-
}
293-
return config.getFilePaths();
288+
return Collections.singletonList(taskDetail.getTargetPath());
294289
}
295290

296291
/**

0 commit comments

Comments
 (0)