Skip to content

Commit c58467c

Browse files
committed
feat: Knowledge write
1 parent ec72140 commit c58467c

File tree

7 files changed

+279
-36
lines changed

7 files changed

+279
-36
lines changed

apps/application/flow/step_node/ai_chat_step_node/i_chat_node.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ def get_node_params_serializer_class(self) -> Type[serializers.Serializer]:
5454
return ChatNodeSerializer
5555

5656
def _run(self):
57-
if [WorkflowMode.KNOWLEDGE, WorkflowMode.APPLICATION_LOOP].__contains__(
57+
if [WorkflowMode.KNOWLEDGE, WorkflowMode.KNOWLEDGE_LOOP].__contains__(
5858
self.workflow_manage.flow.workflow_mode):
5959
return self.execute(**self.node_params_serializer.data, **self.flow_params_serializer.data,
6060
**{'history_chat_record': [], 'stream': True, 'chat_id': None, 'chat_record_id': None})

apps/application/flow/step_node/data_source_web_node/i_data_source_web_node.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ def get_form_list(node):
2222
pass
2323

2424
def _run(self):
25-
return self.execute(**self.node_params_serializer.data, **self.flow_params_serializer.data)
25+
return self.execute(**self.flow_params_serializer.data)
2626

2727
def execute(self, **kwargs) -> NodeResult:
2828
pass

apps/application/flow/step_node/data_source_web_node/impl/base_data_source_web_node.py

Lines changed: 59 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,39 @@
66
@date:2025/11/12 13:47
77
@desc:
88
"""
9+
import traceback
10+
11+
from django.utils.translation import gettext_lazy as _
12+
913
from application.flow.i_step_node import NodeResult
1014
from application.flow.step_node.data_source_web_node.i_data_source_web_node import IDataSourceWebNode
1115
from common import forms
1216
from common.forms import BaseForm
17+
from common.utils.fork import ForkManage, Fork, ChildLink
18+
from common.utils.logger import maxkb_logger
1319

1420

1521
class BaseDataSourceWebNodeForm(BaseForm):
1622
source_url = forms.TextInputField('source url', required=True)
17-
selector = forms.TextInputField('knowledge selector', required=True)
23+
selector = forms.TextInputField('knowledge selector', required=False,default_value="body")
24+
25+
26+
def get_collect_handler():
27+
results = []
28+
29+
def handler(child_link: ChildLink, response: Fork.Response):
30+
if response.status == 200:
31+
try:
32+
document_name = child_link.tag.text if child_link.tag is not None and len(
33+
child_link.tag.text.strip()) > 0 else child_link.url
34+
results.append({
35+
"name": document_name.strip(),
36+
"content": response.content,
37+
})
38+
except Exception as e:
39+
maxkb_logger.error(f'{str(e)}:{traceback.format_exc()}')
40+
41+
return handler,results
1842

1943

2044
class BaseDataSourceWebNode(IDataSourceWebNode):
@@ -26,4 +50,37 @@ def get_form_list(node):
2650
return BaseDataSourceWebNodeForm().to_form_list()
2751

2852
def execute(self, **kwargs) -> NodeResult:
29-
pass
53+
BaseDataSourceWebNodeForm().valid_form(self.workflow_params.get("data_source"))
54+
55+
data_source = self.workflow_params.get("data_source")
56+
57+
node_id = data_source.get("node_id")
58+
source_url = data_source.get("source_url")
59+
selector = data_source.get("selector") or "body"
60+
61+
collect_handler, document_list = get_collect_handler()
62+
63+
try:
64+
ForkManage(source_url,selector.split(" ") if selector is not None else []).fork(1,set(),collect_handler)
65+
66+
return NodeResult({'document_list': document_list},
67+
self.workflow_manage.params.get('knowledge_base') or {})
68+
69+
except Exception as e:
70+
maxkb_logger.error(_('data source web node:{node_id} error{error}{traceback}').format(
71+
knowledge_id=node_id, error=str(e), traceback=traceback.format_exc()))
72+
73+
74+
75+
def get_details(self, index: int, **kwargs):
76+
return {
77+
'name': self.node.properties.get('stepName'),
78+
"index": index,
79+
'run_time': self.context.get('run_time'),
80+
'type': self.node.type,
81+
'input_params': {"source_url": self.context.get("source_url"),"selector": self.context.get('selector')},
82+
'output_params': self.context.get('document_list'),
83+
'knowledge_base': self.workflow_params.get('knowledge_base'),
84+
'status': self.status,
85+
'err_message': self.err_message
86+
}

apps/application/flow/step_node/direct_reply_node/i_reply_node.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,12 @@ def get_node_params_serializer_class(self) -> Type[serializers.Serializer]:
4646
return ReplyNodeParamsSerializer
4747

4848
def _run(self):
49-
return self.execute(**self.node_params_serializer.data, **self.flow_params_serializer.data)
49+
if [WorkflowMode.KNOWLEDGE, WorkflowMode.KNOWLEDGE_LOOP].__contains__(
50+
self.workflow_manage.flow.workflow_mode):
51+
return self.execute(**self.node_params_serializer.data, **self.flow_params_serializer.data,
52+
**{'stream': True})
53+
else:
54+
return self.execute(**self.node_params_serializer.data, **self.flow_params_serializer.data)
5055

5156
def execute(self, reply_type, stream, fields=None, content=None, **kwargs) -> NodeResult:
5257
pass

apps/application/flow/step_node/knowledge_write_node/i_knowledge_write_node.py

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,19 +16,27 @@
1616

1717

1818
class KnowledgeWriteNodeParamSerializer(serializers.Serializer):
19-
paragraph_list = serializers.ListField(required=True, label=_("Paragraph list"))
20-
chunk_length = serializers.CharField(required=True, label=_("Child chunk length"))
19+
document_list = serializers.ListField(required=True, child=serializers.CharField(required=True), allow_null=True,label=_('document list'))
2120

2221

2322
class IKnowledgeWriteNode(INode):
2423

24+
def save_context(self, details, workflow_manage):
25+
pass
26+
2527
def get_node_params_serializer_class(self) -> Type[serializers.Serializer]:
2628
return KnowledgeWriteNodeParamSerializer
2729

2830
def _run(self):
29-
return self.execute(**self.node_params_serializer.data, **self.flow_params_serializer.data)
31+
documents = self.workflow_manage.get_reference_field(
32+
self.node_params_serializer.data.get('document_list')[0],
33+
self.node_params_serializer.data.get('document_list')[1:],
34+
)
35+
36+
37+
return self.execute(**self.node_params_serializer.data, **self.flow_params_serializer.data, documents=documents)
3038

31-
def execute(self, paragraph_list, chunk_length, **kwargs) -> NodeResult:
39+
def execute(self, documents, **kwargs) -> NodeResult:
3240
pass
3341

3442
type = 'knowledge-write-node'

apps/application/flow/step_node/knowledge_write_node/impl/base_knowledge_write_node.py

Lines changed: 197 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,209 @@
66
@date:2025/11/13 11:19
77
@desc:
88
"""
9+
from functools import reduce
10+
from typing import Dict, List
11+
import uuid_utils.compat as uuid
12+
from django.db.models import QuerySet
13+
from django.db.models.aggregates import Max
14+
15+
from rest_framework import serializers
16+
from django.utils.translation import gettext_lazy as _
917
from application.flow.i_step_node import NodeResult
1018
from application.flow.step_node.knowledge_write_node.i_knowledge_write_node import IKnowledgeWriteNode
19+
from common.utils.common import bulk_create_in_batches
20+
from knowledge.models import Document, KnowledgeType, Paragraph, File, FileSourceType, Problem, ProblemParagraphMapping
21+
from knowledge.serializers.common import ProblemParagraphObject, ProblemParagraphManage
22+
23+
24+
class ParagraphInstanceSerializer(serializers.Serializer):
25+
content = serializers.CharField(required=True, label=_('content'), max_length=102400, min_length=1, allow_null=True,
26+
allow_blank=True)
27+
title = serializers.CharField(required=False, max_length=256, label=_('section title'), allow_null=True,
28+
allow_blank=True)
29+
problem_list = serializers.ListField(required=False, child=serializers.CharField(required=True))
30+
is_active = serializers.BooleanField(required=False, label=_('Is active'))
31+
chunks = serializers.ListField(required=False, child=serializers.CharField(required=True))
32+
33+
34+
class KnowledgeWriteParamSerializer(serializers.Serializer):
35+
name = serializers.CharField(required=True, label=_('document name'), max_length=128, min_length=1,
36+
source=_('document name'))
37+
paragraphs = ParagraphInstanceSerializer(required=False, many=True, allow_null=True)
38+
39+
40+
def convert_uuid_to_str(obj):
41+
if isinstance(obj, dict):
42+
return {k: convert_uuid_to_str(v) for k, v in obj.items()}
43+
elif isinstance(obj, list):
44+
return [convert_uuid_to_str(i) for i in obj]
45+
elif isinstance(obj, uuid.UUID):
46+
return str(obj)
47+
else:
48+
return obj
49+
50+
def link_file(source_file_id, document_id):
51+
if source_file_id is None:
52+
return
53+
source_file = QuerySet(File).filter(id=source_file_id).first()
54+
if source_file:
55+
file_content = source_file.get_bytes()
56+
57+
new_file = File(
58+
id=uuid.uuid7(),
59+
file_name=source_file.file_name,
60+
file_size=source_file.file_size,
61+
source_type=FileSourceType.DOCUMENT,
62+
source_id=document_id, # 更新为当前知识库ID
63+
meta=source_file.meta.copy() if source_file.meta else {}
64+
)
65+
66+
# 保存文件内容和元数据
67+
new_file.save(file_content)
68+
69+
70+
def get_paragraph_problem_model(knowledge_id: str, document_id: str, instance: Dict):
71+
paragraph = Paragraph(
72+
id=uuid.uuid7(),
73+
document_id=document_id,
74+
content=instance.get("content"),
75+
knowledge_id=knowledge_id,
76+
title=instance.get("title") if 'title' in instance else ''
77+
)
78+
79+
problem_paragraph_object_list = [ProblemParagraphObject(
80+
knowledge_id, document_id, str(paragraph.id), problem
81+
) for problem in (instance.get('problem_list') if 'problem_list' in instance else [])]
82+
83+
return {
84+
'paragraph': paragraph,
85+
'problem_paragraph_object_list': problem_paragraph_object_list,
86+
}
87+
88+
89+
def get_paragraph_model(document_model, paragraph_list: List):
90+
knowledge_id = document_model.knowledge_id
91+
paragraph_model_dict_list = [
92+
get_paragraph_problem_model(knowledge_id,document_model.id,paragraph)
93+
for paragraph in paragraph_list
94+
]
95+
96+
paragraph_model_list = []
97+
problem_paragraph_object_list = []
98+
for paragraphs in paragraph_model_dict_list:
99+
paragraph = paragraphs.get('paragraph')
100+
for problem_model in paragraphs.get('problem_paragraph_object_list'):
101+
problem_paragraph_object_list.append(problem_model)
102+
paragraph_model_list.append(paragraph)
103+
104+
return {
105+
'document': document_model,
106+
'paragraph_model_list': paragraph_model_list,
107+
'problem_paragraph_object_list': problem_paragraph_object_list,
108+
}
109+
110+
111+
def get_document_paragraph_model(knowledge_id: str, instance: Dict):
112+
source_meta = {'source_file_id': instance.get("source_file_id")} if instance.get("source_file_id") else {}
113+
meta = {**instance.get('meta'), **source_meta} if instance.get('meta') is not None else source_meta
114+
meta = {**convert_uuid_to_str(meta), 'allow_download': True}
115+
116+
document_model = Document(
117+
**{
118+
'knowledge_id': knowledge_id,
119+
'id': uuid.uuid7(),
120+
'name': instance.get('name'),
121+
'char_length': reduce(
122+
lambda x, y: x + y,
123+
[len(p.get('content')) for p in instance.get('paragraphs', [])],
124+
0),
125+
'meta': meta,
126+
'type': instance.get('type') if instance.get('type') is not None else KnowledgeType.WORKFLOW
127+
}
128+
)
129+
130+
return get_paragraph_model(
131+
document_model,
132+
instance.get('paragraphs') if 'paragraphs' in instance else []
133+
)
11134

12135

13136
class BaseKnowledgeWriteNode(IKnowledgeWriteNode):
14137

15138
def save_context(self, details, workflow_manage):
16139
pass
17140

18-
def execute(self, paragraph_list, chunk_length, **kwargs) -> NodeResult:
19-
pass
141+
def save(self, document_list):
142+
serializer = KnowledgeWriteParamSerializer(data=document_list, many=True)
143+
serializer.is_valid(raise_exception=True)
144+
document_list = serializer.data
145+
146+
knowledge_id = self.workflow_params.get("knowledge_id")
147+
workspace_id = "default"
148+
149+
document_model_list = []
150+
paragraph_model_list = []
151+
problem_paragraph_object_list = []
152+
153+
for document in document_list:
154+
document_paragraph_dict_model = get_document_paragraph_model(
155+
knowledge_id,
156+
document
157+
)
158+
document_instance = document_paragraph_dict_model.get('document')
159+
link_file(document.get("source_file_id"), document_instance.id)
160+
document_model_list.append(document_instance)
161+
for paragraph in document_paragraph_dict_model.get("paragraph_model_list"):
162+
paragraph_model_list.append(paragraph)
163+
for problem_paragraph_object in document_paragraph_dict_model.get("problem_paragraph_object_list"):
164+
problem_paragraph_object_list.append(problem_paragraph_object)
165+
166+
problem_model_list, problem_paragraph_mapping_list = (
167+
ProblemParagraphManage(problem_paragraph_object_list, knowledge_id).to_problem_model_list()
168+
)
169+
170+
QuerySet(Document).bulk_create(document_model_list) if len(document_model_list) > 0 else None
171+
172+
if len(paragraph_model_list) > 0:
173+
for document in document_model_list:
174+
max_position = Paragraph.objects.filter(document_id=document.id).aggregate(
175+
max_position=Max('position')
176+
)['max_position'] or 0
177+
sub_list = [p for p in paragraph_model_list if p.document_id == document.id]
178+
for i, paragraph in enumerate(sub_list):
179+
paragraph.position = max_position + i + 1
180+
QuerySet(Paragraph).bulk_create(sub_list if len(sub_list) > 0 else [])
181+
182+
bulk_create_in_batches(Problem, problem_model_list, batch_size=1000)
183+
184+
bulk_create_in_batches(ProblemParagraphMapping, problem_paragraph_mapping_list, batch_size=1000)
185+
186+
return document_model_list, knowledge_id, workspace_id
187+
188+
189+
190+
def execute(self, documents, **kwargs) -> NodeResult:
191+
192+
document_model_list, knowledge_id, workspace_id = self.save(documents)
193+
194+
write_content_list = [{
195+
"name": document.get("name"),
196+
"paragraphs": [{
197+
"title": p.get("title"),
198+
"content": p.get("content"),
199+
} for p in document.get("paragraphs")[0:4]]
200+
} for document in documents]
201+
202+
return NodeResult({'write_content':write_content_list},{})
203+
204+
205+
def get_details(self, index: int, **kwargs):
206+
return {
207+
'name': self.node.properties.get('stepName'),
208+
"index": index,
209+
'run_time': self.context.get('run_time'),
210+
'type': self.node.type,
211+
'write_content': self.context.get("write_content"),
212+
'status': self.status,
213+
'err_message': self.err_message
214+
}

0 commit comments

Comments
 (0)