Skip to content

Commit 2b09c7d

Browse files
authored
feature:mysql数据库归集为csv文件 (#76)
* fix:配比任务需要能够跳转到目标数据集 * feature:增加配比任务详情接口 * fix:删除不存在的配比详情页面 * fix:使用正式的逻辑来展示标签 * fix:参数默认值去掉多余的- * fix:修复配比任务相关操作 * fix:去除不需要的日志打印和import * feature:数据归集创建时将obs、mysql归集也放出 * refactor:重构数据归集的代码 * refactor:重构数据归集的代码 * feature:增加实现mysql归集为csv文件
1 parent b8d7aca commit 2b09c7d

File tree

8 files changed

+146
-54
lines changed

8 files changed

+146
-54
lines changed

backend/services/data-collection-service/src/main/java/com/datamate/collection/application/TaskExecutionService.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.datamate.collection.application;
22

3+
import com.datamate.collection.common.enums.TemplateType;
34
import com.datamate.collection.domain.model.entity.CollectionTask;
45
import com.datamate.collection.domain.model.entity.TaskExecution;
56
import com.datamate.collection.common.enums.TaskStatus;
@@ -9,6 +10,7 @@
910
import com.datamate.datamanagement.application.DatasetApplicationService;
1011
import lombok.RequiredArgsConstructor;
1112
import lombok.extern.slf4j.Slf4j;
13+
import org.apache.commons.lang3.StringUtils;
1214
import org.springframework.scheduling.annotation.Async;
1315
import org.springframework.stereotype.Service;
1416
import org.springframework.transaction.annotation.Transactional;
@@ -50,7 +52,9 @@ public void runAsync(CollectionTask task, String executionId, int timeoutSeconds
5052
executionRepository.completeExecution(executionId, TaskStatus.SUCCESS.name(), LocalDateTime.now(),
5153
0, 0L, 0L, 0L, null);
5254
collectionTaskRepository.updateStatus(task.getId(), TaskStatus.SUCCESS.name());
53-
datasetApplicationService.processDataSourceAsync(datasetId, task.getId());
55+
if (StringUtils.isNotBlank(datasetId)) {
56+
datasetApplicationService.processDataSourceAsync(datasetId, task.getId());
57+
}
5458
} catch (Exception e) {
5559
log.error("DataX execution failed", e);
5660
executionRepository.completeExecution(executionId, TaskStatus.FAILED.name(), LocalDateTime.now(),

backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/datax/DataxProcessRunner.java

Lines changed: 42 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
1+
// java
12
package com.datamate.collection.infrastructure.datax;
23

34
import com.datamate.collection.common.enums.TemplateType;
45
import com.datamate.collection.domain.model.entity.CollectionTask;
56
import com.datamate.collection.domain.process.ProcessRunner;
7+
import com.datamate.collection.infrastructure.datax.config.MysqlConfig;
68
import com.datamate.collection.infrastructure.datax.config.NasConfig;
79
import com.datamate.common.infrastructure.exception.BusinessException;
810
import com.datamate.common.infrastructure.exception.SystemErrorCode;
@@ -15,10 +17,10 @@
1517
import org.springframework.stereotype.Component;
1618

1719
import java.io.*;
18-
import java.nio.file.Files;
19-
import java.nio.file.Path;
20-
import java.nio.file.Paths;
20+
import java.nio.file.*;
2121
import java.time.Duration;
22+
import java.util.*;
23+
import java.util.regex.Pattern;
2224

2325
@Slf4j
2426
@Component
@@ -30,7 +32,10 @@ public class DataxProcessRunner implements ProcessRunner {
3032
@Override
3133
public int runJob(CollectionTask task, String executionId, int timeoutSeconds) throws Exception {
3234
Path job = buildJobFile(task);
33-
return runJob(job.toFile(), executionId, Duration.ofSeconds(timeoutSeconds));
35+
int code = runJob(job.toFile(), executionId, Duration.ofSeconds(timeoutSeconds));
36+
// 任务成功后做后处理(仅针对 MYSQL 类型)
37+
postProcess(task);
38+
return code;
3439
}
3540

3641
private int runJob(File jobFile, String executionId, Duration timeout) throws Exception {
@@ -90,11 +95,12 @@ private String getJobConfig(CollectionTask task) {
9095
switch (templateType) {
9196
case NAS:
9297
// NAS 特殊处理
93-
// 移除 templateType 字段
9498
NasConfig nasConfig = objectMapper.readValue(task.getConfig(), NasConfig.class);
9599
return nasConfig.toJobConfig(objectMapper, task);
96100
case OBS:
97101
case MYSQL:
102+
MysqlConfig mysqlConfig = objectMapper.readValue(task.getConfig(), MysqlConfig.class);
103+
return mysqlConfig.toJobConfig(objectMapper, task);
98104
default:
99105
throw BusinessException.of(SystemErrorCode.UNKNOWN_ERROR, "Unsupported template type: " + templateType);
100106
}
@@ -103,4 +109,35 @@ private String getJobConfig(CollectionTask task) {
103109
throw new RuntimeException("Failed to parse task config", e);
104110
}
105111
}
112+
113+
private void postProcess(CollectionTask task) throws IOException {
114+
if (task.getTaskType() != TemplateType.MYSQL) {
115+
return;
116+
}
117+
String targetPath = task.getTargetPath();
118+
// 将targetPath下所有不以.csv结尾的文件修改为以.csv结尾
119+
Path dir = Paths.get(targetPath);
120+
if (!Files.exists(dir) || !Files.isDirectory(dir)) {
121+
log.info("Target path {} does not exist or is not a directory for task {}, skip post processing.", targetPath, task.getId());
122+
return;
123+
}
124+
125+
try (DirectoryStream<Path> stream = Files.newDirectoryStream(dir)) {
126+
for (Path path : stream) {
127+
if (!Files.isRegularFile(path)) continue;
128+
String name = path.getFileName().toString();
129+
if (name.toLowerCase().endsWith(".csv")) continue;
130+
131+
Path target = dir.resolve(name + ".csv");
132+
try {
133+
Files.move(path, target, StandardCopyOption.REPLACE_EXISTING);
134+
log.info("Renamed file for task {}: {} -> {}", task.getId(), name, target.getFileName().toString());
135+
} catch (IOException ex) {
136+
log.warn("Failed to rename file {} for task {}: {}", path, task.getId(), ex.getMessage(), ex);
137+
}
138+
}
139+
} catch (IOException ioe) {
140+
log.warn("Error scanning target directory {} for task {}: {}", targetPath, task.getId(), ioe.getMessage(), ioe);
141+
}
142+
}
106143
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
package com.datamate.collection.infrastructure.datax.config;
2+
3+
import com.datamate.collection.domain.model.entity.CollectionTask;
4+
import com.fasterxml.jackson.databind.ObjectMapper;
5+
import lombok.Getter;
6+
import lombok.Setter;
7+
import org.apache.commons.collections4.CollectionUtils;
8+
9+
import java.util.Collections;
10+
import java.util.HashMap;
11+
import java.util.List;
12+
import java.util.Map;
13+
14+
@Getter
15+
@Setter
16+
public class MysqlConfig {
17+
private String jdbcUrl;
18+
19+
private String username;
20+
21+
private String password;
22+
23+
private String querySql;
24+
25+
private List<String> headers;
26+
27+
/**
28+
* 将当前 MYSQL 配置构造成 DataX 所需的 job JSON 字符串。
29+
*/
30+
public String toJobConfig(ObjectMapper objectMapper, CollectionTask task) throws Exception {
31+
Map<String, Object> mysqlParameter = new HashMap<>();
32+
Map<String, Object> connection = new HashMap<>();
33+
if (username != null) mysqlParameter.put("username", username);
34+
if (password != null) mysqlParameter.put("password", password);
35+
if (jdbcUrl != null) connection.put("jdbcUrl", Collections.singletonList(jdbcUrl));
36+
if (querySql != null) connection.put("querySql", Collections.singletonList(querySql));
37+
mysqlParameter.put("connection", Collections.singletonList(connection));
38+
39+
Map<String, Object> job = new HashMap<>();
40+
Map<String, Object> content = new HashMap<>();
41+
Map<String, Object> reader = new HashMap<>();
42+
reader.put("name", "mysqlreader");
43+
reader.put("parameter", mysqlParameter);
44+
content.put("reader", reader);
45+
46+
Map<String, Object> writer = new HashMap<>();
47+
Map<String, Object> writerParameter = new HashMap<>();
48+
writer.put("name", "txtfilewriter");
49+
if (CollectionUtils.isNotEmpty(headers)) {
50+
writerParameter.put("header", headers);
51+
}
52+
writerParameter.put("path", task.getTargetPath());
53+
writerParameter.put("fileName", "collectionResult");
54+
writerParameter.put("writeMode", "truncate");
55+
writerParameter.put("dateFormat", "yyyy-MM-dd HH:mm:ss");
56+
writerParameter.put("fileFormat", "csv");
57+
writerParameter.put("encoding", "UTF-8");
58+
writerParameter.put("fieldDelimiter", ",");
59+
writer.put("parameter", writerParameter);
60+
content.put("writer", writer);
61+
62+
job.put("content", List.of(content));
63+
Map<String, Object> setting = new HashMap<>();
64+
Map<String, Object> channel = new HashMap<>();
65+
channel.put("channel", 1);
66+
setting.put("speed", channel);
67+
job.put("setting", setting);
68+
69+
Map<String, Object> jobConfig = new HashMap<>();
70+
jobConfig.put("job", job);
71+
return objectMapper.writeValueAsString(jobConfig);
72+
}
73+
}

backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/converter/CollectionTaskConverter.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,7 @@ public interface CollectionTaskConverter {
4141
default Map<String, Object> parseJsonToMap(String json) {
4242
try {
4343
ObjectMapper objectMapper = new ObjectMapper();
44-
return
45-
objectMapper.readValue(json, Map.class);
44+
return objectMapper.readValue(json, Map.class);
4645
} catch (Exception e) {
4746
throw BusinessException.of(SystemErrorCode.INVALID_PARAMETER);
4847
}

backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/DatasetApplicationService.java

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -287,19 +287,4 @@ private List<String> getFilePaths(String dataSourceId) {
287287
log.info("获取到归集任务详情: {}", taskDetail);
288288
return Collections.singletonList(taskDetail.getTargetPath());
289289
}
290-
291-
/**
292-
* 解析任务配置
293-
*/
294-
private LocalCollectionConfig parseTaskConfig(Map<String, Object> configMap) {
295-
try {
296-
if (configMap == null || configMap.isEmpty()) {
297-
return null;
298-
}
299-
return objectMapper.convertValue(configMap, LocalCollectionConfig.class);
300-
} catch (Exception e) {
301-
log.error("解析任务配置失败", e);
302-
return null;
303-
}
304-
}
305290
}

frontend/src/pages/DataCollection/Create/CreateTask.tsx

Lines changed: 15 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -321,21 +321,15 @@ export default function CollectionTaskCreate() {
321321
{selectedTemplate === TemplateType.MYSQL && (
322322
<div className="grid grid-cols-2 gap-3 px-2 bg-blue-50 rounded">
323323
<Form.Item
324-
name={["config", "host"]}
325-
rules={[{ required: true, message: "请输入MYSQL主机名" }]}
326-
label="MYSQL主机名"
327-
>
328-
<Input placeholder="192.168.1.100" />
329-
</Form.Item>
330-
<Form.Item
331-
name={["config", "port"]}
332-
rules={[{ required: true, message: "请输入端口号" }]}
333-
label="端口号"
324+
name={["config", "jdbcUrl"]}
325+
rules={[{ required: true, message: "请输入数据库链接" }]}
326+
label="数据库链接"
327+
className="col-span-2"
334328
>
335-
<Input placeholder="3306" />
329+
<Input placeholder="jdbc:mysql://localhost:3306/mysql?useUnicode=true&characterEncoding=utf8" />
336330
</Form.Item>
337331
<Form.Item
338-
name={["config", "user"]}
332+
name={["config", "username"]}
339333
rules={[{ required: true, message: "请输入用户名" }]}
340334
label="用户名"
341335
>
@@ -346,22 +340,22 @@ export default function CollectionTaskCreate() {
346340
rules={[{ required: true, message: "请输入密码" }]}
347341
label="密码"
348342
>
349-
<Input placeholder="" />
350-
</Form.Item>
351-
<Form.Item
352-
name={["config", "schema"]}
353-
rules={[{ required: true, message: "请输入数据库" }]}
354-
label="数据库"
355-
>
356-
<Input placeholder="public" />
343+
<Input type="password" className="h-8 text-xs" placeholder="Secret Key" />
357344
</Form.Item>
358345
<Form.Item
359-
name={["config", "sql"]}
346+
name={["config", "querySql"]}
360347
rules={[{ required: true, message: "请输入查询语句" }]}
361348
label="查询语句"
362349
>
363350
<Input placeholder="select * from your_table" />
364351
</Form.Item>
352+
<Form.Item
353+
name={["config", "headers"]}
354+
label="列名"
355+
className="col-span-2"
356+
>
357+
<Select placeholder="请输入列名" mode="tags" />
358+
</Form.Item>
365359
</div>
366360
)}
367361
</>

runtime/datax/package.xml

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -315,13 +315,13 @@
315315
<!-- </includes>-->
316316
<!-- <outputDirectory>datax</outputDirectory>-->
317317
<!-- </fileSet>-->
318-
<!-- <fileSet>-->
319-
<!-- <directory>txtfilewriter/target/datax/</directory>-->
320-
<!-- <includes>-->
321-
<!-- <include>**/*.*</include>-->
322-
<!-- </includes>-->
323-
<!-- <outputDirectory>datax</outputDirectory>-->
324-
<!-- </fileSet>-->
318+
<fileSet>
319+
<directory>txtfilewriter/target/datax/</directory>
320+
<includes>
321+
<include>**/*.*</include>
322+
</includes>
323+
<outputDirectory>datax</outputDirectory>
324+
</fileSet>
325325
<!-- <fileSet>-->
326326
<!-- <directory>ftpwriter/target/datax/</directory>-->
327327
<!-- <includes>-->
@@ -582,4 +582,4 @@
582582
<outputDirectory>datax</outputDirectory>
583583
</fileSet>
584584
</fileSets>
585-
</assembly>
585+
</assembly>

runtime/datax/pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@
111111
<!-- <module>kuduwriter</module>-->
112112
<!-- <module>ftpwriter</module>-->
113113
<!-- <module>hdfswriter</module>-->
114-
<!-- <module>txtfilewriter</module>-->
114+
<module>txtfilewriter</module>
115115
<!-- <module>streamwriter</module>-->
116116

117117
<!-- <module>elasticsearchwriter</module>-->
@@ -305,4 +305,4 @@
305305
</plugin>
306306
</plugins>
307307
</build>
308-
</project>
308+
</project>

0 commit comments

Comments
 (0)