Skip to content

Commit 9b25629

Browse files
committed
fix: 增加清洗任务详情页
1 parent 87468c2 commit 9b25629

File tree

24 files changed

+706
-606
lines changed

24 files changed

+706
-606
lines changed

backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/application/CleaningTaskService.java

Lines changed: 52 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,7 @@
1111
import com.datamate.cleaning.domain.repository.OperatorInstanceRepository;
1212

1313
import com.datamate.cleaning.infrastructure.validator.CleanTaskValidator;
14-
import com.datamate.cleaning.interfaces.dto.CleaningProcess;
15-
import com.datamate.cleaning.interfaces.dto.CleaningTaskDto;
16-
import com.datamate.cleaning.interfaces.dto.CreateCleaningTaskRequest;
17-
import com.datamate.cleaning.interfaces.dto.OperatorInstanceDto;
14+
import com.datamate.cleaning.interfaces.dto.*;
1815
import com.datamate.common.infrastructure.exception.BusinessException;
1916
import com.datamate.common.infrastructure.exception.SystemErrorCode;
2017
import com.datamate.datamanagement.application.DatasetApplicationService;
@@ -40,9 +37,13 @@
4037
import java.io.File;
4138
import java.io.FileWriter;
4239
import java.io.IOException;
43-
import java.util.List;
44-
import java.util.Map;
45-
import java.util.UUID;
40+
import java.nio.file.Files;
41+
import java.nio.file.Paths;
42+
import java.util.*;
43+
import java.util.concurrent.atomic.AtomicReference;
44+
import java.util.regex.Matcher;
45+
import java.util.regex.Pattern;
46+
import java.util.stream.Stream;
4647

4748
@Slf4j
4849
@Service
@@ -66,15 +67,20 @@ public class CleaningTaskService {
6667

6768
private final String FLOW_PATH = "/flow";
6869

70+
private final Pattern LEVEL_PATTERN = Pattern.compile(
71+
"\\b(TRACE|DEBUG|INFO|WARN|WARNING|ERROR|FATAL)\\b",
72+
Pattern.CASE_INSENSITIVE
73+
);
74+
6975
public List<CleaningTaskDto> getTasks(String status, String keywords, Integer page, Integer size) {
7076
List<CleaningTaskDto> tasks = cleaningTaskRepo.findTasks(status, keywords, page, size);
7177
tasks.forEach(this::setProcess);
7278
return tasks;
7379
}
7480

7581
private void setProcess(CleaningTaskDto task) {
76-
int count = cleaningResultRepo.countByInstanceId(task.getId());
77-
task.setProgress(CleaningProcess.of(task.getFileCount(), count));
82+
int[] count = cleaningResultRepo.countByInstanceId(task.getId());
83+
task.setProgress(CleaningProcess.of(task.getFileCount(), count[0], count[1]));
7884
}
7985

8086
public int countTasks(String status, String keywords) {
@@ -117,11 +123,46 @@ public CleaningTaskDto createTask(CreateCleaningTaskRequest request) {
117123

118124
public CleaningTaskDto getTask(String taskId) {
119125
CleaningTaskDto task = cleaningTaskRepo.findTaskById(taskId);
120-
operatorInstanceRepo.
121126
setProcess(task);
127+
task.setInstance(operatorInstanceRepo.findOperatorByInstanceId(taskId));
122128
return task;
123129
}
124130

131+
public List<CleaningResultDto> getTaskResults(String taskId) {
132+
return cleaningResultRepo.findByInstanceId(taskId);
133+
}
134+
135+
public List<CleaningTaskLog> getTaskLog(String taskId) {
136+
String logPath = FLOW_PATH + "/" + taskId + "/output.log";
137+
try (Stream<String> lines = Files.lines(Paths.get(logPath))) {
138+
List<CleaningTaskLog> logs = new ArrayList<>();
139+
AtomicReference<String> lastLevel = new AtomicReference<>("INFO");
140+
lines.forEach(line -> {
141+
lastLevel.set(getLogLevel(line, lastLevel.get()));
142+
CleaningTaskLog log = new CleaningTaskLog();
143+
log.setLevel(lastLevel.get());
144+
log.setMessage(line);
145+
logs.add(log);
146+
});
147+
return logs;
148+
} catch (IOException e) {
149+
log.error("Fail to read log file {}", logPath, e);
150+
return Collections.emptyList();
151+
}
152+
}
153+
154+
private String getLogLevel(String logLine, String defaultLevel) {
155+
if (logLine == null || logLine.trim().isEmpty()) {
156+
return defaultLevel;
157+
}
158+
159+
Matcher matcher = LEVEL_PATTERN.matcher(logLine);
160+
if (matcher.find()) {
161+
return matcher.group(1).toUpperCase();
162+
}
163+
return defaultLevel;
164+
}
165+
125166
@Transactional
126167
public void deleteTask(String taskId) {
127168
cleaningTaskRepo.deleteTaskById(taskId);
@@ -191,7 +232,7 @@ private void writeListMapToJsonlFile(List<Map<String, Object>> mapList, String f
191232

192233
try (BufferedWriter writer = new BufferedWriter(new FileWriter(fileName))) {
193234
if (!mapList.isEmpty()) { // 检查列表是否为空,避免异常
194-
String jsonString = objectMapper.writeValueAsString(mapList.get(0));
235+
String jsonString = objectMapper.writeValueAsString(mapList.getFirst());
195236
writer.write(jsonString);
196237

197238
for (int i = 1; i < mapList.size(); i++) {

backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/application/scheduler/CleaningTaskScheduler.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
public class CleaningTaskScheduler {
1717
private final CleaningTaskRepository cleaningTaskRepo;
1818

19+
private final RuntimeClient runtimeClient;
20+
1921
private final ExecutorService taskExecutor = Executors.newFixedThreadPool(5);
2022

2123
public void executeTask(String taskId) {
@@ -28,11 +30,11 @@ private void submitTask(String taskId) {
2830
task.setStatus(CleaningTaskStatusEnum.RUNNING);
2931
task.setStartedAt(LocalDateTime.now());
3032
cleaningTaskRepo.updateTask(task);
31-
RuntimeClient.submitTask(taskId);
33+
runtimeClient.submitTask(taskId);
3234
}
3335

3436
public void stopTask(String taskId) {
35-
RuntimeClient.stopTask(taskId);
37+
runtimeClient.stopTask(taskId);
3638
CleaningTaskDto task = new CleaningTaskDto();
3739
task.setId(taskId);
3840
task.setStatus(CleaningTaskStatusEnum.STOPPED);

backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/domain/repository/CleaningResultRepository.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,14 @@
33

44
import com.baomidou.mybatisplus.extension.repository.IRepository;
55
import com.datamate.cleaning.domain.model.entity.CleaningResult;
6+
import com.datamate.cleaning.interfaces.dto.CleaningResultDto;
7+
8+
import java.util.List;
69

710
public interface CleaningResultRepository extends IRepository<CleaningResult> {
811
void deleteByInstanceId(String instanceId);
912

10-
int countByInstanceId(String instanceId);
13+
int[] countByInstanceId(String instanceId);
14+
15+
List<CleaningResultDto> findByInstanceId(String instanceId);
1116
}

backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/domain/repository/OperatorInstanceRepository.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,14 @@
33
import com.baomidou.mybatisplus.extension.repository.IRepository;
44
import com.datamate.cleaning.interfaces.dto.OperatorInstanceDto;
55
import com.datamate.cleaning.domain.model.entity.OperatorInstance;
6+
import com.datamate.operator.interfaces.dto.OperatorDto;
67

78
import java.util.List;
89

910
public interface OperatorInstanceRepository extends IRepository<OperatorInstance> {
1011
void insertInstance(String instanceId, List<OperatorInstanceDto> instances);
1112

1213
void deleteByInstanceId(String instanceId);
14+
15+
List<OperatorDto> findOperatorByInstanceId(String instanceId);
1316
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package com.datamate.cleaning.infrastructure.converter;
2+
3+
import com.datamate.cleaning.domain.model.entity.CleaningResult;
4+
import com.datamate.cleaning.interfaces.dto.CleaningResultDto;
5+
import org.mapstruct.Mapper;
6+
import org.mapstruct.factory.Mappers;
7+
8+
import java.util.List;
9+
10+
@Mapper
11+
public interface CleaningResultConverter {
12+
CleaningResultConverter INSTANCE = Mappers.getMapper(CleaningResultConverter.class);
13+
14+
List<CleaningResultDto> convertEntityToDto(List<CleaningResult> cleaningResult);
15+
}

backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/infrastructure/httpclient/RuntimeClient.java

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
import com.datamate.common.infrastructure.exception.BusinessException;
44
import com.datamate.common.infrastructure.exception.SystemErrorCode;
55
import lombok.extern.slf4j.Slf4j;
6+
import org.springframework.beans.factory.annotation.Value;
7+
import org.springframework.stereotype.Component;
68

79
import java.io.IOException;
810
import java.net.URI;
@@ -13,24 +15,36 @@
1315
import java.time.Duration;
1416

1517
@Slf4j
18+
@Component
1619
public class RuntimeClient {
17-
private static final String BASE_URL = "http://datamate-runtime:8081/api";
20+
private final String CREATE_TASK_URL = "/api/task/{0}/submit";
1821

19-
private static final String CREATE_TASK_URL = BASE_URL + "/task/{0}/submit";
22+
private final String STOP_TASK_URL = "/api/task/{0}/stop";
2023

21-
private static final String STOP_TASK_URL = BASE_URL + "/task/{0}/stop";
24+
@Value("${runtime.protocol:http}")
25+
private String protocol;
2226

23-
private static final HttpClient CLIENT = HttpClient.newBuilder().connectTimeout(Duration.ofSeconds(10)).build();
27+
@Value("${runtime.host:datamate-runtime}")
28+
private String host;
2429

25-
public static void submitTask(String taskId) {
26-
send(MessageFormat.format(CREATE_TASK_URL, taskId));
30+
@Value("${runtime.port:8081}")
31+
private int port;
32+
33+
private final HttpClient CLIENT = HttpClient.newBuilder().connectTimeout(Duration.ofSeconds(10)).build();
34+
35+
public void submitTask(String taskId) {
36+
send(MessageFormat.format(getRequestUrl(CREATE_TASK_URL), taskId));
37+
}
38+
39+
public void stopTask(String taskId) {
40+
send(MessageFormat.format(getRequestUrl(STOP_TASK_URL), taskId));
2741
}
2842

29-
public static void stopTask(String taskId) {
30-
send(MessageFormat.format(STOP_TASK_URL, taskId));
43+
private String getRequestUrl(String url) {
44+
return protocol + "://" + host + ":" + port + url;
3145
}
3246

33-
private static void send(String url) {
47+
private void send(String url) {
3448
HttpRequest request = HttpRequest.newBuilder()
3549
.uri(URI.create(url))
3650
.timeout(Duration.ofSeconds(30))

backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/infrastructure/persistence/Impl/CleaningResultRepositoryImpl.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,18 @@
22

33
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
44
import com.baomidou.mybatisplus.extension.repository.CrudRepository;
5+
import com.datamate.cleaning.common.enums.CleaningTaskStatusEnum;
56
import com.datamate.cleaning.domain.model.entity.CleaningResult;
67
import com.datamate.cleaning.domain.repository.CleaningResultRepository;
8+
import com.datamate.cleaning.infrastructure.converter.CleaningResultConverter;
79
import com.datamate.cleaning.infrastructure.persistence.mapper.CleaningResultMapper;
10+
import com.datamate.cleaning.interfaces.dto.CleaningResultDto;
811
import lombok.RequiredArgsConstructor;
12+
import org.apache.commons.lang3.StringUtils;
913
import org.springframework.stereotype.Repository;
1014

15+
import java.util.List;
16+
1117
@Repository
1218
@RequiredArgsConstructor
1319
public class CleaningResultRepositoryImpl extends CrudRepository<CleaningResultMapper, CleaningResult>
@@ -22,9 +28,20 @@ public void deleteByInstanceId(String instanceId) {
2228
}
2329

2430
@Override
25-
public int countByInstanceId(String instanceId) {
31+
public int[] countByInstanceId(String instanceId) {
2632
LambdaQueryWrapper<CleaningResult> lambdaWrapper = new LambdaQueryWrapper<>();
2733
lambdaWrapper.eq(CleaningResult::getInstanceId, instanceId);
28-
return mapper.selectCount(lambdaWrapper).intValue();
34+
List<CleaningResult> cleaningResults = mapper.selectList(lambdaWrapper);
35+
int succeed = Math.toIntExact(cleaningResults.stream()
36+
.filter(result ->
37+
StringUtils.equals(result.getStatus(), CleaningTaskStatusEnum.COMPLETED.getValue()))
38+
.count());
39+
return new int[] {succeed, cleaningResults.size() - succeed};
40+
}
41+
42+
public List<CleaningResultDto> findByInstanceId(String instanceId) {
43+
LambdaQueryWrapper<CleaningResult> queryWrapper = new LambdaQueryWrapper<>();
44+
queryWrapper.eq(CleaningResult::getInstanceId, instanceId);
45+
return CleaningResultConverter.INSTANCE.convertEntityToDto(mapper.selectList(queryWrapper));
2946
}
3047
}

backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/infrastructure/persistence/Impl/OperatorInstanceRepositoryImpl.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import com.datamate.cleaning.domain.model.entity.OperatorInstance;
88
import com.datamate.cleaning.domain.repository.OperatorInstanceRepository;
99
import com.datamate.cleaning.infrastructure.persistence.mapper.OperatorInstanceMapper;
10+
import com.datamate.operator.interfaces.dto.OperatorDto;
1011
import lombok.RequiredArgsConstructor;
1112
import org.springframework.stereotype.Repository;
1213

@@ -38,10 +39,8 @@ public void deleteByInstanceId(String instanceId) {
3839
mapper.delete(lambdaWrapper);
3940
}
4041

41-
public void findByInstanceId(String instanceId) {
42-
LambdaQueryWrapper<OperatorInstance> lambdaWrapper = new LambdaQueryWrapper<>();
43-
lambdaWrapper.eq(OperatorInstance::getInstanceId, instanceId);
44-
List<OperatorInstance> operatorInstances = mapper.selectList(lambdaWrapper);
42+
public List<OperatorDto> findOperatorByInstanceId(String instanceId) {
43+
return OperatorInstanceConverter.INSTANCE.fromEntityToDto(mapper.findOperatorByInstanceId(instanceId));
4544

4645
}
4746
}
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,18 @@
11
package com.datamate.cleaning.infrastructure.persistence.mapper;
22

33
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
4+
import com.datamate.cleaning.domain.model.entity.Operator;
45
import com.datamate.cleaning.domain.model.entity.OperatorInstance;
56
import org.apache.ibatis.annotations.Mapper;
7+
import org.apache.ibatis.annotations.Select;
8+
9+
import java.util.List;
610

711

812
@Mapper
913
public interface OperatorInstanceMapper extends BaseMapper<OperatorInstance> {
14+
@Select("SELECT id, name, description, version, inputs, outputs, runtime, settings, created_at, updated_at " +
15+
"FROM t_operator_instance toi LEFT JOIN datamate.t_operator o ON toi.operator_id = o.id " +
16+
"WHERE toi.instance_id = #{instanceId} ORDER BY toi.op_index")
17+
List<Operator> findOperatorByInstanceId(String instanceId);
1018
}

backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/interfaces/dto/CleaningProcess.java

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,23 +16,37 @@
1616
public class CleaningProcess {
1717
private Float process;
1818

19+
private Float successRate;
20+
1921
private Integer totalFileNum;
2022

23+
private Integer succeedFileNum;
24+
25+
private Integer failedFileNum;
26+
2127
private Integer finishedFileNum;
2228

23-
public CleaningProcess(int totalFileNum, int finishedFileNum) {
29+
public CleaningProcess(int totalFileNum, int succeedFileNum, int failedFileNum) {
2430
this.totalFileNum = totalFileNum;
25-
this.finishedFileNum = finishedFileNum;
31+
this.succeedFileNum = succeedFileNum;
32+
this.failedFileNum = failedFileNum;
33+
this.finishedFileNum = succeedFileNum + failedFileNum;
2634
if (totalFileNum == 0) {
2735
this.process = 0.0f;
2836
} else {
2937
this.process = BigDecimal.valueOf(finishedFileNum * 100L)
3038
.divide(BigDecimal.valueOf(totalFileNum), 2, RoundingMode.HALF_UP).floatValue();
3139
}
40+
if (finishedFileNum == 0) {
41+
this.successRate = 0f;
42+
} else {
43+
this.successRate = BigDecimal.valueOf(succeedFileNum * 100L)
44+
.divide(BigDecimal.valueOf(finishedFileNum), 2, RoundingMode.HALF_UP).floatValue();
45+
}
3246
}
3347

34-
public static CleaningProcess of(int totalFileNum, int finishedFileNum) {
35-
return new CleaningProcess(totalFileNum, finishedFileNum);
48+
public static CleaningProcess of(int totalFileNum, int succeedFileNum, int failedFileNum) {
49+
return new CleaningProcess(totalFileNum, succeedFileNum, failedFileNum);
3650
}
3751
}
3852

0 commit comments

Comments
 (0)