Skip to content

Commit 6bbde0e

Browse files
authored
feature: 清洗任务详情页 (#73)
* feature: 清洗任务详情 * fix: 取消构建镜像,改为直接拉取 * fix: 增加清洗任务详情页 * fix: 增加清洗任务详情页 * fix: 算子列表可点击 * fix: 模板详情和更新
1 parent 442e561 commit 6bbde0e

File tree

46 files changed

+1064
-794
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+1064
-794
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
name: docker-image-save.yml
2+
on:
3+
workflow_call:
4+
inputs:
5+
service_name:
6+
required: true
7+
type: string
8+
9+
jobs:
10+
pull-and-save:
11+
runs-on: ubuntu-latest
12+
steps:
13+
- name: Pull Docker Image
14+
run: |
15+
LOWERCASE_REPO=$(echo "${{ github.repository_owner }}" | tr '[:upper:]' '[:lower:]')
16+
docker pull ghcr.io/$LOWERCASE_REPO/datamate-${{ inputs.service_name }}:latest
17+
docker tag ghcr.io/$LOWERCASE_REPO/datamate-${{ inputs.service_name }}:latest datamate-${{ inputs.service_name }}:latest
18+
19+
- name: Save Docker Image
20+
run: |
21+
docker save -o datamate-${{ inputs.service_name }}.tar datamate-${{ inputs.service_name }}:latest
22+
23+
- name: Upload Docker Image
24+
uses: actions/upload-artifact@v4
25+
with:
26+
name: datamate-${{ inputs.service_name }}
27+
path: datamate-${{ inputs.service_name }}.tar

.github/workflows/docker-images-reusable.yml

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -47,19 +47,7 @@ jobs:
4747
make build-${{ inputs.service_name }} VERSION=latest
4848
4949
- name: Tag & Push Docker Image
50-
if: github.event_name != 'pull_request' && !startsWith(github.workflow, 'Package')
50+
if: github.event_name != 'pull_request'
5151
run: |
5252
docker tag datamate-${{ inputs.service_name }}:latest ${{ steps.set-tag.outputs.TAGS }}
53-
docker push ${{ steps.set-tag.outputs.TAGS }}
54-
55-
- name: Save Docker Image
56-
if: startsWith(github.workflow, 'Package')
57-
run: |
58-
docker save -o datamate-${{ inputs.service_name }}.tar datamate-${{ inputs.service_name }}:latest
59-
60-
- name: Upload Docker Image
61-
if: startsWith(github.workflow, 'Package')
62-
uses: actions/upload-artifact@v4
63-
with:
64-
name: datamate-${{ inputs.service_name }}
65-
path: datamate-${{ inputs.service_name }}.tar
53+
docker push ${{ steps.set-tag.outputs.TAGS }}

.github/workflows/package.yml

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,23 +6,33 @@ on:
66
jobs:
77
backend-docker-build:
88
name: Build and Push Backend Docker Image
9-
uses: ./.github/workflows/docker-image-backend.yml
9+
uses: ./.github/workflows/docker-image-save.yml
10+
with:
11+
service_name: backend
1012

1113
frontend-docker-build:
1214
name: Build and Push Frontend Docker Image
13-
uses: ./.github/workflows/docker-image-frontend.yml
15+
uses: ./.github/workflows/docker-image-save.yml
16+
with:
17+
service_name: frontend
1418

1519
database-docker-build:
1620
name: Build and Push Database Docker Image
17-
uses: ./.github/workflows/docker-image-database.yml
21+
uses: ./.github/workflows/docker-image-save.yml
22+
with:
23+
service_name: database
1824

1925
runtime-docker-build:
2026
name: Build and Push Runtime Docker Image
21-
uses: ./.github/workflows/docker-image-runtime.yml
27+
uses: ./.github/workflows/docker-image-save.yml
28+
with:
29+
service_name: runtime
2230

2331
backend-python-docker-build:
2432
name: Build and Push Backend Python Docker Image
25-
uses: ./.github/workflows/docker-image-backend-python.yml
33+
uses: ./.github/workflows/docker-image-save.yml
34+
with:
35+
service_name: backend-python
2636

2737
package-all:
2838
needs:
@@ -54,7 +64,7 @@ jobs:
5464
- name: Upload Package
5565
uses: actions/upload-artifact@v4
5666
with:
57-
name: datamate
67+
name: DataMate
5868
include-hidden-files: true
5969
path: |
6070
deployment/

backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/application/CleaningTaskService.java

Lines changed: 58 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,7 @@
1111
import com.datamate.cleaning.domain.repository.OperatorInstanceRepository;
1212

1313
import com.datamate.cleaning.infrastructure.validator.CleanTaskValidator;
14-
import com.datamate.cleaning.interfaces.dto.CleaningProcess;
15-
import com.datamate.cleaning.interfaces.dto.CleaningTaskDto;
16-
import com.datamate.cleaning.interfaces.dto.CreateCleaningTaskRequest;
17-
import com.datamate.cleaning.interfaces.dto.OperatorInstanceDto;
14+
import com.datamate.cleaning.interfaces.dto.*;
1815
import com.datamate.common.infrastructure.exception.BusinessException;
1916
import com.datamate.common.infrastructure.exception.SystemErrorCode;
2017
import com.datamate.datamanagement.application.DatasetApplicationService;
@@ -40,15 +37,19 @@
4037
import java.io.File;
4138
import java.io.FileWriter;
4239
import java.io.IOException;
43-
import java.util.List;
44-
import java.util.Map;
45-
import java.util.UUID;
40+
import java.nio.file.Files;
41+
import java.nio.file.Paths;
42+
import java.util.*;
43+
import java.util.concurrent.atomic.AtomicReference;
44+
import java.util.regex.Matcher;
45+
import java.util.regex.Pattern;
46+
import java.util.stream.Stream;
4647

4748
@Slf4j
4849
@Service
4950
@RequiredArgsConstructor
5051
public class CleaningTaskService {
51-
private final CleaningTaskRepository CleaningTaskRepo;
52+
private final CleaningTaskRepository cleaningTaskRepo;
5253

5354
private final OperatorInstanceRepository operatorInstanceRepo;
5455

@@ -66,19 +67,24 @@ public class CleaningTaskService {
6667

6768
private final String FLOW_PATH = "/flow";
6869

70+
private final Pattern LEVEL_PATTERN = Pattern.compile(
71+
"\\b(TRACE|DEBUG|INFO|WARN|WARNING|ERROR|FATAL)\\b",
72+
Pattern.CASE_INSENSITIVE
73+
);
74+
6975
public List<CleaningTaskDto> getTasks(String status, String keywords, Integer page, Integer size) {
70-
List<CleaningTaskDto> tasks = CleaningTaskRepo.findTasks(status, keywords, page, size);
76+
List<CleaningTaskDto> tasks = cleaningTaskRepo.findTasks(status, keywords, page, size);
7177
tasks.forEach(this::setProcess);
7278
return tasks;
7379
}
7480

7581
private void setProcess(CleaningTaskDto task) {
76-
int count = cleaningResultRepo.countByInstanceId(task.getId());
77-
task.setProgress(CleaningProcess.of(task.getFileCount(), count));
82+
int[] count = cleaningResultRepo.countByInstanceId(task.getId());
83+
task.setProgress(CleaningProcess.of(task.getFileCount(), count[0], count[1]));
7884
}
7985

8086
public int countTasks(String status, String keywords) {
81-
return CleaningTaskRepo.findTasks(status, keywords, null, null).size();
87+
return cleaningTaskRepo.findTasks(status, keywords, null, null).size();
8288
}
8389

8490
@Transactional
@@ -105,7 +111,7 @@ public CleaningTaskDto createTask(CreateCleaningTaskRequest request) {
105111
task.setDestDatasetName(destDataset.getName());
106112
task.setBeforeSize(srcDataset.getSizeBytes());
107113
task.setFileCount(srcDataset.getFileCount().intValue());
108-
CleaningTaskRepo.insertTask(task);
114+
cleaningTaskRepo.insertTask(task);
109115

110116
operatorInstanceRepo.insertInstance(taskId, request.getInstance());
111117

@@ -116,14 +122,50 @@ public CleaningTaskDto createTask(CreateCleaningTaskRequest request) {
116122
}
117123

118124
public CleaningTaskDto getTask(String taskId) {
119-
CleaningTaskDto task = CleaningTaskRepo.findTaskById(taskId);
125+
CleaningTaskDto task = cleaningTaskRepo.findTaskById(taskId);
120126
setProcess(task);
127+
task.setInstance(operatorInstanceRepo.findOperatorByInstanceId(taskId));
121128
return task;
122129
}
123130

131+
public List<CleaningResultDto> getTaskResults(String taskId) {
132+
return cleaningResultRepo.findByInstanceId(taskId);
133+
}
134+
135+
public List<CleaningTaskLog> getTaskLog(String taskId) {
136+
String logPath = FLOW_PATH + "/" + taskId + "/output.log";
137+
try (Stream<String> lines = Files.lines(Paths.get(logPath))) {
138+
List<CleaningTaskLog> logs = new ArrayList<>();
139+
AtomicReference<String> lastLevel = new AtomicReference<>("INFO");
140+
lines.forEach(line -> {
141+
lastLevel.set(getLogLevel(line, lastLevel.get()));
142+
CleaningTaskLog log = new CleaningTaskLog();
143+
log.setLevel(lastLevel.get());
144+
log.setMessage(line);
145+
logs.add(log);
146+
});
147+
return logs;
148+
} catch (IOException e) {
149+
log.error("Fail to read log file {}", logPath, e);
150+
return Collections.emptyList();
151+
}
152+
}
153+
154+
private String getLogLevel(String logLine, String defaultLevel) {
155+
if (logLine == null || logLine.trim().isEmpty()) {
156+
return defaultLevel;
157+
}
158+
159+
Matcher matcher = LEVEL_PATTERN.matcher(logLine);
160+
if (matcher.find()) {
161+
return matcher.group(1).toUpperCase();
162+
}
163+
return defaultLevel;
164+
}
165+
124166
@Transactional
125167
public void deleteTask(String taskId) {
126-
CleaningTaskRepo.deleteTaskById(taskId);
168+
cleaningTaskRepo.deleteTaskById(taskId);
127169
operatorInstanceRepo.deleteByInstanceId(taskId);
128170
cleaningResultRepo.deleteByInstanceId(taskId);
129171
}
@@ -190,7 +232,7 @@ private void writeListMapToJsonlFile(List<Map<String, Object>> mapList, String f
190232

191233
try (BufferedWriter writer = new BufferedWriter(new FileWriter(fileName))) {
192234
if (!mapList.isEmpty()) { // 检查列表是否为空,避免异常
193-
String jsonString = objectMapper.writeValueAsString(mapList.get(0));
235+
String jsonString = objectMapper.writeValueAsString(mapList.getFirst());
194236
writer.write(jsonString);
195237

196238
for (int i = 1; i < mapList.size(); i++) {

backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/application/CleaningTemplateService.java

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import com.datamate.cleaning.domain.repository.OperatorInstanceRepository;
66
import com.datamate.cleaning.interfaces.dto.*;
77
import com.datamate.cleaning.domain.model.entity.TemplateWithInstance;
8-
import com.datamate.operator.domain.repository.OperatorRepository;
8+
import com.datamate.operator.domain.repository.OperatorViewRepository;
99
import com.datamate.operator.interfaces.dto.OperatorDto;
1010
import lombok.RequiredArgsConstructor;
1111
import org.apache.commons.lang3.StringUtils;
@@ -26,10 +26,11 @@ public class CleaningTemplateService {
2626

2727
private final OperatorInstanceRepository operatorInstanceRepo;
2828

29-
private final OperatorRepository operatorRepo;
29+
private final OperatorViewRepository operatorViewRepo;
3030

3131
public List<CleaningTemplateDto> getTemplates(String keywords) {
32-
List<OperatorDto> allOperators = operatorRepo.findAllOperators();
32+
List<OperatorDto> allOperators =
33+
operatorViewRepo.findOperatorsByCriteria(null, null, null, null, null);
3334
Map<String, OperatorDto> operatorsMap = allOperators.stream()
3435
.collect(Collectors.toMap(OperatorDto::getId, Function.identity()));
3536
List<TemplateWithInstance> allTemplates = cleaningTemplateRepo.findAllTemplates(keywords);
@@ -39,8 +40,8 @@ public List<CleaningTemplateDto> getTemplates(String keywords) {
3940
List<TemplateWithInstance> value = twi.getValue();
4041
CleaningTemplateDto template = new CleaningTemplateDto();
4142
template.setId(twi.getKey());
42-
template.setName(value.get(0).getName());
43-
template.setDescription(value.get(0).getDescription());
43+
template.setName(value.getFirst().getName());
44+
template.setDescription(value.getFirst().getDescription());
4445
template.setInstance(value.stream().filter(v -> StringUtils.isNotBlank(v.getOperatorId()))
4546
.sorted(Comparator.comparingInt(TemplateWithInstance::getOpIndex))
4647
.map(v -> {
@@ -50,8 +51,8 @@ public List<CleaningTemplateDto> getTemplates(String keywords) {
5051
}
5152
return operator;
5253
}).toList());
53-
template.setCreatedAt(value.get(0).getCreatedAt());
54-
template.setUpdatedAt(value.get(0).getUpdatedAt());
54+
template.setCreatedAt(value.getFirst().getCreatedAt());
55+
template.setUpdatedAt(value.getFirst().getUpdatedAt());
5556
return template;
5657
}).toList();
5758
}
@@ -70,17 +71,22 @@ public CleaningTemplateDto createTemplate(CreateCleaningTemplateRequest request)
7071
}
7172

7273
public CleaningTemplateDto getTemplate(String templateId) {
73-
return cleaningTemplateRepo.findTemplateById(templateId);
74+
CleaningTemplateDto template = cleaningTemplateRepo.findTemplateById(templateId);
75+
template.setInstance(operatorInstanceRepo.findOperatorByInstanceId(templateId));
76+
return template;
7477
}
7578

7679
@Transactional
7780
public CleaningTemplateDto updateTemplate(String templateId, UpdateCleaningTemplateRequest request) {
7881
CleaningTemplateDto template = cleaningTemplateRepo.findTemplateById(templateId);
79-
if (template != null) {
80-
template.setName(request.getName());
81-
template.setDescription(request.getDescription());
82-
cleaningTemplateRepo.updateTemplate(template);
82+
if (template == null) {
83+
return null;
8384
}
85+
template.setName(request.getName());
86+
template.setDescription(request.getDescription());
87+
cleaningTemplateRepo.updateTemplate(template);
88+
operatorInstanceRepo.deleteByInstanceId(templateId);
89+
operatorInstanceRepo.insertInstance(templateId, request.getInstance());
8490
return template;
8591
}
8692

backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/application/scheduler/CleaningTaskScheduler.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
public class CleaningTaskScheduler {
1717
private final CleaningTaskRepository cleaningTaskRepo;
1818

19+
private final RuntimeClient runtimeClient;
20+
1921
private final ExecutorService taskExecutor = Executors.newFixedThreadPool(5);
2022

2123
public void executeTask(String taskId) {
@@ -28,11 +30,11 @@ private void submitTask(String taskId) {
2830
task.setStatus(CleaningTaskStatusEnum.RUNNING);
2931
task.setStartedAt(LocalDateTime.now());
3032
cleaningTaskRepo.updateTask(task);
31-
RuntimeClient.submitTask(taskId);
33+
runtimeClient.submitTask(taskId);
3234
}
3335

3436
public void stopTask(String taskId) {
35-
RuntimeClient.stopTask(taskId);
37+
runtimeClient.stopTask(taskId);
3638
CleaningTaskDto task = new CleaningTaskDto();
3739
task.setId(taskId);
3840
task.setStatus(CleaningTaskStatusEnum.STOPPED);

backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/domain/model/entity/Operator.java

Lines changed: 0 additions & 36 deletions
This file was deleted.

backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/domain/repository/CleaningResultRepository.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,14 @@
33

44
import com.baomidou.mybatisplus.extension.repository.IRepository;
55
import com.datamate.cleaning.domain.model.entity.CleaningResult;
6+
import com.datamate.cleaning.interfaces.dto.CleaningResultDto;
7+
8+
import java.util.List;
69

710
public interface CleaningResultRepository extends IRepository<CleaningResult> {
811
void deleteByInstanceId(String instanceId);
912

10-
int countByInstanceId(String instanceId);
13+
int[] countByInstanceId(String instanceId);
14+
15+
List<CleaningResultDto> findByInstanceId(String instanceId);
1116
}

backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/domain/repository/OperatorInstanceRepository.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,14 @@
33
import com.baomidou.mybatisplus.extension.repository.IRepository;
44
import com.datamate.cleaning.interfaces.dto.OperatorInstanceDto;
55
import com.datamate.cleaning.domain.model.entity.OperatorInstance;
6+
import com.datamate.operator.interfaces.dto.OperatorDto;
67

78
import java.util.List;
89

910
public interface OperatorInstanceRepository extends IRepository<OperatorInstance> {
1011
void insertInstance(String instanceId, List<OperatorInstanceDto> instances);
1112

1213
void deleteByInstanceId(String instanceId);
14+
15+
List<OperatorDto> findOperatorByInstanceId(String instanceId);
1316
}

0 commit comments

Comments
 (0)