1616from datetime import datetime , timezone
1717from typing import Any , Dict , Generator , List , Optional
1818
19- from dotenv import load_dotenv
2019from fastapi import Body , Depends , Path , Query
2120from fastapi .responses import StreamingResponse
2221from jinja2 import Template , StrictUndefined
4342logger = logging .getLogger ("elasticsearch_service" )
4443
4544
46- def generate_knowledge_summary_stream (keywords : str , language : str , tenant_id : str ) -> Generator :
45+
46+
47+ def generate_knowledge_summary_stream (keywords : str , language : str , tenant_id : str , model_id : Optional [int ] = None ) -> Generator :
4748 """
4849 Generate a knowledge base summary based on keywords
4950
@@ -55,9 +56,6 @@ def generate_knowledge_summary_stream(keywords: str, language: str, tenant_id: s
5556 Returns:
5657 str: Generate a knowledge base summary
5758 """
58- # Load environment variables
59- load_dotenv ()
60-
6159 # Load prompt words based on language
6260 prompts = get_knowledge_summary_prompt_template (language )
6361
@@ -73,20 +71,47 @@ def generate_knowledge_summary_stream(keywords: str, language: str, tenant_id: s
7371 {"role" : MESSAGE_ROLE ["USER" ], "content" : user_prompt }
7472 ]
7573
76- # Get model configuration from tenant config manager
77- model_config = tenant_config_manager .get_model_config (
78- key = MODEL_CONFIG_MAPPING ["llmSecondary" ], tenant_id = tenant_id )
74+ # Get model configuration
75+ if model_id :
76+ try :
77+ from database .model_management_db import get_model_by_model_id
78+ model_info = get_model_by_model_id (model_id , tenant_id )
79+ if model_info :
80+ model_config = {
81+ 'api_key' : model_info .get ('api_key' , '' ),
82+ 'base_url' : model_info .get ('base_url' , '' ),
83+ 'model_name' : model_info .get ('model_name' , '' ),
84+ 'model_repo' : model_info .get ('model_repo' , '' )
85+ }
86+ else :
87+ # Fallback to default model if specified model not found
88+ logger .warning (f"Specified model { model_id } not found, falling back to default LLM." )
89+ model_config = tenant_config_manager .get_model_config (
90+ key = MODEL_CONFIG_MAPPING ["llm" ], tenant_id = tenant_id )
91+ except Exception as e :
92+ logger .warning (f"Failed to get model { model_id } , using default model: { e } " )
93+ model_config = tenant_config_manager .get_model_config (
94+ key = MODEL_CONFIG_MAPPING ["llm" ], tenant_id = tenant_id )
95+ else :
96+ # Use default model configuration
97+ model_config = tenant_config_manager .get_model_config (
98+ key = MODEL_CONFIG_MAPPING ["llm" ], tenant_id = tenant_id )
7999
80100 # initialize OpenAI client
81101 client = OpenAI (api_key = model_config .get ('api_key' , "" ),
82102 base_url = model_config .get ('base_url' , "" ))
83103
84104 try :
85105 # Create stream chat completion request
86- max_tokens = KNOWLEDGE_SUMMARY_MAX_TOKENS_ZH if language == LANGUAGE ["ZH" ] else KNOWLEDGE_SUMMARY_MAX_TOKENS_EN
106+ max_tokens = KNOWLEDGE_SUMMARY_MAX_TOKENS_ZH if language == LANGUAGE [
107+ "ZH" ] else KNOWLEDGE_SUMMARY_MAX_TOKENS_EN
108+ # Get model name for the request
109+ model_name_for_request = model_config .get ("model_name" , "" )
110+ if model_config .get ("model_repo" ):
111+ model_name_for_request = f"{ model_config ['model_repo' ]} /{ model_name_for_request } "
112+
87113 stream = client .chat .completions .create (
88- model = get_model_name_from_config (model_config ) if model_config .get (
89- "model_name" ) else "" , # use model name from config
114+ model = model_name_for_request ,
90115 messages = messages ,
91116 max_tokens = max_tokens , # add max_tokens limit
92117 stream = True # enable stream output
@@ -385,7 +410,8 @@ async def delete_index(
385410 }
386411 success = delete_knowledge_record (update_data )
387412 if not success :
388- raise Exception (f"Error deleting knowledge record for index { index_name } " )
413+ raise Exception (
414+ f"Error deleting knowledge record for index { index_name } " )
389415
390416 return {"status" : "success" , "message" : f"Index { index_name } and associated files deleted successfully" }
391417 except Exception as e :
@@ -397,8 +423,10 @@ def list_indices(
397423 "*" , description = "Pattern to match index names" ),
398424 include_stats : bool = Query (
399425 False , description = "Whether to include index stats" ),
400- tenant_id : str = Body (description = "ID of the tenant listing the knowledge base" ),
401- user_id : str = Body (description = "ID of the user listing the knowledge base" ),
426+ tenant_id : str = Body (
427+ description = "ID of the tenant listing the knowledge base" ),
428+ user_id : str = Body (
429+ description = "ID of the user listing the knowledge base" ),
402430 es_core : ElasticSearchCore = Depends (get_es_core )
403431 ):
404432 """
@@ -424,7 +452,8 @@ def list_indices(
424452 for record in db_record :
425453 # async PG database to sync ES, remove the data that is not in ES
426454 if record ["index_name" ] not in all_indices_list :
427- delete_knowledge_record ({"index_name" : record ["index_name" ], "user_id" : user_id })
455+ delete_knowledge_record (
456+ {"index_name" : record ["index_name" ], "user_id" : user_id })
428457 continue
429458 if record ["embedding_model_name" ] is None :
430459 model_name_is_none_list .append (record ["index_name" ])
@@ -449,8 +478,9 @@ def list_indices(
449478 "stats" : index_stats
450479 })
451480 if index_name in model_name_is_none_list :
452- update_model_name_by_index_name (index_name ,
453- index_stats .get ("base_info" , {}).get ("embedding_model" , "" ),
481+ update_model_name_by_index_name (index_name ,
482+ index_stats .get ("base_info" , {}).get (
483+ "embedding_model" , "" ),
454484 tenant_id , user_id )
455485 response ["indices_info" ] = stats_info
456486
@@ -514,11 +544,14 @@ def get_index_name(
514544 error_msg = str (e )
515545 # Check if it's an ElasticSearch connection issue
516546 if "503" in error_msg or "search_phase_execution_exception" in error_msg :
517- raise Exception (f"ElasticSearch service unavailable for index { index_name } : { error_msg } " )
547+ raise Exception (
548+ f"ElasticSearch service unavailable for index { index_name } : { error_msg } " )
518549 elif "ApiError" in error_msg :
519- raise Exception (f"ElasticSearch API error for index { index_name } : { error_msg } " )
550+ raise Exception (
551+ f"ElasticSearch API error for index { index_name } : { error_msg } " )
520552 else :
521- raise Exception (f"Error getting info for index { index_name } : { error_msg } " )
553+ raise Exception (
554+ f"Error getting info for index { index_name } : { error_msg } " )
522555
523556 @staticmethod
524557 def index_documents (
@@ -551,7 +584,8 @@ def index_documents(
551584 index_name , es_core = es_core )
552585 logger .info (f"Created new index { index_name } " )
553586 except Exception as create_error :
554- raise Exception (f"Failed to create index { index_name } : { str (create_error )} " )
587+ raise Exception (
588+ f"Failed to create index { index_name } : { str (create_error )} " )
555589
556590 # Transform indexing request results to documents
557591 documents = []
@@ -783,7 +817,8 @@ async def list_files(
783817 return {"files" : files }
784818
785819 except Exception as e :
786- raise Exception (f"Error getting file list for index { index_name } : { str (e )} " )
820+ raise Exception (
821+ f"Error getting file list for index { index_name } : { str (e )} " )
787822
788823 @staticmethod
789824 def delete_documents (
@@ -828,9 +863,12 @@ async def summary_index_name(self,
828863 1000 , description = "Number of documents to retrieve per batch" ),
829864 es_core : ElasticSearchCore = Depends (
830865 get_es_core ),
866+ user_id : Optional [str ] = Body (
867+ None , description = "ID of the user delete the knowledge base" ),
831868 tenant_id : Optional [str ] = Body (
832869 None , description = "ID of the tenant" ),
833- language : str = LANGUAGE ["ZH" ]
870+ language : str = LANGUAGE ["ZH" ],
871+ model_id : Optional [int ] = None
834872 ):
835873 """
836874 Generate a summary for the specified index based on its content
@@ -848,7 +886,8 @@ async def summary_index_name(self,
848886 try :
849887 # Get all documents
850888 if not tenant_id :
851- raise Exception ("Tenant ID is required for summary generation." )
889+ raise Exception (
890+ "Tenant ID is required for summary generation." )
852891 all_documents = ElasticSearchService .get_random_documents (
853892 index_name , batch_size , es_core )
854893 all_chunks = self ._clean_chunks_for_summary (all_documents )
@@ -860,7 +899,7 @@ async def summary_index_name(self,
860899 async def generate_summary ():
861900 token_join = []
862901 try :
863- for new_token in generate_knowledge_summary_stream (keywords_for_summary , language , tenant_id ):
902+ for new_token in generate_knowledge_summary_stream (keywords_for_summary , language , tenant_id , model_id ):
864903 if new_token == "END" :
865904 break
866905 else :
@@ -947,7 +986,8 @@ def get_random_documents(
947986 }
948987
949988 except Exception as e :
950- raise Exception (f"Error retrieving random documents from index { index_name } : { str (e )} " )
989+ raise Exception (
990+ f"Error retrieving random documents from index { index_name } : { str (e )} " )
951991
952992 @staticmethod
953993 def change_summary (
0 commit comments