66 @date:2025/11/13 11:19
77 @desc:
88"""
9+ from functools import reduce
10+ from typing import Dict , List
11+ import uuid_utils .compat as uuid
12+ from django .db .models import QuerySet
13+ from django .db .models .aggregates import Max
14+
15+ from rest_framework import serializers
16+ from django .utils .translation import gettext_lazy as _
917from application .flow .i_step_node import NodeResult
1018from application .flow .step_node .knowledge_write_node .i_knowledge_write_node import IKnowledgeWriteNode
19+ from common .utils .common import bulk_create_in_batches
20+ from knowledge .models import Document , KnowledgeType , Paragraph , File , FileSourceType , Problem , ProblemParagraphMapping
21+ from knowledge .serializers .common import ProblemParagraphObject , ProblemParagraphManage
22+
23+
24+ class ParagraphInstanceSerializer (serializers .Serializer ):
25+ content = serializers .CharField (required = True , label = _ ('content' ), max_length = 102400 , min_length = 1 , allow_null = True ,
26+ allow_blank = True )
27+ title = serializers .CharField (required = False , max_length = 256 , label = _ ('section title' ), allow_null = True ,
28+ allow_blank = True )
29+ problem_list = serializers .ListField (required = False , child = serializers .CharField (required = True ))
30+ is_active = serializers .BooleanField (required = False , label = _ ('Is active' ))
31+ chunks = serializers .ListField (required = False , child = serializers .CharField (required = True ))
32+
33+
34+ class KnowledgeWriteParamSerializer (serializers .Serializer ):
35+ name = serializers .CharField (required = True , label = _ ('document name' ), max_length = 128 , min_length = 1 ,
36+ source = _ ('document name' ))
37+ paragraphs = ParagraphInstanceSerializer (required = False , many = True , allow_null = True )
38+
39+
40+ def convert_uuid_to_str (obj ):
41+ if isinstance (obj , dict ):
42+ return {k : convert_uuid_to_str (v ) for k , v in obj .items ()}
43+ elif isinstance (obj , list ):
44+ return [convert_uuid_to_str (i ) for i in obj ]
45+ elif isinstance (obj , uuid .UUID ):
46+ return str (obj )
47+ else :
48+ return obj
49+
50+ def link_file (source_file_id , document_id ):
51+ if source_file_id is None :
52+ return
53+ source_file = QuerySet (File ).filter (id = source_file_id ).first ()
54+ if source_file :
55+ file_content = source_file .get_bytes ()
56+
57+ new_file = File (
58+ id = uuid .uuid7 (),
59+ file_name = source_file .file_name ,
60+ file_size = source_file .file_size ,
61+ source_type = FileSourceType .DOCUMENT ,
62+ source_id = document_id , # 更新为当前知识库ID
63+ meta = source_file .meta .copy () if source_file .meta else {}
64+ )
65+
66+ # 保存文件内容和元数据
67+ new_file .save (file_content )
68+
69+
70+ def get_paragraph_problem_model (knowledge_id : str , document_id : str , instance : Dict ):
71+ paragraph = Paragraph (
72+ id = uuid .uuid7 (),
73+ document_id = document_id ,
74+ content = instance .get ("content" ),
75+ knowledge_id = knowledge_id ,
76+ title = instance .get ("title" ) if 'title' in instance else ''
77+ )
78+
79+ problem_paragraph_object_list = [ProblemParagraphObject (
80+ knowledge_id , document_id , str (paragraph .id ), problem
81+ ) for problem in (instance .get ('problem_list' ) if 'problem_list' in instance else [])]
82+
83+ return {
84+ 'paragraph' : paragraph ,
85+ 'problem_paragraph_object_list' : problem_paragraph_object_list ,
86+ }
87+
88+
89+ def get_paragraph_model (document_model , paragraph_list : List ):
90+ knowledge_id = document_model .knowledge_id
91+ paragraph_model_dict_list = [
92+ get_paragraph_problem_model (knowledge_id ,document_model .id ,paragraph )
93+ for paragraph in paragraph_list
94+ ]
95+
96+ paragraph_model_list = []
97+ problem_paragraph_object_list = []
98+ for paragraphs in paragraph_model_dict_list :
99+ paragraph = paragraphs .get ('paragraph' )
100+ for problem_model in paragraphs .get ('problem_paragraph_object_list' ):
101+ problem_paragraph_object_list .append (problem_model )
102+ paragraph_model_list .append (paragraph )
103+
104+ return {
105+ 'document' : document_model ,
106+ 'paragraph_model_list' : paragraph_model_list ,
107+ 'problem_paragraph_object_list' : problem_paragraph_object_list ,
108+ }
109+
110+
111+ def get_document_paragraph_model (knowledge_id : str , instance : Dict ):
112+ source_meta = {'source_file_id' : instance .get ("source_file_id" )} if instance .get ("source_file_id" ) else {}
113+ meta = {** instance .get ('meta' ), ** source_meta } if instance .get ('meta' ) is not None else source_meta
114+ meta = {** convert_uuid_to_str (meta ), 'allow_download' : True }
115+
116+ document_model = Document (
117+ ** {
118+ 'knowledge_id' : knowledge_id ,
119+ 'id' : uuid .uuid7 (),
120+ 'name' : instance .get ('name' ),
121+ 'char_length' : reduce (
122+ lambda x , y : x + y ,
123+ [len (p .get ('content' )) for p in instance .get ('paragraphs' , [])],
124+ 0 ),
125+ 'meta' : meta ,
126+ 'type' : instance .get ('type' ) if instance .get ('type' ) is not None else KnowledgeType .WORKFLOW
127+ }
128+ )
129+
130+ return get_paragraph_model (
131+ document_model ,
132+ instance .get ('paragraphs' ) if 'paragraphs' in instance else []
133+ )
11134
12135
13136class BaseKnowledgeWriteNode (IKnowledgeWriteNode ):
14137
15138 def save_context (self , details , workflow_manage ):
16139 pass
17140
18- def execute (self , paragraph_list , chunk_length , ** kwargs ) -> NodeResult :
19- pass
141+ def save (self , document_list ):
142+ serializer = KnowledgeWriteParamSerializer (data = document_list , many = True )
143+ serializer .is_valid (raise_exception = True )
144+ document_list = serializer .data
145+
146+ knowledge_id = self .workflow_params .get ("knowledge_id" )
147+ workspace_id = "default"
148+
149+ document_model_list = []
150+ paragraph_model_list = []
151+ problem_paragraph_object_list = []
152+
153+ for document in document_list :
154+ document_paragraph_dict_model = get_document_paragraph_model (
155+ knowledge_id ,
156+ document
157+ )
158+ document_instance = document_paragraph_dict_model .get ('document' )
159+ link_file (document .get ("source_file_id" ), document_instance .id )
160+ document_model_list .append (document_instance )
161+ for paragraph in document_paragraph_dict_model .get ("paragraph_model_list" ):
162+ paragraph_model_list .append (paragraph )
163+ for problem_paragraph_object in document_paragraph_dict_model .get ("problem_paragraph_object_list" ):
164+ problem_paragraph_object_list .append (problem_paragraph_object )
165+
166+ problem_model_list , problem_paragraph_mapping_list = (
167+ ProblemParagraphManage (problem_paragraph_object_list , knowledge_id ).to_problem_model_list ()
168+ )
169+
170+ QuerySet (Document ).bulk_create (document_model_list ) if len (document_model_list ) > 0 else None
171+
172+ if len (paragraph_model_list ) > 0 :
173+ for document in document_model_list :
174+ max_position = Paragraph .objects .filter (document_id = document .id ).aggregate (
175+ max_position = Max ('position' )
176+ )['max_position' ] or 0
177+ sub_list = [p for p in paragraph_model_list if p .document_id == document .id ]
178+ for i , paragraph in enumerate (sub_list ):
179+ paragraph .position = max_position + i + 1
180+ QuerySet (Paragraph ).bulk_create (sub_list if len (sub_list ) > 0 else [])
181+
182+ bulk_create_in_batches (Problem , problem_model_list , batch_size = 1000 )
183+
184+ bulk_create_in_batches (ProblemParagraphMapping , problem_paragraph_mapping_list , batch_size = 1000 )
185+
186+ return document_model_list , knowledge_id , workspace_id
187+
188+
189+
190+ def execute (self , documents , ** kwargs ) -> NodeResult :
191+
192+ document_model_list , knowledge_id , workspace_id = self .save (documents )
193+
194+ write_content_list = [{
195+ "name" : document .get ("name" ),
196+ "paragraphs" : [{
197+ "title" : p .get ("title" ),
198+ "content" : p .get ("content" ),
199+ } for p in document .get ("paragraphs" )[0 :4 ]]
200+ } for document in documents ]
201+
202+ return NodeResult ({'write_content' :write_content_list },{})
203+
204+
205+ def get_details (self , index : int , ** kwargs ):
206+ return {
207+ 'name' : self .node .properties .get ('stepName' ),
208+ "index" : index ,
209+ 'run_time' : self .context .get ('run_time' ),
210+ 'type' : self .node .type ,
211+ 'write_content' : self .context .get ("write_content" ),
212+ 'status' : self .status ,
213+ 'err_message' : self .err_message
214+ }
0 commit comments