Skip to content

Commit c2ae4c7

Browse files
authored
Merge pull request #352 from ELK-milu/main
feat: 为kb的操作接口使用了minio管理
2 parents 0ff771d + 31a4e54 commit c2ae4c7

File tree

10 files changed

+328
-106
lines changed

10 files changed

+328
-106
lines changed

server/routers/knowledge_router.py

Lines changed: 83 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
from fastapi import APIRouter, Body, Depends, File, HTTPException, Query, Request, UploadFile
1010
from fastapi.responses import FileResponse
11-
from starlette.responses import FileResponse as StarletteFileResponse
11+
from starlette.responses import StreamingResponse
1212

1313
from src.storage.db.models import User
1414
from server.utils.auth_middleware import get_admin_user
@@ -17,10 +17,47 @@
1717
from src.knowledge.indexing import SUPPORTED_FILE_EXTENSIONS, is_supported_file_extension, process_file_to_markdown
1818
from src.knowledge.utils import calculate_content_hash, merge_processing_params
1919
from src.models.embed import test_embedding_model_status, test_all_embedding_models_status
20+
from src.storage.minio.client import aupload_file_to_minio, get_minio_client
2021
from src.utils import hashstr, logger
2122

2223
knowledge = APIRouter(prefix="/knowledge", tags=["knowledge"])
2324

25+
media_types = {
26+
".pdf": "application/pdf",
27+
".docx": "application/vnd.openxmlformats-officedocument.wordprocessingml.document",
28+
".doc": "application/msword",
29+
".txt": "text/plain",
30+
".md": "text/markdown",
31+
".json": "application/json",
32+
".csv": "text/csv",
33+
".xlsx": "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
34+
".xls": "application/vnd.ms-excel",
35+
".pptx": "application/vnd.openxmlformats-officedocument.presentationml.presentation",
36+
".ppt": "application/vnd.ms-powerpoint",
37+
".jpg": "image/jpeg",
38+
".jpeg": "image/jpeg",
39+
".png": "image/png",
40+
".gif": "image/gif",
41+
".bmp": "image/bmp",
42+
".svg": "image/svg+xml",
43+
".zip": "application/zip",
44+
".rar": "application/x-rar-compressed",
45+
".7z": "application/x-7z-compressed",
46+
".tar": "application/x-tar",
47+
".gz": "application/gzip",
48+
".html": "text/html",
49+
".htm": "text/html",
50+
".xml": "text/xml",
51+
".css": "text/css",
52+
".js": "application/javascript",
53+
".py": "text/x-python",
54+
".java": "text/x-java-source",
55+
".cpp": "text/x-c++src",
56+
".c": "text/x-csrc",
57+
".h": "text/x-chdr",
58+
".hpp": "text/x-c++hdr",
59+
}
60+
2461
# =============================================================================
2562
# === 数据库管理分组 ===
2663
# =============================================================================
@@ -179,12 +216,6 @@ async def export_database(
179216
if not os.path.exists(file_path):
180217
raise HTTPException(status_code=404, detail="Exported file not found.")
181218

182-
media_types = {
183-
"csv": "text/csv",
184-
"xlsx": "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
185-
"md": "text/markdown",
186-
"txt": "text/plain",
187-
}
188219
media_type = media_types.get(format, "application/octet-stream")
189220

190221
return FileResponse(path=file_path, filename=os.path.basename(file_path), media_type=media_type)
@@ -277,13 +308,25 @@ async def run_ingest(context: TaskContext):
277308

278309
item_type = "URL" if content_type == "url" else "文件"
279310
failed_count = len([_p for _p in processed_items if _p.get("status") == "failed"])
311+
success_items = [_p for _p in processed_items if _p.get("status") == "done"]
280312
summary = {
281313
"db_id": db_id,
282314
"item_type": item_type,
283315
"submitted": len(processed_items),
284316
"failed": failed_count,
285317
}
286318
message = f"{item_type}处理完成,失败 {failed_count} 个" if failed_count else f"{item_type}处理完成"
319+
320+
for success_item in success_items:
321+
# 使用异步上传到minio的对应知识库,同名文件会被覆盖
322+
async with aiofiles.open(success_item["path"], "rb") as f:
323+
file_bytes = await f.read()
324+
# 上传的bucket名为ref-{refdb},refdb中的_替换为-
325+
refdb = db_id.replace("_", "-")
326+
url = await aupload_file_to_minio(
327+
f"ref-{refdb}", success_item["filename"], file_bytes, success_item["file_type"]
328+
)
329+
logger.info(f"上传文件成功: {url}")
287330
await context.set_result(summary | {"items": processed_items})
288331
await context.set_progress(100.0, message)
289332
return summary | {"items": processed_items}
@@ -354,6 +397,10 @@ async def delete_document(db_id: str, doc_id: str, current_user: User = Depends(
354397
"""删除文档"""
355398
logger.debug(f"DELETE document {doc_id} info in {db_id}")
356399
try:
400+
file_meta_info = await knowledge_base.get_file_basic_info(db_id, doc_id)
401+
file_name = file_meta_info.get("meta", {}).get("filename")
402+
minio_client = get_minio_client()
403+
await minio_client.adelete_file("ref-" + db_id.replace("_", "-"), file_name)
357404
await knowledge_base.delete_file(db_id, doc_id)
358405
return {"message": "删除成功"}
359406
except Exception as e:
@@ -473,24 +520,6 @@ async def download_document(db_id: str, doc_id: str, request: Request, current_u
473520
logger.debug(f"Download document {doc_id} from {db_id}")
474521
try:
475522
file_info = await knowledge_base.get_file_basic_info(db_id, doc_id)
476-
if not file_info:
477-
raise HTTPException(status_code=404, detail="File not found")
478-
479-
file_path = file_info.get("meta", {}).get("path")
480-
if not file_path:
481-
raise HTTPException(status_code=404, detail="File path not found in metadata")
482-
483-
# 安全检查:验证文件路径
484-
from src.knowledge.utils.kb_utils import validate_file_path
485-
486-
try:
487-
normalized_path = validate_file_path(file_path, db_id)
488-
except ValueError as e:
489-
raise HTTPException(status_code=403, detail=str(e))
490-
491-
if not os.path.exists(normalized_path):
492-
raise HTTPException(status_code=404, detail=f"File not found on disk: {file_info=}")
493-
494523
# 获取文件扩展名和MIME类型,解码URL编码的文件名
495524
filename = file_info.get("meta", {}).get("filename", "file")
496525
logger.debug(f"Original filename from database: {filename}")
@@ -505,46 +534,31 @@ async def download_document(db_id: str, doc_id: str, request: Request, current_u
505534

506535
_, ext = os.path.splitext(decoded_filename)
507536

508-
media_types = {
509-
".pdf": "application/pdf",
510-
".docx": "application/vnd.openxmlformats-officedocument.wordprocessingml.document",
511-
".doc": "application/msword",
512-
".txt": "text/plain",
513-
".md": "text/markdown",
514-
".json": "application/json",
515-
".csv": "text/csv",
516-
".xlsx": "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
517-
".xls": "application/vnd.ms-excel",
518-
".pptx": "application/vnd.openxmlformats-officedocument.presentationml.presentation",
519-
".ppt": "application/vnd.ms-powerpoint",
520-
".jpg": "image/jpeg",
521-
".jpeg": "image/jpeg",
522-
".png": "image/png",
523-
".gif": "image/gif",
524-
".bmp": "image/bmp",
525-
".svg": "image/svg+xml",
526-
".zip": "application/zip",
527-
".rar": "application/x-rar-compressed",
528-
".7z": "application/x-7z-compressed",
529-
".tar": "application/x-tar",
530-
".gz": "application/gzip",
531-
".html": "text/html",
532-
".htm": "text/html",
533-
".xml": "text/xml",
534-
".css": "text/css",
535-
".js": "application/javascript",
536-
".py": "text/x-python",
537-
".java": "text/x-java-source",
538-
".cpp": "text/x-c++src",
539-
".c": "text/x-csrc",
540-
".h": "text/x-chdr",
541-
".hpp": "text/x-c++hdr",
542-
}
543537
media_type = media_types.get(ext.lower(), "application/octet-stream")
544538

545-
# 创建自定义FileResponse,避免文件名编码问题
546-
response = StarletteFileResponse(path=normalized_path, media_type=media_type)
539+
minio_client = get_minio_client()
540+
minio_response = await minio_client.adownload_response(
541+
bucket_name="ref-" + db_id.replace("_", "-"),
542+
object_name=filename,
543+
)
547544

545+
# 创建流式生成器
546+
async def minio_stream():
547+
try:
548+
while True:
549+
chunk = await asyncio.to_thread(minio_response.read, 8192)
550+
if not chunk:
551+
break
552+
yield chunk
553+
finally:
554+
minio_response.close()
555+
minio_response.release_conn()
556+
557+
# 创建StreamingResponse
558+
response = StreamingResponse(
559+
minio_stream(),
560+
media_type=media_type,
561+
)
548562
# 正确处理中文文件名的HTTP头部设置
549563
# HTTP头部只能包含ASCII字符,所以需要对中文文件名进行编码
550564
try:
@@ -1041,23 +1055,24 @@ async def upload_file(
10411055
upload_dir = os.path.join(config.save_dir, "database", "uploads")
10421056

10431057
basename, ext = os.path.splitext(file.filename)
1044-
filename = f"{basename}_{hashstr(basename, 4, with_salt=True)}{ext}".lower()
1058+
# TODO:
1059+
# 后续修改为遇到同名文件则在上传区域提示,是否删除旧文件,同时 filename name 也就不用添加 hash 了
1060+
filename = f"{basename}_{hashstr(basename, 4, with_salt=True, salt='fixed_salt')}{ext}".lower()
1061+
10451062
file_path = os.path.join(upload_dir, filename)
10461063

10471064
# 在线程池中执行同步文件系统操作,避免阻塞事件循环
10481065
await asyncio.to_thread(os.makedirs, upload_dir, exist_ok=True)
10491066

10501067
file_bytes = await file.read()
10511068

1052-
# 在线程池中执行计算密集型操作,避免阻塞事件循环
1053-
content_hash = await asyncio.to_thread(calculate_content_hash, file_bytes)
1069+
content_hash = await calculate_content_hash(file_bytes)
10541070

1055-
# 在线程池中执行同步数据库查询,避免阻塞事件循环
1056-
file_exists = await asyncio.to_thread(knowledge_base.file_existed_in_db, db_id, content_hash)
1071+
file_exists = await knowledge_base.file_existed_in_db(db_id, content_hash)
10571072
if file_exists:
10581073
raise HTTPException(
10591074
status_code=409,
1060-
detail="数据库中已经存在了相同文件,File with the same content already exists in this database",
1075+
detail="数据库中已经存在了相同内容文件,File with the same content already exists in this database",
10611076
)
10621077

10631078
# 使用异步文件写入,避免阻塞事件循环

src/knowledge/implementations/chroma.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ def _split_text_into_chunks(self, text: str, file_id: str, filename: str, params
173173

174174
return chunks
175175

176-
async def add_content(self, db_id: str, items: list[str], params: dict | None) -> list[dict]:
176+
async def add_content(self, db_id: str, items: list[str], params: dict | None = None) -> list[dict]:
177177
"""添加内容(文件/URL)"""
178178
if db_id not in self.databases_meta:
179179
raise ValueError(f"Database {db_id} not found")
@@ -187,7 +187,7 @@ async def add_content(self, db_id: str, items: list[str], params: dict | None) -
187187

188188
for item in items:
189189
# 准备文件元数据
190-
metadata = prepare_item_metadata(item, content_type, db_id, params=params)
190+
metadata = await prepare_item_metadata(item, content_type, db_id, params=params)
191191
file_id = metadata["file_id"]
192192
filename = metadata["filename"]
193193

src/knowledge/implementations/lightrag.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,7 @@ async def add_content(self, db_id: str, items: list[str], params: dict | None =
226226

227227
for item in items:
228228
# 准备文件元数据
229-
metadata = prepare_item_metadata(item, content_type, db_id, params=params)
229+
metadata = await prepare_item_metadata(item, content_type, db_id, params=params)
230230
file_id = metadata["file_id"]
231231
item_path = metadata["path"]
232232

src/knowledge/implementations/milvus.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,7 @@ async def add_content(self, db_id: str, items: list[str], params: dict | None =
226226
processed_items_info = []
227227

228228
for item in items:
229-
metadata = prepare_item_metadata(item, content_type, db_id, params=params)
229+
metadata = await prepare_item_metadata(item, content_type, db_id, params=params)
230230
file_id = metadata["file_id"]
231231
filename = metadata["filename"]
232232

0 commit comments

Comments
 (0)