Skip to content

Commit 07029d0

Browse files
authored
优化清洗重试机制,优化清洗进度展示,修复模板无法展示参数 (#113)
* bugfix: 模板无法展示参数 * bugfix: 优化清洗进度展示 * bugfix: 优化清洗重试机制
1 parent f1bffdc commit 07029d0

File tree

11 files changed

+93
-21
lines changed

11 files changed

+93
-21
lines changed

backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/application/CleaningTaskService.java

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import java.util.concurrent.atomic.AtomicReference;
4242
import java.util.regex.Matcher;
4343
import java.util.regex.Pattern;
44+
import java.util.stream.Collectors;
4445
import java.util.stream.Stream;
4546

4647
@Slf4j
@@ -116,7 +117,7 @@ public CleaningTaskDto createTask(CreateCleaningTaskRequest request) {
116117

117118
prepareTask(task, request.getInstance());
118119
scanDataset(taskId, request.getSrcDatasetId());
119-
executeTask(taskId);
120+
taskScheduler.executeTask(taskId);
120121
return task;
121122
}
122123

@@ -170,6 +171,11 @@ public void deleteTask(String taskId) {
170171
}
171172

172173
public void executeTask(String taskId) {
174+
List<CleaningResultDto> failed = cleaningResultRepo.findByInstanceId(taskId, "FAILED");
175+
Set<String> failedSet = failed.stream().map(CleaningResultDto::getSrcFileId).collect(Collectors.toSet());
176+
CleaningTaskDto task = cleaningTaskRepo.findTaskById(taskId);
177+
scanDataset(taskId, task.getSrcDatasetId(), failedSet);
178+
cleaningResultRepo.deleteByInstanceId(taskId, "FAILED");
173179
taskScheduler.executeTask(taskId);
174180
}
175181

@@ -226,6 +232,29 @@ private void scanDataset(String taskId, String srcDatasetId) {
226232
} while (pageNumber < datasetFiles.getTotalPages());
227233
}
228234

235+
private void scanDataset(String taskId, String srcDatasetId, Set<String> failedFiles) {
236+
int pageNumber = 0;
237+
int pageSize = 500;
238+
PagingQuery pageRequest = new PagingQuery(pageNumber, pageSize);
239+
PagedResponse<DatasetFile> datasetFiles;
240+
do {
241+
datasetFiles = datasetFileService.getDatasetFiles(srcDatasetId, null, null,null, pageRequest);
242+
if (datasetFiles.getContent().isEmpty()) {
243+
break;
244+
}
245+
List<Map<String, Object>> files = datasetFiles.getContent().stream()
246+
.filter(content -> failedFiles.contains(content.getId()))
247+
.map(content -> Map.of("fileName", (Object) content.getFileName(),
248+
"fileSize", content.getFileSize(),
249+
"filePath", content.getFilePath(),
250+
"fileType", content.getFileType(),
251+
"fileId", content.getId()))
252+
.toList();
253+
writeListMapToJsonlFile(files, FLOW_PATH + "/" + taskId + "/dataset.jsonl");
254+
pageNumber += 1;
255+
} while (pageNumber < datasetFiles.getTotalPages());
256+
}
257+
229258
private void writeListMapToJsonlFile(List<Map<String, Object>> mapList, String fileName) {
230259
ObjectMapper objectMapper = new ObjectMapper();
231260

backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/application/CleaningTemplateService.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,13 @@
66
import com.datamate.cleaning.infrastructure.validator.CleanTaskValidator;
77
import com.datamate.cleaning.interfaces.dto.*;
88
import com.datamate.cleaning.domain.model.entity.TemplateWithInstance;
9+
import com.datamate.common.infrastructure.exception.BusinessException;
10+
import com.datamate.operator.application.OperatorService;
911
import com.datamate.operator.domain.repository.OperatorViewRepository;
12+
import com.datamate.operator.infrastructure.exception.OperatorErrorCode;
1013
import com.datamate.operator.interfaces.dto.OperatorDto;
14+
import com.fasterxml.jackson.core.JsonProcessingException;
15+
import com.fasterxml.jackson.databind.ObjectMapper;
1116
import lombok.RequiredArgsConstructor;
1217
import org.apache.commons.lang3.StringUtils;
1318
import org.springframework.stereotype.Service;
@@ -31,6 +36,10 @@ public class CleaningTemplateService {
3136

3237
private final CleanTaskValidator cleanTaskValidator;
3338

39+
private final OperatorService operatorService;
40+
41+
private final ObjectMapper objectMapper = new ObjectMapper();
42+
3443
public List<CleaningTemplateDto> getTemplates(String keywords) {
3544
List<OperatorDto> allOperators =
3645
operatorViewRepo.findOperatorsByCriteria(null, null, null, null, null);
@@ -50,7 +59,12 @@ public List<CleaningTemplateDto> getTemplates(String keywords) {
5059
.map(v -> {
5160
OperatorDto operator = operatorsMap.get(v.getOperatorId());
5261
if (StringUtils.isNotBlank(v.getSettingsOverride())) {
53-
operator.setSettings(v.getSettingsOverride());
62+
try {
63+
operator.setOverrides(objectMapper.readValue(v.getSettingsOverride(), Map.class));
64+
} catch (JsonProcessingException e) {
65+
throw BusinessException.of(OperatorErrorCode.SETTINGS_PARSE_FAILED, e.getMessage());
66+
}
67+
operatorService.overrideSettings(operator);
5468
}
5569
return operator;
5670
}).toList());

backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/domain/repository/CleaningResultRepository.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,11 @@
1010
public interface CleaningResultRepository extends IRepository<CleaningResult> {
1111
void deleteByInstanceId(String instanceId);
1212

13+
void deleteByInstanceId(String instanceId, String status);
14+
1315
int[] countByInstanceId(String instanceId);
1416

1517
List<CleaningResultDto> findByInstanceId(String instanceId);
18+
19+
List<CleaningResultDto> findByInstanceId(String instanceId, String status);
1620
}

backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/infrastructure/persistence/Impl/CleaningResultRepositoryImpl.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,14 @@ public class CleaningResultRepositoryImpl extends CrudRepository<CleaningResultM
2222

2323
@Override
2424
public void deleteByInstanceId(String instanceId) {
25+
deleteByInstanceId(instanceId, null);
26+
}
27+
28+
@Override
29+
public void deleteByInstanceId(String instanceId, String status) {
2530
LambdaQueryWrapper<CleaningResult> queryWrapper = new LambdaQueryWrapper<>();
26-
queryWrapper.eq(CleaningResult::getInstanceId, instanceId);
31+
queryWrapper.eq(CleaningResult::getInstanceId, instanceId)
32+
.eq(StringUtils.isNotBlank(status), CleaningResult::getStatus, status);
2733
mapper.delete(queryWrapper);
2834
}
2935

@@ -40,8 +46,13 @@ public int[] countByInstanceId(String instanceId) {
4046
}
4147

4248
public List<CleaningResultDto> findByInstanceId(String instanceId) {
49+
return findByInstanceId(instanceId, null);
50+
}
51+
52+
public List<CleaningResultDto> findByInstanceId(String instanceId, String status) {
4353
LambdaQueryWrapper<CleaningResult> queryWrapper = new LambdaQueryWrapper<>();
44-
queryWrapper.eq(CleaningResult::getInstanceId, instanceId);
54+
queryWrapper.eq(CleaningResult::getInstanceId, instanceId)
55+
.eq(StringUtils.isNotBlank(status), CleaningResult::getStatus, status);
4556
return CleaningResultConverter.INSTANCE.convertEntityToDto(mapper.selectList(queryWrapper));
4657
}
4758
}

backend/services/operator-market-service/src/main/java/com/datamate/operator/application/OperatorService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ private String getExtractPath(String fileName) {
128128
return operatorBasePath + File.separator + "extract" + File.separator + fileName;
129129
}
130130

131-
private void overrideSettings(OperatorDto operatorDto) {
131+
public void overrideSettings(OperatorDto operatorDto) {
132132
if (StringUtils.isBlank(operatorDto.getSettings()) || MapUtils.isEmpty(operatorDto.getOverrides())) {
133133
return;
134134
}

frontend/src/pages/DataCleansing/Detail/components/BasicInfo.tsx

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,10 @@ export default function BasicInfo({ task }: { task: CleansingTask }) {
110110
{/* 处理进度 */}
111111
<div>
112112
<h3 className="text-lg font-semibold text-gray-900 mb-4">处理进度</h3>
113-
<Progress percent={task?.progress?.process} showInfo />
113+
{ task?.status?.value === TaskStatus.FAILED ?
114+
<Progress percent={task?.progress?.process} size="small" status="exception" />
115+
: <Progress percent={task?.progress?.process} size="small"/>
116+
}
114117
<div className="grid grid-cols-2 gap-4 text-sm mt-4">
115118
<div className="flex items-center gap-2">
116119
<span className="w-3 h-3 bg-green-500 rounded-full inline-block" />

frontend/src/pages/DataCleansing/Detail/components/FileTable.tsx

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -250,14 +250,22 @@ export default function FileTable({result, fetchTaskResult}) {
250250
key: "action",
251251
render: (_text: string, record: any) => (
252252
<div className="flex">
253-
{record.status === "COMPLETED" && (
253+
{record.status === "COMPLETED" ? (
254254
<Button
255255
type="link"
256256
size="small"
257257
onClick={() => handleViewFileCompare(record)}
258258
>
259259
对比
260260
</Button>
261+
) : (
262+
<Button
263+
type="link"
264+
size="small"
265+
disabled
266+
>
267+
对比
268+
</Button>
261269
)}
262270
<Popover content="暂未开放">
263271
<Button type="link" size="small" disabled>下载</Button>

frontend/src/pages/DataCleansing/Home/components/TaskList.tsx

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -177,10 +177,13 @@ export default function TaskList() {
177177
title: "进度",
178178
dataIndex: "process",
179179
key: "process",
180-
width: 200,
181-
render: (progress: number) => (
182-
<Progress percent={progress} size="small" />
183-
),
180+
width: 150,
181+
render: (_, record: CleansingTask) => {
182+
if (record?.status?.value == TaskStatus.FAILED) {
183+
return <Progress percent={record?.progress?.process} size="small" status="exception" />;
184+
}
185+
return <Progress percent={record?.progress?.process} size="small"/>;
186+
},
184187
},
185188
{
186189
title: "已处理文件数",

frontend/src/pages/DataCleansing/Home/components/TemplateList.tsx

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -45,13 +45,6 @@ export default function TemplateList() {
4545
};
4646

4747
const templateColumns = [
48-
{
49-
title: "模板ID",
50-
dataIndex: "id",
51-
key: "id",
52-
fixed: "left",
53-
width: 100,
54-
},
5548
{
5649
title: "模板名称",
5750
dataIndex: "name",
@@ -71,6 +64,13 @@ export default function TemplateList() {
7164
</Button>
7265
);
7366
}},
67+
{
68+
title: "模板ID",
69+
dataIndex: "id",
70+
key: "id",
71+
fixed: "left",
72+
width: 150,
73+
},
7474
{
7575
title: "算子数量",
7676
dataIndex: "num",

runtime/ops/filter/img_duplicated_images_cleaner/process.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,10 +91,10 @@ def execute_sql(self, md5: str, file_name: str,
9191
with self.conn as connection:
9292
connection.execute(text(create_tables_sql))
9393
# 判断是否有重复文件
94-
result = connection.execute(text(query_sql, query_sql_params)).fetchall()
94+
result = connection.execute(text(query_sql), query_sql_params).fetchall()
9595
# 查询记录为空,无重复图片, 插入新文件特征
9696
if not result:
97-
connection.execute(text(insert_sql, insert_sql_params))
97+
connection.execute(text(insert_sql), insert_sql_params)
9898
return img_bytes
9999
logger.info(f"taskId: {self.task_uuid} fileName: {file_name}, method: Duplicate ImagesCleaner. "
100100
f"The image is duplicated and filtered ")

0 commit comments

Comments
 (0)