|
18 | 18 |
|
19 | 19 | from fastapi import Body, Depends, Path, Query |
20 | 20 | from fastapi.responses import StreamingResponse |
21 | | -from jinja2 import Template, StrictUndefined |
22 | 21 | from nexent.core.models.embedding_model import OpenAICompatibleEmbedding, JinaEmbedding, BaseEmbedding |
23 | 22 | from nexent.core.nlp.tokenizer import calculate_term_weights |
24 | 23 | from nexent.vector_database.elasticsearch_core import ElasticSearchCore |
25 | | -from openai import OpenAI |
26 | | -from openai.types.chat import ChatCompletionMessageParam |
27 | 24 |
|
28 | | -from consts.const import ES_API_KEY, ES_HOST, LANGUAGE, MODEL_CONFIG_MAPPING, MESSAGE_ROLE, KNOWLEDGE_SUMMARY_MAX_TOKENS_ZH, KNOWLEDGE_SUMMARY_MAX_TOKENS_EN |
| 25 | +from consts.const import ES_API_KEY, ES_HOST, LANGUAGE |
29 | 26 | from database.attachment_db import delete_file |
30 | 27 | from database.knowledge_db import ( |
31 | 28 | create_knowledge_record, |
|
36 | 33 | from services.redis_service import get_redis_service |
37 | 34 | from utils.config_utils import tenant_config_manager, get_model_name_from_config |
38 | 35 | from utils.file_management_utils import get_all_files_status, get_file_size |
39 | | -from utils.prompt_template_utils import get_knowledge_summary_prompt_template |
40 | 36 |
|
41 | 37 | # Configure logging |
42 | 38 | logger = logging.getLogger("elasticsearch_service") |
43 | 39 |
|
44 | 40 |
|
45 | 41 |
|
46 | 42 |
|
47 | | -def generate_knowledge_summary_stream(keywords: str, language: str, tenant_id: str, model_id: Optional[int] = None) -> Generator: |
48 | | - """ |
49 | | - Generate a knowledge base summary based on keywords |
50 | | -
|
51 | | - Args: |
52 | | - keywords: Keywords that frequently appear in the knowledge base content |
53 | | - language: Language of the knowledge base content |
54 | | - tenant_id: The tenant ID for configuration |
55 | | -
|
56 | | - Returns: |
57 | | - str: Generate a knowledge base summary |
58 | | - """ |
59 | | - # Load prompt words based on language |
60 | | - prompts = get_knowledge_summary_prompt_template(language) |
61 | | - |
62 | | - # Render templates using Jinja2 |
63 | | - system_prompt = Template( |
64 | | - prompts['system_prompt'], undefined=StrictUndefined).render({}) |
65 | | - user_prompt = Template(prompts['user_prompt'], undefined=StrictUndefined).render( |
66 | | - {'content': keywords}) |
67 | | - |
68 | | - # Build messages |
69 | | - messages: List[ChatCompletionMessageParam] = [ |
70 | | - {"role": MESSAGE_ROLE["SYSTEM"], "content": system_prompt}, |
71 | | - {"role": MESSAGE_ROLE["USER"], "content": user_prompt} |
72 | | - ] |
73 | | - |
74 | | - # Get model configuration |
75 | | - if model_id: |
76 | | - try: |
77 | | - from database.model_management_db import get_model_by_model_id |
78 | | - model_info = get_model_by_model_id(model_id, tenant_id) |
79 | | - if model_info: |
80 | | - model_config = { |
81 | | - 'api_key': model_info.get('api_key', ''), |
82 | | - 'base_url': model_info.get('base_url', ''), |
83 | | - 'model_name': model_info.get('model_name', ''), |
84 | | - 'model_repo': model_info.get('model_repo', '') |
85 | | - } |
86 | | - else: |
87 | | - # Fallback to default model if specified model not found |
88 | | - logger.warning(f"Specified model {model_id} not found, falling back to default LLM.") |
89 | | - model_config = tenant_config_manager.get_model_config( |
90 | | - key=MODEL_CONFIG_MAPPING["llm"], tenant_id=tenant_id) |
91 | | - except Exception as e: |
92 | | - logger.warning(f"Failed to get model {model_id}, using default model: {e}") |
93 | | - model_config = tenant_config_manager.get_model_config( |
94 | | - key=MODEL_CONFIG_MAPPING["llm"], tenant_id=tenant_id) |
95 | | - else: |
96 | | - # Use default model configuration |
97 | | - model_config = tenant_config_manager.get_model_config( |
98 | | - key=MODEL_CONFIG_MAPPING["llm"], tenant_id=tenant_id) |
99 | | - |
100 | | - # initialize OpenAI client |
101 | | - client = OpenAI(api_key=model_config.get('api_key', ""), |
102 | | - base_url=model_config.get('base_url', "")) |
103 | | - |
104 | | - try: |
105 | | - # Create stream chat completion request |
106 | | - max_tokens = KNOWLEDGE_SUMMARY_MAX_TOKENS_ZH if language == LANGUAGE[ |
107 | | - "ZH"] else KNOWLEDGE_SUMMARY_MAX_TOKENS_EN |
108 | | - # Get model name for the request |
109 | | - model_name_for_request = model_config.get("model_name", "") |
110 | | - if model_config.get("model_repo"): |
111 | | - model_name_for_request = f"{model_config['model_repo']}/{model_name_for_request}" |
112 | | - |
113 | | - stream = client.chat.completions.create( |
114 | | - model=model_name_for_request, |
115 | | - messages=messages, |
116 | | - max_tokens=max_tokens, # add max_tokens limit |
117 | | - stream=True # enable stream output |
118 | | - ) |
119 | | - |
120 | | - # Iterate through stream response |
121 | | - for chunk in stream: |
122 | | - new_token = chunk.choices[0].delta.content |
123 | | - if new_token is not None: |
124 | | - yield new_token |
125 | | - yield "END" |
126 | | - |
127 | | - except Exception as e: |
128 | | - logger.error(f"Error occurred: {str(e)}") |
129 | | - yield f"Error: {str(e)}" |
| 43 | +# Old keyword-based summary method removed - replaced with Map-Reduce approach |
| 44 | +# See utils/document_vector_utils.py for new implementation |
130 | 45 |
|
131 | 46 |
|
132 | 47 | # Initialize ElasticSearchCore instance with HTTPS support |
@@ -871,62 +786,85 @@ async def summary_index_name(self, |
871 | 786 | model_id: Optional[int] = None |
872 | 787 | ): |
873 | 788 | """ |
874 | | - Generate a summary for the specified index based on its content |
| 789 | + Generate a summary for the specified index using advanced Map-Reduce approach |
| 790 | + |
| 791 | + New implementation: |
| 792 | + 1. Get documents and cluster them by semantic similarity |
| 793 | + 2. Map: Summarize each document individually |
| 794 | + 3. Reduce: Merge document summaries into cluster summaries |
| 795 | + 4. Return: Combined knowledge base summary |
875 | 796 |
|
876 | 797 | Args: |
877 | 798 | index_name: Name of the index to summarize |
878 | | - batch_size: Number of documents to process per batch |
| 799 | + batch_size: Number of documents to sample (default: 1000) |
879 | 800 | es_core: ElasticSearchCore instance |
880 | 801 | tenant_id: ID of the tenant |
881 | 802 | language: Language of the summary (default: 'zh') |
| 803 | + model_id: Model ID for LLM summarization |
882 | 804 |
|
883 | 805 | Returns: |
884 | 806 | StreamingResponse containing the generated summary |
885 | 807 | """ |
886 | 808 | try: |
887 | | - # Get all documents |
| 809 | + from utils.document_vector_utils import ( |
| 810 | + process_documents_for_clustering, |
| 811 | + kmeans_cluster_documents, |
| 812 | + summarize_clusters_map_reduce, |
| 813 | + merge_cluster_summaries |
| 814 | + ) |
| 815 | + |
888 | 816 | if not tenant_id: |
889 | | - raise Exception( |
890 | | - "Tenant ID is required for summary generation.") |
891 | | - all_documents = ElasticSearchService.get_random_documents( |
892 | | - index_name, batch_size, es_core) |
893 | | - all_chunks = self._clean_chunks_for_summary(all_documents) |
894 | | - keywords_dict = calculate_term_weights(all_chunks) |
895 | | - keywords_for_summary = "" |
896 | | - for _, key in enumerate(keywords_dict): |
897 | | - keywords_for_summary = keywords_for_summary + ", " + key |
898 | | - |
| 817 | + raise Exception("Tenant ID is required for summary generation.") |
| 818 | + |
| 819 | + # Use new Map-Reduce approach |
| 820 | + sample_count = min(batch_size // 5, 200) # Sample reasonable number of documents |
| 821 | + |
| 822 | + # Step 1: Get documents and calculate embeddings |
| 823 | + document_samples, doc_embeddings = process_documents_for_clustering( |
| 824 | + index_name=index_name, |
| 825 | + es_core=es_core, |
| 826 | + sample_doc_count=sample_count |
| 827 | + ) |
| 828 | + |
| 829 | + if not document_samples: |
| 830 | + raise Exception("No documents found in index.") |
| 831 | + |
| 832 | + # Step 2: Cluster documents |
| 833 | + clusters = kmeans_cluster_documents(doc_embeddings, k=None) |
| 834 | + |
| 835 | + # Step 3: Map-Reduce summarization |
| 836 | + cluster_summaries = summarize_clusters_map_reduce( |
| 837 | + document_samples=document_samples, |
| 838 | + clusters=clusters, |
| 839 | + language=language, |
| 840 | + doc_max_words=100, |
| 841 | + cluster_max_words=150, |
| 842 | + model_id=model_id, |
| 843 | + tenant_id=tenant_id |
| 844 | + ) |
| 845 | + |
| 846 | + # Step 4: Merge into final summary |
| 847 | + final_summary = merge_cluster_summaries(cluster_summaries) |
| 848 | + |
| 849 | + # Stream the result |
899 | 850 | async def generate_summary(): |
900 | | - token_join = [] |
901 | 851 | try: |
902 | | - for new_token in generate_knowledge_summary_stream(keywords_for_summary, language, tenant_id, model_id): |
903 | | - if new_token == "END": |
904 | | - break |
905 | | - else: |
906 | | - token_join.append(new_token) |
907 | | - yield f"data: {{\"status\": \"success\", \"message\": \"{new_token}\"}}\n\n" |
908 | | - await asyncio.sleep(0.1) |
| 852 | + # Stream the summary character by character |
| 853 | + for char in final_summary: |
| 854 | + yield f"data: {{\"status\": \"success\", \"message\": \"{char}\"}}\n\n" |
| 855 | + await asyncio.sleep(0.01) |
| 856 | + yield f"data: {{\"status\": \"completed\"}}\n\n" |
909 | 857 | except Exception as e: |
910 | 858 | yield f"data: {{\"status\": \"error\", \"message\": \"{e}\"}}\n\n" |
911 | | - |
912 | | - # Return the flow response |
| 859 | + |
913 | 860 | return StreamingResponse( |
914 | 861 | generate_summary(), |
915 | 862 | media_type="text/event-stream" |
916 | 863 | ) |
917 | | - |
| 864 | + |
918 | 865 | except Exception as e: |
919 | | - raise Exception(f"{str(e)}") |
920 | | - |
921 | | - @staticmethod |
922 | | - def _clean_chunks_for_summary(all_documents): |
923 | | - # Only use these three fields for summarization |
924 | | - all_chunks = "" |
925 | | - for _, chunk in enumerate(all_documents['documents']): |
926 | | - all_chunks = all_chunks + "\n" + \ |
927 | | - chunk["title"] + "\n" + chunk["filename"] + \ |
928 | | - "\n" + chunk["content"] |
929 | | - return all_chunks |
| 866 | + logger.error(f"Knowledge base summary generation failed: {str(e)}", exc_info=True) |
| 867 | + raise Exception(f"Failed to generate summary: {str(e)}") |
930 | 868 |
|
931 | 869 | @staticmethod |
932 | 870 | def get_random_documents( |
|
0 commit comments