Skip to content

Commit 9858388

Browse files
authored
feat: Refactor dataset file pagination and enhance retrieval functionality with new request structure #98
* feat: Enhance knowledge base management with collection renaming, imp… * feat: Update Milvus integration with new API, enhance collection mana… * Merge branch 'refs/heads/main' into dev * feat: Refactor dataset file pagination and enhance retrieval function… * Merge branch 'main' into dev
1 parent 536ef9f commit 9858388

File tree

19 files changed

+398
-105
lines changed

19 files changed

+398
-105
lines changed

backend/pom.xml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,13 @@
144144
<artifactId>poi</artifactId>
145145
<version>${poi.version}</version>
146146
</dependency>
147+
148+
<dependency>
149+
<groupId>io.milvus</groupId>
150+
<artifactId>milvus-sdk-java</artifactId>
151+
<version>2.6.6</version>
152+
</dependency>
153+
147154
</dependencies>
148155
</dependencyManagement>
149156

backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/application/CleaningTaskService.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,16 @@
44
import com.datamate.cleaning.application.scheduler.CleaningTaskScheduler;
55
import com.datamate.cleaning.common.enums.CleaningTaskStatusEnum;
66
import com.datamate.cleaning.common.enums.ExecutorType;
7-
87
import com.datamate.cleaning.domain.model.TaskProcess;
98
import com.datamate.cleaning.domain.repository.CleaningResultRepository;
109
import com.datamate.cleaning.domain.repository.CleaningTaskRepository;
1110
import com.datamate.cleaning.domain.repository.OperatorInstanceRepository;
12-
1311
import com.datamate.cleaning.infrastructure.validator.CleanTaskValidator;
1412
import com.datamate.cleaning.interfaces.dto.*;
1513
import com.datamate.common.infrastructure.exception.BusinessException;
1614
import com.datamate.common.infrastructure.exception.SystemErrorCode;
15+
import com.datamate.common.interfaces.PagedResponse;
16+
import com.datamate.common.interfaces.PagingQuery;
1717
import com.datamate.datamanagement.application.DatasetApplicationService;
1818
import com.datamate.datamanagement.application.DatasetFileApplicationService;
1919
import com.datamate.datamanagement.common.enums.DatasetType;
@@ -26,8 +26,6 @@
2626
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
2727
import lombok.RequiredArgsConstructor;
2828
import lombok.extern.slf4j.Slf4j;
29-
import org.springframework.data.domain.Page;
30-
import org.springframework.data.domain.PageRequest;
3129
import org.springframework.stereotype.Service;
3230
import org.springframework.transaction.annotation.Transactional;
3331
import org.yaml.snakeyaml.DumperOptions;
@@ -208,10 +206,10 @@ private void prepareTask(CleaningTaskDto task, List<OperatorInstanceDto> instanc
208206
private void scanDataset(String taskId, String srcDatasetId) {
209207
int pageNumber = 0;
210208
int pageSize = 500;
211-
PageRequest pageRequest = PageRequest.of(pageNumber, pageSize);
212-
Page<DatasetFile> datasetFiles;
209+
PagingQuery pageRequest = new PagingQuery(pageNumber, pageSize);
210+
PagedResponse<DatasetFile> datasetFiles;
213211
do {
214-
datasetFiles = datasetFileService.getDatasetFiles(srcDatasetId, null, null, pageRequest);
212+
datasetFiles = datasetFileService.getDatasetFiles(srcDatasetId, null, null,null, pageRequest);
215213
if (datasetFiles.getContent().isEmpty()) {
216214
break;
217215
}

backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/DatasetFileApplicationService.java

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
11
package com.datamate.datamanagement.application;
22

3+
import com.baomidou.mybatisplus.core.metadata.IPage;
34
import com.datamate.common.domain.model.ChunkUploadPreRequest;
45
import com.datamate.common.domain.model.FileUploadResult;
56
import com.datamate.common.domain.service.FileService;
67
import com.datamate.common.domain.utils.AnalyzerUtils;
78
import com.datamate.common.infrastructure.exception.BusinessAssert;
89
import com.datamate.common.infrastructure.exception.BusinessException;
910
import com.datamate.common.infrastructure.exception.SystemErrorCode;
11+
import com.datamate.common.interfaces.PagedResponse;
12+
import com.datamate.common.interfaces.PagingQuery;
1013
import com.datamate.datamanagement.common.enums.DuplicateMethod;
1114
import com.datamate.datamanagement.domain.contants.DatasetConstant;
1215
import com.datamate.datamanagement.domain.model.dataset.Dataset;
@@ -23,14 +26,10 @@
2326
import com.fasterxml.jackson.databind.ObjectMapper;
2427
import jakarta.servlet.http.HttpServletResponse;
2528
import lombok.extern.slf4j.Slf4j;
26-
import org.apache.ibatis.session.RowBounds;
2729
import org.springframework.beans.factory.annotation.Autowired;
2830
import org.springframework.beans.factory.annotation.Value;
2931
import org.springframework.core.io.Resource;
3032
import org.springframework.core.io.UrlResource;
31-
import org.springframework.data.domain.Page;
32-
import org.springframework.data.domain.PageImpl;
33-
import org.springframework.data.domain.Pageable;
3433
import org.springframework.http.HttpHeaders;
3534
import org.springframework.stereotype.Service;
3635
import org.springframework.transaction.annotation.Transactional;
@@ -82,12 +81,10 @@ public DatasetFileApplicationService(DatasetFileRepository datasetFileRepository
8281
* 获取数据集文件列表
8382
*/
8483
@Transactional(readOnly = true)
85-
public Page<DatasetFile> getDatasetFiles(String datasetId, String fileType,
86-
String status, Pageable pageable) {
87-
RowBounds bounds = new RowBounds(pageable.getPageNumber() * pageable.getPageSize(), pageable.getPageSize());
88-
List<DatasetFile> content = datasetFileRepository.findByCriteria(datasetId, fileType, status, bounds);
89-
long total = content.size() < pageable.getPageSize() && pageable.getPageNumber() == 0 ? content.size() : content.size() + (long) pageable.getPageNumber() * pageable.getPageSize();
90-
return new PageImpl<>(content, pageable, total);
84+
public PagedResponse<DatasetFile> getDatasetFiles(String datasetId, String fileType, String status, String name, PagingQuery pagingQuery) {
85+
IPage<DatasetFile> page = new com.baomidou.mybatisplus.extension.plugins.pagination.Page<>(pagingQuery.getPage(), pagingQuery.getSize());
86+
IPage<DatasetFile> files = datasetFileRepository.findByCriteria(datasetId, fileType, status, name, page);
87+
return PagedResponse.of(files);
9188
}
9289

9390
/**

backend/services/data-management-service/src/main/java/com/datamate/datamanagement/infrastructure/persistence/repository/DatasetFileRepository.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
package com.datamate.datamanagement.infrastructure.persistence.repository;
22

3+
import com.baomidou.mybatisplus.core.metadata.IPage;
34
import com.baomidou.mybatisplus.extension.repository.IRepository;
45
import com.datamate.datamanagement.domain.model.dataset.DatasetFile;
5-
import org.apache.ibatis.session.RowBounds;
66

77
import java.util.List;
88

@@ -23,5 +23,6 @@ public interface DatasetFileRepository extends IRepository<DatasetFile> {
2323

2424
DatasetFile findByDatasetIdAndFileName(String datasetId, String fileName);
2525

26-
List<DatasetFile> findByCriteria(String datasetId, String fileType, String status, RowBounds bounds);
26+
IPage<DatasetFile> findByCriteria(String datasetId, String fileType, String status, String name,
27+
IPage<DatasetFile> page);
2728
}

backend/services/data-management-service/src/main/java/com/datamate/datamanagement/infrastructure/persistence/repository/impl/DatasetFileRepositoryImpl.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
11
package com.datamate.datamanagement.infrastructure.persistence.repository.impl;
22

33
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
4+
import com.baomidou.mybatisplus.core.metadata.IPage;
45
import com.baomidou.mybatisplus.extension.repository.CrudRepository;
56
import com.datamate.datamanagement.domain.model.dataset.DatasetFile;
67
import com.datamate.datamanagement.infrastructure.persistence.mapper.DatasetFileMapper;
78
import com.datamate.datamanagement.infrastructure.persistence.repository.DatasetFileRepository;
89
import lombok.RequiredArgsConstructor;
9-
import org.apache.ibatis.session.RowBounds;
1010
import org.springframework.stereotype.Repository;
11+
import org.springframework.util.StringUtils;
1112

1213
import java.util.List;
1314

@@ -47,8 +48,12 @@ public DatasetFile findByDatasetIdAndFileName(String datasetId, String fileName)
4748
return datasetFileMapper.findByDatasetIdAndFileName(datasetId, fileName);
4849
}
4950

50-
@Override
51-
public List<DatasetFile> findByCriteria(String datasetId, String fileType, String status, RowBounds bounds) {
52-
return datasetFileMapper.findByCriteria(datasetId, fileType, status, bounds);
51+
public IPage<DatasetFile> findByCriteria(String datasetId, String fileType, String status, String name,
52+
IPage<DatasetFile> page) {
53+
return datasetFileMapper.selectPage(page, new LambdaQueryWrapper<DatasetFile>()
54+
.eq(DatasetFile::getDatasetId, datasetId)
55+
.eq(StringUtils.hasText(fileType), DatasetFile::getFileType, fileType)
56+
.eq(StringUtils.hasText(status), DatasetFile::getStatus, status)
57+
.like(StringUtils.hasText(name), DatasetFile::getFileName, name));
5358
}
5459
}

backend/services/data-management-service/src/main/java/com/datamate/datamanagement/interfaces/rest/DatasetFileController.java

Lines changed: 13 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -3,26 +3,27 @@
33
import com.datamate.common.infrastructure.common.IgnoreResponseWrap;
44
import com.datamate.common.infrastructure.common.Response;
55
import com.datamate.common.infrastructure.exception.SystemErrorCode;
6+
import com.datamate.common.interfaces.PagedResponse;
7+
import com.datamate.common.interfaces.PagingQuery;
68
import com.datamate.datamanagement.application.DatasetFileApplicationService;
79
import com.datamate.datamanagement.domain.model.dataset.DatasetFile;
810
import com.datamate.datamanagement.interfaces.converter.DatasetConverter;
9-
import com.datamate.datamanagement.interfaces.dto.*;
11+
import com.datamate.datamanagement.interfaces.dto.CopyFilesRequest;
12+
import com.datamate.datamanagement.interfaces.dto.DatasetFileResponse;
13+
import com.datamate.datamanagement.interfaces.dto.UploadFileRequest;
14+
import com.datamate.datamanagement.interfaces.dto.UploadFilesPreRequest;
1015
import jakarta.servlet.http.HttpServletResponse;
1116
import jakarta.validation.Valid;
1217
import lombok.extern.slf4j.Slf4j;
1318
import org.springframework.beans.factory.annotation.Autowired;
1419
import org.springframework.core.io.Resource;
15-
import org.springframework.data.domain.Page;
16-
import org.springframework.data.domain.PageRequest;
17-
import org.springframework.data.domain.Pageable;
1820
import org.springframework.http.HttpHeaders;
1921
import org.springframework.http.HttpStatus;
2022
import org.springframework.http.MediaType;
2123
import org.springframework.http.ResponseEntity;
2224
import org.springframework.web.bind.annotation.*;
2325

2426
import java.util.List;
25-
import java.util.stream.Collectors;
2627

2728
/**
2829
* 数据集文件 REST 控制器(UUID 模式)
@@ -40,29 +41,17 @@ public DatasetFileController(DatasetFileApplicationService datasetFileApplicatio
4041
}
4142

4243
@GetMapping
43-
public ResponseEntity<Response<PagedDatasetFileResponse>> getDatasetFiles(
44+
public Response<PagedResponse<DatasetFile>> getDatasetFiles(
4445
@PathVariable("datasetId") String datasetId,
4546
@RequestParam(value = "page", required = false, defaultValue = "0") Integer page,
4647
@RequestParam(value = "size", required = false, defaultValue = "20") Integer size,
4748
@RequestParam(value = "fileType", required = false) String fileType,
48-
@RequestParam(value = "status", required = false) String status) {
49-
Pageable pageable = PageRequest.of(page != null ? page : 0, size != null ? size : 20);
50-
51-
Page<DatasetFile> filesPage = datasetFileApplicationService.getDatasetFiles(
52-
datasetId, fileType, status, pageable);
53-
54-
PagedDatasetFileResponse response = new PagedDatasetFileResponse();
55-
response.setContent(filesPage.getContent().stream()
56-
.map(DatasetConverter.INSTANCE::convertToResponse)
57-
.collect(Collectors.toList()));
58-
response.setPage(filesPage.getNumber());
59-
response.setSize(filesPage.getSize());
60-
response.setTotalElements((int) filesPage.getTotalElements());
61-
response.setTotalPages(filesPage.getTotalPages());
62-
response.setFirst(filesPage.isFirst());
63-
response.setLast(filesPage.isLast());
64-
65-
return ResponseEntity.ok(Response.ok(response));
49+
@RequestParam(value = "status", required = false) String status,
50+
@RequestParam(value = "name", required = false) String name) {
51+
PagingQuery pagingQuery = new PagingQuery(page, size);
52+
PagedResponse<DatasetFile> filesPage = datasetFileApplicationService.getDatasetFiles(
53+
datasetId, fileType, status, name, pagingQuery);
54+
return Response.ok(filesPage);
6655
}
6756

6857
@GetMapping("/{fileId}")

backend/services/rag-indexer-service/src/main/java/com/datamate/rag/indexer/application/KnowledgeBaseService.java

Lines changed: 80 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@
66
import com.datamate.common.infrastructure.exception.KnowledgeBaseErrorCode;
77
import com.datamate.common.interfaces.PagedResponse;
88
import com.datamate.common.interfaces.PagingQuery;
9+
import com.datamate.common.setting.domain.entity.ModelConfig;
910
import com.datamate.common.setting.domain.repository.ModelConfigRepository;
11+
import com.datamate.common.setting.infrastructure.client.ModelClient;
1012
import com.datamate.rag.indexer.domain.model.FileStatus;
1113
import com.datamate.rag.indexer.domain.model.KnowledgeBase;
1214
import com.datamate.rag.indexer.domain.model.RagChunk;
@@ -16,8 +18,14 @@
1618
import com.datamate.rag.indexer.infrastructure.event.DataInsertedEvent;
1719
import com.datamate.rag.indexer.infrastructure.milvus.MilvusService;
1820
import com.datamate.rag.indexer.interfaces.dto.*;
19-
import io.milvus.param.collection.DropCollectionParam;
20-
import io.milvus.param.dml.DeleteParam;
21+
import dev.langchain4j.data.embedding.Embedding;
22+
import dev.langchain4j.model.embedding.EmbeddingModel;
23+
import io.milvus.v2.service.collection.request.DropCollectionReq;
24+
import io.milvus.v2.service.collection.request.RenameCollectionReq;
25+
import io.milvus.v2.service.vector.request.DeleteReq;
26+
import io.milvus.v2.service.vector.request.QueryReq;
27+
import io.milvus.v2.service.vector.response.QueryResp;
28+
import io.milvus.v2.service.vector.response.SearchResp;
2129
import lombok.RequiredArgsConstructor;
2230
import org.jetbrains.annotations.NotNull;
2331
import org.springframework.beans.BeanUtils;
@@ -26,6 +34,7 @@
2634
import org.springframework.transaction.annotation.Transactional;
2735
import org.springframework.util.StringUtils;
2836

37+
import java.util.Collections;
2938
import java.util.List;
3039
import java.util.Optional;
3140

@@ -63,10 +72,15 @@ public String create(KnowledgeBaseCreateReq request) {
6372
* @param knowledgeBaseId 知识库 ID
6473
* @param request 知识库更新请求
6574
*/
75+
@Transactional(rollbackFor = Exception.class)
6676
public void update(String knowledgeBaseId, KnowledgeBaseUpdateReq request) {
6777
KnowledgeBase knowledgeBase = Optional.ofNullable(knowledgeBaseRepository.getById(knowledgeBaseId))
6878
.orElseThrow(() -> BusinessException.of(KnowledgeBaseErrorCode.KNOWLEDGE_BASE_NOT_FOUND));
6979
if (StringUtils.hasText(request.getName())) {
80+
milvusService.getMilvusClient().renameCollection(RenameCollectionReq.builder()
81+
.collectionName(knowledgeBase.getName())
82+
.newCollectionName(request.getName())
83+
.build());
7084
knowledgeBase.setName(request.getName());
7185
}
7286
if (StringUtils.hasText(request.getDescription())) {
@@ -75,13 +89,19 @@ public void update(String knowledgeBaseId, KnowledgeBaseUpdateReq request) {
7589
knowledgeBaseRepository.updateById(knowledgeBase);
7690
}
7791

78-
@Transactional
92+
93+
/**
94+
* 删除知识库
95+
*
96+
* @param knowledgeBaseId 知识库 ID
97+
*/
98+
@Transactional(rollbackFor = Exception.class)
7999
public void delete(String knowledgeBaseId) {
80100
KnowledgeBase knowledgeBase = Optional.ofNullable(knowledgeBaseRepository.getById(knowledgeBaseId))
81101
.orElseThrow(() -> BusinessException.of(KnowledgeBaseErrorCode.KNOWLEDGE_BASE_NOT_FOUND));
82102
knowledgeBaseRepository.removeById(knowledgeBaseId);
83103
ragFileRepository.removeByKnowledgeBaseId(knowledgeBaseId);
84-
milvusService.getMilvusClient().dropCollection(DropCollectionParam.newBuilder().withCollectionName(knowledgeBase.getName()).build());
104+
milvusService.getMilvusClient().dropCollection(DropCollectionReq.builder().collectionName(knowledgeBase.getName()).build());
85105
}
86106

87107
public KnowledgeBaseResp getById(String knowledgeBaseId) {
@@ -147,14 +167,65 @@ public void deleteFiles(String knowledgeBaseId, DeleteFilesReq request) {
147167
KnowledgeBase knowledgeBase = Optional.ofNullable(knowledgeBaseRepository.getById(knowledgeBaseId))
148168
.orElseThrow(() -> BusinessException.of(KnowledgeBaseErrorCode.KNOWLEDGE_BASE_NOT_FOUND));
149169
ragFileRepository.removeByIds(request.getIds());
150-
milvusService.getMilvusClient().delete(DeleteParam.newBuilder()
151-
.withCollectionName(knowledgeBase.getName())
152-
.withExpr("metadata[\"rag_file_id\"] in [" + org.apache.commons.lang3.StringUtils.join(request.getIds().stream().map(id -> "\"" + id + "\"").toArray(), ",") + "]")
170+
milvusService.getMilvusClient().delete(DeleteReq.builder()
171+
.collectionName(knowledgeBase.getName())
172+
.filter("metadata[\"rag_file_id\"] in [" + org.apache.commons.lang3.StringUtils.join(request.getIds().stream().map(id -> "\"" + id + "\"").toArray(), ",") + "]")
153173
.build());
154174
}
155175

156176
public PagedResponse<RagChunk> getChunks(String knowledgeBaseId, String ragFileId, PagingQuery pagingQuery) {
157-
IPage<RagChunk> page = new Page<>(pagingQuery.getPage(), pagingQuery.getSize());
158-
return PagedResponse.of(page.getRecords(), page.getCurrent(), page.getTotal(), page.getPages());
177+
KnowledgeBase knowledgeBase = Optional.ofNullable(knowledgeBaseRepository.getById(knowledgeBaseId))
178+
.orElseThrow(() -> BusinessException.of(KnowledgeBaseErrorCode.KNOWLEDGE_BASE_NOT_FOUND));
179+
QueryResp results = milvusService.getMilvusClient().query(QueryReq.builder()
180+
.collectionName(knowledgeBase.getName())
181+
.filter("metadata[\"rag_file_id\"] == \"" + ragFileId + "\"")
182+
.outputFields(Collections.singletonList("*"))
183+
.limit(Long.valueOf(pagingQuery.getSize()))
184+
.offset((long) (pagingQuery.getPage() - 1) * pagingQuery.getSize())
185+
.build());
186+
List<QueryResp.QueryResult> queryResults = results.getQueryResults();
187+
List<RagChunk> ragChunks = queryResults.stream()
188+
.map(QueryResp.QueryResult::getEntity)
189+
.map(item -> new RagChunk(
190+
item.get("id").toString(),
191+
item.get("text").toString(),
192+
item.get("metadata").toString()
193+
)).toList();
194+
195+
// 获取总数
196+
QueryResp countResults = milvusService.getMilvusClient().query(QueryReq.builder()
197+
.collectionName(knowledgeBase.getName())
198+
.filter("metadata[\"rag_file_id\"] == \"" + ragFileId + "\"")
199+
.outputFields(Collections.singletonList("count(*)"))
200+
.build());
201+
202+
long totalCount = Long.parseLong(countResults.getQueryResults().getFirst().getEntity().get("count(*)").toString());
203+
return PagedResponse.of(ragChunks, pagingQuery.getPage(), totalCount, (int) Math.ceil((double) totalCount / pagingQuery.getSize()));
204+
}
205+
206+
/**
207+
* 检索知识库内容
208+
*
209+
* @param request 检索请求
210+
* @return 检索结果
211+
*/
212+
public SearchResp retrieve(RetrieveReq request) {
213+
KnowledgeBase knowledgeBase = Optional.ofNullable(knowledgeBaseRepository.getById(request.getKnowledgeBaseIds().getFirst()))
214+
.orElseThrow(() -> BusinessException.of(KnowledgeBaseErrorCode.KNOWLEDGE_BASE_NOT_FOUND));
215+
ModelConfig modelConfig = modelConfigRepository.getById(knowledgeBase.getEmbeddingModel());
216+
EmbeddingModel embeddingModel = ModelClient.invokeEmbeddingModel(modelConfig);
217+
Embedding embedding = embeddingModel.embed(request.getQuery()).content();
218+
SearchResp searchResp = milvusService.hybridSearch(knowledgeBase.getName(), request.getQuery(), embedding.vector(), request.getTopK());
219+
return searchResp;
220+
221+
// request.getKnowledgeBaseIds().forEach(knowledgeId -> {
222+
// KnowledgeBase knowledgeBase = Optional.ofNullable(knowledgeBaseRepository.getById(knowledgeId))
223+
// .orElseThrow(() -> BusinessException.of(KnowledgeBaseErrorCode.KNOWLEDGE_BASE_NOT_FOUND));
224+
// ModelConfig modelConfig = modelConfigRepository.getById(knowledgeBase.getEmbeddingModel());
225+
// EmbeddingModel embeddingModel = ModelClient.invokeEmbeddingModel(modelConfig);
226+
// Embedding embedding = embeddingModel.embed(request.getQuery()).content();
227+
// searchResp = milvusService.hybridSearch(knowledgeBase.getName(), request.getQuery(), embedding.vector(), request.getTopK());
228+
// });
229+
// return searchResp;
159230
}
160231
}

0 commit comments

Comments
 (0)