diff --git a/backend/services/data-collection-service/src/main/java/com/datamate/collection/application/TaskExecutionService.java b/backend/services/data-collection-service/src/main/java/com/datamate/collection/application/TaskExecutionService.java index 2d4c874f5..6d27a65ad 100644 --- a/backend/services/data-collection-service/src/main/java/com/datamate/collection/application/TaskExecutionService.java +++ b/backend/services/data-collection-service/src/main/java/com/datamate/collection/application/TaskExecutionService.java @@ -1,5 +1,6 @@ package com.datamate.collection.application; +import com.datamate.collection.common.enums.TemplateType; import com.datamate.collection.domain.model.entity.CollectionTask; import com.datamate.collection.domain.model.entity.TaskExecution; import com.datamate.collection.common.enums.TaskStatus; @@ -9,6 +10,7 @@ import com.datamate.datamanagement.application.DatasetApplicationService; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @@ -50,7 +52,9 @@ public void runAsync(CollectionTask task, String executionId, int timeoutSeconds executionRepository.completeExecution(executionId, TaskStatus.SUCCESS.name(), LocalDateTime.now(), 0, 0L, 0L, 0L, null); collectionTaskRepository.updateStatus(task.getId(), TaskStatus.SUCCESS.name()); - datasetApplicationService.processDataSourceAsync(datasetId, task.getId()); + if (StringUtils.isNotBlank(datasetId)) { + datasetApplicationService.processDataSourceAsync(datasetId, task.getId()); + } } catch (Exception e) { log.error("DataX execution failed", e); executionRepository.completeExecution(executionId, TaskStatus.FAILED.name(), LocalDateTime.now(), diff --git a/backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/datax/DataxProcessRunner.java b/backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/datax/DataxProcessRunner.java index e74f16f73..41858a087 100644 --- a/backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/datax/DataxProcessRunner.java +++ b/backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/datax/DataxProcessRunner.java @@ -1,8 +1,10 @@ +// java package com.datamate.collection.infrastructure.datax; import com.datamate.collection.common.enums.TemplateType; import com.datamate.collection.domain.model.entity.CollectionTask; import com.datamate.collection.domain.process.ProcessRunner; +import com.datamate.collection.infrastructure.datax.config.MysqlConfig; import com.datamate.collection.infrastructure.datax.config.NasConfig; import com.datamate.common.infrastructure.exception.BusinessException; import com.datamate.common.infrastructure.exception.SystemErrorCode; @@ -15,10 +17,10 @@ import org.springframework.stereotype.Component; import java.io.*; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; +import java.nio.file.*; import java.time.Duration; +import java.util.*; +import java.util.regex.Pattern; @Slf4j @Component @@ -30,7 +32,10 @@ public class DataxProcessRunner implements ProcessRunner { @Override public int runJob(CollectionTask task, String executionId, int timeoutSeconds) throws Exception { Path job = buildJobFile(task); - return runJob(job.toFile(), executionId, Duration.ofSeconds(timeoutSeconds)); + int code = runJob(job.toFile(), executionId, Duration.ofSeconds(timeoutSeconds)); + // 任务成功后做后处理(仅针对 MYSQL 类型) + postProcess(task); + return code; } private int runJob(File jobFile, String executionId, Duration timeout) throws Exception { @@ -90,11 +95,12 @@ private String getJobConfig(CollectionTask task) { switch (templateType) { case NAS: // NAS 特殊处理 - // 移除 templateType 字段 NasConfig nasConfig = objectMapper.readValue(task.getConfig(), NasConfig.class); return nasConfig.toJobConfig(objectMapper, task); case OBS: case MYSQL: + MysqlConfig mysqlConfig = objectMapper.readValue(task.getConfig(), MysqlConfig.class); + return mysqlConfig.toJobConfig(objectMapper, task); default: throw BusinessException.of(SystemErrorCode.UNKNOWN_ERROR, "Unsupported template type: " + templateType); } @@ -103,4 +109,35 @@ private String getJobConfig(CollectionTask task) { throw new RuntimeException("Failed to parse task config", e); } } + + private void postProcess(CollectionTask task) throws IOException { + if (task.getTaskType() != TemplateType.MYSQL) { + return; + } + String targetPath = task.getTargetPath(); + // 将targetPath下所有不以.csv结尾的文件修改为以.csv结尾 + Path dir = Paths.get(targetPath); + if (!Files.exists(dir) || !Files.isDirectory(dir)) { + log.info("Target path {} does not exist or is not a directory for task {}, skip post processing.", targetPath, task.getId()); + return; + } + + try (DirectoryStream stream = Files.newDirectoryStream(dir)) { + for (Path path : stream) { + if (!Files.isRegularFile(path)) continue; + String name = path.getFileName().toString(); + if (name.toLowerCase().endsWith(".csv")) continue; + + Path target = dir.resolve(name + ".csv"); + try { + Files.move(path, target, StandardCopyOption.REPLACE_EXISTING); + log.info("Renamed file for task {}: {} -> {}", task.getId(), name, target.getFileName().toString()); + } catch (IOException ex) { + log.warn("Failed to rename file {} for task {}: {}", path, task.getId(), ex.getMessage(), ex); + } + } + } catch (IOException ioe) { + log.warn("Error scanning target directory {} for task {}: {}", targetPath, task.getId(), ioe.getMessage(), ioe); + } + } } diff --git a/backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/datax/config/MysqlConfig.java b/backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/datax/config/MysqlConfig.java new file mode 100644 index 000000000..4ba006727 --- /dev/null +++ b/backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/datax/config/MysqlConfig.java @@ -0,0 +1,73 @@ +package com.datamate.collection.infrastructure.datax.config; + +import com.datamate.collection.domain.model.entity.CollectionTask; +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.Getter; +import lombok.Setter; +import org.apache.commons.collections4.CollectionUtils; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +@Getter +@Setter +public class MysqlConfig { + private String jdbcUrl; + + private String username; + + private String password; + + private String querySql; + + private List headers; + + /** + * 将当前 MYSQL 配置构造成 DataX 所需的 job JSON 字符串。 + */ + public String toJobConfig(ObjectMapper objectMapper, CollectionTask task) throws Exception { + Map mysqlParameter = new HashMap<>(); + Map connection = new HashMap<>(); + if (username != null) mysqlParameter.put("username", username); + if (password != null) mysqlParameter.put("password", password); + if (jdbcUrl != null) connection.put("jdbcUrl", Collections.singletonList(jdbcUrl)); + if (querySql != null) connection.put("querySql", Collections.singletonList(querySql)); + mysqlParameter.put("connection", Collections.singletonList(connection)); + + Map job = new HashMap<>(); + Map content = new HashMap<>(); + Map reader = new HashMap<>(); + reader.put("name", "mysqlreader"); + reader.put("parameter", mysqlParameter); + content.put("reader", reader); + + Map writer = new HashMap<>(); + Map writerParameter = new HashMap<>(); + writer.put("name", "txtfilewriter"); + if (CollectionUtils.isNotEmpty(headers)) { + writerParameter.put("header", headers); + } + writerParameter.put("path", task.getTargetPath()); + writerParameter.put("fileName", "collectionResult"); + writerParameter.put("writeMode", "truncate"); + writerParameter.put("dateFormat", "yyyy-MM-dd HH:mm:ss"); + writerParameter.put("fileFormat", "csv"); + writerParameter.put("encoding", "UTF-8"); + writerParameter.put("fieldDelimiter", ","); + writer.put("parameter", writerParameter); + content.put("writer", writer); + + job.put("content", List.of(content)); + Map setting = new HashMap<>(); + Map channel = new HashMap<>(); + channel.put("channel", 1); + setting.put("speed", channel); + job.put("setting", setting); + + Map jobConfig = new HashMap<>(); + jobConfig.put("job", job); + return objectMapper.writeValueAsString(jobConfig); + } +} diff --git a/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/converter/CollectionTaskConverter.java b/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/converter/CollectionTaskConverter.java index 2b4538398..b54931903 100644 --- a/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/converter/CollectionTaskConverter.java +++ b/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/converter/CollectionTaskConverter.java @@ -41,8 +41,7 @@ public interface CollectionTaskConverter { default Map parseJsonToMap(String json) { try { ObjectMapper objectMapper = new ObjectMapper(); - return - objectMapper.readValue(json, Map.class); + return objectMapper.readValue(json, Map.class); } catch (Exception e) { throw BusinessException.of(SystemErrorCode.INVALID_PARAMETER); } diff --git a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/DatasetApplicationService.java b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/DatasetApplicationService.java index f4c61d8d8..0c13bb0cd 100644 --- a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/DatasetApplicationService.java +++ b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/DatasetApplicationService.java @@ -287,19 +287,4 @@ private List getFilePaths(String dataSourceId) { log.info("获取到归集任务详情: {}", taskDetail); return Collections.singletonList(taskDetail.getTargetPath()); } - - /** - * 解析任务配置 - */ - private LocalCollectionConfig parseTaskConfig(Map configMap) { - try { - if (configMap == null || configMap.isEmpty()) { - return null; - } - return objectMapper.convertValue(configMap, LocalCollectionConfig.class); - } catch (Exception e) { - log.error("解析任务配置失败", e); - return null; - } - } } diff --git a/frontend/src/pages/DataCollection/Create/CreateTask.tsx b/frontend/src/pages/DataCollection/Create/CreateTask.tsx index ed208af99..8b23b6db3 100644 --- a/frontend/src/pages/DataCollection/Create/CreateTask.tsx +++ b/frontend/src/pages/DataCollection/Create/CreateTask.tsx @@ -321,21 +321,15 @@ export default function CollectionTaskCreate() { {selectedTemplate === TemplateType.MYSQL && (
- - - - + @@ -346,22 +340,22 @@ export default function CollectionTaskCreate() { rules={[{ required: true, message: "请输入密码" }]} label="密码" > - - - - + + +