Skip to content

Commit a345d49

Browse files
committed
数据清洗接口
1 parent 3a544e5 commit a345d49

File tree

12 files changed

+317
-69
lines changed

12 files changed

+317
-69
lines changed

backend/openapi/specs/data-cleaning.yaml

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,6 @@ paths:
142142
example: ''
143143
schema:
144144
type: string
145-
format: uuid
146145
responses:
147146
'200':
148147
description: 成功获取任务详情
@@ -183,7 +182,6 @@ paths:
183182
example: ''
184183
schema:
185184
type: string
186-
format: uuid
187185
responses:
188186
'204':
189187
description: 任务删除成功
@@ -288,7 +286,6 @@ paths:
288286
example: ''
289287
schema:
290288
type: string
291-
format: uuid
292289
responses:
293290
'200':
294291
description: 成功获取模板详情
@@ -329,7 +326,6 @@ paths:
329326
example: ''
330327
schema:
331328
type: string
332-
format: uuid
333329
requestBody:
334330
content:
335331
application/json:
@@ -384,7 +380,6 @@ paths:
384380
example: ''
385381
schema:
386382
type: string
387-
format: uuid
388383
responses:
389384
'204':
390385
description: 模板删除成功
@@ -513,7 +508,7 @@ components:
513508
instance:
514509
type: array
515510
items: &ref_3
516-
$ref: '#/components/schemas/OperatorResponse'
511+
$ref: '#/components/schemas/OperatorInstance'
517512
description: 模板定义的清洗规则和配置
518513
CreateCleaningTemplateRequest:
519514
type: object
@@ -527,8 +522,10 @@ components:
527522
description:
528523
type: string
529524
description: 模板描述
530-
instance: &ref_4
531-
$ref: '#/components/schemas/OperatorInstance'
525+
instance:
526+
type: array
527+
items: *ref_3
528+
description: 任务的具体配置(如果非模板创建,则直接定义)'
532529
CleaningTemplate:
533530
type: object
534531
required:
@@ -539,7 +536,6 @@ components:
539536
properties:
540537
id:
541538
type: string
542-
format: uuid
543539
description: 模板唯一标识符
544540
name:
545541
type: string
@@ -549,7 +545,8 @@ components:
549545
description: 模板描述
550546
instance:
551547
type: array
552-
items: *ref_3
548+
items: &ref_4
549+
$ref: '#/components/schemas/OperatorResponse'
553550
description: 模板定义的清洗规则和配置
554551
createdAt:
555552
type: string
@@ -574,7 +571,10 @@ components:
574571
description: 任务描述
575572
sourceType:
576573
type: string
577-
instance: *ref_4
574+
instance:
575+
type: array
576+
items: *ref_3
577+
description: 任务的具体配置(如果非模板创建,则直接定义)
578578
ErrorResponse:
579579
type: object
580580
properties:
@@ -595,7 +595,6 @@ components:
595595
properties:
596596
id:
597597
type: string
598-
format: uuid
599598
description: 任务唯一标识符
600599
name:
601600
type: string
@@ -613,11 +612,10 @@ components:
613612
- failed
614613
templateId:
615614
type: string
616-
format: uuid
617615
description: 关联的模板ID(如果基于模板创建)
618616
instance:
619617
type: array
620-
items: *ref_3
618+
items: *ref_4
621619
description: 任务的具体配置(如果非模板创建,则直接定义)
622620
progress:
623621
$ref: '#/components/schemas/CleaningProcess'
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package com.dataengine.cleaning.application.scheduler;
2+
3+
public class TaskScheduler {
4+
public void a() {
5+
6+
7+
}
8+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
package com.dataengine.cleaning.application.service;
2+
3+
4+
import com.dataengine.cleaning.infrastructure.persistence.mapper.CleaningTaskMapper;
5+
import com.dataengine.cleaning.interfaces.dto.CleaningTask;
6+
import com.dataengine.cleaning.interfaces.dto.CreateCleaningTaskRequest;
7+
import org.springframework.beans.factory.annotation.Autowired;
8+
import org.springframework.stereotype.Service;
9+
import org.springframework.transaction.annotation.Transactional;
10+
11+
import java.util.List;
12+
import java.util.UUID;
13+
import java.util.concurrent.ExecutorService;
14+
import java.util.concurrent.Executors;
15+
16+
@Service
17+
public class CleaningTaskService {
18+
19+
@Autowired
20+
private CleaningTaskMapper cleaningTaskMapper;
21+
22+
private final ExecutorService taskExecutor = Executors.newFixedThreadPool(5);
23+
24+
public List<CleaningTask> getTasks(String status) {
25+
return cleaningTaskMapper.findTasksByStatus(status);
26+
}
27+
28+
@Transactional
29+
public CleaningTask createTask(CreateCleaningTaskRequest request) {
30+
CleaningTask task = new CleaningTask();
31+
task.setName(request.getName());
32+
task.setDescription(request.getDescription());
33+
task.setStatus(CleaningTask.StatusEnum.PENDING);
34+
task.setId(UUID.randomUUID().toString());
35+
cleaningTaskMapper.insertTask(task);
36+
37+
taskExecutor.submit(() -> executeTask(task));
38+
return task;
39+
}
40+
41+
public CleaningTask getTask(String taskId) {
42+
return cleaningTaskMapper.findTaskById(taskId);
43+
}
44+
45+
@Transactional
46+
public void deleteTask(String taskId) {
47+
cleaningTaskMapper.deleteTask(taskId);
48+
}
49+
50+
private void executeTask(CleaningTask task) {
51+
task.setStatus(CleaningTask.StatusEnum.RUNNING);
52+
cleaningTaskMapper.updateTaskStatus(task);
53+
String raySubmitId = submitTaskToRay(task);
54+
trackTaskStatus(raySubmitId);
55+
task.setStatus(CleaningTask.StatusEnum.COMPLETED);
56+
cleaningTaskMapper.updateTaskStatus(task);
57+
}
58+
59+
private String submitTaskToRay(CleaningTask task) {
60+
return "";
61+
}
62+
63+
private void trackTaskStatus (String submitId) {
64+
65+
}
66+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package com.dataengine.cleaning.application.service;
2+
3+
4+
import com.dataengine.cleaning.infrastructure.persistence.mapper.CleaningTemplateMapper;
5+
import com.dataengine.cleaning.interfaces.dto.CleaningTemplate;
6+
import com.dataengine.cleaning.interfaces.dto.CreateCleaningTemplateRequest;
7+
import com.dataengine.cleaning.interfaces.dto.UpdateCleaningTemplateRequest;
8+
import org.springframework.beans.factory.annotation.Autowired;
9+
import org.springframework.stereotype.Service;
10+
import org.springframework.transaction.annotation.Transactional;
11+
import java.util.List;
12+
13+
@Service
14+
public class CleaningTemplateService {
15+
16+
@Autowired
17+
private CleaningTemplateMapper cleaningTemplateMapper;
18+
19+
public List<CleaningTemplate> getTemplates() {
20+
return cleaningTemplateMapper.findAllTemplates();
21+
}
22+
23+
@Transactional
24+
public CleaningTemplate createTemplate(CreateCleaningTemplateRequest request) {
25+
CleaningTemplate template = new CleaningTemplate();
26+
template.setName(request.getName());
27+
template.setDescription(request.getDescription());
28+
cleaningTemplateMapper.insertTemplate(template);
29+
return template;
30+
}
31+
32+
public CleaningTemplate getTemplate(String templateId) {
33+
return cleaningTemplateMapper.findTemplateById(templateId);
34+
}
35+
36+
@Transactional
37+
public CleaningTemplate updateTemplate(String templateId, UpdateCleaningTemplateRequest request) {
38+
CleaningTemplate template = cleaningTemplateMapper.findTemplateById(templateId);
39+
if (template != null) {
40+
template.setName(request.getName());
41+
template.setDescription(request.getDescription());
42+
cleaningTemplateMapper.updateTemplate(template);
43+
}
44+
return template;
45+
}
46+
47+
@Transactional
48+
public void deleteTemplate(String templateId) {
49+
cleaningTemplateMapper.deleteTemplate(templateId);
50+
}
51+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package com.dataengine.cleaning.infrastructure.persistence.mapper;
2+
3+
import com.dataengine.cleaning.interfaces.dto.CleaningTask;
4+
import org.apache.ibatis.annotations.Mapper;
5+
import org.apache.ibatis.annotations.Param;
6+
7+
import java.util.List;
8+
9+
@Mapper
10+
public interface CleaningTaskMapper {
11+
List<CleaningTask> findTasksByStatus(@Param("status") String status);
12+
13+
CleaningTask findTaskById(@Param("taskId") String taskId);
14+
15+
void insertTask(CleaningTask task);
16+
17+
void updateTaskStatus(CleaningTask task);
18+
19+
void deleteTask(@Param("taskId") String taskId);
20+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package com.dataengine.cleaning.infrastructure.persistence.mapper;
2+
3+
import com.dataengine.cleaning.interfaces.dto.CleaningTemplate;
4+
import org.apache.ibatis.annotations.Mapper;
5+
import org.apache.ibatis.annotations.Param;
6+
7+
import java.util.List;
8+
9+
@Mapper
10+
public interface CleaningTemplateMapper {
11+
12+
List<CleaningTemplate> findAllTemplates();
13+
14+
CleaningTemplate findTemplateById(@Param("templateId") String templateId);
15+
16+
void insertTemplate(CleaningTemplate template);
17+
18+
void updateTemplate(CleaningTemplate template);
19+
20+
void deleteTemplate(@Param("templateId") String templateId);
21+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package com.dataengine.cleaning.interfaces.api;
2+
3+
import com.dataengine.cleaning.interfaces.dto.CleaningTask;
4+
import com.dataengine.cleaning.interfaces.dto.CreateCleaningTaskRequest;
5+
import com.dataengine.cleaning.application.service.CleaningTaskService;
6+
7+
import org.springframework.beans.factory.annotation.Autowired;
8+
import org.springframework.http.ResponseEntity;
9+
import org.springframework.web.bind.annotation.*;
10+
11+
import java.util.List;
12+
13+
@RestController
14+
public class CleaningTaskController implements CleaningTaskApi {
15+
16+
@Autowired
17+
private CleaningTaskService cleaningTaskService;
18+
19+
@Override
20+
public ResponseEntity<List<CleaningTask>> cleaningTasksGet(String status) {
21+
return ResponseEntity.ok(cleaningTaskService.getTasks(status));
22+
}
23+
24+
@Override
25+
public ResponseEntity<CleaningTask> cleaningTasksPost(CreateCleaningTaskRequest request) {
26+
return ResponseEntity.ok(cleaningTaskService.createTask(request));
27+
}
28+
29+
@Override
30+
public ResponseEntity<CleaningTask> cleaningTasksTaskIdGet(String taskId) {
31+
return ResponseEntity.ok(cleaningTaskService.getTask(taskId));
32+
}
33+
34+
@Override
35+
public ResponseEntity<Void> cleaningTasksTaskIdDelete(String taskId) {
36+
cleaningTaskService.deleteTask(taskId);
37+
return ResponseEntity.noContent().build();
38+
}
39+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package com.dataengine.cleaning.interfaces.api;
2+
3+
import com.dataengine.cleaning.application.service.CleaningTemplateService;
4+
5+
import com.dataengine.cleaning.interfaces.dto.CleaningTemplate;
6+
import com.dataengine.cleaning.interfaces.dto.CreateCleaningTemplateRequest;
7+
import com.dataengine.cleaning.interfaces.dto.UpdateCleaningTemplateRequest;
8+
import org.springframework.beans.factory.annotation.Autowired;
9+
import org.springframework.http.ResponseEntity;
10+
import org.springframework.web.bind.annotation.*;
11+
12+
import java.util.List;
13+
14+
@RestController
15+
public class CleaningTemplateController implements CleaningTemplateApi {
16+
17+
@Autowired
18+
private CleaningTemplateService cleaningTemplateService;
19+
20+
@Override
21+
public ResponseEntity<List<CleaningTemplate>> cleaningTemplatesGet() {
22+
return ResponseEntity.ok(cleaningTemplateService.getTemplates());
23+
}
24+
25+
@Override
26+
public ResponseEntity<CleaningTemplate> cleaningTemplatesPost(CreateCleaningTemplateRequest request) {
27+
return ResponseEntity.ok(cleaningTemplateService.createTemplate(request));
28+
}
29+
30+
@Override
31+
public ResponseEntity<CleaningTemplate> cleaningTemplatesTemplateIdGet(String templateId) {
32+
return ResponseEntity.ok(cleaningTemplateService.getTemplate(templateId));
33+
}
34+
35+
@Override
36+
public ResponseEntity<CleaningTemplate> cleaningTemplatesTemplateIdPut(String templateId,
37+
UpdateCleaningTemplateRequest request) {
38+
return ResponseEntity.ok(cleaningTemplateService.updateTemplate(templateId, request));
39+
}
40+
41+
@Override
42+
public ResponseEntity<Void> cleaningTemplatesTemplateIdDelete(String templateId) {
43+
cleaningTemplateService.deleteTemplate(templateId);
44+
return ResponseEntity.noContent().build();
45+
}
46+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
<?xml version="1.0" encoding="UTF-8" ?>
2+
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
3+
<mapper namespace="com.dataengine.cleaning.infrastructure.persistence.mapper.CleaningTaskMapper">
4+
5+
<select id="findTasksByStatus" resultType="com.dataengine.cleaning.interfaces.dto.CleaningTask">
6+
SELECT * FROM t_clean_task WHERE status = #{status}
7+
</select>
8+
9+
<select id="findTaskById" resultType="com.dataengine.cleaning.interfaces.dto.CleaningTask">
10+
SELECT * FROM t_clean_task WHERE id = #{taskId}
11+
</select>
12+
13+
<insert id="insertTask">
14+
INSERT INTO t_clean_task (id, name, description, status, created_at)
15+
VALUES (#{id}, #{name}, #{description}, #{status}, NOW())
16+
</insert>
17+
18+
<update id="updateTaskStatus">
19+
UPDATE t_clean_task SET status = #{status} WHERE id = #{id}
20+
</update>
21+
22+
<delete id="deleteTask">
23+
DELETE FROM t_clean_task WHERE id = #{taskId}
24+
</delete>
25+
26+
</mapper>

0 commit comments

Comments
 (0)