Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.datamate.common.infrastructure.exception.BusinessException;
import com.datamate.common.infrastructure.exception.KnowledgeBaseErrorCode;
import com.datamate.common.interfaces.PagedResponse;
import com.datamate.common.interfaces.PagingQuery;
import com.datamate.common.setting.domain.repository.ModelConfigRepository;
import com.datamate.rag.indexer.domain.model.FileStatus;
import com.datamate.rag.indexer.domain.model.KnowledgeBase;
Expand All @@ -10,11 +14,10 @@
import com.datamate.rag.indexer.domain.repository.KnowledgeBaseRepository;
import com.datamate.rag.indexer.domain.repository.RagFileRepository;
import com.datamate.rag.indexer.infrastructure.event.DataInsertedEvent;
import com.datamate.common.infrastructure.exception.BusinessException;
import com.datamate.common.infrastructure.exception.KnowledgeBaseErrorCode;
import com.datamate.common.interfaces.PagedResponse;
import com.datamate.common.interfaces.PagingQuery;
import com.datamate.rag.indexer.interfaces.dto.*;
import io.milvus.client.MilvusClient;
import io.milvus.param.collection.DropCollectionParam;
import io.milvus.param.dml.DeleteParam;
import lombok.RequiredArgsConstructor;
import org.jetbrains.annotations.NotNull;
import org.springframework.beans.BeanUtils;
Expand All @@ -39,7 +42,7 @@ public class KnowledgeBaseService {
private final RagFileRepository ragFileRepository;
private final ApplicationEventPublisher eventPublisher;
private final ModelConfigRepository modelConfigRepository;

private final MilvusClient milvusClient;

/**
* 创建知识库
Expand Down Expand Up @@ -72,10 +75,13 @@ public void update(String knowledgeBaseId, KnowledgeBaseUpdateReq request) {
knowledgeBaseRepository.updateById(knowledgeBase);
}

@Transactional
public void delete(String knowledgeBaseId) {
KnowledgeBase knowledgeBase = Optional.ofNullable(knowledgeBaseRepository.getById(knowledgeBaseId))
.orElseThrow(() -> BusinessException.of(KnowledgeBaseErrorCode.KNOWLEDGE_BASE_NOT_FOUND));
knowledgeBaseRepository.removeById(knowledgeBaseId);
ragFileRepository.removeByKnowledgeBaseId(knowledgeBaseId);
// TODO: 删除知识库关联的所有文档
milvusClient.dropCollection(DropCollectionParam.newBuilder().withCollectionName(knowledgeBase.getName()).build());
}

public KnowledgeBaseResp getById(String knowledgeBaseId) {
Expand Down Expand Up @@ -136,8 +142,15 @@ public PagedResponse<RagFile> listFiles(String knowledgeBaseId, RagFileReq reque
return PagedResponse.of(page.getRecords(), page.getCurrent(), page.getTotal(), page.getPages());
}

@Transactional(rollbackFor = Exception.class)
public void deleteFiles(String knowledgeBaseId, DeleteFilesReq request) {
KnowledgeBase knowledgeBase = Optional.ofNullable(knowledgeBaseRepository.getById(knowledgeBaseId))
.orElseThrow(() -> BusinessException.of(KnowledgeBaseErrorCode.KNOWLEDGE_BASE_NOT_FOUND));
ragFileRepository.removeByIds(request.getIds());
milvusClient.delete(DeleteParam.newBuilder()
.withCollectionName(knowledgeBase.getName())
.withExpr("metadata[\"rag_file_id\"] in [" + org.apache.commons.lang3.StringUtils.join(request.getIds().stream().map(id -> "\"" + id + "\"").toArray(), ",") + "]")
.build());
}

public PagedResponse<RagChunk> getChunks(String knowledgeBaseId, String ragFileId, PagingQuery pagingQuery) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.datamate.rag.indexer.domain.model.FileStatus;
import com.datamate.rag.indexer.domain.model.RagFile;
import com.datamate.rag.indexer.domain.repository.RagFileRepository;
import com.datamate.rag.indexer.infrastructure.milvus.MilvusService;
import com.datamate.rag.indexer.interfaces.dto.ProcessType;
import com.google.common.collect.Lists;
import dev.langchain4j.data.document.Document;
Expand All @@ -27,11 +28,8 @@
import dev.langchain4j.data.embedding.Embedding;
import dev.langchain4j.data.segment.TextSegment;
import dev.langchain4j.model.embedding.EmbeddingModel;
import dev.langchain4j.store.embedding.EmbeddingStore;
import dev.langchain4j.store.embedding.milvus.MilvusEmbeddingStore;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.transaction.event.TransactionPhase;
Expand All @@ -55,10 +53,7 @@
public class RagEtlService {
private static final Semaphore SEMAPHORE = new Semaphore(10);

@Value("${datamate.rag.milvus-host:milvus-standalone}")
private String milvusHost;
@Value("${datamate.rag.milvus-port:19530}")
private int milvusPort;
private final MilvusService milvusService;

private final RagFileRepository ragFileRepository;

Expand Down Expand Up @@ -112,7 +107,7 @@ private void processRagFile(RagFile ragFile, DataInsertedEvent event) {
if (Arrays.asList("html", "htm").contains(file.getFileType().toLowerCase())) {
document = new HtmlToTextDocumentTransformer().transform(document);
}
document.metadata().put("fileId", ragFile.getFileId());
document.metadata().put("rag_file_id", ragFile.getId());
// 使用文档分块器对文档进行分块
DocumentSplitter splitter = documentSplitter(event.addFilesReq().getProcessType());
List<TextSegment> split = splitter.split(document);
Expand All @@ -129,7 +124,7 @@ private void processRagFile(RagFile ragFile, DataInsertedEvent event) {
Lists.partition(split, 20).forEach(partition -> {
List<Embedding> content = embeddingModel.embedAll(partition).content();
// 存储嵌入向量到 Milvus
embeddingStore(embeddingModel, event.knowledgeBase().getName()).addAll(content, partition);
milvusService.embeddingStore(embeddingModel, event.knowledgeBase().getName()).addAll(content, partition);
});
}

Expand Down Expand Up @@ -159,13 +154,4 @@ public DocumentSplitter documentSplitter(ProcessType processType) {
case DEFAULT_CHUNK -> new DocumentByLineSplitter(1000, 100);
};
}

public EmbeddingStore<TextSegment> embeddingStore(EmbeddingModel embeddingModel, String knowledgeBaseName) {
return MilvusEmbeddingStore.builder()
.host(milvusHost)
.port(milvusPort)
.collectionName(knowledgeBaseName)
.dimension(embeddingModel.dimension())
.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package com.datamate.rag.indexer.infrastructure.milvus;

import dev.langchain4j.data.segment.TextSegment;
import dev.langchain4j.model.embedding.EmbeddingModel;
import dev.langchain4j.store.embedding.EmbeddingStore;
import dev.langchain4j.store.embedding.milvus.MilvusEmbeddingStore;
import io.milvus.client.MilvusClient;
import io.milvus.client.MilvusServiceClient;
import io.milvus.param.ConnectParam;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

/**
* Milvus 服务类
*
* @author dallas
* @since 2025-11-17
*/
@Component
public class MilvusService {
@Value("${datamate.rag.milvus-host:milvus-standalone}")
private String milvusHost;
@Value("${datamate.rag.milvus-port:19530}")
private int milvusPort;

public EmbeddingStore<TextSegment> embeddingStore(EmbeddingModel embeddingModel, String knowledgeBaseName) {
return MilvusEmbeddingStore.builder()
.host(milvusHost)
.port(milvusPort)
.collectionName(knowledgeBaseName)
.dimension(embeddingModel.dimension())
.build();
}

@Bean
public MilvusClient milvusClient() {
ConnectParam connectParam = ConnectParam.newBuilder()
.withHost(milvusHost)
.withPort(milvusPort)
.build();
return new MilvusServiceClient(connectParam);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ const KnowledgeBaseDetailPage: React.FC = () => {
setSearchParams,
handleFiltersChange,
} = useFetchData<KBFile>(
(params) => queryKnowledgeBaseFilesUsingGet(knowledgeBase?.id, params),
(params) => id ? queryKnowledgeBaseFilesUsingGet(id, params) : Promise.resolve({ data: [] }),
mapFileData
);

Expand Down Expand Up @@ -237,4 +237,4 @@ const KnowledgeBaseDetailPage: React.FC = () => {
);
};

export default KnowledgeBaseDetailPage;
export default KnowledgeBaseDetailPage;