Skip to content

Commit 46dfb38

Browse files
authored
feature:增加定时清除超出保留期限数据集的功能;增加数据归集任务绑定数据集的接口 (#24)
* refactor: 修改调整数据归集实现,删除无用代码,优化代码结构 * feature: 每天凌晨00:00扫描所有数据集,检查数据集是否超过了预设的保留天数,超出保留天数的数据集调用删除接口进行删除 * fix: 修改删除数据集文件的逻辑,上传到数据集中的文件会同时删除数据库中的记录和文件系统中的文件,归集过来的文件仅删除数据库中的记录 * fix: 增加参数校验和接口定义,删除不使用的接口 * fix: 数据集统计数据默认为0 * feature: 数据集状态增加流转,创建时为草稿状态,上传文件或者归集文件后修改为活动状态 * refactor: 修改分页查询归集任务的代码 * fix: 更新后重新执行;归集任务执行增加事务控制 * feature: 创建归集任务时能够同步创建数据集,更新归集任务时能更新到指定数据集
1 parent 871ba57 commit 46dfb38

File tree

21 files changed

+375
-212
lines changed

21 files changed

+375
-212
lines changed

backend/openapi/specs/data-management.yaml

Lines changed: 125 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ paths:
3939
schema:
4040
type: integer
4141
default: 0
42-
description: 页码,从0开始
42+
description: 页码,从1开始
4343
- name: size
4444
in: query
4545
schema:
@@ -65,7 +65,7 @@ paths:
6565
in: query
6666
schema:
6767
type: string
68-
enum: [ACTIVE, INACTIVE, PROCESSING]
68+
enum: [DRAFT, ACTIVE, PROCESSING, ARCHIVED, PUBLISHED, DEPRECATED]
6969
description: 数据集状态过滤
7070
responses:
7171
'200':
@@ -231,40 +231,6 @@ paths:
231231
schema:
232232
$ref: '#/components/schemas/PagedDatasetFileResponse'
233233

234-
post:
235-
tags: [DatasetFile]
236-
summary: 上传文件到数据集
237-
operationId: uploadDatasetFile
238-
description: 向指定数据集上传文件
239-
parameters:
240-
- name: datasetId
241-
in: path
242-
required: true
243-
schema:
244-
type: string
245-
description: 数据集ID
246-
requestBody:
247-
required: true
248-
content:
249-
multipart/form-data:
250-
schema:
251-
type: object
252-
properties:
253-
file:
254-
type: string
255-
format: binary
256-
description: 要上传的文件
257-
description:
258-
type: string
259-
description: 文件描述
260-
responses:
261-
'201':
262-
description: 上传成功
263-
content:
264-
application/json:
265-
schema:
266-
$ref: '#/components/schemas/DatasetFileResponse'
267-
268234
/data-management/datasets/{datasetId}/files/{fileId}:
269235
get:
270236
tags: [DatasetFile]
@@ -342,6 +308,78 @@ paths:
342308
type: string
343309
format: binary
344310

311+
/data-management/datasets/{datasetId}/files/download:
312+
get:
313+
tags: [ DatasetFile ]
314+
operationId: downloadDatasetFileAsZip
315+
summary: 下载文件
316+
description: 下载数据集中全部文件
317+
parameters:
318+
- name: datasetId
319+
in: path
320+
required: true
321+
schema:
322+
type: string
323+
description: 数据集ID
324+
responses:
325+
'200':
326+
description: 文件内容
327+
content:
328+
application/octet-stream:
329+
schema:
330+
type: string
331+
format: binary
332+
333+
/data-management/datasets/{datasetId}/files/upload/pre-upload:
334+
post:
335+
tags: [ DatasetFile ]
336+
operationId: preUpload
337+
summary: 切片上传预上传
338+
description: 预上传接口,返回后续分片上传所需的请求ID
339+
parameters:
340+
- name: datasetId
341+
in: path
342+
required: true
343+
schema:
344+
type: string
345+
description: 数据集ID
346+
requestBody:
347+
required: true
348+
content:
349+
application/json:
350+
schema:
351+
$ref: '#/components/schemas/UploadFilesPreRequest'
352+
responses:
353+
'200':
354+
description: 预上传成功,返回请求ID
355+
content:
356+
application/json:
357+
schema:
358+
type: string
359+
360+
/data-management/datasets/{datasetId}/files/upload/chunk:
361+
post:
362+
tags: [ DatasetFile ]
363+
operationId: chunkUpload
364+
summary: 切片上传
365+
description: 使用预上传返回的请求ID进行分片上传
366+
parameters:
367+
- name: datasetId
368+
in: path
369+
required: true
370+
schema:
371+
type: string
372+
description: 数据集ID
373+
requestBody:
374+
required: true
375+
content:
376+
multipart/form-data:
377+
schema:
378+
$ref: '#/components/schemas/UploadFileRequest'
379+
responses:
380+
'200':
381+
description: 上传成功
382+
345383
/data-management/dataset-types:
346384
get:
347385
operationId: getDatasetTypes
@@ -548,9 +586,59 @@ components:
548586
description: 标签列表
549587
status:
550588
type: string
551-
enum: [ACTIVE, INACTIVE]
589+
enum: [DRAFT, ACTIVE, PROCESSING, ARCHIVED, PUBLISHED, DEPRECATED]
552590
description: 数据集状态
553591

592+
UploadFilesPreRequest:
593+
type: object
594+
description: 切片上传预上传请求
595+
properties:
596+
hasArchive:
597+
type: boolean
598+
description: 是否为压缩包上传
599+
default: false
600+
totalFileNum:
601+
type: integer
602+
format: int32
603+
minimum: 1
604+
description: 总文件数量
605+
totalSize:
606+
type: integer
607+
format: int64
608+
description: 总文件大小(字节)
609+
required: [ totalFileNum ]
610+
611+
UploadFileRequest:
612+
type: object
613+
description: 分片上传请求
614+
properties:
615+
reqId:
616+
type: string
617+
description: 预上传返回的请求ID
618+
fileNo:
619+
type: integer
620+
format: int32
621+
description: 文件编号(批量中的第几个)
622+
fileName:
623+
type: string
624+
description: 文件名称
625+
totalChunkNum:
626+
type: integer
627+
format: int32
628+
description: 文件总分片数量
629+
chunkNo:
630+
type: integer
631+
format: int32
632+
description: 当前分片编号(从1开始)
633+
file:
634+
type: string
635+
format: binary
636+
description: 分片二进制内容
637+
checkSumHex:
638+
type: string
639+
description: 分片校验和(十六进制)
640+
required: [ reqId, fileNo, fileName, totalChunkNum, chunkNo, file ]
641+
554642
DatasetTypeResponse:
555643
type: object
556644
properties:

backend/services/data-collection-service/pom.xml

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,12 @@
8787
<dependency>
8888
<groupId>com.datamate</groupId>
8989
<artifactId>domain-common</artifactId>
90-
<version>1.0.0-SNAPSHOT</version>
90+
<version>${project.version}</version>
91+
</dependency>
92+
<dependency>
93+
<groupId>com.datamate</groupId>
94+
<artifactId>data-management-service</artifactId>
95+
<version>${project.version}</version>
9196
</dependency>
9297

9398
<!-- OpenAPI Dependencies -->

backend/services/data-collection-service/src/main/java/com/datamate/collection/DataCollectionServiceConfiguration.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
@EnableTransactionManagement
1818
@ComponentScan(basePackages = {
1919
"com.datamate.collection",
20+
"com.datamate.datamanagement",
2021
"com.datamate.shared"
2122
})
2223
public class DataCollectionServiceConfiguration {

backend/services/data-collection-service/src/main/java/com/datamate/collection/application/CollectionTaskService.java

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,12 @@
77
import com.datamate.collection.domain.model.entity.TaskExecution;
88
import com.datamate.collection.common.enums.TaskStatus;
99
import com.datamate.collection.domain.repository.CollectionTaskRepository;
10-
import com.datamate.collection.interfaces.dto.CollectionTaskPagingQuery;
1110
import com.datamate.collection.common.enums.SyncMode;
1211
import com.datamate.common.domain.utils.ChunksSaver;
12+
import com.datamate.datamanagement.application.DatasetApplicationService;
13+
import com.datamate.datamanagement.domain.model.dataset.Dataset;
1314
import lombok.RequiredArgsConstructor;
1415
import lombok.extern.slf4j.Slf4j;
15-
import org.apache.commons.lang3.StringUtils;
1616
import org.springframework.stereotype.Service;
1717
import org.springframework.transaction.annotation.Transactional;
1818

@@ -25,31 +25,32 @@
2525
@RequiredArgsConstructor
2626
public class CollectionTaskService {
2727
private final TaskExecutionService taskExecutionService;
28+
private final DatasetApplicationService datasetApplicationService;
2829
private final CollectionTaskRepository collectionTaskRepository;
2930

3031
@Transactional
31-
public CollectionTask create(CollectionTask task) {
32-
task.setStatus(TaskStatus.READY);
33-
task.setCreatedAt(LocalDateTime.now());
34-
task.setUpdatedAt(LocalDateTime.now());
32+
public CollectionTask create(CollectionTask task, String datasetId) {
33+
task.initCreateParam();
3534
collectionTaskRepository.save(task);
36-
executeTaskNow(task);
35+
executeTaskNow(task, datasetId);
3736
return task;
3837
}
3938

40-
private void executeTaskNow(CollectionTask task) {
39+
private void executeTaskNow(CollectionTask task, String datasetId) {
4140
if (Objects.equals(task.getSyncMode(), SyncMode.ONCE)) {
4241
TaskExecution exec = taskExecutionService.createExecution(task);
4342
int timeout = task.getTimeoutSeconds() == null ? 3600 : task.getTimeoutSeconds();
44-
taskExecutionService.runAsync(task, exec.getId(), timeout);
43+
taskExecutionService.runAsync(task, exec.getId(), timeout, datasetId);
4544
log.info("Triggered DataX execution for task {} at {}, execId={}", task.getId(), LocalDateTime.now(), exec.getId());
4645
}
4746
}
4847

4948
@Transactional
50-
public CollectionTask update(CollectionTask task) {
49+
public CollectionTask update(CollectionTask task, String datasetId) {
5150
task.setUpdatedAt(LocalDateTime.now());
51+
task.addPath();
5252
collectionTaskRepository.updateById(task);
53+
executeTaskNow(task, datasetId);
5354
return task;
5455
}
5556

@@ -66,11 +67,8 @@ public CollectionTask get(String id) {
6667
return collectionTaskRepository.getById(id);
6768
}
6869

69-
public IPage<CollectionTask> getTasks(CollectionTaskPagingQuery query) {
70-
LambdaQueryWrapper<CollectionTask> wrapper = new LambdaQueryWrapper<CollectionTask>()
71-
.eq(query.getStatus() != null, CollectionTask::getStatus, query.getStatus())
72-
.like(StringUtils.isNotBlank(query.getName()), CollectionTask::getName, query.getName());
73-
return collectionTaskRepository.page(new Page<>(query.getPage(), query.getSize()), wrapper);
70+
public IPage<CollectionTask> getTasks(Page<CollectionTask> page, LambdaQueryWrapper<CollectionTask> wrapper) {
71+
return collectionTaskRepository.page(page, wrapper);
7472
}
7573

7674
public List<CollectionTask> selectActiveTasks() {

backend/services/data-collection-service/src/main/java/com/datamate/collection/application/TaskExecutionService.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import com.datamate.collection.domain.process.ProcessRunner;
77
import com.datamate.collection.domain.repository.CollectionTaskRepository;
88
import com.datamate.collection.domain.repository.TaskExecutionRepository;
9+
import com.datamate.datamanagement.application.DatasetApplicationService;
910
import lombok.RequiredArgsConstructor;
1011
import lombok.extern.slf4j.Slf4j;
1112
import org.springframework.scheduling.annotation.Async;
@@ -21,6 +22,7 @@ public class TaskExecutionService {
2122
private final ProcessRunner processRunner;
2223
private final TaskExecutionRepository executionRepository;
2324
private final CollectionTaskRepository collectionTaskRepository;
25+
private final DatasetApplicationService datasetApplicationService;
2426

2527

2628
@Transactional
@@ -39,14 +41,16 @@ public TaskExecution selectLatestByTaskId(String taskId) {
3941
}
4042

4143
@Async
42-
public void runAsync(CollectionTask task, String executionId, int timeoutSeconds) {
44+
@Transactional
45+
public void runAsync(CollectionTask task, String executionId, int timeoutSeconds, String datasetId) {
4346
try {
4447
int code = processRunner.runJob(task, executionId, timeoutSeconds);
4548
log.info("DataX finished with code {} for execution {}", code, executionId);
4649
// 简化:成功即完成
4750
executionRepository.completeExecution(executionId, TaskStatus.SUCCESS.name(), LocalDateTime.now(),
4851
0, 0L, 0L, 0L, null);
4952
collectionTaskRepository.updateStatus(task.getId(), TaskStatus.SUCCESS.name());
53+
datasetApplicationService.processDataSourceAsync(datasetId, task.getId());
5054
} catch (Exception e) {
5155
log.error("DataX execution failed", e);
5256
executionRepository.completeExecution(executionId, TaskStatus.FAILED.name(), LocalDateTime.now(),

backend/services/data-collection-service/src/main/java/com/datamate/collection/domain/model/entity/CollectionTask.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,10 @@
1010
import lombok.Getter;
1111
import lombok.Setter;
1212

13+
import java.time.LocalDateTime;
1314
import java.util.Collections;
1415
import java.util.Map;
16+
import java.util.UUID;
1517

1618
/**
1719
* 数据采集任务实体(与数据库表 t_dc_collection_tasks 对齐)
@@ -46,4 +48,12 @@ public void addPath() {
4648
throw new RuntimeException(e);
4749
}
4850
}
51+
52+
public void initCreateParam() {
53+
this.id = UUID.randomUUID().toString();
54+
this.addPath();
55+
this.status = TaskStatus.READY;
56+
this.createdAt = LocalDateTime.now();
57+
this.updatedAt = LocalDateTime.now();
58+
}
4959
}

backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/dto/CollectionTaskResponse.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
import com.datamate.collection.common.enums.TaskStatus;
88
import com.datamate.collection.common.enums.SyncMode;
9+
import com.datamate.datamanagement.interfaces.dto.DatasetResponse;
910
import lombok.AllArgsConstructor;
1011
import lombok.Getter;
1112
import lombok.NoArgsConstructor;
@@ -44,5 +45,7 @@ public class CollectionTaskResponse {
4445

4546
@DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME)
4647
private LocalDateTime updatedAt;
48+
49+
private DatasetResponse dataset;
4750
}
4851

backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/dto/CreateCollectionTaskRequest.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.datamate.collection.interfaces.dto;
22

33
import com.datamate.collection.common.enums.SyncMode;
4+
import com.datamate.datamanagement.interfaces.dto.CreateDatasetRequest;
45
import com.fasterxml.jackson.annotation.JsonProperty;
56

67
import java.util.HashMap;
@@ -49,5 +50,9 @@ public class CreateCollectionTaskRequest {
4950
@Schema(name = "scheduleExpression", description = "Cron调度表达式 (syncMode=SCHEDULED 时必填)", requiredMode = Schema.RequiredMode.NOT_REQUIRED)
5051
@JsonProperty("scheduleExpression")
5152
private String scheduleExpression;
53+
54+
/** 创建数据集参数 */
55+
@Valid
56+
private CreateDatasetRequest dataset;
5257
}
5358

backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/dto/UpdateCollectionTaskRequest.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,5 +46,8 @@ public class UpdateCollectionTaskRequest {
4646
@Schema(name = "scheduleExpression", description = "Cron调度表达式 (syncMode=SCHEDULED 时必填)", requiredMode = Schema.RequiredMode.NOT_REQUIRED)
4747
@JsonProperty("scheduleExpression")
4848
private String scheduleExpression;
49+
50+
/** 数据集id */
51+
private String datasetId;
4952
}
5053

0 commit comments

Comments
 (0)