11import logging
2+ import json
23from http import HTTPStatus
34from typing import Any , Dict , List , Optional
45
56from fastapi import APIRouter , Body , Depends , Header , HTTPException , Path , Query
67from fastapi .responses import JSONResponse
8+ import re
79
810from consts .model import ChunkCreateRequest , ChunkUpdateRequest , HybridSearchRequest , IndexingResponse
911from nexent .vector_database .base import VectorDatabaseCore
1517)
1618from services .redis_service import get_redis_service
1719from utils .auth_utils import get_current_user_id
20+ from utils .file_management_utils import get_all_files_status
21+ from database .knowledge_db import get_index_name_by_knowledge_name
1822
1923router = APIRouter (prefix = "/indices" )
2024service = ElasticSearchService ()
@@ -49,7 +53,8 @@ def create_new_index(
4953 """Create a new vector index and store it in the knowledge table"""
5054 try :
5155 user_id , tenant_id = get_current_user_id (authorization )
52- return ElasticSearchService .create_index (index_name , embedding_dim , vdb_core , user_id , tenant_id )
56+ # Treat path parameter as user-facing knowledge base name for new creations
57+ return ElasticSearchService .create_knowledge_base (index_name , embedding_dim , vdb_core , user_id , tenant_id )
5358 except Exception as e :
5459 raise HTTPException (
5560 status_code = HTTPStatus .INTERNAL_SERVER_ERROR , detail = f"Error creating index: { str (e )} " )
@@ -99,7 +104,9 @@ def create_index_documents(
99104 data : List [Dict [str , Any ]
100105 ] = Body (..., description = "Document List to process" ),
101106 vdb_core : VectorDatabaseCore = Depends (get_vector_db_core ),
102- authorization : Optional [str ] = Header (None )
107+ authorization : Optional [str ] = Header (None ),
108+ task_id : Optional [str ] = Header (
109+ None , alias = "X-Task-Id" , description = "Task ID for progress tracking" ),
103110):
104111 """
105112 Index documents with embeddings, creating the index if it doesn't exist.
@@ -108,12 +115,21 @@ def create_index_documents(
108115 try :
109116 user_id , tenant_id = get_current_user_id (authorization )
110117 embedding_model = get_embedding_model (tenant_id )
111- return ElasticSearchService .index_documents (embedding_model , index_name , data , vdb_core )
118+ return ElasticSearchService .index_documents (
119+ embedding_model = embedding_model ,
120+ index_name = index_name ,
121+ data = data ,
122+ vdb_core = vdb_core ,
123+ task_id = task_id ,
124+ )
112125 except Exception as e :
113126 error_msg = str (e )
114127 logger .error (f"Error indexing documents: { error_msg } " )
128+
115129 raise HTTPException (
116- status_code = HTTPStatus .INTERNAL_SERVER_ERROR , detail = f"Error indexing documents: { error_msg } " )
130+ status_code = HTTPStatus .INTERNAL_SERVER_ERROR ,
131+ detail = f"Error indexing documents: { error_msg } "
132+ )
117133
118134
119135@router .get ("/{index_name}/files" )
@@ -187,6 +203,66 @@ def delete_documents(
187203 status_code = HTTPStatus .INTERNAL_SERVER_ERROR , detail = f"Error delete indexing documents: { e } " )
188204
189205
206+ @router .get ("/{index_name}/documents/{path_or_url:path}/error-info" )
207+ async def get_document_error_info (
208+ index_name : str = Path (..., description = "Name of the index" ),
209+ path_or_url : str = Path (...,
210+ description = "Path or URL of the document" ),
211+ authorization : Optional [str ] = Header (None )
212+ ):
213+ """Get error information for a document"""
214+ try :
215+ celery_task_files = await get_all_files_status (index_name )
216+ file_status = celery_task_files .get (path_or_url )
217+
218+ if not file_status :
219+ raise HTTPException (
220+ status_code = HTTPStatus .NOT_FOUND ,
221+ detail = f"Document { path_or_url } not found in index { index_name } "
222+ )
223+
224+ task_id = file_status .get ('latest_task_id' , '' )
225+ if not task_id :
226+ return {
227+ "status" : "success" ,
228+ "error_code" : None ,
229+ }
230+
231+ redis_service = get_redis_service ()
232+ raw_error = redis_service .get_error_info (task_id )
233+ error_code = None
234+
235+ if raw_error :
236+ # Try to parse JSON (new format with error_code only)
237+ try :
238+ parsed = json .loads (raw_error )
239+ if isinstance (parsed , dict ) and "error_code" in parsed :
240+ error_code = parsed .get ("error_code" )
241+ except Exception :
242+ # Fallback: regex extraction if JSON parsing fails
243+ try :
244+ match = re .search (
245+ r'["\']error_code["\']\s*:\s*["\']([^"\']+)["\']' , raw_error )
246+ if match :
247+ error_code = match .group (1 )
248+ except Exception :
249+ pass
250+
251+ return {
252+ "status" : "success" ,
253+ "error_code" : error_code ,
254+ }
255+ except HTTPException :
256+ raise
257+ except Exception as e :
258+ logger .error (
259+ f"Error getting error info for document { path_or_url } : { str (e )} " )
260+ raise HTTPException (
261+ status_code = HTTPStatus .INTERNAL_SERVER_ERROR ,
262+ detail = f"Error getting error info: { str (e )} "
263+ )
264+
265+
190266# Health check
191267@router .get ("/health" )
192268def health_check (vdb_core : VectorDatabaseCore = Depends (get_vector_db_core )):
@@ -201,25 +277,35 @@ def health_check(vdb_core: VectorDatabaseCore = Depends(get_vector_db_core)):
201277@router .post ("/{index_name}/chunks" )
202278def get_index_chunks (
203279 index_name : str = Path (...,
204- description = "Name of the index to get chunks from" ),
280+ description = "Name of the index (or knowledge_name) to get chunks from" ),
205281 page : int = Query (
206282 None , description = "Page number (1-based) for pagination" ),
207283 page_size : int = Query (
208284 None , description = "Number of records per page for pagination" ),
209285 path_or_url : Optional [str ] = Query (
210286 None , description = "Filter chunks by document path_or_url" ),
211- vdb_core : VectorDatabaseCore = Depends (get_vector_db_core )
287+ vdb_core : VectorDatabaseCore = Depends (get_vector_db_core ),
288+ authorization : Optional [str ] = Header (None )
212289):
213290 """Get chunks from the specified index, with optional pagination support"""
214291 try :
292+ _ , tenant_id = get_current_user_id (authorization )
293+ actual_index_name = get_index_name_by_knowledge_name (
294+ index_name , tenant_id )
295+
215296 result = ElasticSearchService .get_index_chunks (
216- index_name = index_name ,
297+ index_name = actual_index_name ,
217298 page = page ,
218299 page_size = page_size ,
219300 path_or_url = path_or_url ,
220301 vdb_core = vdb_core ,
221302 )
222303 return JSONResponse (status_code = HTTPStatus .OK , content = result )
304+ except ValueError as e :
305+ raise HTTPException (
306+ status_code = HTTPStatus .NOT_FOUND ,
307+ detail = str (e )
308+ )
223309 except Exception as e :
224310 error_msg = str (e )
225311 logger .error (
@@ -230,21 +316,29 @@ def get_index_chunks(
230316
231317@router .post ("/{index_name}/chunk" )
232318def create_chunk (
233- index_name : str = Path (..., description = "Name of the index" ),
319+ index_name : str = Path (...,
320+ description = "Name of the index (or knowledge_name)" ),
234321 payload : ChunkCreateRequest = Body (..., description = "Chunk data" ),
235322 vdb_core : VectorDatabaseCore = Depends (get_vector_db_core ),
236323 authorization : Optional [str ] = Header (None ),
237324):
238325 """Create a manual chunk."""
239326 try :
240- user_id , _ = get_current_user_id (authorization )
327+ user_id , tenant_id = get_current_user_id (authorization )
328+ actual_index_name = get_index_name_by_knowledge_name (
329+ index_name , tenant_id )
241330 result = ElasticSearchService .create_chunk (
242- index_name = index_name ,
331+ index_name = actual_index_name ,
243332 chunk_request = payload ,
244333 vdb_core = vdb_core ,
245334 user_id = user_id ,
246335 )
247336 return JSONResponse (status_code = HTTPStatus .OK , content = result )
337+ except ValueError as e :
338+ raise HTTPException (
339+ status_code = HTTPStatus .NOT_FOUND ,
340+ detail = str (e )
341+ )
248342 except Exception as exc :
249343 logger .error (
250344 "Error creating chunk for index %s: %s" , index_name , exc , exc_info = True
@@ -256,7 +350,8 @@ def create_chunk(
256350
257351@router .put ("/{index_name}/chunk/{chunk_id}" )
258352def update_chunk (
259- index_name : str = Path (..., description = "Name of the index" ),
353+ index_name : str = Path (...,
354+ description = "Name of the index (or knowledge_name)" ),
260355 chunk_id : str = Path (..., description = "Chunk identifier" ),
261356 payload : ChunkUpdateRequest = Body (...,
262357 description = "Chunk update payload" ),
@@ -265,18 +360,22 @@ def update_chunk(
265360):
266361 """Update an existing chunk."""
267362 try :
268- user_id , _ = get_current_user_id (authorization )
363+ user_id , tenant_id = get_current_user_id (authorization )
364+ actual_index_name = get_index_name_by_knowledge_name (
365+ index_name , tenant_id )
269366 result = ElasticSearchService .update_chunk (
270- index_name = index_name ,
367+ index_name = actual_index_name ,
271368 chunk_id = chunk_id ,
272369 chunk_request = payload ,
273370 vdb_core = vdb_core ,
274371 user_id = user_id ,
275372 )
276373 return JSONResponse (status_code = HTTPStatus .OK , content = result )
277- except ValueError as exc :
374+ except ValueError as e :
278375 raise HTTPException (
279- status_code = HTTPStatus .BAD_REQUEST , detail = str (exc ))
376+ status_code = HTTPStatus .NOT_FOUND ,
377+ detail = str (e )
378+ )
280379 except Exception as exc :
281380 logger .error (
282381 "Error updating chunk %s for index %s: %s" ,
@@ -292,22 +391,28 @@ def update_chunk(
292391
293392@router .delete ("/{index_name}/chunk/{chunk_id}" )
294393def delete_chunk (
295- index_name : str = Path (..., description = "Name of the index" ),
394+ index_name : str = Path (...,
395+ description = "Name of the index (or knowledge_name)" ),
296396 chunk_id : str = Path (..., description = "Chunk identifier" ),
297397 vdb_core : VectorDatabaseCore = Depends (get_vector_db_core ),
298398 authorization : Optional [str ] = Header (None ),
299399):
300400 """Delete a chunk."""
301401 try :
302- get_current_user_id (authorization )
402+ _ , tenant_id = get_current_user_id (authorization )
403+ actual_index_name = get_index_name_by_knowledge_name (
404+ index_name , tenant_id )
303405 result = ElasticSearchService .delete_chunk (
304- index_name = index_name ,
406+ index_name = actual_index_name ,
305407 chunk_id = chunk_id ,
306408 vdb_core = vdb_core ,
307409 )
308410 return JSONResponse (status_code = HTTPStatus .OK , content = result )
309- except ValueError as exc :
310- raise HTTPException (status_code = HTTPStatus .NOT_FOUND , detail = str (exc ))
411+ except ValueError as e :
412+ raise HTTPException (
413+ status_code = HTTPStatus .NOT_FOUND ,
414+ detail = str (e )
415+ )
311416 except Exception as exc :
312417 logger .error (
313418 "Error deleting chunk %s for index %s: %s" ,
0 commit comments