Skip to content

Commit d9ce140

Browse files
author
uname
committed
Merge branch 'main' of github.com:ModelEngine-Group/DataMate
2 parents a728bc3 + f8b3250 commit d9ce140

File tree

17 files changed

+362
-51
lines changed

17 files changed

+362
-51
lines changed

backend/pom.xml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
<lombok-mapstruct-binding.version>0.2.0</lombok-mapstruct-binding.version>
3838
<poi.version>5.4.0</poi.version>
3939
<log4j2.version>2.21.1</log4j2.version>
40+
<commons-compress.version>1.26.1</commons-compress.version>
4041
</properties>
4142

4243
<modules>
@@ -151,6 +152,12 @@
151152
<version>2.6.6</version>
152153
</dependency>
153154

155+
<dependency>
156+
<groupId>org.apache.commons</groupId>
157+
<artifactId>commons-compress</artifactId>
158+
<version>${commons-compress.version}</version>
159+
</dependency>
160+
154161
</dependencies>
155162
</dependencyManagement>
156163

backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/DatasetFileApplicationService.java

Lines changed: 40 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,10 @@
55
import com.datamate.common.domain.model.FileUploadResult;
66
import com.datamate.common.domain.service.FileService;
77
import com.datamate.common.domain.utils.AnalyzerUtils;
8+
import com.datamate.common.domain.utils.ArchiveAnalyzer;
89
import com.datamate.common.infrastructure.exception.BusinessAssert;
910
import com.datamate.common.infrastructure.exception.BusinessException;
11+
import com.datamate.common.infrastructure.exception.CommonErrorCode;
1012
import com.datamate.common.infrastructure.exception.SystemErrorCode;
1113
import com.datamate.common.interfaces.PagedResponse;
1214
import com.datamate.common.interfaces.PagingQuery;
@@ -213,6 +215,9 @@ private void addToZipFile(DatasetFile file, ZipOutputStream zos) throws IOExcept
213215
*/
214216
@Transactional
215217
public String preUpload(UploadFilesPreRequest chunkUploadRequest, String datasetId) {
218+
if (Objects.isNull(datasetRepository.getById(datasetId))) {
219+
throw BusinessException.of(DataManagementErrorCode.DATASET_NOT_FOUND);
220+
}
216221
ChunkUploadPreRequest request = ChunkUploadPreRequest.builder().build();
217222
request.setUploadPath(datasetBasePath + File.separator + datasetId);
218223
request.setTotalFileNum(chunkUploadRequest.getTotalFileNum());
@@ -225,7 +230,7 @@ public String preUpload(UploadFilesPreRequest chunkUploadRequest, String dataset
225230
String checkInfoJson = objectMapper.writeValueAsString(checkInfo);
226231
request.setCheckInfo(checkInfoJson);
227232
} catch (JsonProcessingException e) {
228-
throw new IllegalArgumentException("Failed to serialize checkInfo to JSON", e);
233+
log.warn("Failed to serialize checkInfo to JSON", e);
229234
}
230235
return fileService.preUpload(request);
231236
}
@@ -238,31 +243,54 @@ public String preUpload(UploadFilesPreRequest chunkUploadRequest, String dataset
238243
@Transactional
239244
public void chunkUpload(String datasetId, UploadFileRequest uploadFileRequest) {
240245
FileUploadResult uploadResult = fileService.chunkUpload(DatasetConverter.INSTANCE.toChunkUploadRequest(uploadFileRequest));
241-
saveFileInfoToDb(uploadResult, uploadFileRequest, datasetId);
246+
saveFileInfoToDb(uploadResult, datasetId);
242247
}
243248

244-
private void saveFileInfoToDb(FileUploadResult fileUploadResult, UploadFileRequest uploadFile, String datasetId) {
249+
private void saveFileInfoToDb(FileUploadResult fileUploadResult, String datasetId) {
245250
if (Objects.isNull(fileUploadResult.getSavedFile())) {
246251
// 文件切片上传没有完成
247252
return;
248253
}
254+
DatasetFileUploadCheckInfo checkInfo;
255+
try {
256+
ObjectMapper objectMapper = new ObjectMapper();
257+
checkInfo = objectMapper.readValue(fileUploadResult.getCheckInfo(), DatasetFileUploadCheckInfo.class);
258+
if (!Objects.equals(checkInfo.getDatasetId(), datasetId)) {
259+
throw BusinessException.of(DataManagementErrorCode.DATASET_NOT_FOUND);
260+
}
261+
} catch (IllegalArgumentException | JsonProcessingException e) {
262+
log.warn("Failed to convert checkInfo to DatasetFileUploadCheckInfo", e);
263+
throw BusinessException.of(CommonErrorCode.PRE_UPLOAD_REQUEST_NOT_EXIST);
264+
}
265+
List<FileUploadResult> files;
266+
if (checkInfo.isHasArchive() && AnalyzerUtils.isPackage(fileUploadResult.getSavedFile().getPath())) {
267+
files = ArchiveAnalyzer.process(fileUploadResult);
268+
} else {
269+
files = Collections.singletonList(fileUploadResult);
270+
}
271+
addFileToDataset(datasetId, files);
272+
}
273+
274+
private void addFileToDataset(String datasetId, List<FileUploadResult> unpacked) {
249275
Dataset dataset = datasetRepository.getById(datasetId);
250-
File savedFile = fileUploadResult.getSavedFile();
251-
LocalDateTime currentTime = LocalDateTime.now();
252-
DatasetFile datasetFile = DatasetFile.builder()
276+
dataset.setFiles(datasetFileRepository.findAllByDatasetId(datasetId));
277+
for (FileUploadResult file : unpacked) {
278+
File savedFile = file.getSavedFile();
279+
LocalDateTime currentTime = LocalDateTime.now();
280+
DatasetFile datasetFile = DatasetFile.builder()
253281
.id(UUID.randomUUID().toString())
254282
.datasetId(datasetId)
255283
.fileSize(savedFile.length())
256284
.uploadTime(currentTime)
257285
.lastAccessTime(currentTime)
258-
.fileName(uploadFile.getFileName())
286+
.fileName(file.getFileName())
259287
.filePath(savedFile.getPath())
260-
.fileType(AnalyzerUtils.getExtension(uploadFile.getFileName()))
288+
.fileType(AnalyzerUtils.getExtension(file.getFileName()))
261289
.build();
262-
dataset.setFiles(datasetFileRepository.findAllByDatasetId(datasetId));
263-
setDatasetFileId(datasetFile, dataset);
264-
datasetFileRepository.saveOrUpdate(datasetFile);
265-
dataset.addFile(datasetFile);
290+
setDatasetFileId(datasetFile, dataset);
291+
datasetFileRepository.saveOrUpdate(datasetFile);
292+
dataset.addFile(datasetFile);
293+
}
266294
dataset.active();
267295
datasetRepository.updateById(dataset);
268296
}

backend/services/data-management-service/src/main/java/com/datamate/datamanagement/domain/model/dataset/DatasetFileUploadCheckInfo.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,18 @@
11
package com.datamate.datamanagement.domain.model.dataset;
22

3-
import com.datamate.common.domain.model.UploadCheckInfo;
3+
import lombok.AllArgsConstructor;
44
import lombok.Getter;
5+
import lombok.NoArgsConstructor;
56
import lombok.Setter;
67

78
/**
89
* 数据集文件上传检查信息
910
*/
1011
@Getter
1112
@Setter
12-
public class DatasetFileUploadCheckInfo extends UploadCheckInfo {
13+
@NoArgsConstructor
14+
@AllArgsConstructor
15+
public class DatasetFileUploadCheckInfo {
1316
/** 数据集id */
1417
private String datasetId;
1518

backend/services/main-application/pom.xml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@
4848
<dependency>
4949
<groupId>org.apache.commons</groupId>
5050
<artifactId>commons-compress</artifactId>
51-
<version>1.26.1</version>
5251
</dependency>
5352

5453
<!-- 核心服务依赖 -->

backend/shared/domain-common/pom.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,5 +42,9 @@
4242
<groupId>org.springframework.boot</groupId>
4343
<artifactId>spring-boot-starter-data-redis</artifactId>
4444
</dependency>
45+
<dependency>
46+
<groupId>org.apache.commons</groupId>
47+
<artifactId>commons-compress</artifactId>
48+
</dependency>
4549
</dependencies>
4650
</project>

backend/shared/domain-common/src/main/java/com/datamate/common/domain/model/UploadCheckInfo.java

Lines changed: 0 additions & 7 deletions
This file was deleted.

backend/shared/domain-common/src/main/java/com/datamate/common/domain/service/FileService.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
import com.datamate.common.domain.model.ChunkUploadRequest;
55
import com.datamate.common.domain.model.FileUploadResult;
66
import com.datamate.common.domain.utils.ChunksSaver;
7+
import com.datamate.common.infrastructure.exception.BusinessException;
8+
import com.datamate.common.infrastructure.exception.CommonErrorCode;
79
import com.datamate.common.infrastructure.mapper.ChunkUploadRequestMapper;
810
import org.springframework.stereotype.Component;
911
import org.springframework.transaction.annotation.Transactional;
@@ -46,7 +48,7 @@ public FileUploadResult chunkUpload(ChunkUploadRequest uploadFileRequest) {
4648
uploadFileRequest.setFileSize(uploadFileRequest.getFile().getSize());
4749
ChunkUploadPreRequest preRequest = chunkUploadRequestMapper.findById(uploadFileRequest.getReqId());
4850
if (preRequest == null || preRequest.isUploadComplete() || preRequest.isRequestTimeout()) {
49-
throw new IllegalArgumentException("预上传请求不存在");
51+
throw BusinessException.of(CommonErrorCode.PRE_UPLOAD_REQUEST_NOT_EXIST);
5052
}
5153
File savedFile;
5254
if (uploadFileRequest.getTotalChunkNum() > 1) {
@@ -55,7 +57,7 @@ public FileUploadResult chunkUpload(ChunkUploadRequest uploadFileRequest) {
5557
savedFile = uploadFile(uploadFileRequest, preRequest);
5658
}
5759
if (chunkUploadRequestMapper.update(preRequest) == 0) {
58-
throw new IllegalArgumentException("预上传请求不存在");
60+
throw BusinessException.of(CommonErrorCode.PRE_UPLOAD_REQUEST_NOT_EXIST);
5961
}
6062
boolean isFinish = Objects.equals(preRequest.getUploadedFileNum(), preRequest.getTotalFileNum());
6163
if (isFinish) {

backend/shared/domain-common/src/main/java/com/datamate/common/domain/utils/AnalyzerUtils.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,4 +37,15 @@ public static String getExtension(final String filePath) {
3737
}
3838
return filename.substring(firstDotIndex + 1).toLowerCase(Locale.ROOT);
3939
}
40+
41+
/**
42+
* 判断是否为压缩包
43+
*
44+
* @param filePath 文件路径
45+
* @return 返回信息
46+
*/
47+
public static boolean isPackage(String filePath) {
48+
String extension = getExtension(filePath);
49+
return extension.toLowerCase(Locale.ROOT).equals(TYPE_ZIP) || extension.toLowerCase(Locale.ROOT).equals(TYPE_TAR_GZ);
50+
}
4051
}
Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
package com.datamate.common.domain.utils;
2+
3+
import com.datamate.common.domain.model.FileUploadResult;
4+
import com.datamate.common.infrastructure.exception.BusinessException;
5+
import com.datamate.common.infrastructure.exception.SystemErrorCode;
6+
import lombok.extern.slf4j.Slf4j;
7+
8+
import org.apache.commons.compress.archivers.ArchiveEntry;
9+
import org.apache.commons.compress.archivers.ArchiveInputStream;
10+
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
11+
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
12+
import org.apache.commons.compress.archivers.zip.ZipArchiveEntry;
13+
import org.apache.commons.compress.archivers.zip.ZipArchiveInputStream;
14+
import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
15+
16+
import java.io.BufferedInputStream;
17+
import java.io.BufferedOutputStream;
18+
import java.io.File;
19+
import java.io.IOException;
20+
import java.io.OutputStream;
21+
import java.nio.charset.StandardCharsets;
22+
import java.nio.file.Files;
23+
import java.nio.file.Path;
24+
import java.nio.file.Paths;
25+
import java.util.*;
26+
27+
/**
28+
* Responsible for validating and unpacking archive files.
29+
*
30+
* @since 2023-11-17
31+
*/
32+
@Slf4j
33+
public class ArchiveAnalyzer {
34+
private static final int DEFAULT_BUFFER_SIZE = 4096;
35+
36+
/**
37+
* Process list.
38+
*
39+
* @param fileDto The uploaded file DTO
40+
* @return the list
41+
*/
42+
public static List<FileUploadResult> process(FileUploadResult fileDto) {
43+
log.info("Start unpacking [{}]", fileDto.getFileName());
44+
File file = fileDto.getSavedFile();
45+
Path archivePath;
46+
try {
47+
archivePath = Paths.get(file.getCanonicalPath());
48+
} catch (IOException e) {
49+
log.error("Failed to get the archive file path.");
50+
throw BusinessException.of(SystemErrorCode.FILE_SYSTEM_ERROR, "Failed to get the archive file path.");
51+
}
52+
53+
List<FileUploadResult> unpacked;
54+
String extension = AnalyzerUtils.getExtension(fileDto.getFileName());
55+
if (AnalyzerUtils.TYPE_ZIP.equalsIgnoreCase(extension)) {
56+
log.info("ZIP unpacking [{}]", fileDto.getFileName());
57+
unpacked = processZip(archivePath);
58+
log.info("ZIP unpacking FINISHED [{}]", fileDto.getFileName());
59+
} else if (AnalyzerUtils.TYPE_TAR_GZ.equalsIgnoreCase(extension)) {
60+
unpacked = processTarGz(archivePath);
61+
} else {
62+
throw BusinessException.of(SystemErrorCode.FILE_SYSTEM_ERROR, "Unrecognized archive format.");
63+
}
64+
65+
if (!archivePath.toFile().delete()) {
66+
throw BusinessException.of(SystemErrorCode.FILE_SYSTEM_ERROR, "Unable to delete the archive file " + archivePath.toAbsolutePath());
67+
}
68+
69+
log.info("Finished unpacking [{}]", fileDto.getFileName());
70+
return unpacked;
71+
}
72+
73+
private static List<FileUploadResult> processZip(Path archivePath) {
74+
try (ArchiveInputStream<ZipArchiveEntry> inputStream = new ZipArchiveInputStream(
75+
new BufferedInputStream(Files.newInputStream(archivePath)))) {
76+
return unpackArchive(inputStream, archivePath);
77+
} catch (IOException e) {
78+
log.error("Failed to unpack zip archive:", e);
79+
throw BusinessException.of(SystemErrorCode.FILE_SYSTEM_ERROR, "Failed to unpack zip archive.");
80+
}
81+
}
82+
83+
private static List<FileUploadResult> processTarGz(Path archivePath) {
84+
try (ArchiveInputStream<TarArchiveEntry> inputStream = new TarArchiveInputStream(
85+
new GzipCompressorInputStream(new BufferedInputStream(Files.newInputStream(archivePath))),
86+
StandardCharsets.UTF_8.toString())) {
87+
return unpackArchive(inputStream, archivePath);
88+
} catch (IOException e) {
89+
log.error("Failed to unpack tar.gz archive:", e);
90+
throw BusinessException.of(SystemErrorCode.FILE_SYSTEM_ERROR, "Failed to unpack tar.gz archive.");
91+
}
92+
}
93+
94+
private static List<FileUploadResult> unpackArchive(ArchiveInputStream<?> archiveInputStream, Path archivePath) throws IOException {
95+
List<FileUploadResult> unpacked = new ArrayList<>();
96+
long unpackedSize = 0L;
97+
try {
98+
ArchiveEntry archiveEntry;
99+
int entryCount = 0;
100+
while ((archiveEntry = archiveInputStream.getNextEntry()) != null) {
101+
if (isSymlink(archiveEntry)) {
102+
// 解压时跳过symlink文件
103+
continue;
104+
}
105+
entryCount++;
106+
if (checkUnpackSizeAndFileSize(entryCount, unpacked) || checkVersionSize(unpackedSize, archiveEntry.getSize())) {
107+
break;
108+
}
109+
if (!archiveEntry.isDirectory()) {
110+
unpackedSize = addFileAndCountFileSize(archiveInputStream, archiveEntry, unpacked,
111+
unpackedSize, archivePath);
112+
}
113+
}
114+
} catch (IOException e) {
115+
unpacked.forEach(v -> deleteFile(v.getSavedFile()));
116+
throw e;
117+
}
118+
119+
return unpacked;
120+
}
121+
122+
private static boolean checkVersionSize(long unpackedSize, long currFileSize) {
123+
return false;
124+
}
125+
126+
private static long addFileAndCountFileSize(ArchiveInputStream<?> archiveInputStream, ArchiveEntry archiveEntry,
127+
List<FileUploadResult> unpacked, long unpackedSize, Path archivePath) throws IOException {
128+
Optional<FileUploadResult> uploadFileDto = extractEntity(archiveInputStream, archiveEntry, archivePath);
129+
long newSize = unpackedSize;
130+
if (uploadFileDto.isPresent()) {
131+
FileUploadResult dto = uploadFileDto.get();
132+
unpacked.add(dto);
133+
newSize += dto.getSavedFile().length();
134+
}
135+
return newSize;
136+
}
137+
138+
private static boolean checkUnpackSizeAndFileSize(int entryCount, List<FileUploadResult> unpacked) {
139+
return false;
140+
}
141+
142+
private static Optional<FileUploadResult> extractEntity(ArchiveInputStream<?> archiveInputStream, ArchiveEntry archiveEntry, Path archivePath)
143+
throws IOException {
144+
byte[] buffer = new byte[DEFAULT_BUFFER_SIZE];
145+
Path path = Paths.get(archivePath.getParent().toString(), archiveEntry.getName());
146+
File file = path.toFile();
147+
long fileSize = 0L;
148+
String extension = AnalyzerUtils.getExtension(archiveEntry.getName());
149+
150+
long supportFileSize = 1024*1024*1024; // 上传大小暂定为1个G
151+
try (OutputStream outputStream = new BufferedOutputStream(Files.newOutputStream(file.toPath()))) {
152+
int byteRead;
153+
while ((byteRead = archiveInputStream.read(buffer)) != -1) {
154+
outputStream.write(buffer, 0, byteRead);
155+
fileSize += byteRead;
156+
if (fileSize > supportFileSize) {
157+
break;
158+
}
159+
}
160+
} catch (IOException e) {
161+
log.error("error happened while write entry to file system");
162+
file.delete();
163+
throw e;
164+
}
165+
166+
if (fileSize > supportFileSize) {
167+
// 文件大小超过限制,删除
168+
log.info("file {} size exceeds limit", archiveEntry.getName());
169+
file.delete();
170+
return Optional.empty();
171+
}
172+
return Optional.of(FileUploadResult.builder().savedFile(file).fileName(CommonUtils.trimFilePath(archiveEntry.getName())).build());
173+
}
174+
175+
private static void deleteFile(File file) {
176+
Path fileToDeletePath = Paths.get(file.getPath());
177+
if (Files.exists(fileToDeletePath)) {
178+
try {
179+
Files.delete(fileToDeletePath);
180+
} catch (IOException e1) {
181+
log.error("Failed to delete file.", e1);
182+
}
183+
}
184+
}
185+
186+
private static boolean isSymlink(ArchiveEntry archiveEntry) {
187+
if (archiveEntry instanceof TarArchiveEntry) {
188+
return ((TarArchiveEntry) archiveEntry).isSymbolicLink();
189+
}
190+
return false;
191+
}
192+
}

0 commit comments

Comments
 (0)