Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
14 commits
Select commit Hold shift + click to select a range
12dc3a6
refactor: 修改调整数据归集实现,删除无用代码,优化代码结构
hefanli Oct 23, 2025
8809997
Merge remote-tracking branch 'origin/main'
hefanli Oct 23, 2025
008a1e4
Merge branch 'ModelEngine-Group:main' into main
hefanli Oct 24, 2025
8d7bb8a
feature: 每天凌晨00:00扫描所有数据集,检查数据集是否超过了预设的保留天数,超出保留天数的数据集调用删除接口进行删除
hefanli Oct 24, 2025
738127e
fix: 修改删除数据集文件的逻辑,上传到数据集中的文件会同时删除数据库中的记录和文件系统中的文件,归集过来的文件仅删除数据库中的记录
hefanli Oct 24, 2025
23a821a
Merge remote-tracking branch 'origin/main'
hefanli Oct 24, 2025
98ee5b3
fix: 增加参数校验和接口定义,删除不使用的接口
hefanli Oct 24, 2025
825bd4b
fix: 数据集统计数据默认为0
hefanli Oct 24, 2025
3b1a043
feature: 数据集状态增加流转,创建时为草稿状态,上传文件或者归集文件后修改为活动状态
hefanli Oct 24, 2025
dc3706d
refactor: 修改分页查询归集任务的代码
hefanli Oct 24, 2025
732b4e4
Merge branch 'ModelEngine-Group:main' into main
hefanli Oct 25, 2025
96c2247
Merge remote-tracking branch 'origin/main'
hefanli Oct 25, 2025
4398e56
fix: 更新后重新执行;归集任务执行增加事务控制
hefanli Oct 25, 2025
bb22e89
feature: 创建归集任务时能够同步创建数据集,更新归集任务时能更新到指定数据集
hefanli Oct 25, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
162 changes: 125 additions & 37 deletions backend/openapi/specs/data-management.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ paths:
schema:
type: integer
default: 0
description: 页码,从0开始
description: 页码,从1开始
- name: size
in: query
schema:
Expand All @@ -65,7 +65,7 @@ paths:
in: query
schema:
type: string
enum: [ACTIVE, INACTIVE, PROCESSING]
enum: [DRAFT, ACTIVE, PROCESSING, ARCHIVED, PUBLISHED, DEPRECATED]
description: 数据集状态过滤
responses:
'200':
Expand Down Expand Up @@ -231,40 +231,6 @@ paths:
schema:
$ref: '#/components/schemas/PagedDatasetFileResponse'

post:
tags: [DatasetFile]
summary: 上传文件到数据集
operationId: uploadDatasetFile
description: 向指定数据集上传文件
parameters:
- name: datasetId
in: path
required: true
schema:
type: string
description: 数据集ID
requestBody:
required: true
content:
multipart/form-data:
schema:
type: object
properties:
file:
type: string
format: binary
description: 要上传的文件
description:
type: string
description: 文件描述
responses:
'201':
description: 上传成功
content:
application/json:
schema:
$ref: '#/components/schemas/DatasetFileResponse'

/data-management/datasets/{datasetId}/files/{fileId}:
get:
tags: [DatasetFile]
Expand Down Expand Up @@ -342,6 +308,78 @@ paths:
type: string
format: binary

/data-management/datasets/{datasetId}/files/download:
get:
tags: [ DatasetFile ]
operationId: downloadDatasetFileAsZip
summary: 下载文件
description: 下载数据集中全部文件
parameters:
- name: datasetId
in: path
required: true
schema:
type: string
description: 数据集ID
responses:
'200':
description: 文件内容
content:
application/octet-stream:
schema:
type: string
format: binary

/data-management/datasets/{datasetId}/files/upload/pre-upload:
post:
tags: [ DatasetFile ]
operationId: preUpload
summary: 切片上传预上传
description: 预上传接口,返回后续分片上传所需的请求ID
parameters:
- name: datasetId
in: path
required: true
schema:
type: string
description: 数据集ID
requestBody:
required: true
content:
application/json:
schema:
$ref: '#/components/schemas/UploadFilesPreRequest'
responses:
'200':
description: 预上传成功,返回请求ID
content:
application/json:
schema:
type: string

/data-management/datasets/{datasetId}/files/upload/chunk:
post:
tags: [ DatasetFile ]
operationId: chunkUpload
summary: 切片上传
description: 使用预上传返回的请求ID进行分片上传
parameters:
- name: datasetId
in: path
required: true
schema:
type: string
description: 数据集ID
requestBody:
required: true
content:
multipart/form-data:
schema:
$ref: '#/components/schemas/UploadFileRequest'
responses:
'200':
description: 上传成功

/data-management/dataset-types:
get:
operationId: getDatasetTypes
Expand Down Expand Up @@ -548,9 +586,59 @@ components:
description: 标签列表
status:
type: string
enum: [ACTIVE, INACTIVE]
enum: [DRAFT, ACTIVE, PROCESSING, ARCHIVED, PUBLISHED, DEPRECATED]
description: 数据集状态

UploadFilesPreRequest:
type: object
description: 切片上传预上传请求
properties:
hasArchive:
type: boolean
description: 是否为压缩包上传
default: false
totalFileNum:
type: integer
format: int32
minimum: 1
description: 总文件数量
totalSize:
type: integer
format: int64
description: 总文件大小(字节)
required: [ totalFileNum ]

UploadFileRequest:
type: object
description: 分片上传请求
properties:
reqId:
type: string
description: 预上传返回的请求ID
fileNo:
type: integer
format: int32
description: 文件编号(批量中的第几个)
fileName:
type: string
description: 文件名称
totalChunkNum:
type: integer
format: int32
description: 文件总分片数量
chunkNo:
type: integer
format: int32
description: 当前分片编号(从1开始)
file:
type: string
format: binary
description: 分片二进制内容
checkSumHex:
type: string
description: 分片校验和(十六进制)
required: [ reqId, fileNo, fileName, totalChunkNum, chunkNo, file ]

DatasetTypeResponse:
type: object
properties:
Expand Down
7 changes: 6 additions & 1 deletion backend/services/data-collection-service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,12 @@
<dependency>
<groupId>com.datamate</groupId>
<artifactId>domain-common</artifactId>
<version>1.0.0-SNAPSHOT</version>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.datamate</groupId>
<artifactId>data-management-service</artifactId>
<version>${project.version}</version>
</dependency>

<!-- OpenAPI Dependencies -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
@EnableTransactionManagement
@ComponentScan(basePackages = {
"com.datamate.collection",
"com.datamate.datamanagement",
"com.datamate.shared"
})
public class DataCollectionServiceConfiguration {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@
import com.datamate.collection.domain.model.entity.TaskExecution;
import com.datamate.collection.common.enums.TaskStatus;
import com.datamate.collection.domain.repository.CollectionTaskRepository;
import com.datamate.collection.interfaces.dto.CollectionTaskPagingQuery;
import com.datamate.collection.common.enums.SyncMode;
import com.datamate.common.domain.utils.ChunksSaver;
import com.datamate.datamanagement.application.DatasetApplicationService;
import com.datamate.datamanagement.domain.model.dataset.Dataset;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

Expand All @@ -25,31 +25,32 @@
@RequiredArgsConstructor
public class CollectionTaskService {
private final TaskExecutionService taskExecutionService;
private final DatasetApplicationService datasetApplicationService;
private final CollectionTaskRepository collectionTaskRepository;

@Transactional
public CollectionTask create(CollectionTask task) {
task.setStatus(TaskStatus.READY);
task.setCreatedAt(LocalDateTime.now());
task.setUpdatedAt(LocalDateTime.now());
public CollectionTask create(CollectionTask task, String datasetId) {
task.initCreateParam();
collectionTaskRepository.save(task);
executeTaskNow(task);
executeTaskNow(task, datasetId);
return task;
}

private void executeTaskNow(CollectionTask task) {
private void executeTaskNow(CollectionTask task, String datasetId) {
if (Objects.equals(task.getSyncMode(), SyncMode.ONCE)) {
TaskExecution exec = taskExecutionService.createExecution(task);
int timeout = task.getTimeoutSeconds() == null ? 3600 : task.getTimeoutSeconds();
taskExecutionService.runAsync(task, exec.getId(), timeout);
taskExecutionService.runAsync(task, exec.getId(), timeout, datasetId);
log.info("Triggered DataX execution for task {} at {}, execId={}", task.getId(), LocalDateTime.now(), exec.getId());
}
}

@Transactional
public CollectionTask update(CollectionTask task) {
public CollectionTask update(CollectionTask task, String datasetId) {
task.setUpdatedAt(LocalDateTime.now());
task.addPath();
collectionTaskRepository.updateById(task);
executeTaskNow(task, datasetId);
return task;
}

Expand All @@ -66,11 +67,8 @@ public CollectionTask get(String id) {
return collectionTaskRepository.getById(id);
}

public IPage<CollectionTask> getTasks(CollectionTaskPagingQuery query) {
LambdaQueryWrapper<CollectionTask> wrapper = new LambdaQueryWrapper<CollectionTask>()
.eq(query.getStatus() != null, CollectionTask::getStatus, query.getStatus())
.like(StringUtils.isNotBlank(query.getName()), CollectionTask::getName, query.getName());
return collectionTaskRepository.page(new Page<>(query.getPage(), query.getSize()), wrapper);
public IPage<CollectionTask> getTasks(Page<CollectionTask> page, LambdaQueryWrapper<CollectionTask> wrapper) {
return collectionTaskRepository.page(page, wrapper);
}

public List<CollectionTask> selectActiveTasks() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.datamate.collection.domain.process.ProcessRunner;
import com.datamate.collection.domain.repository.CollectionTaskRepository;
import com.datamate.collection.domain.repository.TaskExecutionRepository;
import com.datamate.datamanagement.application.DatasetApplicationService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
Expand All @@ -21,6 +22,7 @@ public class TaskExecutionService {
private final ProcessRunner processRunner;
private final TaskExecutionRepository executionRepository;
private final CollectionTaskRepository collectionTaskRepository;
private final DatasetApplicationService datasetApplicationService;


@Transactional
Expand All @@ -39,14 +41,16 @@ public TaskExecution selectLatestByTaskId(String taskId) {
}

@Async
public void runAsync(CollectionTask task, String executionId, int timeoutSeconds) {
@Transactional
public void runAsync(CollectionTask task, String executionId, int timeoutSeconds, String datasetId) {
try {
int code = processRunner.runJob(task, executionId, timeoutSeconds);
log.info("DataX finished with code {} for execution {}", code, executionId);
// 简化:成功即完成
executionRepository.completeExecution(executionId, TaskStatus.SUCCESS.name(), LocalDateTime.now(),
0, 0L, 0L, 0L, null);
collectionTaskRepository.updateStatus(task.getId(), TaskStatus.SUCCESS.name());
datasetApplicationService.processDataSourceAsync(datasetId, task.getId());
} catch (Exception e) {
log.error("DataX execution failed", e);
executionRepository.completeExecution(executionId, TaskStatus.FAILED.name(), LocalDateTime.now(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@
import lombok.Getter;
import lombok.Setter;

import java.time.LocalDateTime;
import java.util.Collections;
import java.util.Map;
import java.util.UUID;

/**
* 数据采集任务实体(与数据库表 t_dc_collection_tasks 对齐)
Expand Down Expand Up @@ -46,4 +48,12 @@ public void addPath() {
throw new RuntimeException(e);
}
}

public void initCreateParam() {
this.id = UUID.randomUUID().toString();
this.addPath();
this.status = TaskStatus.READY;
this.createdAt = LocalDateTime.now();
this.updatedAt = LocalDateTime.now();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import com.datamate.collection.common.enums.TaskStatus;
import com.datamate.collection.common.enums.SyncMode;
import com.datamate.datamanagement.interfaces.dto.DatasetResponse;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
Expand Down Expand Up @@ -44,5 +45,7 @@ public class CollectionTaskResponse {

@DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME)
private LocalDateTime updatedAt;

private DatasetResponse dataset;
}

Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.datamate.collection.interfaces.dto;

import com.datamate.collection.common.enums.SyncMode;
import com.datamate.datamanagement.interfaces.dto.CreateDatasetRequest;
import com.fasterxml.jackson.annotation.JsonProperty;

import java.util.HashMap;
Expand Down Expand Up @@ -49,5 +50,9 @@ public class CreateCollectionTaskRequest {
@Schema(name = "scheduleExpression", description = "Cron调度表达式 (syncMode=SCHEDULED 时必填)", requiredMode = Schema.RequiredMode.NOT_REQUIRED)
@JsonProperty("scheduleExpression")
private String scheduleExpression;

/** 创建数据集参数 */
@Valid
private CreateDatasetRequest dataset;
}

Original file line number Diff line number Diff line change
Expand Up @@ -46,5 +46,8 @@ public class UpdateCollectionTaskRequest {
@Schema(name = "scheduleExpression", description = "Cron调度表达式 (syncMode=SCHEDULED 时必填)", requiredMode = Schema.RequiredMode.NOT_REQUIRED)
@JsonProperty("scheduleExpression")
private String scheduleExpression;

/** 数据集id */
private String datasetId;
}

Loading