Skip to content

Commit 0ed42e6

Browse files
authored
♻️ Refactor: refactor elasticsearch core to vectordb core
♻️ Refactor: refactor elasticsearch core to vectordb core
2 parents a1bf5ca + 16427e2 commit 0ed42e6

35 files changed

+1870
-1483
lines changed

backend/agents/create_agent_info.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,11 @@
99
from nexent.core.agents.agent_model import AgentRunInfo, ModelConfig, AgentConfig, ToolConfig
1010
from nexent.memory.memory_service import search_memory_in_levels
1111

12-
from services.elasticsearch_service import ElasticSearchService, elastic_core, get_embedding_model
12+
from services.vectordatabase_service import (
13+
ElasticSearchService,
14+
get_vector_db_core,
15+
get_embedding_model,
16+
)
1317
from services.tenant_config_service import get_selected_knowledge_list
1418
from services.remote_mcp_service import get_remote_mcp_server_list
1519
from services.memory_config_service import build_memory_context
@@ -227,9 +231,11 @@ async def create_tool_config_list(agent_id, tenant_id, user_id):
227231
tenant_id=tenant_id, user_id=user_id)
228232
index_names = [knowledge_info.get(
229233
"index_name") for knowledge_info in knowledge_info_list]
230-
tool_config.metadata = {"index_names": index_names,
231-
"es_core": elastic_core,
232-
"embedding_model": get_embedding_model(tenant_id=tenant_id)}
234+
tool_config.metadata = {
235+
"index_names": index_names,
236+
"vdb_core": get_vector_db_core(),
237+
"embedding_model": get_embedding_model(tenant_id=tenant_id),
238+
}
233239
tool_config_list.append(tool_config)
234240

235241
return tool_config_list

backend/apps/config_app.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
from apps.agent_app import agent_config_router as agent_router
88
from apps.config_sync_app import router as config_sync_router
9-
from apps.elasticsearch_app import router as elasticsearch_router
9+
from apps.vectordatabase_app import router as vectordatabase_router
1010
from apps.file_management_app import file_management_config_router as file_manager_router
1111
from apps.image_app import router as proxy_router
1212
from apps.knowledge_summary_app import router as summary_router
@@ -41,7 +41,7 @@
4141
app.include_router(model_manager_router)
4242
app.include_router(config_sync_router)
4343
app.include_router(agent_router)
44-
app.include_router(elasticsearch_router)
44+
app.include_router(vectordatabase_router)
4545
app.include_router(voice_router)
4646
app.include_router(file_manager_router)
4747
app.include_router(proxy_router)

backend/apps/knowledge_summary_app.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,10 @@
33

44
from fastapi import APIRouter, Body, Depends, Header, HTTPException, Path, Query, Request
55
from fastapi.responses import StreamingResponse
6-
from nexent.vector_database.elasticsearch_core import ElasticSearchCore
6+
from nexent.vector_database.base import VectorDatabaseCore
77

88
from consts.model import ChangeSummaryRequest
9-
from services.elasticsearch_service import ElasticSearchService, get_es_core
9+
from services.vectordatabase_service import ElasticSearchService, get_vector_db_core
1010
from utils.auth_utils import get_current_user_id, get_current_user_info
1111

1212
router = APIRouter(prefix="/summary")
@@ -22,7 +22,7 @@ async def auto_summary(
2222
1000, description="Number of documents to retrieve per batch"),
2323
model_id: Optional[int] = Query(
2424
None, description="Model ID to use for summary generation"),
25-
es_core: ElasticSearchCore = Depends(get_es_core),
25+
vdb_core: VectorDatabaseCore = Depends(get_vector_db_core),
2626
authorization: Optional[str] = Header(None)
2727
):
2828
"""Summary Elasticsearch index_name by model"""
@@ -34,15 +34,16 @@ async def auto_summary(
3434
return await service.summary_index_name(
3535
index_name=index_name,
3636
batch_size=batch_size,
37-
es_core=es_core,
37+
vdb_core=vdb_core,
3838
tenant_id=tenant_id,
3939
language=language,
4040
model_id=model_id
4141
)
4242
except Exception as e:
43-
logger.error("Knowledge base summary generation failed", exc_info=True)
43+
logger.error(
44+
f"Knowledge base summary generation failed: {e}", exc_info=True)
4445
return StreamingResponse(
45-
f"data: {{\"status\": \"error\", \"message\": \"Knowledge base summary generation failed due to an internal error.\"}}\n\n",
46+
"data: {{\"status\": \"error\", \"message\": \"Knowledge base summary generation failed due to an internal error.\"}}\n\n",
4647
media_type="text/event-stream",
4748
status_code=500
4849
)
Lines changed: 24 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -5,27 +5,31 @@
55
from fastapi import APIRouter, Body, Depends, Header, HTTPException, Path, Query
66

77
from consts.model import IndexingResponse
8-
from nexent.vector_database.elasticsearch_core import ElasticSearchCore
9-
from services.elasticsearch_service import ElasticSearchService, get_embedding_model, get_es_core, \
10-
check_knowledge_base_exist_impl
8+
from nexent.vector_database.base import VectorDatabaseCore
9+
from services.vectordatabase_service import (
10+
ElasticSearchService,
11+
get_embedding_model,
12+
get_vector_db_core,
13+
check_knowledge_base_exist_impl,
14+
)
1115
from services.redis_service import get_redis_service
1216
from utils.auth_utils import get_current_user_id
1317

1418
router = APIRouter(prefix="/indices")
1519
service = ElasticSearchService()
16-
logger = logging.getLogger("elasticsearch_app")
20+
logger = logging.getLogger("vectordatabase_app")
1721

1822

1923
@router.get("/check_exist/{index_name}")
2024
async def check_knowledge_base_exist(
2125
index_name: str = Path(..., description="Name of the index to check"),
22-
es_core: ElasticSearchCore = Depends(get_es_core),
26+
vdb_core: VectorDatabaseCore = Depends(get_vector_db_core),
2327
authorization: Optional[str] = Header(None)
2428
):
2529
"""Check if a knowledge base name exists and in which scope."""
2630
try:
2731
user_id, tenant_id = get_current_user_id(authorization)
28-
return check_knowledge_base_exist_impl(index_name=index_name, es_core=es_core, user_id=user_id, tenant_id=tenant_id)
32+
return check_knowledge_base_exist_impl(index_name=index_name, vdb_core=vdb_core, user_id=user_id, tenant_id=tenant_id)
2933
except Exception as e:
3034
logger.error(
3135
f"Error checking knowledge base existence for '{index_name}': {str(e)}", exc_info=True)
@@ -38,13 +42,13 @@ def create_new_index(
3842
index_name: str = Path(..., description="Name of the index to create"),
3943
embedding_dim: Optional[int] = Query(
4044
None, description="Dimension of the embedding vectors"),
41-
es_core: ElasticSearchCore = Depends(get_es_core),
45+
vdb_core: VectorDatabaseCore = Depends(get_vector_db_core),
4246
authorization: Optional[str] = Header(None)
4347
):
4448
"""Create a new vector index and store it in the knowledge table"""
4549
try:
4650
user_id, tenant_id = get_current_user_id(authorization)
47-
return ElasticSearchService.create_index(index_name, embedding_dim, es_core, user_id, tenant_id)
51+
return ElasticSearchService.create_index(index_name, embedding_dim, vdb_core, user_id, tenant_id)
4852
except Exception as e:
4953
raise HTTPException(
5054
status_code=HTTPStatus.INTERNAL_SERVER_ERROR, detail=f"Error creating index: {str(e)}")
@@ -53,15 +57,15 @@ def create_new_index(
5357
@router.delete("/{index_name}")
5458
async def delete_index(
5559
index_name: str = Path(..., description="Name of the index to delete"),
56-
es_core: ElasticSearchCore = Depends(get_es_core),
60+
vdb_core: VectorDatabaseCore = Depends(get_vector_db_core),
5761
authorization: Optional[str] = Header(None)
5862
):
5963
"""Delete an index and all its related data by calling the centralized service."""
6064
logger.debug(f"Received request to delete knowledge base: {index_name}")
6165
try:
6266
user_id, tenant_id = get_current_user_id(authorization)
6367
# Call the centralized full deletion service
64-
result = await ElasticSearchService.full_delete_knowledge_base(index_name, es_core, user_id)
68+
result = await ElasticSearchService.full_delete_knowledge_base(index_name, vdb_core, user_id)
6569
return result
6670
except Exception as e:
6771
logger.error(
@@ -75,13 +79,13 @@ def get_list_indices(
7579
pattern: str = Query("*", description="Pattern to match index names"),
7680
include_stats: bool = Query(
7781
False, description="Whether to include index stats"),
78-
es_core: ElasticSearchCore = Depends(get_es_core),
82+
vdb_core: VectorDatabaseCore = Depends(get_vector_db_core),
7983
authorization: Optional[str] = Header(None),
8084
):
8185
"""List all user indices with optional stats"""
8286
try:
8387
user_id, tenant_id = get_current_user_id(authorization)
84-
return ElasticSearchService.list_indices(pattern, include_stats, tenant_id, user_id, es_core)
88+
return ElasticSearchService.list_indices(pattern, include_stats, tenant_id, user_id, vdb_core)
8589
except Exception as e:
8690
raise HTTPException(
8791
status_code=HTTPStatus.INTERNAL_SERVER_ERROR, detail=f"Error get index: {str(e)}")
@@ -93,7 +97,7 @@ def create_index_documents(
9397
index_name: str = Path(..., description="Name of the index"),
9498
data: List[Dict[str, Any]
9599
] = Body(..., description="Document List to process"),
96-
es_core: ElasticSearchCore = Depends(get_es_core),
100+
vdb_core: VectorDatabaseCore = Depends(get_vector_db_core),
97101
authorization: Optional[str] = Header(None)
98102
):
99103
"""
@@ -103,7 +107,7 @@ def create_index_documents(
103107
try:
104108
user_id, tenant_id = get_current_user_id(authorization)
105109
embedding_model = get_embedding_model(tenant_id)
106-
return ElasticSearchService.index_documents(embedding_model, index_name, data, es_core)
110+
return ElasticSearchService.index_documents(embedding_model, index_name, data, vdb_core)
107111
except Exception as e:
108112
error_msg = str(e)
109113
logger.error(f"Error indexing documents: {error_msg}")
@@ -114,11 +118,11 @@ def create_index_documents(
114118
@router.get("/{index_name}/files")
115119
async def get_index_files(
116120
index_name: str = Path(..., description="Name of the index"),
117-
es_core: ElasticSearchCore = Depends(get_es_core)
121+
vdb_core: VectorDatabaseCore = Depends(get_vector_db_core)
118122
):
119123
"""Get all files from an index, including those that are not yet stored in ES"""
120124
try:
121-
result = await ElasticSearchService.list_files(index_name, include_chunks=False, es_core=es_core)
125+
result = await ElasticSearchService.list_files(index_name, include_chunks=False, vdb_core=vdb_core)
122126
# Transform result to match frontend expectations
123127
return {
124128
"status": "success",
@@ -136,13 +140,13 @@ def delete_documents(
136140
index_name: str = Path(..., description="Name of the index"),
137141
path_or_url: str = Query(...,
138142
description="Path or URL of documents to delete"),
139-
es_core: ElasticSearchCore = Depends(get_es_core)
143+
vdb_core: VectorDatabaseCore = Depends(get_vector_db_core)
140144
):
141145
"""Delete documents by path or URL and clean up related Redis records"""
142146
try:
143147
# First delete the documents using existing service
144148
result = ElasticSearchService.delete_documents(
145-
index_name, path_or_url, es_core)
149+
index_name, path_or_url, vdb_core)
146150

147151
# Then clean up Redis records related to this specific document
148152
try:
@@ -184,10 +188,10 @@ def delete_documents(
184188

185189
# Health check
186190
@router.get("/health")
187-
def health_check(es_core: ElasticSearchCore = Depends(get_es_core)):
191+
def health_check(vdb_core: VectorDatabaseCore = Depends(get_vector_db_core)):
188192
"""Check API and Elasticsearch health"""
189193
try:
190194
# Try to list indices as a health check
191-
return ElasticSearchService.health_check(es_core)
195+
return ElasticSearchService.health_check(vdb_core)
192196
except Exception as e:
193197
raise HTTPException(status_code=HTTPStatus.INTERNAL_SERVER_ERROR, detail=f"{str(e)}")

backend/consts/const.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import os
2+
from enum import Enum
23
from dotenv import load_dotenv
34

45
# Load environment variables
@@ -10,6 +11,11 @@
1011
os.path.dirname(__file__)), 'assets', 'test.wav')
1112

1213

14+
# Vector database providers
15+
class VectorDatabaseType(str, Enum):
16+
ELASTICSEARCH = "elasticsearch"
17+
18+
1319
# ModelEngine Configuration
1420
MODEL_ENGINE_HOST = os.getenv('MODEL_ENGINE_HOST')
1521
MODEL_ENGINE_APIKEY = os.getenv('MODEL_ENGINE_APIKEY')

backend/services/file_management_service.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
list_files
2121
)
2222
from utils.attachment_utils import convert_image_to_text, convert_long_text_to_text
23-
from services.elasticsearch_service import ElasticSearchService, get_es_core
23+
from services.vectordatabase_service import ElasticSearchService, get_vector_db_core
2424
from utils.prompt_template_utils import get_file_processing_messages_template
2525
from utils.file_management_utils import save_upload_file
2626

@@ -79,8 +79,8 @@ async def upload_files_impl(destination: str, file: List[UploadFile], folder: st
7979
# Resolve filename conflicts against existing KB documents by renaming (e.g., name -> name_1)
8080
if index_name:
8181
try:
82-
es_core = get_es_core()
83-
existing = await ElasticSearchService.list_files(index_name, include_chunks=False, es_core=es_core)
82+
vdb_core = get_vector_db_core()
83+
existing = await ElasticSearchService.list_files(index_name, include_chunks=False, vdb_core=vdb_core)
8484
existing_files = existing.get(
8585
"files", []) if isinstance(existing, dict) else []
8686
# Prefer 'file' field; fall back to 'filename' if present

backend/services/model_management_service.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import logging
2-
from typing import List, Dict, Any, Optional
2+
from typing import List, Dict, Any
33

44
from consts.const import LOCALHOST_IP, LOCALHOST_NAME, DOCKER_INTERNAL_HOST
55
from consts.model import ModelConnectStatusEnum
@@ -25,7 +25,7 @@
2525
sort_models_by_id,
2626
)
2727
from utils.memory_utils import build_memory_config as build_memory_config_for_tenant
28-
from services.elasticsearch_service import get_es_core
28+
from services.vectordatabase_service import get_vector_db_core
2929
from nexent.memory.memory_service import clear_model_memories
3030

3131
logger = logging.getLogger("model_management_service")
@@ -244,12 +244,12 @@ async def delete_model_for_tenant(user_id: str, tenant_id: str, display_name: st
244244

245245
# Best-effort memory cleanup using the fetched variants
246246
try:
247-
es_core = get_es_core()
247+
vdb_core = get_vector_db_core()
248248
base_memory_config = build_memory_config_for_tenant(tenant_id)
249249
for t, m in models_by_type.items():
250250
try:
251251
await clear_model_memories(
252-
es_core=es_core,
252+
vdb_core=vdb_core,
253253
model_repo=m.get("model_repo", ""),
254254
model_name=m.get("model_name", ""),
255255
embedding_dims=int(m.get("max_tokens") or 0),

backend/services/tool_configuration_service.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,10 @@
2020
query_all_tools,
2121
query_tool_instances_by_id,
2222
update_tool_table_from_scan_tool_list,
23-
search_last_tool_instance_by_tool_id
23+
search_last_tool_instance_by_tool_id,
2424
)
2525
from database.user_tenant_db import get_all_tenant_ids
26-
from services.elasticsearch_service import get_embedding_model, elastic_core
26+
from services.vectordatabase_service import get_embedding_model, get_vector_db_core
2727
from services.tenant_config_service import get_selected_knowledge_list
2828

2929
logger = logging.getLogger("tool_configuration_service")
@@ -605,11 +605,12 @@ def _validate_local_tool(
605605
index_names = [knowledge_info.get("index_name")
606606
for knowledge_info in knowledge_info_list]
607607
embedding_model = get_embedding_model(tenant_id=tenant_id)
608+
vdb_core = get_vector_db_core()
608609
params = {
609610
**instantiation_params,
610611
'index_names': index_names,
611-
'es_core': elastic_core,
612-
'embedding_model': embedding_model
612+
'vdb_core': vdb_core,
613+
'embedding_model': embedding_model,
613614
}
614615
tool_instance = tool_class(**params)
615616
else:

0 commit comments

Comments
 (0)