1313import logging
1414import os
1515import time
16+ import uuid
1617from datetime import datetime , timezone
1718from typing import Any , Dict , Generator , List , Optional
1819
2425from nexent .vector_database .elasticsearch_core import ElasticSearchCore
2526
2627from consts .const import ES_API_KEY , ES_HOST , LANGUAGE , VectorDatabaseType
28+ from consts .model import ChunkCreateRequest , ChunkUpdateRequest
2729from database .attachment_db import delete_file
2830from database .knowledge_db import (
2931 create_knowledge_record ,
3537from utils .config_utils import tenant_config_manager , get_model_name_from_config
3638from utils .file_management_utils import get_all_files_status , get_file_size
3739
38- ALLOWED_CHUNK_FIELDS = {"filename" ,
39- "path_or_url" , "content" , "create_time" , "id" }
40+ ALLOWED_CHUNK_FIELDS = {
41+ "id" ,
42+ "title" ,
43+ "filename" ,
44+ "path_or_url" ,
45+ "content" ,
46+ "create_time" ,
47+ "language" ,
48+ "author" ,
49+ "date" ,
50+ }
4051
4152# Configure logging
4253logger = logging .getLogger ("vectordatabase_service" )
@@ -997,6 +1008,105 @@ def get_index_chunks(
9971008 logger .error (error_msg )
9981009 raise Exception (error_msg )
9991010
1011+ @staticmethod
1012+ def create_chunk (
1013+ index_name : str ,
1014+ chunk_request : ChunkCreateRequest ,
1015+ vdb_core : VectorDatabaseCore = Depends (get_vector_db_core ),
1016+ user_id : Optional [str ] = None ,
1017+ ):
1018+ """
1019+ Create a manual chunk entry in the specified index.
1020+ """
1021+ try :
1022+ chunk_payload = ElasticSearchService ._build_chunk_payload (
1023+ base_fields = {
1024+ "id" : chunk_request .chunk_id or ElasticSearchService ._generate_chunk_id (),
1025+ "title" : chunk_request .title ,
1026+ "filename" : chunk_request .filename ,
1027+ "path_or_url" : chunk_request .path_or_url ,
1028+ "content" : chunk_request .content ,
1029+ "created_by" : user_id ,
1030+ },
1031+ metadata = chunk_request .metadata ,
1032+ ensure_create_time = True ,
1033+ )
1034+ result = vdb_core .create_chunk (index_name , chunk_payload )
1035+ return {
1036+ "status" : "success" ,
1037+ "message" : f"Chunk { result .get ('id' )} created successfully" ,
1038+ "chunk_id" : result .get ("id" ),
1039+ }
1040+ except Exception as exc :
1041+ logger .error ("Error creating chunk in index %s: %s" ,
1042+ index_name , exc , exc_info = True )
1043+ raise Exception (f"Error creating chunk: { exc } " )
1044+
1045+ @staticmethod
1046+ def update_chunk (
1047+ index_name : str ,
1048+ chunk_id : str ,
1049+ chunk_request : ChunkUpdateRequest ,
1050+ vdb_core : VectorDatabaseCore = Depends (get_vector_db_core ),
1051+ user_id : Optional [str ] = None ,
1052+ ):
1053+ """
1054+ Update a chunk document.
1055+ """
1056+ try :
1057+ update_fields = chunk_request .dict (
1058+ exclude_unset = True , exclude = {"metadata" })
1059+ metadata = chunk_request .metadata or {}
1060+ update_payload = ElasticSearchService ._build_chunk_payload (
1061+ base_fields = {
1062+ ** update_fields ,
1063+ "updated_by" : user_id ,
1064+ "update_time" : datetime .utcnow ().strftime (
1065+ "%Y-%m-%dT%H:%M:%S" ),
1066+ },
1067+ metadata = metadata ,
1068+ ensure_create_time = False ,
1069+ )
1070+
1071+ if not update_payload :
1072+ raise ValueError ("No update fields supplied." )
1073+
1074+ result = vdb_core .update_chunk (
1075+ index_name , chunk_id , update_payload )
1076+ return {
1077+ "status" : "success" ,
1078+ "message" : f"Chunk { result .get ('id' )} updated successfully" ,
1079+ "chunk_id" : result .get ("id" ),
1080+ }
1081+ except Exception as exc :
1082+ logger .error ("Error updating chunk %s in index %s: %s" ,
1083+ chunk_id , index_name , exc , exc_info = True )
1084+ raise Exception (f"Error updating chunk: { exc } " )
1085+
1086+ @staticmethod
1087+ def delete_chunk (
1088+ index_name : str ,
1089+ chunk_id : str ,
1090+ vdb_core : VectorDatabaseCore = Depends (get_vector_db_core ),
1091+ ):
1092+ """
1093+ Delete a chunk document by id.
1094+ """
1095+ try :
1096+ deleted = vdb_core .delete_chunk (index_name , chunk_id )
1097+ if not deleted :
1098+ raise ValueError (
1099+ f"Chunk { chunk_id } not found in index { index_name } " )
1100+ return {
1101+ "status" : "success" ,
1102+ "message" : f"Chunk { chunk_id } deleted successfully" ,
1103+ "chunk_id" : chunk_id ,
1104+ }
1105+ except Exception as exc :
1106+ logger .error ("Error deleting chunk %s in index %s: %s" ,
1107+ chunk_id , index_name , exc , exc_info = True )
1108+ raise Exception (f"Error deleting chunk: { exc } " )
1109+
10001110 @staticmethod
10011111 def search_hybrid (
10021112 * ,
@@ -1058,4 +1168,32 @@ def search_hybrid(
10581168 f"Hybrid search failed for indices { index_names } : { exc } " ,
10591169 exc_info = True ,
10601170 )
1061- raise Exception (f"Error executing hybrid search: { str (exc )} " )
1171+ raise Exception (f"Error executing hybrid search: { str (exc )} " )
1172+
1173+ @staticmethod
1174+ def _generate_chunk_id () -> str :
1175+ """Generate a deterministic chunk id."""
1176+ return f"chunk_{ uuid .uuid4 ().hex } "
1177+
1178+ @staticmethod
1179+ def _build_chunk_payload (
1180+ base_fields : Dict [str , Any ],
1181+ metadata : Optional [Dict [str , Any ]],
1182+ ensure_create_time : bool = True ,
1183+ ) -> Dict [str , Any ]:
1184+ """
1185+ Merge and sanitize chunk payload fields.
1186+ """
1187+ payload = {
1188+ key : value for key , value in (base_fields or {}).items () if value is not None
1189+ }
1190+ if metadata :
1191+ for key , value in metadata .items ():
1192+ if value is not None :
1193+ payload [key ] = value
1194+
1195+ if ensure_create_time and "create_time" not in payload :
1196+ payload ["create_time" ] = datetime .utcnow ().strftime (
1197+ "%Y-%m-%dT%H:%M:%S" )
1198+
1199+ return payload
0 commit comments