diff --git a/backend/pom.xml b/backend/pom.xml index 6e92d847..98841b36 100644 --- a/backend/pom.xml +++ b/backend/pom.xml @@ -34,6 +34,7 @@ 1.6.3 1.18.32 0.2.0 + 5.4.0 @@ -132,6 +133,12 @@ mysql-connector-java ${mysql.version} + + + org.apache.poi + poi + ${poi.version} + @@ -165,6 +172,12 @@ mapstruct ${mapstruct.version} + + + org.apache.poi + poi + ${poi.version} + diff --git a/backend/services/data-management-service/src/main/java/com/dataengine/datamanagement/application/service/DatasetApplicationService.java b/backend/services/data-management-service/src/main/java/com/dataengine/datamanagement/application/service/DatasetApplicationService.java index ec2a07bb..f016545b 100644 --- a/backend/services/data-management-service/src/main/java/com/dataengine/datamanagement/application/service/DatasetApplicationService.java +++ b/backend/services/data-management-service/src/main/java/com/dataengine/datamanagement/application/service/DatasetApplicationService.java @@ -1,23 +1,26 @@ package com.dataengine.datamanagement.application.service; import com.dataengine.datamanagement.domain.model.dataset.Dataset; -import com.dataengine.datamanagement.domain.model.dataset.StatusConstants; import com.dataengine.datamanagement.domain.model.dataset.Tag; import com.dataengine.datamanagement.infrastructure.persistence.mapper.DatasetFileMapper; import com.dataengine.datamanagement.infrastructure.persistence.mapper.DatasetMapper; import com.dataengine.datamanagement.infrastructure.persistence.mapper.TagMapper; +import com.dataengine.datamanagement.interfaces.converter.DatasetConverter; import com.dataengine.datamanagement.interfaces.dto.AllDatasetStatisticsResponse; +import com.dataengine.datamanagement.interfaces.dto.CreateDatasetRequest; +import com.dataengine.datamanagement.interfaces.dto.DatasetPagingQuery; +import org.apache.commons.collections4.CollectionUtils; import org.apache.ibatis.session.RowBounds; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; import org.springframework.data.domain.Page; import org.springframework.data.domain.PageImpl; -import org.springframework.data.domain.Pageable; +import org.springframework.data.domain.PageRequest; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import java.time.LocalDateTime; import java.util.*; -import java.util.stream.Collectors; /** * 数据集应用服务(对齐 DB schema,使用 UUID 字符串主键) @@ -30,6 +33,9 @@ public class DatasetApplicationService { private final TagMapper tagMapper; private final DatasetFileMapper datasetFileMapper; + @Value("${dataset.base.path:/dataset}") + private String datasetBasePath; + @Autowired public DatasetApplicationService(DatasetMapper datasetMapper, TagMapper tagMapper, DatasetFileMapper datasetFileMapper) { this.datasetMapper = datasetMapper; @@ -40,32 +46,21 @@ public DatasetApplicationService(DatasetMapper datasetMapper, TagMapper tagMappe /** * 创建数据集 */ - public Dataset createDataset(String name, String description, String datasetType, - List tagNames, Long dataSourceId, - String path, String format, String createdBy) { - if (datasetMapper.findByName(name) != null) { - throw new IllegalArgumentException("Dataset with name '" + name + "' already exists"); + @Transactional + public Dataset createDataset(CreateDatasetRequest createDatasetRequest) { + if (datasetMapper.findByName(createDatasetRequest.getName()) != null) { + throw new IllegalArgumentException("Dataset with name '" + createDatasetRequest.getName() + "' already exists"); } - Dataset dataset = new Dataset(); - dataset.setId(UUID.randomUUID().toString()); - dataset.setName(name); - dataset.setDescription(description); - dataset.setDatasetType(datasetType); - dataset.setDataSourceId(dataSourceId); - dataset.setPath(path); - dataset.setFormat(format); - dataset.setStatus(StatusConstants.DatasetStatuses.ACTIVE); - dataset.setCreatedBy(createdBy); - dataset.setUpdatedBy(createdBy); - dataset.setCreatedAt(LocalDateTime.now()); - dataset.setUpdatedAt(LocalDateTime.now()); - datasetMapper.insert(dataset); // 手动设定 UUID 主键 + // 创建数据集对象 + Dataset dataset = DatasetConverter.INSTANCE.convertToDataset(createDatasetRequest); + dataset.initCreateParam(datasetBasePath); + datasetMapper.insert(dataset); // 处理标签 Set processedTags = new HashSet<>(); - if (tagNames != null && !tagNames.isEmpty()) { - processedTags = processTagNames(tagNames); + if (CollectionUtils.isNotEmpty(createDatasetRequest.getTags())) { + processedTags = processTagNames(createDatasetRequest.getTags()); for (Tag t : processedTags) { tagMapper.insertDatasetTag(dataset.getId(), t.getId()); } @@ -150,13 +145,13 @@ public Dataset getDataset(String datasetId) { * 分页查询数据集 */ @Transactional(readOnly = true) - public Page getDatasets(String typeCode, String status, String keyword, - List tagNames, Pageable pageable) { - RowBounds bounds = new RowBounds(pageable.getPageNumber() * pageable.getPageSize(), pageable.getPageSize()); - List content = datasetMapper.findByCriteria(typeCode, status, keyword, tagNames, bounds); + public Page getDatasets(DatasetPagingQuery query) { + RowBounds bounds = new RowBounds(query.getPage() * query.getSize(), query.getSize()); + List content = datasetMapper.findByCriteria(query.getType(), query.getStatus(), query.getKeyword(), query.getTagList(), bounds); + long total = datasetMapper.countByCriteria(query.getType(), query.getStatus(), query.getKeyword(), query.getTagList()); // 为每个数据集填充标签信息 - if (content != null && !content.isEmpty()) { + if (CollectionUtils.isNotEmpty(content)) { for (Dataset dataset : content) { List tags = tagMapper.findByDatasetId(dataset.getId()); if (tags != null) { @@ -164,9 +159,7 @@ public Page getDatasets(String typeCode, String status, String keyword, } } } - - long total = datasetMapper.countByCriteria(typeCode, status, keyword, tagNames); - return new PageImpl<>(content, pageable, total); + return new PageImpl<>(content, PageRequest.of(query.getPage(), query.getSize()), total); } /** diff --git a/backend/services/data-management-service/src/main/java/com/dataengine/datamanagement/application/service/DatasetFileApplicationService.java b/backend/services/data-management-service/src/main/java/com/dataengine/datamanagement/application/service/DatasetFileApplicationService.java index b29331cf..7ecbd831 100644 --- a/backend/services/data-management-service/src/main/java/com/dataengine/datamanagement/application/service/DatasetFileApplicationService.java +++ b/backend/services/data-management-service/src/main/java/com/dataengine/datamanagement/application/service/DatasetFileApplicationService.java @@ -1,10 +1,21 @@ package com.dataengine.datamanagement.application.service; +import com.dataengine.common.domain.model.ChunkUploadPreRequest; +import com.dataengine.common.domain.model.FileUploadResult; +import com.dataengine.common.domain.service.FileService; +import com.dataengine.common.domain.utils.AnalyzerUtils; +import com.dataengine.datamanagement.domain.contants.DatasetConstant; import com.dataengine.datamanagement.domain.model.dataset.Dataset; import com.dataengine.datamanagement.domain.model.dataset.DatasetFile; +import com.dataengine.datamanagement.domain.model.dataset.DatasetFileUploadCheckInfo; import com.dataengine.datamanagement.domain.model.dataset.StatusConstants; import com.dataengine.datamanagement.infrastructure.persistence.mapper.DatasetFileMapper; import com.dataengine.datamanagement.infrastructure.persistence.mapper.DatasetMapper; +import com.dataengine.datamanagement.interfaces.converter.DatasetConverter; +import com.dataengine.datamanagement.interfaces.dto.UploadFileRequest; +import com.dataengine.datamanagement.interfaces.dto.UploadFilesPreRequest; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.ibatis.session.RowBounds; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; @@ -17,6 +28,7 @@ import org.springframework.transaction.annotation.Transactional; import org.springframework.web.multipart.MultipartFile; +import java.io.File; import java.io.IOException; import java.net.MalformedURLException; import java.nio.file.Files; @@ -25,6 +37,7 @@ import java.nio.file.StandardCopyOption; import java.time.LocalDateTime; import java.util.List; +import java.util.Objects; import java.util.UUID; /** @@ -37,14 +50,19 @@ public class DatasetFileApplicationService { private final DatasetFileMapper datasetFileMapper; private final DatasetMapper datasetMapper; private final Path fileStorageLocation; + private final FileService fileService; + + @Value("${dataset.base.path:/dataset}") + private String datasetBasePath; @Autowired public DatasetFileApplicationService(DatasetFileMapper datasetFileMapper, - DatasetMapper datasetMapper, + DatasetMapper datasetMapper, FileService fileService, @Value("${app.file.upload-dir:./uploads}") String uploadDir) { this.datasetFileMapper = datasetFileMapper; this.datasetMapper = datasetMapper; this.fileStorageLocation = Paths.get(uploadDir).toAbsolutePath().normalize(); + this.fileService = fileService; try { Files.createDirectories(this.fileStorageLocation); } catch (Exception ex) { @@ -170,4 +188,65 @@ private String getFileExtension(String fileName) { } return fileName.substring(lastDotIndex + 1); } + + /** + * 预上传 + * + * @param chunkUploadRequest 上传请求 + * @param datasetId 数据集id + * @return 请求id + */ + @Transactional + public String preUpload(UploadFilesPreRequest chunkUploadRequest, String datasetId) { + ChunkUploadPreRequest request = ChunkUploadPreRequest.builder().build(); + request.setUploadPath(datasetBasePath + File.separator + datasetId); + request.setTotalFileNum(chunkUploadRequest.getTotalFileNum()); + request.setServiceId(DatasetConstant.SERVICE_ID); + DatasetFileUploadCheckInfo checkInfo = new DatasetFileUploadCheckInfo(); + checkInfo.setDatasetId(datasetId); + checkInfo.setHasArchive(chunkUploadRequest.isHasArchive()); + try { + ObjectMapper objectMapper = new ObjectMapper(); + String checkInfoJson = objectMapper.writeValueAsString(checkInfo); + request.setCheckInfo(checkInfoJson); + } catch (JsonProcessingException e) { + throw new IllegalArgumentException("Failed to serialize checkInfo to JSON", e); + } + return fileService.preUpload(request); + } + + /** + * 切片上传 + * + * @param uploadFileRequest 上传请求 + */ + public void chunkUpload(String datasetId, UploadFileRequest uploadFileRequest) { + FileUploadResult uploadResult = fileService.chunkUpload(DatasetConverter.INSTANCE.toChunkUploadRequest(uploadFileRequest)); + saveFileInfoToDb(uploadResult, uploadFileRequest, datasetId); + if (uploadResult.isAllFilesUploaded()) { + // 解析文件,后续依据需求看是否添加校验文件元数据和解析半结构化文件的逻辑, + } + } + + private void saveFileInfoToDb(FileUploadResult fileUploadResult, UploadFileRequest uploadFile, String datasetId) { + if (Objects.isNull(fileUploadResult.getSavedFile())) { + // 文件切片上传没有完成 + return; + } + File savedFile = fileUploadResult.getSavedFile(); + LocalDateTime currentTime = LocalDateTime.now(); + DatasetFile datasetFile = DatasetFile.builder() + .id(UUID.randomUUID().toString()) + .datasetId(datasetId) + .fileSize(savedFile.length()) + .uploadTime(currentTime) + .lastAccessTime(currentTime) + .lastAccessTime(currentTime) + .fileName(uploadFile.getFileName()) + .filePath(savedFile.getPath()) + .fileType(AnalyzerUtils.getExtension(uploadFile.getFileName())) + .build(); + + datasetFileMapper.insert(datasetFile); + } } diff --git a/backend/services/data-management-service/src/main/java/com/dataengine/datamanagement/domain/contants/DatasetConstant.java b/backend/services/data-management-service/src/main/java/com/dataengine/datamanagement/domain/contants/DatasetConstant.java new file mode 100644 index 00000000..12c6a6c5 --- /dev/null +++ b/backend/services/data-management-service/src/main/java/com/dataengine/datamanagement/domain/contants/DatasetConstant.java @@ -0,0 +1,11 @@ +package com.dataengine.datamanagement.domain.contants; + +/** + * 数据集常量 + */ +public interface DatasetConstant { + /** + * 服务ID + */ + String SERVICE_ID = "DATA_MANAGEMENT"; +} diff --git a/backend/services/data-management-service/src/main/java/com/dataengine/datamanagement/domain/model/dataset/Dataset.java b/backend/services/data-management-service/src/main/java/com/dataengine/datamanagement/domain/model/dataset/Dataset.java index b8ad4cf4..2eea0922 100644 --- a/backend/services/data-management-service/src/main/java/com/dataengine/datamanagement/domain/model/dataset/Dataset.java +++ b/backend/services/data-management-service/src/main/java/com/dataengine/datamanagement/domain/model/dataset/Dataset.java @@ -1,12 +1,19 @@ package com.dataengine.datamanagement.domain.model.dataset; +import lombok.Getter; +import lombok.Setter; + +import java.io.File; import java.time.LocalDateTime; -import java.util.HashSet; -import java.util.Set; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; /** * 数据集实体(与数据库表 t_dm_datasets 对齐) */ +@Getter +@Setter public class Dataset { private String id; // UUID @@ -18,7 +25,7 @@ public class Dataset { private String category; // DB: data_source_id - private Long dataSourceId; + private String dataSourceId; // DB: path private String path; // DB: format @@ -47,13 +54,13 @@ public class Dataset { private String updatedBy; // 聚合内的便捷集合(非持久化关联,由应用服务填充) - private Set tags = new HashSet<>(); - private Set files = new HashSet<>(); + private List tags = new ArrayList<>(); + private List files = new ArrayList<>(); public Dataset() {} public Dataset(String name, String description, String datasetType, String category, - Long dataSourceId, String path, String format, String status, String createdBy) { + String dataSourceId, String path, String format, String status, String createdBy) { this.name = name; this.description = description; this.datasetType = datasetType; @@ -67,6 +74,16 @@ public Dataset(String name, String description, String datasetType, String categ this.updatedAt = LocalDateTime.now(); } + public void initCreateParam(String datasetBasePath) { + this.id = UUID.randomUUID().toString(); + this.path = datasetBasePath + File.separator + this.id; + this.status = StatusConstants.DatasetStatuses.ACTIVE; + this.createdBy = "system"; + this.updatedBy = "system"; + this.createdAt = LocalDateTime.now(); + this.updatedAt = LocalDateTime.now(); + } + public void updateBasicInfo(String name, String description, String category) { if (name != null && !name.isEmpty()) this.name = name; if (description != null) this.description = description; @@ -94,80 +111,4 @@ public void removeFile(DatasetFile file) { this.updatedAt = LocalDateTime.now(); } } - - // Getters & Setters - public String getId() { return id; } - public void setId(String id) { this.id = id; } - - public String getName() { return name; } - public void setName(String name) { this.name = name; } - - public String getDescription() { return description; } - public void setDescription(String description) { this.description = description; } - - public String getDatasetType() { return datasetType; } - public void setDatasetType(String datasetType) { this.datasetType = datasetType; } - - public String getCategory() { return category; } - public void setCategory(String category) { this.category = category; } - - public Long getDataSourceId() { return dataSourceId; } - public void setDataSourceId(Long dataSourceId) { this.dataSourceId = dataSourceId; } - - public String getPath() { return path; } - public void setPath(String path) { this.path = path; } - - public String getFormat() { return format; } - public void setFormat(String format) { this.format = format; } - - public Long getSizeBytes() { return sizeBytes; } - public void setSizeBytes(Long sizeBytes) { this.sizeBytes = sizeBytes; } - - public Long getFileCount() { return fileCount; } - public void setFileCount(Long fileCount) { this.fileCount = fileCount; } - - public Long getRecordCount() { return recordCount; } - public void setRecordCount(Long recordCount) { this.recordCount = recordCount; } - - public Double getCompletionRate() { return completionRate; } - public void setCompletionRate(Double completionRate) { this.completionRate = completionRate; } - - public Double getQualityScore() { return qualityScore; } - public void setQualityScore(Double qualityScore) { this.qualityScore = qualityScore; } - - public String getStatus() { return status; } - public void setStatus(String status) { this.status = status; } - - public Boolean getPublic() { return isPublic; } - public void setPublic(Boolean aPublic) { isPublic = aPublic; } - - public Boolean getFeatured() { return isFeatured; } - public void setFeatured(Boolean featured) { isFeatured = featured; } - - public Long getDownloadCount() { return downloadCount; } - public void setDownloadCount(Long downloadCount) { this.downloadCount = downloadCount; } - - public Long getViewCount() { return viewCount; } - public void setViewCount(Long viewCount) { this.viewCount = viewCount; } - - public Long getVersion() { return version; } - public void setVersion(Long version) { this.version = version; } - - public LocalDateTime getCreatedAt() { return createdAt; } - public void setCreatedAt(LocalDateTime createdAt) { this.createdAt = createdAt; } - - public LocalDateTime getUpdatedAt() { return updatedAt; } - public void setUpdatedAt(LocalDateTime updatedAt) { this.updatedAt = updatedAt; } - - public String getCreatedBy() { return createdBy; } - public void setCreatedBy(String createdBy) { this.createdBy = createdBy; } - - public String getUpdatedBy() { return updatedBy; } - public void setUpdatedBy(String updatedBy) { this.updatedBy = updatedBy; } - - public Set getTags() { return tags; } - public void setTags(Set tags) { this.tags = tags; } - - public Set getFiles() { return files; } - public void setFiles(Set files) { this.files = files; } } diff --git a/backend/services/data-management-service/src/main/java/com/dataengine/datamanagement/domain/model/dataset/DatasetFile.java b/backend/services/data-management-service/src/main/java/com/dataengine/datamanagement/domain/model/dataset/DatasetFile.java index 94217614..804778c6 100644 --- a/backend/services/data-management-service/src/main/java/com/dataengine/datamanagement/domain/model/dataset/DatasetFile.java +++ b/backend/services/data-management-service/src/main/java/com/dataengine/datamanagement/domain/model/dataset/DatasetFile.java @@ -1,10 +1,17 @@ package com.dataengine.datamanagement.domain.model.dataset; +import lombok.*; + import java.time.LocalDateTime; /** * 数据集文件实体(与数据库表 t_dm_dataset_files 对齐) */ +@Getter +@Setter +@Builder +@NoArgsConstructor +@AllArgsConstructor public class DatasetFile { private String id; // UUID @@ -17,47 +24,4 @@ public class DatasetFile { private LocalDateTime uploadTime; private LocalDateTime lastAccessTime; private String status; // UPLOADED, PROCESSING, COMPLETED, ERROR - - public DatasetFile() {} - - public DatasetFile(String datasetId, String fileName, String filePath, String fileType, Long fileSize, String fileFormat) { - this.datasetId = datasetId; - this.fileName = fileName; - this.filePath = filePath; - this.fileType = fileType; - this.fileSize = fileSize; - this.fileFormat = fileFormat; - this.status = StatusConstants.DatasetFileStatuses.COMPLETED; - } - - // Getters & Setters - public String getId() { return id; } - public void setId(String id) { this.id = id; } - - public String getDatasetId() { return datasetId; } - public void setDatasetId(String datasetId) { this.datasetId = datasetId; } - - public String getFileName() { return fileName; } - public void setFileName(String fileName) { this.fileName = fileName; } - - public String getFilePath() { return filePath; } - public void setFilePath(String filePath) { this.filePath = filePath; } - - public String getFileType() { return fileType; } - public void setFileType(String fileType) { this.fileType = fileType; } - - public Long getFileSize() { return fileSize; } - public void setFileSize(Long fileSize) { this.fileSize = fileSize; } - - public String getFileFormat() { return fileFormat; } - public void setFileFormat(String fileFormat) { this.fileFormat = fileFormat; } - - public LocalDateTime getUploadTime() { return uploadTime; } - public void setUploadTime(LocalDateTime uploadTime) { this.uploadTime = uploadTime; } - - public LocalDateTime getLastAccessTime() { return lastAccessTime; } - public void setLastAccessTime(LocalDateTime lastAccessTime) { this.lastAccessTime = lastAccessTime; } - - public String getStatus() { return status; } - public void setStatus(String status) { this.status = status; } } diff --git a/backend/services/data-management-service/src/main/java/com/dataengine/datamanagement/domain/model/dataset/DatasetFileUploadCheckInfo.java b/backend/services/data-management-service/src/main/java/com/dataengine/datamanagement/domain/model/dataset/DatasetFileUploadCheckInfo.java new file mode 100644 index 00000000..996d3bdd --- /dev/null +++ b/backend/services/data-management-service/src/main/java/com/dataengine/datamanagement/domain/model/dataset/DatasetFileUploadCheckInfo.java @@ -0,0 +1,18 @@ +package com.dataengine.datamanagement.domain.model.dataset; + +import com.dataengine.common.domain.model.UploadCheckInfo; +import lombok.Getter; +import lombok.Setter; + +/** + * 数据集文件上传检查信息 + */ +@Getter +@Setter +public class DatasetFileUploadCheckInfo extends UploadCheckInfo { + /** 数据集id */ + private String datasetId; + + /** 是否为压缩包上传 */ + private boolean hasArchive; +} diff --git a/backend/services/data-management-service/src/main/java/com/dataengine/datamanagement/interfaces/converter/DatasetConverter.java b/backend/services/data-management-service/src/main/java/com/dataengine/datamanagement/interfaces/converter/DatasetConverter.java new file mode 100644 index 00000000..baf3b9fe --- /dev/null +++ b/backend/services/data-management-service/src/main/java/com/dataengine/datamanagement/interfaces/converter/DatasetConverter.java @@ -0,0 +1,50 @@ +package com.dataengine.datamanagement.interfaces.converter; + +import com.dataengine.common.domain.model.ChunkUploadRequest; +import com.dataengine.common.interfaces.PagedResponse; +import com.dataengine.datamanagement.domain.model.dataset.Dataset; +import com.dataengine.datamanagement.domain.model.dataset.Tag; +import com.dataengine.datamanagement.interfaces.dto.CreateDatasetRequest; +import com.dataengine.datamanagement.interfaces.dto.DatasetResponse; +import com.dataengine.datamanagement.interfaces.dto.TagResponse; +import com.dataengine.datamanagement.interfaces.dto.UploadFileRequest; +import org.mapstruct.Mapper; +import org.mapstruct.Mapping; +import org.mapstruct.factory.Mappers; +import org.springframework.data.domain.Page; + +/** + * 数据集文件转换器 + */ +@Mapper +public interface DatasetConverter { + /** 单例实例 */ + DatasetConverter INSTANCE = Mappers.getMapper(DatasetConverter.class); + + /** + * 将数据集转换为响应 + */ + DatasetResponse convertToResponse(Dataset dataset); + + /** + * 将数据集转换为响应 + */ + @Mapping(target = "tags", ignore = true) + Dataset convertToDataset(CreateDatasetRequest createDatasetRequest); + + /** + * 将数据集转换为响应 + */ + @Mapping(source = "number", target = "page") + PagedResponse convertToPagedResponse(Page dataset); + + /** + * 将上传文件请求转换为分片上传请求 + */ + ChunkUploadRequest toChunkUploadRequest(UploadFileRequest uploadFileRequest); + + /** + * 将数据集转换为响应 + */ + TagResponse convertToResponse(Tag dataset); +} diff --git a/backend/services/data-management-service/src/main/java/com/dataengine/datamanagement/interfaces/dto/CreateDatasetRequest.java b/backend/services/data-management-service/src/main/java/com/dataengine/datamanagement/interfaces/dto/CreateDatasetRequest.java index df7c1d65..658c3a26 100644 --- a/backend/services/data-management-service/src/main/java/com/dataengine/datamanagement/interfaces/dto/CreateDatasetRequest.java +++ b/backend/services/data-management-service/src/main/java/com/dataengine/datamanagement/interfaces/dto/CreateDatasetRequest.java @@ -1,6 +1,8 @@ package com.dataengine.datamanagement.interfaces.dto; +import lombok.AllArgsConstructor; import lombok.Getter; +import lombok.NoArgsConstructor; import lombok.Setter; import java.util.List; @@ -10,13 +12,15 @@ */ @Getter @Setter +@NoArgsConstructor +@AllArgsConstructor public class CreateDatasetRequest { /** 数据集名称 */ private String name; /** 数据集描述 */ private String description; /** 数据集类型 */ - private String type; + private String datasetType; /** 标签列表 */ private List tags; /** 数据源 */ diff --git a/backend/services/data-management-service/src/main/java/com/dataengine/datamanagement/interfaces/dto/DatasetPagingQuery.java b/backend/services/data-management-service/src/main/java/com/dataengine/datamanagement/interfaces/dto/DatasetPagingQuery.java new file mode 100644 index 00000000..cec36811 --- /dev/null +++ b/backend/services/data-management-service/src/main/java/com/dataengine/datamanagement/interfaces/dto/DatasetPagingQuery.java @@ -0,0 +1,52 @@ +package com.dataengine.datamanagement.interfaces.dto; + +import com.dataengine.common.interfaces.PagingQuery; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; + +import java.util.Arrays; +import java.util.List; + +/** + * 数据集分页查询请求 + * + */ +@Getter +@Setter +@NoArgsConstructor +@AllArgsConstructor +public class DatasetPagingQuery extends PagingQuery { + /** + * 数据集类型过滤 + */ + private String type; + + /** + * 标签过滤,多个标签用逗号分隔 + */ + private String tags; + + /** + * 关键词搜索(名称或描述) + */ + private String keyword; + + /** + * 状态过滤 + */ + private String status; + + /** + * 将逗号分隔的标签字符串转换为标签列表 + * + * @return 标签列表,如果tags为空则返回null + */ + public List getTagList() { + if (tags != null && !tags.trim().isEmpty()) { + return Arrays.asList(tags.split(",")); + } + return null; + } +} diff --git a/backend/services/data-management-service/src/main/java/com/dataengine/datamanagement/interfaces/dto/DatasetResponse.java b/backend/services/data-management-service/src/main/java/com/dataengine/datamanagement/interfaces/dto/DatasetResponse.java index 5d01efe3..d6f6520b 100644 --- a/backend/services/data-management-service/src/main/java/com/dataengine/datamanagement/interfaces/dto/DatasetResponse.java +++ b/backend/services/data-management-service/src/main/java/com/dataengine/datamanagement/interfaces/dto/DatasetResponse.java @@ -19,7 +19,7 @@ public class DatasetResponse { /** 数据集描述 */ private String description; /** 数据集类型 */ - private DatasetTypeResponse type; + private String datasetType; /** 数据集状态 */ private String status; /** 标签列表 */ diff --git a/backend/services/data-management-service/src/main/java/com/dataengine/datamanagement/interfaces/dto/UploadFileRequest.java b/backend/services/data-management-service/src/main/java/com/dataengine/datamanagement/interfaces/dto/UploadFileRequest.java new file mode 100644 index 00000000..f012d343 --- /dev/null +++ b/backend/services/data-management-service/src/main/java/com/dataengine/datamanagement/interfaces/dto/UploadFileRequest.java @@ -0,0 +1,34 @@ +package com.dataengine.datamanagement.interfaces.dto; + +import lombok.Getter; +import lombok.Setter; +import org.springframework.web.multipart.MultipartFile; + +/** + * 上传文件请求 + * 用于分块上传文件时的请求参数封装,支持大文件分片上传功能 + */ +@Getter +@Setter +public class UploadFileRequest { + /** 预上传返回的id,用来确认同一个任务 */ + private String reqId; + + /** 文件编号,用于标识批量上传中的第几个文件 */ + private int fileNo; + + /** 文件名称 */ + private String fileName; + + /** 文件总分块数量 */ + private int totalChunkNum; + + /** 当前分块编号,从1开始 */ + private int chunkNo; + + /** 上传的文件分块内容 */ + private MultipartFile file; + + /** 文件分块的校验和(十六进制字符串),用于验证文件完整性 */ + private String checkSumHex; +} diff --git a/backend/services/data-management-service/src/main/java/com/dataengine/datamanagement/interfaces/dto/UploadFilesPreRequest.java b/backend/services/data-management-service/src/main/java/com/dataengine/datamanagement/interfaces/dto/UploadFilesPreRequest.java new file mode 100644 index 00000000..cec96af6 --- /dev/null +++ b/backend/services/data-management-service/src/main/java/com/dataengine/datamanagement/interfaces/dto/UploadFilesPreRequest.java @@ -0,0 +1,22 @@ +package com.dataengine.datamanagement.interfaces.dto; + +import jakarta.validation.constraints.Min; +import lombok.Getter; +import lombok.Setter; + +/** + * 切片上传预上传请求 + */ +@Getter +@Setter +public class UploadFilesPreRequest { + /** 是否为压缩包上传 */ + private boolean hasArchive; + + /** 总文件数量 */ + @Min(1) + private int totalFileNum; + + /** 总文件大小 */ + private long totalSize; +} diff --git a/backend/services/data-management-service/src/main/java/com/dataengine/datamanagement/interfaces/rest/DatasetController.java b/backend/services/data-management-service/src/main/java/com/dataengine/datamanagement/interfaces/rest/DatasetController.java index 3502d489..a7060774 100644 --- a/backend/services/data-management-service/src/main/java/com/dataengine/datamanagement/interfaces/rest/DatasetController.java +++ b/backend/services/data-management-service/src/main/java/com/dataengine/datamanagement/interfaces/rest/DatasetController.java @@ -1,22 +1,18 @@ package com.dataengine.datamanagement.interfaces.rest; +import com.dataengine.common.interfaces.PagedResponse; import com.dataengine.datamanagement.application.service.DatasetApplicationService; import com.dataengine.datamanagement.domain.model.dataset.Dataset; +import com.dataengine.datamanagement.interfaces.converter.DatasetConverter; import com.dataengine.datamanagement.interfaces.dto.*; import com.dataengine.common.interfaces.Response; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.domain.Page; -import org.springframework.data.domain.PageRequest; -import org.springframework.data.domain.Pageable; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.*; -import java.time.ZoneOffset; -import java.util.Arrays; -import java.util.List; import java.util.Map; -import java.util.stream.Collectors; /** * 数据集 REST 控制器(UUID 模式) @@ -32,56 +28,24 @@ public DatasetController(DatasetApplicationService datasetApplicationService) { this.datasetApplicationService = datasetApplicationService; } + /** + * 获取数据集列表 + * + * @param query 分页查询参数 + * @return 分页的数据集列表 + */ @GetMapping - public ResponseEntity> getDatasets( - @RequestParam(value = "page", required = false, defaultValue = "0") Integer page, - @RequestParam(value = "size", required = false, defaultValue = "20") Integer size, - @RequestParam(value = "type", required = false) String type, - @RequestParam(value = "tags", required = false) String tags, - @RequestParam(value = "keyword", required = false) String keyword, - @RequestParam(value = "status", required = false) String status) { - - Pageable pageable = PageRequest.of(page != null ? page : 0, size != null ? size : 20); - - List tagList = null; - if (tags != null && !tags.trim().isEmpty()) { - tagList = Arrays.asList(tags.split(",")); - } - - Page datasetsPage = datasetApplicationService.getDatasets(type, status, keyword, tagList, pageable); - - PagedDatasetResponse response = new PagedDatasetResponse(); - response.setContent(datasetsPage.getContent().stream() - .map(this::convertToResponse) - .collect(Collectors.toList())); - response.setPage(datasetsPage.getNumber()); - response.setSize(datasetsPage.getSize()); - response.setTotalElements((int) datasetsPage.getTotalElements()); - response.setTotalPages(datasetsPage.getTotalPages()); - response.setFirst(datasetsPage.isFirst()); - response.setLast(datasetsPage.isLast()); - - return ResponseEntity.ok(Response.ok(response)); + public ResponseEntity>> getDatasets(DatasetPagingQuery query) { + query.initPaging(); + Page datasetsPage = datasetApplicationService.getDatasets(query); + return ResponseEntity.ok(Response.ok(DatasetConverter.INSTANCE.convertToPagedResponse(datasetsPage))); } @PostMapping public ResponseEntity> createDataset(@RequestBody CreateDatasetRequest createDatasetRequest) { try { - Long dataSourceId = null; - if (createDatasetRequest.getDataSource() != null) { - try { dataSourceId = Long.valueOf(createDatasetRequest.getDataSource()); } catch (NumberFormatException ignore) {} - } - Dataset dataset = datasetApplicationService.createDataset( - createDatasetRequest.getName(), - createDatasetRequest.getDescription(), - createDatasetRequest.getType(), - createDatasetRequest.getTags(), - dataSourceId, - createDatasetRequest.getTargetLocation(), - null, - "system" - ); - return ResponseEntity.status(HttpStatus.CREATED).body(Response.ok(convertToResponse(dataset))); + Dataset dataset = datasetApplicationService.createDataset(createDatasetRequest); + return ResponseEntity.status(HttpStatus.CREATED).body(Response.ok(DatasetConverter.INSTANCE.convertToResponse(dataset))); } catch (IllegalArgumentException e) { return ResponseEntity.badRequest().body(Response.error("参数错误", null)); } @@ -91,7 +55,7 @@ public ResponseEntity> createDataset(@RequestBody Crea public ResponseEntity> getDatasetById(@PathVariable("datasetId") String datasetId) { try { Dataset dataset = datasetApplicationService.getDataset(datasetId); - return ResponseEntity.ok(Response.ok(convertToResponse(dataset))); + return ResponseEntity.ok(Response.ok(DatasetConverter.INSTANCE.convertToResponse(dataset))); } catch (IllegalArgumentException e) { return ResponseEntity.status(HttpStatus.NOT_FOUND).body(Response.error("数据集不存在", null)); } @@ -109,7 +73,7 @@ public ResponseEntity> updateDataset( updateDatasetRequest.getTags(), updateDatasetRequest.getStatus() != null ? updateDatasetRequest.getStatus() : null ); - return ResponseEntity.ok(Response.ok(convertToResponse(dataset))); + return ResponseEntity.ok(Response.ok(DatasetConverter.INSTANCE.convertToResponse(dataset))); } catch (IllegalArgumentException e) { return ResponseEntity.status(HttpStatus.NOT_FOUND).body(Response.error("数据集不存在", null)); } @@ -150,42 +114,4 @@ public ResponseEntity> getDatasetStatistics( public ResponseEntity> getAllStatistics() { return ResponseEntity.ok(Response.ok(datasetApplicationService.getAllDatasetStatistics())); } - - private DatasetResponse convertToResponse(Dataset dataset) { - DatasetResponse response = new DatasetResponse(); - response.setId(dataset.getId()); - response.setName(dataset.getName()); - response.setDescription(dataset.getDescription()); - - DatasetTypeResponse typeResponse = new DatasetTypeResponse(); - typeResponse.setCode(dataset.getDatasetType()); - response.setType(typeResponse); - - response.setStatus(dataset.getStatus()); - - response.setDataSource(dataset.getDataSourceId() != null ? String.valueOf(dataset.getDataSourceId()) : null); - response.setTargetLocation(dataset.getPath()); - response.setFileCount(dataset.getFileCount() != null ? dataset.getFileCount().intValue() : null); - response.setTotalSize(dataset.getSizeBytes()); - response.setCompletionRate(dataset.getCompletionRate() != null ? dataset.getCompletionRate().floatValue() : null); - - if (dataset.getCreatedAt() != null) response.setCreatedAt(dataset.getCreatedAt().atOffset(ZoneOffset.UTC).toLocalDateTime()); - if (dataset.getUpdatedAt() != null) response.setUpdatedAt(dataset.getUpdatedAt().atOffset(ZoneOffset.UTC).toLocalDateTime()); - response.setCreatedBy(dataset.getCreatedBy()); - - List tagResponses = dataset.getTags().stream() - .map(tag -> { - TagResponse tr = new TagResponse(); - tr.setId(tag.getId()); - tr.setName(tag.getName()); - tr.setColor(tag.getColor()); - tr.setDescription(tag.getDescription()); - tr.setUsageCount(tag.getUsageCount() != null ? tag.getUsageCount().intValue() : null); - return tr; - }) - .collect(Collectors.toList()); - response.setTags(tagResponses); - - return response; - } } diff --git a/backend/services/data-management-service/src/main/java/com/dataengine/datamanagement/interfaces/rest/DatasetFileController.java b/backend/services/data-management-service/src/main/java/com/dataengine/datamanagement/interfaces/rest/DatasetFileController.java index 7a842ca8..1bdad82a 100644 --- a/backend/services/data-management-service/src/main/java/com/dataengine/datamanagement/interfaces/rest/DatasetFileController.java +++ b/backend/services/data-management-service/src/main/java/com/dataengine/datamanagement/interfaces/rest/DatasetFileController.java @@ -5,6 +5,10 @@ import com.dataengine.datamanagement.interfaces.dto.DatasetFileResponse; import com.dataengine.datamanagement.interfaces.dto.PagedDatasetFileResponse; import com.dataengine.common.interfaces.Response; +import com.dataengine.datamanagement.interfaces.dto.UploadFileRequest; +import com.dataengine.datamanagement.interfaces.dto.UploadFilesPreRequest; +import jakarta.validation.Valid; +import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.core.io.Resource; import org.springframework.data.domain.Page; @@ -23,6 +27,7 @@ /** * 数据集文件 REST 控制器(UUID 模式) */ +@Slf4j @RestController @RequestMapping("/data-management/datasets/{datasetId}/files") public class DatasetFileController { @@ -121,7 +126,7 @@ public ResponseEntity downloadDatasetFile( } } - private DatasetFileResponse convertToResponse(DatasetFile datasetFile) { + private DatasetFileResponse convertToResponse(DatasetFile datasetFile) { DatasetFileResponse response = new DatasetFileResponse(); response.setId(datasetFile.getId()); response.setFileName(datasetFile.getFileName()); @@ -137,4 +142,29 @@ private DatasetFileResponse convertToResponse(DatasetFile datasetFile) { response.setUploadedBy(null); return response; } + + /** + * 文件上传请求 + * + * @param request 批量文件上传请求 + * @return 批量上传请求id + */ + @PostMapping("/upload/pre-upload") + public ResponseEntity> preUpload(@PathVariable("datasetId") String datasetId, @RequestBody @Valid UploadFilesPreRequest request) { + return ResponseEntity.ok(Response.ok(datasetFileApplicationService.preUpload(request, datasetId))); + } + + /** + * 分块上传 + * + * @param uploadFileRequest 上传文件请求 + */ + @PostMapping("/upload/chunk") + public ResponseEntity chunkUpload(@PathVariable("datasetId") String datasetId, UploadFileRequest uploadFileRequest) { + log.info("file upload reqId:{}, fileNo:{}, total chunk num:{}, current chunkNo:{}", + uploadFileRequest.getReqId(), uploadFileRequest.getFileNo(), uploadFileRequest.getTotalChunkNum(), + uploadFileRequest.getChunkNo()); + datasetFileApplicationService.chunkUpload(datasetId, uploadFileRequest); + return ResponseEntity.ok().build(); + } } diff --git a/backend/services/data-management-service/src/test/java/com/dataengine/datamanagement/application/service/DatasetApplicationServiceTest.java b/backend/services/data-management-service/src/test/java/com/dataengine/datamanagement/application/service/DatasetApplicationServiceTest.java index 64844534..4b868f40 100644 --- a/backend/services/data-management-service/src/test/java/com/dataengine/datamanagement/application/service/DatasetApplicationServiceTest.java +++ b/backend/services/data-management-service/src/test/java/com/dataengine/datamanagement/application/service/DatasetApplicationServiceTest.java @@ -6,6 +6,8 @@ import com.dataengine.datamanagement.infrastructure.persistence.mapper.DatasetFileMapper; import com.dataengine.datamanagement.infrastructure.persistence.mapper.DatasetMapper; import com.dataengine.datamanagement.infrastructure.persistence.mapper.TagMapper; +import com.dataengine.datamanagement.interfaces.dto.CreateDatasetRequest; +import com.dataengine.datamanagement.interfaces.dto.DatasetPagingQuery; import org.apache.ibatis.session.RowBounds; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.DisplayName; @@ -16,8 +18,6 @@ import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.springframework.data.domain.Page; -import org.springframework.data.domain.PageRequest; -import org.springframework.data.domain.Pageable; import java.time.LocalDateTime; import java.util.*; @@ -77,8 +77,8 @@ void createDataset_success_withTags() { when(tagMapper.insert(any(Tag.class))).thenReturn(1); // When - Dataset result = service.createDataset("New Dataset", "Description", "CSV", - tagNames, 1L, "/path", "csv", "user1"); + Dataset result = service.createDataset(new CreateDatasetRequest("New Dataset", "Description", "CSV", + tagNames, "1", "/path")); // Then assertNotNull(result); @@ -103,7 +103,7 @@ void createDataset_duplicateName() { when(datasetMapper.findByName("Duplicate")).thenReturn(sampleDataset); IllegalArgumentException ex = assertThrows(IllegalArgumentException.class, - () -> service.createDataset("Duplicate", "desc", "CSV", null, 1L, "/path", "csv", "user1")); + () -> service.createDataset(new CreateDatasetRequest("Duplicate", "desc", "CSV", null, "1L", "/path"))); assertTrue(ex.getMessage().contains("already exists")); verify(datasetMapper).findByName("Duplicate"); @@ -117,8 +117,8 @@ void createDataset_withoutTags() { when(datasetMapper.insert(any(Dataset.class))).thenReturn(1); when(datasetMapper.findById(anyString())).thenReturn(sampleDataset); - Dataset result = service.createDataset("No Tags Dataset", "desc", "CSV", - null, 1L, "/path", "csv", "user1"); + Dataset result = service.createDataset(new CreateDatasetRequest("No Tags Dataset", "desc", "CSV", + null, "1L", "/path")); assertNotNull(result); verify(tagMapper, never()).insertDatasetTag(anyString(), anyString()); @@ -214,15 +214,13 @@ void getDataset_notFound() { void getDatasets_pagination() { List datasets = Arrays.asList(sampleDataset); List tags = Arrays.asList(sampleTag); - Pageable pageable = PageRequest.of(0, 10); when(datasetMapper.findByCriteria(eq("CSV"), eq("ACTIVE"), eq("test"), eq(Arrays.asList("tag1")), any(RowBounds.class))).thenReturn(datasets); when(datasetMapper.countByCriteria("CSV", "ACTIVE", "test", Arrays.asList("tag1"))).thenReturn(1L); when(tagMapper.findByDatasetId("dataset-id-1")).thenReturn(tags); - Page result = service.getDatasets("CSV", "ACTIVE", "test", - Arrays.asList("tag1"), pageable); + Page result = service.getDatasets(new DatasetPagingQuery("CSV", "ACTIVE", "test", "tag1")); assertNotNull(result); assertEquals(1, result.getContent().size()); @@ -236,12 +234,11 @@ void getDatasets_pagination() { @Test @DisplayName("getDatasets: 空结果集") void getDatasets_emptyResult() { - Pageable pageable = PageRequest.of(0, 10); when(datasetMapper.findByCriteria(isNull(), isNull(), isNull(), isNull(), any(RowBounds.class))).thenReturn(Collections.emptyList()); when(datasetMapper.countByCriteria(null, null, null, null)).thenReturn(0L); - Page result = service.getDatasets(null, null, null, null, pageable); + Page result = service.getDatasets(new DatasetPagingQuery()); assertNotNull(result); assertTrue(result.getContent().isEmpty()); @@ -316,7 +313,7 @@ void processTagNames_mixedTags() { when(tagMapper.insert(any(Tag.class))).thenReturn(1); List tagNames = Arrays.asList("existing", "new"); - service.createDataset("Test Dataset", "desc", "CSV", tagNames, 1L, "/path", "csv", "user1"); + service.createDataset(new CreateDatasetRequest("Test Dataset", "desc", "CSV", tagNames, "1L", "/path")); verify(tagMapper).findByName("existing"); verify(tagMapper).findByName("new"); diff --git a/backend/services/data-management-service/src/test/java/com/dataengine/datamanagement/application/service/DatasetFileApplicationServiceTest.java b/backend/services/data-management-service/src/test/java/com/dataengine/datamanagement/application/service/DatasetFileApplicationServiceTest.java index c4543bd0..20f50bf2 100644 --- a/backend/services/data-management-service/src/test/java/com/dataengine/datamanagement/application/service/DatasetFileApplicationServiceTest.java +++ b/backend/services/data-management-service/src/test/java/com/dataengine/datamanagement/application/service/DatasetFileApplicationServiceTest.java @@ -1,5 +1,6 @@ package com.dataengine.datamanagement.application.service; +import com.dataengine.common.domain.service.FileService; import com.dataengine.datamanagement.domain.model.dataset.Dataset; import com.dataengine.datamanagement.domain.model.dataset.DatasetFile; import com.dataengine.datamanagement.domain.model.dataset.StatusConstants; @@ -11,7 +12,6 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.ArgumentCaptor; -import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.springframework.core.io.Resource; @@ -24,7 +24,6 @@ import java.io.InputStream; import java.nio.file.Files; import java.nio.file.Path; -import java.nio.file.Paths; import java.time.LocalDateTime; import java.util.Arrays; import java.util.Collections; @@ -46,6 +45,9 @@ class DatasetFileApplicationServiceTest { @Mock private MultipartFile multipartFile; + @Mock + private FileService fileService; + private DatasetFileApplicationService service; private Dataset sampleDataset; private DatasetFile sampleFile; @@ -54,8 +56,8 @@ class DatasetFileApplicationServiceTest { void setUp() { // 使用临时目录进行测试 String tempDir = System.getProperty("java.io.tmpdir") + "/test-uploads"; - service = new DatasetFileApplicationService(datasetFileMapper, datasetMapper, tempDir); - + service = new DatasetFileApplicationService(datasetFileMapper, datasetMapper, fileService, tempDir); + sampleDataset = new Dataset(); sampleDataset.setId("dataset-id-1"); sampleDataset.setName("Test Dataset"); @@ -92,7 +94,7 @@ void uploadFile_success() throws IOException { // Then assertNotNull(result); verify(datasetMapper).findById("dataset-id-1"); - + ArgumentCaptor fileCaptor = ArgumentCaptor.forClass(DatasetFile.class); verify(datasetFileMapper).insert(fileCaptor.capture()); DatasetFile insertedFile = fileCaptor.getValue(); @@ -115,7 +117,7 @@ void uploadFile_datasetNotFound() { IllegalArgumentException ex = assertThrows(IllegalArgumentException.class, () -> service.uploadFile("not-exist", multipartFile, "desc", "user1")); - + assertTrue(ex.getMessage().contains("Dataset not found")); verify(datasetMapper).findById("not-exist"); verify(datasetFileMapper, never()).insert(any()); @@ -151,7 +153,7 @@ void uploadFile_ioException() throws IOException { RuntimeException ex = assertThrows(RuntimeException.class, () -> service.uploadFile("dataset-id-1", multipartFile, "desc", "user1")); - + assertTrue(ex.getMessage().contains("Could not store file")); verify(datasetFileMapper, never()).insert(any()); } @@ -161,16 +163,16 @@ void uploadFile_ioException() throws IOException { void getDatasetFiles_pagination() { List files = Arrays.asList(sampleFile); Pageable pageable = PageRequest.of(0, 10); - - when(datasetFileMapper.findByCriteria(eq("dataset-id-1"), eq("text/csv"), + + when(datasetFileMapper.findByCriteria(eq("dataset-id-1"), eq("text/csv"), eq(StatusConstants.DatasetFileStatuses.COMPLETED), any(RowBounds.class))).thenReturn(files); - Page result = service.getDatasetFiles("dataset-id-1", "text/csv", + Page result = service.getDatasetFiles("dataset-id-1", "text/csv", StatusConstants.DatasetFileStatuses.COMPLETED, pageable); assertNotNull(result); assertEquals(1, result.getContent().size()); - verify(datasetFileMapper).findByCriteria(eq("dataset-id-1"), eq("text/csv"), + verify(datasetFileMapper).findByCriteria(eq("dataset-id-1"), eq("text/csv"), eq(StatusConstants.DatasetFileStatuses.COMPLETED), any(RowBounds.class)); } @@ -178,7 +180,7 @@ void getDatasetFiles_pagination() { @DisplayName("getDatasetFiles: 空结果集") void getDatasetFiles_emptyResult() { Pageable pageable = PageRequest.of(0, 10); - when(datasetFileMapper.findByCriteria(eq("dataset-id-1"), isNull(), + when(datasetFileMapper.findByCriteria(eq("dataset-id-1"), isNull(), isNull(), any(RowBounds.class))).thenReturn(Collections.emptyList()); Page result = service.getDatasetFiles("dataset-id-1", null, null, pageable); @@ -206,7 +208,7 @@ void getDatasetFile_notFound() { IllegalArgumentException ex = assertThrows(IllegalArgumentException.class, () -> service.getDatasetFile("dataset-id-1", "not-exist")); - + assertTrue(ex.getMessage().contains("File not found")); verify(datasetFileMapper).findById("not-exist"); } @@ -220,7 +222,7 @@ void getDatasetFile_wrongDataset() { IllegalArgumentException ex = assertThrows(IllegalArgumentException.class, () -> service.getDatasetFile("dataset-id-1", "file-id-1")); - + assertTrue(ex.getMessage().contains("does not belong to the specified dataset")); verify(datasetFileMapper).findById("file-id-1"); } @@ -238,7 +240,7 @@ void deleteDatasetFile_success() { verify(datasetFileMapper).findById("file-id-1"); verify(datasetFileMapper).deleteById("file-id-1"); verify(datasetMapper).findById("dataset-id-1"); - + ArgumentCaptor captor = ArgumentCaptor.forClass(Dataset.class); verify(datasetMapper).update(captor.capture()); Dataset updated = captor.getValue(); @@ -253,7 +255,7 @@ void deleteDatasetFile_notFound() { IllegalArgumentException ex = assertThrows(IllegalArgumentException.class, () -> service.deleteDatasetFile("dataset-id-1", "not-exist")); - + assertTrue(ex.getMessage().contains("File not found")); verify(datasetFileMapper).findById("not-exist"); verify(datasetFileMapper, never()).deleteById(anyString()); @@ -265,7 +267,7 @@ void deleteDatasetFile_negativeStats() { Dataset smallDataset = new Dataset(); smallDataset.setFileCount(0L); smallDataset.setSizeBytes(0L); - + when(datasetFileMapper.findById("file-id-1")).thenReturn(sampleFile); when(datasetFileMapper.deleteById("file-id-1")).thenReturn(1); when(datasetMapper.findById("dataset-id-1")).thenReturn(smallDataset); @@ -286,7 +288,7 @@ void downloadFile_success() throws Exception { // 创建临时文件用于测试 Path tempFile = Files.createTempFile("test", ".csv"); Files.write(tempFile, "test content".getBytes()); - + sampleFile.setFilePath(tempFile.toString()); when(datasetFileMapper.findById("file-id-1")).thenReturn(sampleFile); @@ -295,7 +297,7 @@ void downloadFile_success() throws Exception { assertNotNull(result); assertTrue(result.exists()); verify(datasetFileMapper).findById("file-id-1"); - + // 清理临时文件 Files.deleteIfExists(tempFile); } @@ -308,7 +310,7 @@ void downloadFile_fileNotExist() { RuntimeException ex = assertThrows(RuntimeException.class, () -> service.downloadFile("dataset-id-1", "file-id-1")); - + assertTrue(ex.getMessage().contains("File not found")); verify(datasetFileMapper).findById("file-id-1"); } @@ -320,7 +322,7 @@ void downloadFile_recordNotFound() { IllegalArgumentException ex = assertThrows(IllegalArgumentException.class, () -> service.downloadFile("dataset-id-1", "not-exist")); - + assertTrue(ex.getMessage().contains("File not found")); verify(datasetFileMapper).findById("not-exist"); } diff --git a/backend/services/data-management-service/src/test/java/com/dataengine/datamanagement/domain/model/dataset/DatasetFileTest.java b/backend/services/data-management-service/src/test/java/com/dataengine/datamanagement/domain/model/dataset/DatasetFileTest.java deleted file mode 100644 index ed14212c..00000000 --- a/backend/services/data-management-service/src/test/java/com/dataengine/datamanagement/domain/model/dataset/DatasetFileTest.java +++ /dev/null @@ -1,245 +0,0 @@ -package com.dataengine.datamanagement.domain.model.dataset; - -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.DisplayName; -import org.junit.jupiter.api.Test; - -import java.time.LocalDateTime; - -import static org.junit.jupiter.api.Assertions.*; - -class DatasetFileTest { - - private DatasetFile datasetFile; - - @BeforeEach - void setUp() { - datasetFile = new DatasetFile(); - } - - @Test - @DisplayName("constructor: 默认构造函数") - void testDefaultConstructor() { - DatasetFile file = new DatasetFile(); - - assertNull(file.getId()); - assertNull(file.getDatasetId()); - assertNull(file.getFileName()); - assertNull(file.getFilePath()); - assertNull(file.getFileType()); - assertNull(file.getFileSize()); - assertNull(file.getFileFormat()); - assertNull(file.getUploadTime()); - assertNull(file.getLastAccessTime()); - assertNull(file.getStatus()); - } - - @Test - @DisplayName("constructor: 参数构造函数") - void testParameterizedConstructor() { - String datasetId = "dataset-123"; - String fileName = "test.csv"; - String filePath = "/path/to/test.csv"; - String fileType = "TEXT"; - Long fileSize = 1024L; - String fileFormat = "csv"; - - DatasetFile file = new DatasetFile(datasetId, fileName, filePath, fileType, fileSize, fileFormat); - - assertNull(file.getId()); // ID需要单独设置 - assertEquals(datasetId, file.getDatasetId()); - assertEquals(fileName, file.getFileName()); - assertEquals(filePath, file.getFilePath()); - assertEquals(fileType, file.getFileType()); - assertEquals(fileSize, file.getFileSize()); - assertEquals(fileFormat, file.getFileFormat()); - assertEquals(StatusConstants.DatasetFileStatuses.COMPLETED, file.getStatus()); - assertNull(file.getUploadTime()); - assertNull(file.getLastAccessTime()); - } - - @Test - @DisplayName("gettersAndSetters: 基本属性访问") - void testGettersAndSetters() { - String id = "file-id-123"; - String datasetId = "dataset-id-456"; - String fileName = "document.pdf"; - String filePath = "/uploads/document.pdf"; - String fileType = "APPLICATION"; - Long fileSize = 2048L; - String fileFormat = "pdf"; - LocalDateTime uploadTime = LocalDateTime.now(); - LocalDateTime lastAccessTime = LocalDateTime.now().minusDays(1); - String status = StatusConstants.DatasetFileStatuses.PROCESSING; - - datasetFile.setId(id); - datasetFile.setDatasetId(datasetId); - datasetFile.setFileName(fileName); - datasetFile.setFilePath(filePath); - datasetFile.setFileType(fileType); - datasetFile.setFileSize(fileSize); - datasetFile.setFileFormat(fileFormat); - datasetFile.setUploadTime(uploadTime); - datasetFile.setLastAccessTime(lastAccessTime); - datasetFile.setStatus(status); - - assertEquals(id, datasetFile.getId()); - assertEquals(datasetId, datasetFile.getDatasetId()); - assertEquals(fileName, datasetFile.getFileName()); - assertEquals(filePath, datasetFile.getFilePath()); - assertEquals(fileType, datasetFile.getFileType()); - assertEquals(fileSize, datasetFile.getFileSize()); - assertEquals(fileFormat, datasetFile.getFileFormat()); - assertEquals(uploadTime, datasetFile.getUploadTime()); - assertEquals(lastAccessTime, datasetFile.getLastAccessTime()); - assertEquals(status, datasetFile.getStatus()); - } - - @Test - @DisplayName("setters: 设置null值") - void testSettersWithNullValues() { - // 先设置一些值 - datasetFile.setId("test-id"); - datasetFile.setFileName("test.txt"); - datasetFile.setFileSize(1024L); - - // 然后设置为null - datasetFile.setId(null); - datasetFile.setFileName(null); - datasetFile.setFileSize(null); - - assertNull(datasetFile.getId()); - assertNull(datasetFile.getFileName()); - assertNull(datasetFile.getFileSize()); - } - - @Test - @DisplayName("fileSize: 处理0和负数") - void testFileSizeEdgeCases() { - // 测试0大小 - datasetFile.setFileSize(0L); - assertEquals(0L, datasetFile.getFileSize()); - - // 测试大文件 - Long largeSize = Long.MAX_VALUE; - datasetFile.setFileSize(largeSize); - assertEquals(largeSize, datasetFile.getFileSize()); - } - - @Test - @DisplayName("status: 不同状态值") - void testDifferentStatusValues() { - String[] statuses = { - StatusConstants.DatasetFileStatuses.UPLOADED, - StatusConstants.DatasetFileStatuses.PROCESSING, - StatusConstants.DatasetFileStatuses.COMPLETED, - StatusConstants.DatasetFileStatuses.ERROR - }; - - for (String status : statuses) { - datasetFile.setStatus(status); - assertEquals(status, datasetFile.getStatus()); - } - } - - @Test - @DisplayName("fileType: 不同文件类型") - void testDifferentFileTypes() { - String[] fileTypes = {"TEXT", "IMAGE", "VIDEO", "AUDIO", "APPLICATION"}; - - for (String fileType : fileTypes) { - datasetFile.setFileType(fileType); - assertEquals(fileType, datasetFile.getFileType()); - } - } - - @Test - @DisplayName("timestamps: 时间戳设置和获取") - void testTimestamps() { - LocalDateTime now = LocalDateTime.now(); - LocalDateTime past = now.minusDays(1); - LocalDateTime future = now.plusDays(1); - - datasetFile.setUploadTime(now); - datasetFile.setLastAccessTime(past); - - assertEquals(now, datasetFile.getUploadTime()); - assertEquals(past, datasetFile.getLastAccessTime()); - - // 测试更新访问时间 - datasetFile.setLastAccessTime(future); - assertEquals(future, datasetFile.getLastAccessTime()); - } - - @Test - @DisplayName("filePath: 不同路径格式") - void testDifferentPathFormats() { - String[] paths = { - "/absolute/path/file.txt", - "relative/path/file.txt", - "C:\\Windows\\Path\\file.txt", - "/home/user/documents/file.pdf", - "./local/file.csv" - }; - - for (String path : paths) { - datasetFile.setFilePath(path); - assertEquals(path, datasetFile.getFilePath()); - } - } - - @Test - @DisplayName("fileName: 不同文件名格式") - void testDifferentFileNames() { - String[] fileNames = { - "simple.txt", - "file-with-dashes.csv", - "file_with_underscores.json", - "file with spaces.pdf", - "文件名中文.docx", - "file.with.multiple.dots.zip", - "no-extension" - }; - - for (String fileName : fileNames) { - datasetFile.setFileName(fileName); - assertEquals(fileName, datasetFile.getFileName()); - } - } - - @Test - @DisplayName("constructor: 参数构造函数处理null值") - void testParameterizedConstructorWithNulls() { - DatasetFile file = new DatasetFile(null, null, null, null, null, null); - - assertNull(file.getDatasetId()); - assertNull(file.getFileName()); - assertNull(file.getFilePath()); - assertNull(file.getFileType()); - assertNull(file.getFileSize()); - assertNull(file.getFileFormat()); - assertEquals(StatusConstants.DatasetFileStatuses.COMPLETED, file.getStatus()); - } - - @Test - @DisplayName("immutability: 对象状态变更") - void testObjectStateChanges() { - // 初始状态 - assertNull(datasetFile.getId()); - assertNull(datasetFile.getStatus()); - - // 设置初始值 - datasetFile.setId("initial-id"); - datasetFile.setStatus(StatusConstants.DatasetFileStatuses.UPLOADED); - - assertEquals("initial-id", datasetFile.getId()); - assertEquals(StatusConstants.DatasetFileStatuses.UPLOADED, datasetFile.getStatus()); - - // 更改值 - datasetFile.setId("new-id"); - datasetFile.setStatus(StatusConstants.DatasetFileStatuses.PROCESSING); - - assertEquals("new-id", datasetFile.getId()); - assertEquals(StatusConstants.DatasetFileStatuses.PROCESSING, datasetFile.getStatus()); - } -} diff --git a/backend/services/data-management-service/src/test/java/com/dataengine/datamanagement/integration/DataManagementIntegrationTest.java b/backend/services/data-management-service/src/test/java/com/dataengine/datamanagement/integration/DataManagementIntegrationTest.java index b65ac3c3..a73a001a 100644 --- a/backend/services/data-management-service/src/test/java/com/dataengine/datamanagement/integration/DataManagementIntegrationTest.java +++ b/backend/services/data-management-service/src/test/java/com/dataengine/datamanagement/integration/DataManagementIntegrationTest.java @@ -8,6 +8,8 @@ import com.dataengine.datamanagement.infrastructure.persistence.mapper.DatasetFileMapper; import com.dataengine.datamanagement.infrastructure.persistence.mapper.DatasetMapper; import com.dataengine.datamanagement.infrastructure.persistence.mapper.TagMapper; +import com.dataengine.datamanagement.interfaces.dto.CreateDatasetRequest; +import com.dataengine.datamanagement.interfaces.dto.DatasetPagingQuery; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; @@ -15,8 +17,6 @@ import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.springframework.data.domain.Page; -import org.springframework.data.domain.PageImpl; -import org.springframework.data.domain.PageRequest; import java.time.LocalDateTime; import java.util.*; @@ -96,9 +96,8 @@ void completeDatasetWorkflow() { when(tagMapper.updateUsageCount(eq(createdTag.getId()), eq(1L))).thenReturn(1); when(tagMapper.insertDatasetTag(anyString(), eq(createdTag.getId()))).thenReturn(1); - Dataset dataset = datasetService.createDataset( - datasetName, datasetDesc, datasetType, - tagNames, 1L, "/data/mnist", "csv", "user1" + Dataset dataset = datasetService.createDataset(new CreateDatasetRequest( + datasetName, datasetDesc, datasetType,tagNames, "1L", "/data/mnist") ); assertNotNull(dataset); @@ -122,7 +121,6 @@ void completeDatasetWorkflow() { // === 阶段4: 搜索数据集 === List searchResult = Arrays.asList(createdDataset); - Page searchPage = new PageImpl<>(searchResult, PageRequest.of(0, 10), 1); List datasetTags = Arrays.asList(createdTag); when(datasetMapper.findByCriteria(eq("IMAGE"), isNull(), eq("MNIST"), @@ -131,9 +129,7 @@ void completeDatasetWorkflow() { eq(Arrays.asList(tagName)))).thenReturn(1L); when(tagMapper.findByDatasetId(anyString())).thenReturn(datasetTags); - Page searchResults = datasetService.getDatasets( - "IMAGE", null, "MNIST", Arrays.asList(tagName), PageRequest.of(0, 10) - ); + Page searchResults = datasetService.getDatasets(new DatasetPagingQuery("IMAGE", null, "MNIST", tagName)); assertEquals(1, searchResults.getContent().size()); assertEquals(1L, searchResults.getTotalElements()); @@ -168,9 +164,9 @@ void tagUsageCountWorkflow() { when(tagMapper.updateUsageCount(eq(nlpTag.getId()), eq(1L))).thenReturn(1); when(tagMapper.insertDatasetTag(anyString(), eq(nlpTag.getId()))).thenReturn(1); - datasetService.createDataset( - "文本分类数据集", "情感分析数据", "TEXT", - Arrays.asList("nlp"), 2L, "/data/text", "json", "user2" + datasetService.createDataset(new CreateDatasetRequest( + "文本分类数据集", "情感分析数据", "TEXT", + Arrays.asList("nlp"), "2L", "/data/text") ); // 第二个数据集使用该标签 @@ -182,10 +178,11 @@ void tagUsageCountWorkflow() { when(datasetMapper.findByName("问答数据集")).thenReturn(null); when(datasetMapper.findById(anyString())).thenReturn(dataset2); when(tagMapper.updateUsageCount(eq(nlpTag.getId()), eq(2L))).thenReturn(1); - when(tagMapper.insertDatasetTag(anyString(), eq(nlpTag.getId()))).thenReturn(1); datasetService.createDataset( + when(tagMapper.insertDatasetTag(anyString(), eq(nlpTag.getId()))).thenReturn(1); + datasetService.createDataset(new CreateDatasetRequest( "问答数据集", "机器阅读理解", "TEXT", - Arrays.asList("nlp"), 3L, "/data/qa", "json", "user3" - ); + Arrays.asList("nlp"), "3L", "/data/qa" + )); // 验证标签使用次数更新 verify(tagMapper).updateUsageCount(eq(nlpTag.getId()), eq(1L)); @@ -224,18 +221,13 @@ void datasetUpdateWorkflow() { when(tagMapper.updateUsageCount(eq(oldTag.getId()), eq(6L))).thenReturn(1); when(tagMapper.insertDatasetTag(anyString(), eq(oldTag.getId()))).thenReturn(1); - Dataset created = datasetService.createDataset( - "初始数据集", "初始描述", "TEXT", - Arrays.asList("old-tag"), 1L, "/path", "csv", "user1" - ); - // 重置部分mock以准备更新操作 reset(tagMapper); when(tagMapper.deleteDatasetTagsByDatasetId("dataset-update-001")).thenReturn(1); when(tagMapper.findByName("new-tag")).thenReturn(newTag); when(tagMapper.updateUsageCount(eq(newTag.getId()), eq(1L))).thenReturn(1); when(tagMapper.insertDatasetTag(eq("dataset-update-001"), eq(newTag.getId()))).thenReturn(1); - + // 更新数据集 - 更改标签和状态 Dataset updatedDataset = new Dataset(); updatedDataset.setId("dataset-update-001"); @@ -327,8 +319,8 @@ void errorHandlingWorkflow() { when(datasetMapper.findByName("existing-dataset")).thenReturn(existingDataset); IllegalArgumentException datasetException = assertThrows(IllegalArgumentException.class, - () -> datasetService.createDataset("existing-dataset", "desc", "TEXT", - null, 1L, "/path", "csv", "user")); + () -> datasetService.createDataset(new CreateDatasetRequest("existing-dataset", "desc", "TEXT", + null, "1L", "/path"))); assertTrue(datasetException.getMessage().contains("already exists")); // 获取不存在的资源 diff --git a/backend/services/data-management-service/src/test/java/com/dataengine/datamanagement/interfaces/rest/DatasetControllerTest.java b/backend/services/data-management-service/src/test/java/com/dataengine/datamanagement/interfaces/rest/DatasetControllerTest.java index 9a93669d..5c1daa47 100644 --- a/backend/services/data-management-service/src/test/java/com/dataengine/datamanagement/interfaces/rest/DatasetControllerTest.java +++ b/backend/services/data-management-service/src/test/java/com/dataengine/datamanagement/interfaces/rest/DatasetControllerTest.java @@ -1,5 +1,6 @@ package com.dataengine.datamanagement.interfaces.rest; +import com.dataengine.common.interfaces.PagedResponse; import com.dataengine.common.interfaces.Response; import com.dataengine.datamanagement.application.service.DatasetApplicationService; import com.dataengine.datamanagement.domain.model.dataset.Dataset; @@ -52,7 +53,7 @@ void setUp() { sampleDataset.setDescription("Test description"); sampleDataset.setDatasetType("CSV"); sampleDataset.setStatus(StatusConstants.DatasetStatuses.ACTIVE); - sampleDataset.setDataSourceId(1L); + sampleDataset.setDataSourceId("1L"); sampleDataset.setPath("/test/path"); sampleDataset.setFileCount(10L); sampleDataset.setSizeBytes(1024L); @@ -69,12 +70,11 @@ void getDatasets_success() { // Given List datasets = Collections.singletonList(sampleDataset); Page page = new PageImpl<>(datasets, PageRequest.of(0, 20), 1); - when(datasetApplicationService.getDatasets(eq("CSV"), eq("ACTIVE"), eq("test"), - eq(Arrays.asList("tag1", "tag2")), any())).thenReturn(page); + when(datasetApplicationService.getDatasets(any())).thenReturn(page); // When - ResponseEntity> response = controller.getDatasets(0, 20, "CSV", - "tag1,tag2", "test", "ACTIVE"); + ResponseEntity>> response = controller.getDatasets(new DatasetPagingQuery("CSV", + "tag1,tag2", "test", "ACTIVE")); // Then assertEquals(HttpStatus.OK, response.getStatusCode()); @@ -84,51 +84,46 @@ void getDatasets_success() { assertEquals(20, response.getBody().getData().getSize()); assertEquals(1, response.getBody().getData().getTotalElements()); assertEquals(1, response.getBody().getData().getTotalPages()); - assertTrue(response.getBody().getData().getFirst()); - assertTrue(response.getBody().getData().getLast()); DatasetResponse datasetResponse = response.getBody().getData().getContent().getFirst(); assertEquals("dataset-id-1", datasetResponse.getId()); assertEquals("Test Dataset", datasetResponse.getName()); - assertEquals("CSV", datasetResponse.getType().getCode()); + assertEquals("CSV", datasetResponse.getDatasetType()); assertEquals(1, datasetResponse.getTags().size()); assertEquals("test-tag", datasetResponse.getTags().getFirst().getName()); - verify(datasetApplicationService).getDatasets(eq("CSV"), eq("ACTIVE"), eq("test"), - eq(Arrays.asList("tag1", "tag2")), any()); + verify(datasetApplicationService).getDatasets(any()); } @Test @DisplayName("datasetsGet: 默认分页参数") void getDatasets_defaultPaging() { Page emptyPage = new PageImpl<>(Collections.emptyList(), PageRequest.of(0, 20), 0); - when(datasetApplicationService.getDatasets(isNull(), isNull(), isNull(), isNull(), any())) + when(datasetApplicationService.getDatasets(any())) .thenReturn(emptyPage); - ResponseEntity> response = controller.getDatasets(null, null, null, - null, null, null); + ResponseEntity>> response = controller.getDatasets(new DatasetPagingQuery()); assertEquals(HttpStatus.OK, response.getStatusCode()); assertNotNull(response.getBody()); assertTrue(response.getBody().getData().getContent().isEmpty()); - verify(datasetApplicationService).getDatasets(isNull(), isNull(), isNull(), isNull(), - argThat(pageable -> pageable.getPageNumber() == 0 && pageable.getPageSize() == 20)); + verify(datasetApplicationService).getDatasets(argThat(pageable -> pageable.getPage() == 0 && pageable.getSize() == 20)); } @Test @DisplayName("datasetsGet: 标签参数解析") void getDatasets_tagsProcessing() { Page page = new PageImpl<>(Collections.emptyList(), PageRequest.of(0, 20), 0); - when(datasetApplicationService.getDatasets(isNull(), isNull(), isNull(), any(), any())) + when(datasetApplicationService.getDatasets(any())) .thenReturn(page); // 测试空标签 - controller.getDatasets(null, null, null, "", null, null); - verify(datasetApplicationService).getDatasets(isNull(), isNull(), isNull(), isNull(), any()); + controller.getDatasets(new DatasetPagingQuery(null, "", null, null)); + verify(datasetApplicationService).getDatasets(any()); // 测试空白标签 - controller.getDatasets(null, null, null, " ", null, null); - verify(datasetApplicationService, times(2)).getDatasets(isNull(), isNull(), isNull(), isNull(), any()); + controller.getDatasets(new DatasetPagingQuery(null, " ", null, null)); + verify(datasetApplicationService, times(2)).getDatasets(any()); } @Test @@ -137,23 +132,19 @@ void createDataset_success() { CreateDatasetRequest request = new CreateDatasetRequest(); request.setName("New Dataset"); request.setDescription("New description"); - request.setType("JSON"); + request.setDatasetType("JSON"); request.setTags(Arrays.asList("tag1", "tag2")); request.setDataSource("123"); request.setTargetLocation("/new/path"); - when(datasetApplicationService.createDataset(eq("New Dataset"), eq("New description"), - eq("JSON"), eq(Arrays.asList("tag1", "tag2")), eq(123L), eq("/new/path"), - isNull(), eq("system"))).thenReturn(sampleDataset); + when(datasetApplicationService.createDataset(any())).thenReturn(sampleDataset); ResponseEntity> response = controller.createDataset(request); assertEquals(HttpStatus.CREATED, response.getStatusCode()); assertNotNull(response.getBody()); assertEquals("dataset-id-1", response.getBody().getData().getId()); - verify(datasetApplicationService).createDataset(eq("New Dataset"), eq("New description"), - eq("JSON"), eq(Arrays.asList("tag1", "tag2")), eq(123L), eq("/new/path"), - isNull(), eq("system")); + verify(datasetApplicationService).createDataset(any()); } @Test @@ -163,15 +154,13 @@ void createDataset_invalidDataSourceId() { request.setName("New Dataset"); request.setDataSource("invalid-id"); - when(datasetApplicationService.createDataset(eq("New Dataset"), isNull(), - isNull(), isNull(), isNull(), isNull(), isNull(), eq("system"))) + when(datasetApplicationService.createDataset(any())) .thenReturn(sampleDataset); ResponseEntity> response = controller.createDataset(request); assertEquals(HttpStatus.CREATED, response.getStatusCode()); - verify(datasetApplicationService).createDataset(eq("New Dataset"), isNull(), - isNull(), isNull(), isNull(), isNull(), isNull(), eq("system")); + verify(datasetApplicationService).createDataset(any()); } @Test @@ -180,8 +169,7 @@ void createDataset_serviceException() { CreateDatasetRequest request = new CreateDatasetRequest(); request.setName("Duplicate Dataset"); - when(datasetApplicationService.createDataset(anyString(), any(), any(), any(), - any(), any(), any(), anyString())).thenThrow(new IllegalArgumentException("Already exists")); + when(datasetApplicationService.createDataset(any())).thenThrow(new IllegalArgumentException("Already exists")); ResponseEntity> response = controller.createDataset(request); diff --git a/backend/services/main-application/src/main/java/com/dataengine/main/DataEnginePlatformApplication.java b/backend/services/main-application/src/main/java/com/dataengine/main/DataEnginePlatformApplication.java index a37e1220..ffbc631a 100644 --- a/backend/services/main-application/src/main/java/com/dataengine/main/DataEnginePlatformApplication.java +++ b/backend/services/main-application/src/main/java/com/dataengine/main/DataEnginePlatformApplication.java @@ -36,7 +36,8 @@ "com.dataengine.collection.infrastructure.persistence.mapper", "com.dataengine.datamanagement.infrastructure.persistence.mapper", "com.dataengine.operator.infrastructure.persistence.mapper", - "com.dataengine.cleaning.infrastructure.persistence.mapper" + "com.dataengine.cleaning.infrastructure.persistence.mapper", + "com.dataengine.common.infrastructure.mapper" }) @EnableTransactionManagement @EnableAsync diff --git a/backend/shared/domain-common/src/main/java/com/dataengine/common/domain/model/ChunkUploadPreRequest.java b/backend/shared/domain-common/src/main/java/com/dataengine/common/domain/model/ChunkUploadPreRequest.java new file mode 100644 index 00000000..4dc87949 --- /dev/null +++ b/backend/shared/domain-common/src/main/java/com/dataengine/common/domain/model/ChunkUploadPreRequest.java @@ -0,0 +1,48 @@ +package com.dataengine.common.domain.model; + +import lombok.Builder; +import lombok.Getter; +import lombok.Setter; + +import java.time.LocalDateTime; + +/** + * 文件切片上传请求实体(与数据库表 t_chunk_upload_request 对齐) + */ +@Getter +@Setter +@Builder +public class ChunkUploadPreRequest { + + private String id; // UUID + private Integer totalFileNum; // 总文件数 + private Integer uploadedFileNum; // 已上传文件数 + private String uploadPath; // 文件路径 + private LocalDateTime timeout; // 上传请求超时时间 + private String serviceId; // 上传请求所属服务:DATA-MANAGEMENT(数据管理) + private String checkInfo; // 业务信息 + + /** + * 增加已上传文件数 + */ + public void incrementUploadedFileNum() { + if (this.uploadedFileNum == null) { + this.uploadedFileNum = 1; + } + this.uploadedFileNum++; + } + + /** + * 检查是否已完成上传 + */ + public boolean isUploadComplete() { + return this.uploadedFileNum != null && this.uploadedFileNum.equals(this.totalFileNum); + } + + /** + * 检查是否已超时 + */ + public boolean isRequestTimeout() { + return this.timeout != null && LocalDateTime.now().isAfter(this.timeout); + } +} diff --git a/backend/shared/domain-common/src/main/java/com/dataengine/common/domain/model/ChunkUploadRequest.java b/backend/shared/domain-common/src/main/java/com/dataengine/common/domain/model/ChunkUploadRequest.java new file mode 100644 index 00000000..f0576c71 --- /dev/null +++ b/backend/shared/domain-common/src/main/java/com/dataengine/common/domain/model/ChunkUploadRequest.java @@ -0,0 +1,36 @@ +package com.dataengine.common.domain.model; + +import lombok.Getter; +import lombok.Setter; +import org.springframework.web.multipart.MultipartFile; + +/** + * 文件切片上传请求实体(与数据库表 t_chunk_upload_request 对齐) + */ +@Getter +@Setter +public class ChunkUploadRequest { + /** 预上传返回的id,用来确认同一个任务 */ + private String reqId; + + /** 文件编号,用于标识批量上传中的第几个文件 */ + private int fileNo; + + /** 文件名称 */ + private String fileName; + + /** 文件总分块数量 */ + private int totalChunkNum; + + /** 当前分块编号,从1开始 */ + private int chunkNo; + + /** 上传的文件分块内容 */ + private MultipartFile file; + + /** 切片大小 */ + private Long fileSize; + + /** 文件分块的校验和(十六进制字符串),用于验证文件完整性 */ + private String checkSumHex; +} diff --git a/backend/shared/domain-common/src/main/java/com/dataengine/common/domain/model/FileUploadResult.java b/backend/shared/domain-common/src/main/java/com/dataengine/common/domain/model/FileUploadResult.java new file mode 100644 index 00000000..3defcf8e --- /dev/null +++ b/backend/shared/domain-common/src/main/java/com/dataengine/common/domain/model/FileUploadResult.java @@ -0,0 +1,24 @@ +package com.dataengine.common.domain.model; + +import lombok.Builder; +import lombok.Getter; +import lombok.Setter; + +import java.io.File; + +@Getter +@Setter +@Builder +public class FileUploadResult { + /** 切片是否已经全部上传 */ + boolean isAllFilesUploaded; + + /** 业务上传信息 */ + String checkInfo; + + /** 保存的文件 */ + File savedFile; + + /** 真实文件名 */ + String fileName; +} diff --git a/backend/shared/domain-common/src/main/java/com/dataengine/common/domain/model/UploadCheckInfo.java b/backend/shared/domain-common/src/main/java/com/dataengine/common/domain/model/UploadCheckInfo.java new file mode 100644 index 00000000..ecb8e710 --- /dev/null +++ b/backend/shared/domain-common/src/main/java/com/dataengine/common/domain/model/UploadCheckInfo.java @@ -0,0 +1,7 @@ +package com.dataengine.common.domain.model; + +/** + * 上传检查信息基类 + */ +public abstract class UploadCheckInfo { +} diff --git a/backend/shared/domain-common/src/main/java/com/dataengine/common/domain/service/FileService.java b/backend/shared/domain-common/src/main/java/com/dataengine/common/domain/service/FileService.java new file mode 100644 index 00000000..fda714aa --- /dev/null +++ b/backend/shared/domain-common/src/main/java/com/dataengine/common/domain/service/FileService.java @@ -0,0 +1,91 @@ +package com.dataengine.common.domain.service; + +import com.dataengine.common.domain.model.ChunkUploadPreRequest; +import com.dataengine.common.domain.model.ChunkUploadRequest; +import com.dataengine.common.domain.model.FileUploadResult; +import com.dataengine.common.domain.utils.ChunksSaver; +import com.dataengine.common.infrastructure.mapper.ChunkUploadRequestMapper; +import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; + +import java.io.File; +import java.time.LocalDateTime; +import java.util.Objects; +import java.util.Optional; +import java.util.UUID; + +/** + * 文件服务 + */ +@Component +public class FileService { + private static final int DEFAULT_TIMEOUT = 120; + + private final ChunkUploadRequestMapper chunkUploadRequestMapper; + + public FileService(ChunkUploadRequestMapper chunkUploadRequestMapper) { + this.chunkUploadRequestMapper = chunkUploadRequestMapper; + } + + /** + * 预上传 + */ + @Transactional + public String preUpload(ChunkUploadPreRequest chunkUploadPreRequest) { + chunkUploadPreRequest.setId(UUID.randomUUID().toString()); + chunkUploadPreRequest.setTimeout(LocalDateTime.now().plusSeconds(DEFAULT_TIMEOUT)); + chunkUploadRequestMapper.insert(chunkUploadPreRequest); + return chunkUploadPreRequest.getId(); + } + + /** + * 切片上传 + */ + @Transactional + public FileUploadResult chunkUpload(ChunkUploadRequest uploadFileRequest) { + uploadFileRequest.setFileSize(uploadFileRequest.getFile().getSize()); + ChunkUploadPreRequest preRequest = chunkUploadRequestMapper.findById(uploadFileRequest.getReqId()); + if (preRequest == null || preRequest.isUploadComplete() || preRequest.isRequestTimeout()) { + throw new IllegalArgumentException("预上传请求不存在"); + } + File savedFile; + if (uploadFileRequest.getTotalChunkNum() > 1) { + savedFile = uploadChunk(uploadFileRequest, preRequest); + } else { + savedFile = uploadFile(uploadFileRequest, preRequest); + } + if (chunkUploadRequestMapper.update(preRequest) == 0) { + throw new IllegalArgumentException("预上传请求不存在"); + } + boolean isFinish = Objects.equals(preRequest.getUploadedFileNum(), preRequest.getTotalFileNum()); + if (isFinish) { + // 删除存分片的临时路径 + ChunksSaver.deleteFiles(new File(preRequest.getUploadPath(), + String.format(ChunksSaver.TEMP_DIR_NAME_FORMAT, preRequest.getId())).getPath()); + chunkUploadRequestMapper.deleteById(preRequest.getId()); + } + return FileUploadResult.builder() + .isAllFilesUploaded(isFinish) + .checkInfo(preRequest.getCheckInfo()) + .savedFile(savedFile) + .fileName(uploadFileRequest.getFileName()) + .build(); + } + + private File uploadFile(ChunkUploadRequest fileUploadRequest, ChunkUploadPreRequest preRequest) { + File savedFile = ChunksSaver.saveFile(fileUploadRequest, preRequest); + preRequest.setTimeout(LocalDateTime.now().plusSeconds(DEFAULT_TIMEOUT)); + preRequest.incrementUploadedFileNum(); + return savedFile; + } + + private File uploadChunk(ChunkUploadRequest fileUploadRequest, ChunkUploadPreRequest preRequest) { + Optional savedFile = ChunksSaver.save(fileUploadRequest, preRequest); + if (savedFile.isPresent()) { + preRequest.incrementUploadedFileNum(); + return savedFile.get(); + } + preRequest.setTimeout(LocalDateTime.now().plusSeconds(DEFAULT_TIMEOUT)); + return null; + } +} diff --git a/backend/shared/domain-common/src/main/java/com/dataengine/common/domain/utils/AnalyzerUtils.java b/backend/shared/domain-common/src/main/java/com/dataengine/common/domain/utils/AnalyzerUtils.java new file mode 100644 index 00000000..ba2e2978 --- /dev/null +++ b/backend/shared/domain-common/src/main/java/com/dataengine/common/domain/utils/AnalyzerUtils.java @@ -0,0 +1,40 @@ +package com.dataengine.common.domain.utils; + +import org.springframework.util.StringUtils; + +import java.util.Collections; +import java.util.List; +import java.util.Locale; + +/** + * 解析工具类 + */ +public class AnalyzerUtils { + /** zip压缩包文件后缀类型 */ + public static final String TYPE_ZIP = "zip"; + + /** tar压缩包文件后缀类型 */ + public static final String TYPE_TAR_GZ = "tar.gz"; + + private static final List SPECIAL_EXTENSIONS = Collections.singletonList(TYPE_TAR_GZ); + + /** + * 从文件路径获取文件后缀类型 + * + * @param filePath 文件类型 + * @return 文件后缀类型 + */ + public static String getExtension(final String filePath) { + String filename = CommonUtils.trimFilePath(filePath); + for (String ext : SPECIAL_EXTENSIONS) { + if (StringUtils.endsWithIgnoreCase(filename, "." + ext)) { + return ext; + } + } + int firstDotIndex = filename.lastIndexOf("."); + if (firstDotIndex == -1) { + return ""; + } + return filename.substring(firstDotIndex + 1).toLowerCase(Locale.ROOT); + } +} diff --git a/backend/shared/domain-common/src/main/java/com/dataengine/common/domain/utils/ChunksSaver.java b/backend/shared/domain-common/src/main/java/com/dataengine/common/domain/utils/ChunksSaver.java new file mode 100644 index 00000000..9cd1cbcd --- /dev/null +++ b/backend/shared/domain-common/src/main/java/com/dataengine/common/domain/utils/ChunksSaver.java @@ -0,0 +1,135 @@ +package com.dataengine.common.domain.utils; + +import com.dataengine.common.domain.model.ChunkUploadPreRequest; +import com.dataengine.common.domain.model.ChunkUploadRequest; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.io.FileUtils; +import org.springframework.web.multipart.MultipartFile; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.time.LocalDateTime; +import java.time.temporal.ChronoUnit; +import java.util.Locale; +import java.util.Optional; + +@Slf4j +public class ChunksSaver { + /** + * 分片保存的临时目录 + */ + public static final String TEMP_DIR_NAME_FORMAT = "req_%s_chunks"; + + /** + * 保存分片 + * + * @param fileUploadRequest 上传分片的请求 + * @param preUploadReq 上传文件的请求 + * @return 保存后完整的文件 + */ + public static Optional save(ChunkUploadRequest fileUploadRequest, ChunkUploadPreRequest preUploadReq) { + Path uploadPath = Paths.get(preUploadReq.getUploadPath(), + String.format(TEMP_DIR_NAME_FORMAT, preUploadReq.getId())); + LocalDateTime startTime = LocalDateTime.now(); + // 临时文件名为文件序号 + File targetFile = new File(uploadPath.toString(), String.valueOf(fileUploadRequest.getFileNo())); + + // 追加分片到目标文件末尾 + appendToTargetFile(targetFile, getFileInputStream(fileUploadRequest.getFile())); + + // 判断是否分片已经全部上传,全部上传后将重组文件移动到指定路径,否则返回空 + if (fileUploadRequest.getTotalChunkNum() != fileUploadRequest.getChunkNo()) { + log.debug("save chunk {} cost {}", fileUploadRequest.getChunkNo(), + ChronoUnit.MILLIS.between(startTime, LocalDateTime.now())); + return Optional.empty(); + } + + File finalFile = new File(preUploadReq.getUploadPath(), fileUploadRequest.getFileName()); + if (!targetFile.renameTo(finalFile)) { + log.error("failed to mv file:{}, req Id:{}", targetFile.getName(), fileUploadRequest.getReqId()); + throw new IllegalArgumentException("failed to move file to target dir"); + } + log.debug("save chunk {} cost {}", fileUploadRequest.getChunkNo(), + ChronoUnit.MILLIS.between(startTime, LocalDateTime.now())); + return Optional.of(finalFile); + } + + private static InputStream getFileInputStream(MultipartFile file) { + try { + return file.getInputStream(); + } catch (IOException e) { + log.error("get uploaded file input stream failed", e); + throw new IllegalArgumentException(); + } + } + + /** + * 保存文件 + * + * @param fileUploadRequest 上传分片的请求 + * @param preUploadReq 上传文件的请求 + */ + public static File saveFile(ChunkUploadRequest fileUploadRequest, ChunkUploadPreRequest preUploadReq) { + // 保存文件 + File targetFile = new File(preUploadReq.getUploadPath(), fileUploadRequest.getFileName()); + try { + log.info("file path {}, file size {}", targetFile.toPath(), targetFile.getTotalSpace()); + FileUtils.copyInputStreamToFile(getFileInputStream(fileUploadRequest.getFile()), targetFile); + } catch (IOException e) { + throw new IllegalArgumentException(); + } + return targetFile; + } + + /** + * 追加分片到文件末尾 + * + * @param targetFile 目标文件 + * @param inputStream file stream + */ + public static void appendToTargetFile(File targetFile, InputStream inputStream) { + try { + byte[] buffer = new byte[1024 * 1024]; + int byteRead; + while ((byteRead = inputStream.read(buffer)) != -1) { + FileUtils.writeByteArrayToFile(targetFile, buffer, 0, byteRead, true); + } + } catch (IOException e) { + throw new IllegalArgumentException(); + } + } + + /** + * 删除指定路径下的所有文件 + * + * @param uploadPath 文件路径 + */ + public static void deleteFiles(String uploadPath) { + File dic = new File(uploadPath); + if (!dic.exists()) { + return; + } + File[] files = dic.listFiles(); + if (files == null || files.length == 0) { + dic.delete(); + return; + } + try { + for (File file : files) { + if (file.isDirectory()) { + deleteFiles(file.getPath()); + } else { + file.delete(); + } + } + if (dic.exists()) { + dic.delete(); + } + } catch (SecurityException e) { + log.warn("Fail to delete file", e); + } + } +} diff --git a/backend/shared/domain-common/src/main/java/com/dataengine/common/domain/utils/CommonUtils.java b/backend/shared/domain-common/src/main/java/com/dataengine/common/domain/utils/CommonUtils.java new file mode 100644 index 00000000..3c522d98 --- /dev/null +++ b/backend/shared/domain-common/src/main/java/com/dataengine/common/domain/utils/CommonUtils.java @@ -0,0 +1,24 @@ +package com.dataengine.common.domain.utils; + +import java.io.File; + +/** + * 通用工具类 + */ +public class CommonUtils { + /** + * 从文件路径中获取文件名(带后缀) + * + * @param filePath 文件路径 + * @return 文件名(带后缀) + */ + public static String trimFilePath(String filePath) { + int lastSlashIndex = filePath.lastIndexOf(File.separator); + + String filename = filePath; + if (lastSlashIndex != -1) { + filename = filePath.substring(lastSlashIndex + 1); + } + return filename; + } +} diff --git a/backend/shared/domain-common/src/main/java/com/dataengine/common/infrastructure/mapper/ChunkUploadRequestMapper.java b/backend/shared/domain-common/src/main/java/com/dataengine/common/infrastructure/mapper/ChunkUploadRequestMapper.java new file mode 100644 index 00000000..8a70ac05 --- /dev/null +++ b/backend/shared/domain-common/src/main/java/com/dataengine/common/infrastructure/mapper/ChunkUploadRequestMapper.java @@ -0,0 +1,49 @@ +package com.dataengine.common.infrastructure.mapper; + +import com.dataengine.common.domain.model.ChunkUploadPreRequest; +import org.apache.ibatis.annotations.Mapper; +import org.apache.ibatis.annotations.Param; + +import java.util.List; + +/** + * 文件切片上传请求Mapper + */ +@Mapper +public interface ChunkUploadRequestMapper { + + /** + * 根据ID查询 + */ + ChunkUploadPreRequest findById(@Param("id") String id); + + /** + * 根据服务ID查询 + */ + List findByServiceId(@Param("serviceId") String serviceId); + + /** + * 查询所有 + */ + List findAll(); + + /** + * 插入 + */ + int insert(ChunkUploadPreRequest request); + + /** + * 更新 + */ + int update(ChunkUploadPreRequest request); + + /** + * 根据ID删除 + */ + int deleteById(@Param("id") String id); + + /** + * 根据服务ID删除 + */ + int deleteByServiceId(@Param("serviceId") String serviceId); +} diff --git a/backend/shared/domain-common/src/main/java/com/dataengine/common/interfaces/PagedResponse.java b/backend/shared/domain-common/src/main/java/com/dataengine/common/interfaces/PagedResponse.java new file mode 100644 index 00000000..3c2174fc --- /dev/null +++ b/backend/shared/domain-common/src/main/java/com/dataengine/common/interfaces/PagedResponse.java @@ -0,0 +1,20 @@ +package com.dataengine.common.interfaces; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; + +import java.util.List; + +@Getter +@Setter +@NoArgsConstructor +@AllArgsConstructor +public class PagedResponse { + private int page; + private int size; + private long totalElements; + private int totalPages; + private List content; +} diff --git a/backend/shared/domain-common/src/main/java/com/dataengine/common/interfaces/PagingQuery.java b/backend/shared/domain-common/src/main/java/com/dataengine/common/interfaces/PagingQuery.java new file mode 100644 index 00000000..c97b908f --- /dev/null +++ b/backend/shared/domain-common/src/main/java/com/dataengine/common/interfaces/PagingQuery.java @@ -0,0 +1,31 @@ +package com.dataengine.common.interfaces; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; + +@Getter +@Setter +@NoArgsConstructor +@AllArgsConstructor +public class PagingQuery { + + /** + * 页码,从0开始 + */ + private Integer page = 0; + + /** + * 每页大小 + */ + private Integer size = 20; + + /** + * 初始化分页参数 + */ + public void initPaging() { + page = page == null ? 0 : page; + size = size == null ? 20 : size; + } +} diff --git a/backend/shared/domain-common/src/main/resources/mappers/ChunkUploadRequestMapper.xml b/backend/shared/domain-common/src/main/resources/mappers/ChunkUploadRequestMapper.xml new file mode 100644 index 00000000..585c91ff --- /dev/null +++ b/backend/shared/domain-common/src/main/resources/mappers/ChunkUploadRequestMapper.xml @@ -0,0 +1,65 @@ + + + + + + + + + + + + + + + + id, total_file_num, uploaded_file_num, upload_path, timeout, service_id, check_info + + + + + + + + + + INSERT INTO t_chunk_upload_request ( + id, total_file_num, uploaded_file_num, upload_path, timeout, service_id, check_info + ) VALUES ( + #{id}, #{totalFileNum}, #{uploadedFileNum}, #{uploadPath}, #{timeout}, #{serviceId}, #{checkInfo} + ) + + + + UPDATE t_chunk_upload_request + SET total_file_num = #{totalFileNum}, + uploaded_file_num = #{uploadedFileNum}, + upload_path = #{uploadPath}, + timeout = #{timeout}, + service_id = #{serviceId}, + check_info = #{checkInfo} + WHERE id = #{id} + + + + DELETE FROM t_chunk_upload_request WHERE id = #{id} + + + + DELETE FROM t_chunk_upload_request WHERE service_id = #{serviceId} + + diff --git a/scripts/db/common-init.sql b/scripts/db/common-init.sql new file mode 100644 index 00000000..ff966601 --- /dev/null +++ b/scripts/db/common-init.sql @@ -0,0 +1,22 @@ +-- 数据归集服务数据库初始化脚本 +-- 适用于dataengine数据库 + +USE dataengine; + +create table if not exists t_chunk_upload_request +( + id VARCHAR(36) PRIMARY KEY COMMENT 'UUID', + total_file_num integer, + uploaded_file_num integer, + upload_path varchar(256), + timeout timestamp default now(), + service_id varchar(64), + check_info TEXT + ); +comment on table t_chunk_upload_request is '文件切片上传请求表'; +comment on column t_chunk_upload_request.total_file_num is '总文件数'; +comment on column t_chunk_upload_request.uploaded_file_num is '已上传文件数'; +comment on column t_chunk_upload_request.upload_path is '文件路径'; +comment on column t_chunk_upload_request.timeout is '上传请求超时时间'; +comment on column t_chunk_upload_request.service_id is '上传请求所属服务:DATA-MANAGEMENT(数据管理);'; +comment on column t_chunk_upload_request.check_info is '业务信息';