Skip to content

Commit bfae088

Browse files
shaohuzhang1liuruibinzhanweizhang7wangdan-fit2cloud
authored
feat: knowledge workflow (#4399)
* feat: init knowledge workflow * feat: add knowledge workflow and version models, serializers, and API views * feat: knowledge workflow * feat: knowledge workflow * feat: add KnowledgeWorkflowModelSerializer and Operate class for workflow management * fix: route * feat: knowledge workflow * feat: Knowledge workflow permission * feat: knowledge workflow * feat: knowledge workflow * feat: knowledge workflow * feat: knowledge workflow * feat: Data source web node * fix: Back route * feat: knowledge workflow * feat: knowledge workflow * feat: Knowledge write node * feat: add Data Source tool functionality and localization * feat: add Data Source tool functionality and localization * feat: knowledge workflow * feat: knowledge workflow * fix: simplify export tool permission check in ToolListContainer.vue * fix: simplify export condition in ToolResourceIndex.vue * fix: simplify condition for copying tool in ToolListContainer * feat: knowledge workflow * fix: Upload local files and add output fields * feat: Knowledge write * feat: add Document Split Node functionality and localization * feat: add Document Split Node functionality and localization * feat: Knowledge write * feat: enhance Document Split Node with result processing and problem list generation * fix: Allow problem be blank * feat: enhance Document Split Node with result processing and problem list generation * feat: tool datasource * fix: Optimization of knowledge base workflow execution logic * refactor: streamline image handling by updating application and knowledge ID management * refactor: streamline image handling by updating application and knowledge ID management * feat: extend support modes in variable aggregation node to include knowledge workflows * feat: Chunks stored * refactor: simplify file handling in document extraction by removing unnecessary byte conversion and enhancing file saving logic * refactor: update file ID assignment in document extraction to use provided metadata * feat: Workflow menu that distinguishes between applications and knowledge bases * refactor: update file ID assignment in document extraction to use provided metadata * fix: Add workspace ID as workflow execution parameter * feat: add code template for Data Source tool form functionality * refactor: remove unused sys import and improve module handling * feat: Execution details support loading status * refactor: update tool type handling and improve category merging logic * feat: Alter fork depth * fix: ensure filterList is properly initialized and updated in getList function * refactor: simplify ToolStoreDialog by removing unused toolType logic * perf: Optimize the style * style: adjust div width for improved layout in Tree component * refactor: improve polling mechanism for knowledge workflow action * fix: Get workspace_id from workflow params * fix: filter out 'file_bytes' from result in get_details method * feat: add recursive filtering for file_bytes in context data * fix: append results to paragraph_list instead of replacing it * perf: Optimize translation files * fix: include document name in bytes_to_uploaded_file call for better file handling * refactor: optimize buffer retrieval in document processing * refactor: remove redundant parameter from bytes_to_uploaded_file call * fix: Page style optimization * feat: add slider for setting limit in document rules form * feat: add workflow knowledge management endpoints and related functionality * fix: swap file size and file count limits in form inputs * refactor: update tool_config args to use list format for improved readability * feat: Node supports knowledge base workflow * feat: Node supports knowledge base workflow * fix: Basic node data cannot be obtained in the workflow * style: Knowledge base workflow debugging page style adjustment * fix: Loop nodes cannot be used in the knowledge base workflow * fix: Knowledge base workflow variable assignment node * feat: add chunk size slider to form for custom split strategy * fix: Workflow style optimization --------- Co-authored-by: CaptainB <[email protected]> Co-authored-by: zhangzhanwei <[email protected]> Co-authored-by: wangdan-fit2cloud <[email protected]>
1 parent cf59cca commit bfae088

File tree

266 files changed

+8386
-1996
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

266 files changed

+8386
-1996
lines changed

apps/application/flow/common.py

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
@date:2024/12/11 17:57
77
@desc:
88
"""
9-
9+
from enum import Enum
1010
from typing import List, Dict
1111

1212
from django.db.models import QuerySet
@@ -90,6 +90,16 @@ def __init__(self, edge, node):
9090
self.node = node
9191

9292

93+
class WorkflowMode(Enum):
94+
APPLICATION = "application"
95+
96+
APPLICATION_LOOP = "application-loop"
97+
98+
KNOWLEDGE = "knowledge"
99+
100+
KNOWLEDGE_LOOP = "knowledge-loop"
101+
102+
93103
class Workflow:
94104
"""
95105
节点列表
@@ -112,7 +122,10 @@ class Workflow:
112122
"""
113123
next_node_map: Dict[str, List[EdgeNode]]
114124

115-
def __init__(self, nodes: List[Node], edges: List[Edge]):
125+
workflow_mode: WorkflowMode
126+
127+
def __init__(self, nodes: List[Node], edges: List[Edge],
128+
workflow_mode: WorkflowMode = WorkflowMode.APPLICATION.value):
116129
self.nodes = nodes
117130
self.edges = edges
118131
self.node_map = {node.id: node for node in nodes}
@@ -125,6 +138,7 @@ def __init__(self, nodes: List[Node], edges: List[Edge]):
125138
self.next_node_map = {key: [EdgeNode(edge, self.node_map.get(edge.targetNodeId)) for edge in edges] for
126139
key, edges in
127140
group_by(edges, key=lambda edge: edge.sourceNodeId).items()}
141+
self.workflow_mode = workflow_mode
128142

129143
def get_node(self, node_id):
130144
"""
@@ -167,13 +181,13 @@ def get_next_nodes(self, node_id) -> List[Node]:
167181
return [en.node for en in self.next_node_map.get(node_id, [])]
168182

169183
@staticmethod
170-
def new_instance(flow_obj: Dict):
184+
def new_instance(flow_obj: Dict, workflow_mode: WorkflowMode = WorkflowMode.APPLICATION):
171185
nodes = flow_obj.get('nodes')
172186
edges = flow_obj.get('edges')
173187
nodes = [Node(node.get('id'), node.get('type'), **node)
174188
for node in nodes]
175189
edges = [Edge(edge.get('id'), edge.get('type'), **edge) for edge in edges]
176-
return Workflow(nodes, edges)
190+
return Workflow(nodes, edges, workflow_mode)
177191

178192
def get_start_node(self):
179193
return self.get_node('start-node')
@@ -190,10 +204,9 @@ def is_valid(self):
190204
self.is_valid_base_node()
191205
self.is_valid_work_flow()
192206

193-
@staticmethod
194-
def is_valid_node_params(node: Node):
207+
def is_valid_node_params(self, node: Node):
195208
from application.flow.step_node import get_node
196-
get_node(node.type)(node, None, None)
209+
get_node(node.type, self.workflow_mode)(node, None, None)
197210

198211
def is_valid_node(self, node: Node):
199212
self.is_valid_node_params(node)

apps/application/flow/i_step_node.py

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
from application.models import ApplicationChatUserStats
2222
from application.models import ChatRecord, ChatUserType
2323
from common.field.common import InstanceField
24+
from knowledge.models.knowledge_action import KnowledgeAction, State
2425

2526
chat_cache = cache
2627

@@ -78,7 +79,8 @@ def handler(self, workflow):
7879
message_tokens=message_tokens,
7980
answer_tokens=answer_tokens,
8081
answer_text_list=answer_text_list,
81-
run_time=time.time() - workflow.context['start_time'],
82+
run_time=time.time() - workflow.context.get('start_time') if workflow.context.get(
83+
'start_time') is not None else 0,
8284
index=0)
8385

8486
self.chat_info.append_chat_record(chat_record)
@@ -97,6 +99,16 @@ def handler(self, workflow):
9799
self.chat_info = None
98100

99101

102+
class KnowledgeWorkflowPostHandler(WorkFlowPostHandler):
103+
def __init__(self, chat_info, knowledge_action_id):
104+
super().__init__(chat_info)
105+
self.knowledge_action_id = knowledge_action_id
106+
107+
def handler(self, workflow):
108+
QuerySet(KnowledgeAction).filter(id=self.knowledge_action_id).update(
109+
state=State.SUCCESS)
110+
111+
100112
class NodeResult:
101113
def __init__(self, node_variable: Dict, workflow_variable: Dict,
102114
_write_context=write_context, _is_interrupt=is_interrupt):
@@ -153,6 +165,14 @@ class FlowParamsSerializer(serializers.Serializer):
153165
debug = serializers.BooleanField(required=True, label="是否debug")
154166

155167

168+
class KnowledgeFlowParamsSerializer(serializers.Serializer):
169+
knowledge_id = serializers.UUIDField(required=True, label="知识库id")
170+
workspace_id = serializers.CharField(required=True, label="工作空间id")
171+
knowledge_action_id = serializers.UUIDField(required=True, label="知识库任务执行器id")
172+
data_source = serializers.DictField(required=True, label="数据源")
173+
knowledge_base = serializers.DictField(required=False, label="知识库设置")
174+
175+
156176
class INode:
157177
view_type = 'many_view'
158178

@@ -165,7 +185,8 @@ def get_answer_list(self) -> List[Answer] | None:
165185
return None
166186
reasoning_content_enable = self.context.get('model_setting', {}).get('reasoning_content_enable', False)
167187
return [
168-
Answer(self.answer_text, self.view_type, self.runtime_node_id, self.workflow_params['chat_record_id'], {},
188+
Answer(self.answer_text, self.view_type, self.runtime_node_id, self.workflow_params.get('chat_record_id'),
189+
{},
169190
self.runtime_node_id, self.context.get('reasoning_content', '') if reasoning_content_enable else '')]
170191

171192
def __init__(self, node, workflow_params, workflow_manage, up_node_id_list=None,
@@ -222,13 +243,14 @@ def get_node_params_serializer_class(self) -> Type[serializers.Serializer]:
222243
pass
223244

224245
def get_flow_params_serializer_class(self) -> Type[serializers.Serializer]:
225-
return FlowParamsSerializer
246+
return self.workflow_manage.get_params_serializer_class()
226247

227248
def get_write_error_context(self, e):
228249
self.status = 500
229250
self.answer_text = str(e)
230251
self.err_message = str(e)
231-
self.context['run_time'] = time.time() - self.context['start_time']
252+
current_time = time.time()
253+
self.context['run_time'] = current_time - (self.context.get('start_time') or current_time)
232254

233255
def write_error_context(answer, status=200):
234256
pass
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
# coding=utf-8
2+
"""
3+
@project: maxkb
4+
@Author:虎
5+
@file: workflow_manage.py
6+
@date:2024/1/9 17:40
7+
@desc:
8+
"""
9+
from application.flow.i_step_node import KnowledgeFlowParamsSerializer
10+
from application.flow.loop_workflow_manage import LoopWorkflowManage
11+
12+
13+
class KnowledgeLoopWorkflowManage(LoopWorkflowManage):
14+
def get_params_serializer_class(self):
15+
return KnowledgeFlowParamsSerializer
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
# coding=utf-8
2+
"""
3+
@project: MaxKB
4+
@Author:虎虎
5+
@file: Knowledge_workflow_manage.py
6+
@date:2025/11/13 19:02
7+
@desc:
8+
"""
9+
import traceback
10+
from concurrent.futures import ThreadPoolExecutor
11+
12+
from django.db.models import QuerySet
13+
from django.utils.translation import get_language
14+
15+
from application.flow.common import Workflow
16+
from application.flow.i_step_node import WorkFlowPostHandler, KnowledgeFlowParamsSerializer
17+
from application.flow.workflow_manage import WorkflowManage
18+
from common.handle.base_to_response import BaseToResponse
19+
from common.handle.impl.response.system_to_response import SystemToResponse
20+
from knowledge.models.knowledge_action import KnowledgeAction, State
21+
22+
executor = ThreadPoolExecutor(max_workers=200)
23+
24+
25+
class KnowledgeWorkflowManage(WorkflowManage):
26+
27+
def __init__(self, flow: Workflow,
28+
params,
29+
work_flow_post_handler: WorkFlowPostHandler,
30+
base_to_response: BaseToResponse = SystemToResponse(),
31+
start_node_id=None,
32+
start_node_data=None, chat_record=None, child_node=None):
33+
super().__init__(flow, params, work_flow_post_handler, base_to_response, None, None, None,
34+
None,
35+
None, None, start_node_id, start_node_data, chat_record, child_node)
36+
37+
def get_params_serializer_class(self):
38+
return KnowledgeFlowParamsSerializer
39+
40+
def get_start_node(self):
41+
start_node_list = [node for node in self.flow.nodes if
42+
self.params.get('data_source', {}).get('node_id') == node.id]
43+
return start_node_list[0]
44+
45+
def run(self):
46+
executor.submit(self._run)
47+
48+
def _run(self):
49+
QuerySet(KnowledgeAction).filter(id=self.params.get('knowledge_action_id')).update(
50+
state=State.STARTED)
51+
language = get_language()
52+
self.run_chain_async(self.start_node, None, language)
53+
while self.is_run():
54+
pass
55+
self.work_flow_post_handler.handler(self)
56+
57+
@staticmethod
58+
def get_node_details(current_node, node, index):
59+
if current_node == node:
60+
return {
61+
'name': node.node.properties.get('stepName'),
62+
"index": index,
63+
'run_time': 0,
64+
'type': node.type,
65+
'status': 202,
66+
'err_message': ""
67+
}
68+
69+
return node.get_details(index)
70+
71+
def run_chain(self, current_node, node_result_future=None):
72+
QuerySet(KnowledgeAction).filter(id=self.params.get('knowledge_action_id')).update(
73+
details=self.get_runtime_details(lambda node, index: self.get_node_details(current_node, node, index)))
74+
if node_result_future is None:
75+
node_result_future = self.run_node_future(current_node)
76+
try:
77+
result = self.hand_node_result(current_node, node_result_future)
78+
return result
79+
except Exception as e:
80+
traceback.print_exc()
81+
return None
82+
83+
def hand_node_result(self, current_node, node_result_future):
84+
try:
85+
current_result = node_result_future.result()
86+
result = current_result.write_context(current_node, self)
87+
if result is not None:
88+
# 阻塞获取结果
89+
list(result)
90+
return current_result
91+
except Exception as e:
92+
traceback.print_exc()
93+
self.status = 500
94+
current_node.get_write_error_context(e)
95+
self.answer += str(e)
96+
QuerySet(KnowledgeAction).filter(id=self.params.get('knowledge_action_id')).update(
97+
details=self.get_runtime_details(),
98+
state=State.FAILURE)
99+
finally:
100+
current_node.node_chunk.end()
101+
QuerySet(KnowledgeAction).filter(id=self.params.get('knowledge_action_id')).update(
102+
details=self.get_runtime_details())

apps/application/flow/loop_workflow_manage.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -105,18 +105,18 @@ def get_node_cls_by_id(self, node_id, up_node_id_list=None,
105105
get_node_params=lambda node: node.properties.get('node_data')):
106106
for node in self.flow.nodes:
107107
if node.id == node_id:
108-
node_instance = get_node(node.type)(node,
109-
self.params, self, up_node_id_list,
110-
get_node_params,
111-
salt=self.get_index())
108+
node_instance = get_node(node.type, self.flow.workflow_mode)(node,
109+
self.params, self, up_node_id_list,
110+
get_node_params,
111+
salt=self.get_index())
112112
return node_instance
113113
return None
114114

115115
def stream(self):
116116
close_old_connections()
117117
language = get_language()
118118
self.run_chain_async(self.start_node, None, language)
119-
return self.await_result()
119+
return self.await_result(is_cleanup=False)
120120

121121
def get_index(self):
122122
return self.loop_params.get('index')

apps/application/flow/step_node/__init__.py

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,16 @@
99
from .ai_chat_step_node import *
1010
from .application_node import BaseApplicationNode
1111
from .condition_node import *
12+
from .data_source_local_node.impl.base_data_source_local_node import BaseDataSourceLocalNode
13+
from .data_source_web_node.impl.base_data_source_web_node import BaseDataSourceWebNode
1214
from .direct_reply_node import *
1315
from .document_extract_node import *
1416
from .form_node import *
1517
from .image_generate_step_node import *
1618
from .image_to_video_step_node import BaseImageToVideoNode
1719
from .image_understand_step_node import *
1820
from .intent_node import *
21+
from .knowledge_write_node.impl.base_knowledge_write_node import BaseKnowledgeWriteNode
1922
from .loop_break_node import BaseLoopBreakNode
2023
from .loop_continue_node import BaseLoopContinueNode
2124
from .loop_node import *
@@ -36,6 +39,7 @@
3639
from .variable_assign_node import BaseVariableAssignNode
3740
from .variable_splitting_node import BaseVariableSplittingNode
3841
from .video_understand_step_node import BaseVideoUnderstandNode
42+
from .document_split_node import BaseDocumentSplitNode
3943

4044
node_list = [BaseStartStepNode, BaseChatNode, BaseSearchKnowledgeNode, BaseSearchDocumentNode, BaseQuestionNode,
4145
BaseConditionNode, BaseReplyNode,
@@ -46,11 +50,11 @@
4650
BaseVideoUnderstandNode,
4751
BaseIntentNode, BaseLoopNode, BaseLoopStartStepNode,
4852
BaseLoopContinueNode,
49-
BaseLoopBreakNode, BaseVariableSplittingNode, BaseParameterExtractionNode, BaseVariableAggregationNode]
53+
BaseLoopBreakNode, BaseVariableSplittingNode, BaseParameterExtractionNode, BaseVariableAggregationNode,
54+
BaseDataSourceLocalNode, BaseDataSourceWebNode, BaseKnowledgeWriteNode, BaseDocumentSplitNode]
5055

56+
node_map = {n.type: {w: n for w in n.support} for n in node_list}
5157

52-
def get_node(node_type):
53-
find_list = [node for node in node_list if node.type == node_type]
54-
if len(find_list) > 0:
55-
return find_list[0]
56-
return None
58+
59+
def get_node(node_type, workflow_model):
60+
return node_map.get(node_type).get(workflow_model)

apps/application/flow/step_node/ai_chat_step_node/i_chat_node.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
from django.utils.translation import gettext_lazy as _
1212
from rest_framework import serializers
1313

14+
from application.flow.common import WorkflowMode
1415
from application.flow.i_step_node import INode, NodeResult
1516

1617

@@ -34,22 +35,31 @@ class ChatNodeSerializer(serializers.Serializer):
3435
mcp_enable = serializers.BooleanField(required=False, label=_("Whether to enable MCP"))
3536
mcp_servers = serializers.JSONField(required=False, label=_("MCP Server"))
3637
mcp_tool_id = serializers.CharField(required=False, allow_blank=True, allow_null=True, label=_("MCP Tool ID"))
37-
mcp_tool_ids = serializers.ListField(child=serializers.UUIDField(), required=False, allow_empty=True, label=_("MCP Tool IDs"), )
38+
mcp_tool_ids = serializers.ListField(child=serializers.UUIDField(), required=False, allow_empty=True,
39+
label=_("MCP Tool IDs"), )
3840
mcp_source = serializers.CharField(required=False, allow_blank=True, allow_null=True, label=_("MCP Source"))
3941

4042
tool_enable = serializers.BooleanField(required=False, default=False, label=_("Whether to enable tools"))
4143
tool_ids = serializers.ListField(child=serializers.UUIDField(), required=False, allow_empty=True,
4244
label=_("Tool IDs"), )
4345
mcp_output_enable = serializers.BooleanField(required=False, default=True, label=_("Whether to enable MCP output"))
4446

47+
4548
class IChatNode(INode):
4649
type = 'ai-chat-node'
50+
support = [WorkflowMode.APPLICATION, WorkflowMode.APPLICATION_LOOP, WorkflowMode.KNOWLEDGE_LOOP,
51+
WorkflowMode.KNOWLEDGE]
4752

4853
def get_node_params_serializer_class(self) -> Type[serializers.Serializer]:
4954
return ChatNodeSerializer
5055

5156
def _run(self):
52-
return self.execute(**self.node_params_serializer.data, **self.flow_params_serializer.data)
57+
if [WorkflowMode.KNOWLEDGE, WorkflowMode.KNOWLEDGE_LOOP].__contains__(
58+
self.workflow_manage.flow.workflow_mode):
59+
return self.execute(**self.node_params_serializer.data, **self.flow_params_serializer.data,
60+
**{'history_chat_record': [], 'stream': True, 'chat_id': None, 'chat_record_id': None})
61+
else:
62+
return self.execute(**self.node_params_serializer.data, **self.flow_params_serializer.data)
5363

5464
def execute(self, model_id, system, prompt, dialogue_number, history_chat_record, stream, chat_id,
5565
chat_record_id,

apps/application/flow/step_node/application_node/i_application_node.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
from rest_framework import serializers
55

6+
from application.flow.common import WorkflowMode
67
from application.flow.i_step_node import INode, NodeResult
78

89
from django.utils.translation import gettext_lazy as _
@@ -25,6 +26,7 @@ class ApplicationNodeSerializer(serializers.Serializer):
2526

2627
class IApplicationNode(INode):
2728
type = 'application-node'
29+
support = [WorkflowMode.APPLICATION, WorkflowMode.APPLICATION_LOOP]
2830

2931
def get_node_params_serializer_class(self) -> Type[serializers.Serializer]:
3032
return ApplicationNodeSerializer

0 commit comments

Comments
 (0)