Skip to content

Commit a66e636

Browse files
committed
feat: Variable aggregation node
1 parent 4393db9 commit a66e636

File tree

10 files changed

+414
-11
lines changed

10 files changed

+414
-11
lines changed

apps/application/flow/step_node/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
from .text_to_video_step_node.impl.base_text_to_video_node import BaseTextToVideoNode
3333
from .tool_lib_node import *
3434
from .tool_node import *
35+
from .variable_aggregation_node.impl.base_variable_aggregation_node import BaseVariableAggregationNode
3536
from .variable_assign_node import BaseVariableAssignNode
3637
from .variable_splitting_node import BaseVariableSplittingNode
3738
from .video_understand_step_node import BaseVideoUnderstandNode
@@ -45,7 +46,7 @@
4546
BaseVideoUnderstandNode,
4647
BaseIntentNode, BaseLoopNode, BaseLoopStartStepNode,
4748
BaseLoopContinueNode,
48-
BaseLoopBreakNode, BaseVariableSplittingNode, BaseParameterExtractionNode]
49+
BaseLoopBreakNode, BaseVariableSplittingNode, BaseParameterExtractionNode, BaseVariableAggregationNode]
4950

5051

5152
def get_node(node_type):

apps/application/flow/step_node/variable_aggregation_node/__init__.py

Whitespace-only changes.
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
# coding=utf-8
2+
3+
from typing import Type
4+
5+
from django.utils.translation import gettext_lazy as _
6+
from rest_framework import serializers
7+
8+
from application.flow.i_step_node import INode, NodeResult
9+
10+
11+
12+
13+
class VariableListSerializer(serializers.Serializer):
14+
v_id = serializers.CharField(required=True, label=_("Variable id"))
15+
variable = serializers.ListField(required=True, label=_("Variable"))
16+
17+
18+
class VariableGroupSerializer(serializers.Serializer):
19+
id = serializers.CharField(required=True, label=_("Group id"))
20+
group_name = serializers.CharField(required=True, label=_("group_name"))
21+
variable_list = VariableListSerializer(many=True)
22+
23+
24+
class VariableAggregationNodeSerializer(serializers.Serializer):
25+
strategy = serializers.CharField(required=True, label=_("Strategy"))
26+
group_list = VariableGroupSerializer(many=True)
27+
28+
29+
class IVariableAggregation(INode):
30+
type = 'variable-aggregation-node'
31+
32+
33+
def get_node_params_serializer_class(self) -> Type[serializers.Serializer]:
34+
return VariableAggregationNodeSerializer
35+
36+
def _run(self):
37+
return self.execute(**self.node_params_serializer.data, **self.flow_params_serializer.data)
38+
39+
def execute(self,strategy,group_list,**kwargs) -> NodeResult:
40+
pass
41+
42+

apps/application/flow/step_node/variable_aggregation_node/impl/__init__.py

Whitespace-only changes.
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
#coding=utf-8
2+
"""
3+
@project: MaxKB
4+
@Author:虎²
5+
@file: base_variable_aggregation_node.py
6+
@date:2025/10/23 17:42
7+
@desc:
8+
"""
9+
from application.flow.i_step_node import NodeResult
10+
from application.flow.step_node.variable_aggregation_node.i_variable_aggregation_node import IVariableAggregation
11+
12+
13+
class BaseVariableAggregationNode(IVariableAggregation):
14+
15+
def save_context(self, details, workflow_manage):
16+
for key, value in details.get('result').items():
17+
self.context['key'] = value
18+
self.context['result'] = details.get('result')
19+
20+
def get_first_non_null(self, variable_list):
21+
22+
for variable in variable_list:
23+
v = self.workflow_manage.get_reference_field(
24+
variable.get('variable')[0],
25+
variable.get('variable')[1:])
26+
if v is not None:
27+
return v
28+
return None
29+
30+
def set_variable_to_json(self, variable_list):
31+
32+
return {variable.get('variable')[1:][0]: self.workflow_manage.get_reference_field(
33+
variable.get('variable')[0],
34+
variable.get('variable')[1:]) for variable in variable_list}
35+
36+
def execute(self,strategy,group_list,**kwargs) -> NodeResult:
37+
strategy_map = {'first_non_null':self.get_first_non_null,
38+
'variable_to_json': self.set_variable_to_json,
39+
}
40+
41+
result = { item.get('group_name'):strategy_map[strategy](item.get('variable_list')) for item in group_list}
42+
43+
return NodeResult({'result': result,**result},{})
44+
45+
def get_details(self, index: int, **kwargs):
46+
return {
47+
'name': self.node.properties.get('stepName'),
48+
"index": index,
49+
'run_time': self.context.get('run_time'),
50+
'type': self.node.type,
51+
'result': self.context.get('result'),
52+
'status': self.status,
53+
'err_message': self.err_message
54+
}

ui/src/locales/lang/en-US/views/application-workflow.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,15 @@ You are a master of problem optimization, adept at accurately inferring user int
263263
variableAggregationNode: {
264264
label: 'Variable Aggregation',
265265
text: 'Perform aggregation processing on the outputs of multiple branches',
266+
Strategy: 'Aggregation Strategy',
267+
placeholder: 'Return the first non-null value of each group',
268+
placeholder1: 'Structurally aggregate each group of variables',
269+
group: {
270+
placeholder: 'Please select a variable',
271+
noneError: 'Name cannot be empty',
272+
dupError: 'Name cannot be duplicated',
273+
},
274+
add: 'Add Group',
266275
},
267276
mcpNode: {
268277
label: 'MCP Node',

ui/src/locales/lang/zh-CN/views/application-workflow.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,15 @@ export default {
264264
variableAggregationNode: {
265265
label: '变量聚合',
266266
text: '对多个分支的输出进行聚合处理',
267+
Strategy: '聚合策略',
268+
placeholder: '返回每组的第一个非空值',
269+
placeholder1: '结构化聚合每组变量',
270+
group: {
271+
placeholder: '请选择变量',
272+
noneError: '名称不能为空',
273+
dupError: '名称不能重复',
274+
},
275+
add: '添加分组',
267276
},
268277
variableAssignNode: {
269278
label: '变量赋值',

ui/src/locales/lang/zh-Hant/views/application-workflow.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,15 @@ export default {
263263
variableAggregationNode: {
264264
label: '變量聚合',
265265
text: '對多個分支的輸出進行聚合處理',
266+
Strategy: '聚合策略',
267+
placeholder: '返回每組的第一個非空值',
268+
placeholder1: '結構化聚合每組變量',
269+
group: {
270+
placeholder: '請選擇變量',
271+
noneError: '名稱不能為空',
272+
dupError: '名稱不能重複',
273+
},
274+
add: '新增分組',
266275
},
267276
mcpNode: {
268277
label: 'MCP 調用',

ui/src/workflow/common/data.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -345,7 +345,9 @@ export const variableAggregationNode = {
345345
height: 252,
346346
properties: {
347347
stepName: t('views.applicationWorkflow.nodes.variableAggregationNode.label'),
348-
config: {},
348+
config: {
349+
fields: [],
350+
},
349351
},
350352
}
351353

0 commit comments

Comments
 (0)