Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion apps/application/flow/step_node/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from .text_to_video_step_node.impl.base_text_to_video_node import BaseTextToVideoNode
from .tool_lib_node import *
from .tool_node import *
from .variable_aggregation_node.impl.base_variable_aggregation_node import BaseVariableAggregationNode
from .variable_assign_node import BaseVariableAssignNode
from .variable_splitting_node import BaseVariableSplittingNode
from .video_understand_step_node import BaseVideoUnderstandNode
Expand All @@ -45,7 +46,7 @@
BaseVideoUnderstandNode,
BaseIntentNode, BaseLoopNode, BaseLoopStartStepNode,
BaseLoopContinueNode,
BaseLoopBreakNode, BaseVariableSplittingNode, BaseParameterExtractionNode]
BaseLoopBreakNode, BaseVariableSplittingNode, BaseParameterExtractionNode, BaseVariableAggregationNode]


def get_node(node_type):
Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# coding=utf-8

from typing import Type

from django.utils.translation import gettext_lazy as _
from rest_framework import serializers

from application.flow.i_step_node import INode, NodeResult




class VariableListSerializer(serializers.Serializer):
v_id = serializers.CharField(required=True, label=_("Variable id"))
variable = serializers.ListField(required=True, label=_("Variable"))


class VariableGroupSerializer(serializers.Serializer):
id = serializers.CharField(required=True, label=_("Group id"))
group_name = serializers.CharField(required=True, label=_("group_name"))
variable_list = VariableListSerializer(many=True)


class VariableAggregationNodeSerializer(serializers.Serializer):
strategy = serializers.CharField(required=True, label=_("Strategy"))
group_list = VariableGroupSerializer(many=True)


class IVariableAggregation(INode):
type = 'variable-aggregation-node'


def get_node_params_serializer_class(self) -> Type[serializers.Serializer]:
return VariableAggregationNodeSerializer

def _run(self):
return self.execute(**self.node_params_serializer.data, **self.flow_params_serializer.data)

def execute(self,strategy,group_list,**kwargs) -> NodeResult:
pass


Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall, the code looks well-structured and follows Django Rest Framework conventions. However, there are a few areas where improvements can be made:

  1. Docstring for Methods: You have docstrings in all classes but no documentation strings (like __doc__) for each method. It would be helpful to add brief descriptions for methods that describe what they do.

  2. Error Handling: While not explicitly shown here, it's generally good practice to implement error handling across different parts of your application to make it more robust against unexpected inputs or failures during execution.

  3. Type Hints on Arguments: The use of type hints is present within the serializer classes and elsewhere in the code, which is a good approach. Make sure that these are consistent throughout, including function parameters and class variables.

  4. Class Names: Class names follow best practices, such as using meaningful and descriptive names like IVariableAggregation instead of generic ones like INode.

  5. Method Naming: Method names should typically start with lowercase letters followed by underscores if they aren't functions (as they appear). Consider renaming _run to something more explicit, perhaps like process, depending on its intended functionality.

  6. Code Style: Ensure consistency in indentation and spacing across the file. This makes the code easier to read and maintain.

  7. Documentation: Add detailed comments at key points in the code, especially around complex logic or data flow. Use Markdown formatting within docstrings where appropriate.

With these considerations addressed, your code will be cleaner, safer, and easier to understand. Here’s an example of how some specific parts could benefit from additional context:

class VariableListSerializer(serializers.Serializer):
    """
    Serializer for individual variable information.
    
    Fields:
        v_id: Unique identifier for the variable.
        variable: List of values associated with this variable.
    """

    v_id = serializers.CharField(required=True, label=_("Variable id"))
    variable = serializers.ListField(required=True, label=_("Variable"))

class VariableGroupSerializer(serializers.Serializer):
    """Serializer for grouping variables together based on various criteria."""

    id = serializers.CharField(required=True, label=_("Group id"))
    group_name = serializers.CharField(required=True, label=_("group name"))
    variable_list = VariableListSerializer(many=True)

class VariableAggregationNodeSerializer(serializers.Serializer):
    """
    Serializer for aggregation configurations, allowing nodes to define grouping strategies and manage sets of variables.
    
    Fields:
        strategy: Type of aggregation method to apply.
        group_list: Collection of VariableGroupSerializer instances defining groups of variables.
    """

    strategy = serializers.CharField(required=True, label=_("Strategy"))
    group_list = VariableGroupSerializer(many=True)

class IVariableAggregation(INode):
    """
    Interface representing aggregation logic nodes.
    
    Attributes:
        strategy: Aggregation strategy used by the node.

    Methods:
        get_node_params_serializer_class: Returns the appropriate serialization object for this node's configuration.
        _run: Executes node-specific logic after receiving input.
        execute(strategy, group_list): Defines the core logic to perform variable aggregation.
    """

    type = 'variable-aggregation-node'

    def get_node_params_serializer_class(self) -> Type[serializers.Serializer]:
        """
        Return serializer class for configuring this aggregation node
        
        :return: VariableAggregationNodeSerializer instance
        """
        return VariableAggregationNodeSerializer

    def _run(self):
        """
        Runs the node asynchronously by processing its inputs and invoking the internal execute method.
        
        This method is abstract and must be implemented by subclasses.
        
        """
        print(f"Running {self.type} node...")  # Simplified logging; replace with actual implementation
        result = self.execute(**self.node_params_serializer().data(), **self.flow_params_serializer().data())
        return NodeResult(result=result)

    async def execute(
            self,
            strategy=None,
            group_list=None,
            **kwargs
    ) -> None:
        """
        Perform aggregation logic over the provided group definitions.
        
        This method receives necessary arguments and executes aggregating calculations.
        
        :param kwargs: Additional keyword arguments passed to the method
        :return: Nothing (results are stored internally)
        """

        # Placeholder implementation
        pass

Each class now has a comprehensive docstring describing its purpose, fields, and methods, enhancing understanding among developers who may work with or contribute to the codebase.

Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
#coding=utf-8
"""
@project: MaxKB
@Author:虎²
@file: base_variable_aggregation_node.py
@date:2025/10/23 17:42
@desc:
"""
from application.flow.i_step_node import NodeResult
from application.flow.step_node.variable_aggregation_node.i_variable_aggregation_node import IVariableAggregation


class BaseVariableAggregationNode(IVariableAggregation):

def save_context(self, details, workflow_manage):
for key, value in details.get('result').items():
self.context['key'] = value
self.context['result'] = details.get('result')

def get_first_non_null(self, variable_list):

for variable in variable_list:
v = self.workflow_manage.get_reference_field(
variable.get('variable')[0],
variable.get('variable')[1:])
if v is not None:
return v
return None

def set_variable_to_json(self, variable_list):

return {variable.get('variable')[1:][0]: self.workflow_manage.get_reference_field(
variable.get('variable')[0],
variable.get('variable')[1:]) for variable in variable_list}

def execute(self,strategy,group_list,**kwargs) -> NodeResult:
strategy_map = {'first_non_null':self.get_first_non_null,
'variable_to_json': self.set_variable_to_json,
}

result = { item.get('group_name'):strategy_map[strategy](item.get('variable_list')) for item in group_list}

return NodeResult({'result': result,**result},{})

def get_details(self, index: int, **kwargs):
return {
'name': self.node.properties.get('stepName'),
"index": index,
'run_time': self.context.get('run_time'),
'type': self.node.type,
'result': self.context.get('result'),
'status': self.status,
'err_message': self.err_message
}
9 changes: 9 additions & 0 deletions ui/src/locales/lang/en-US/views/application-workflow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,15 @@ You are a master of problem optimization, adept at accurately inferring user int
variableAggregationNode: {
label: 'Variable Aggregation',
text: 'Perform aggregation processing on the outputs of multiple branches',
Strategy: 'Aggregation Strategy',
placeholder: 'Return the first non-null value of each group',
placeholder1: 'Structurally aggregate each group of variables',
group: {
placeholder: 'Please select a variable',
noneError: 'Name cannot be empty',
dupError: 'Name cannot be duplicated',
},
add: 'Add Group',
},
mcpNode: {
label: 'MCP Node',
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The provided code snippet appears to be part of a configuration file for a software interface that manages different types of nodes used in processing data pipelines. Here's a brief review of the changes and potential optimizations:

  1. The Strategy property was added under variableAggregationNode, providing more context about what kind of aggregation strategy is available.
  2. Two placeholders (placeholder1 and placeholder) were introduced to enhance clarity, but they might benefit from better descriptions to match their intended use cases.
  3. A group object was created within variableAggregationNode, which defines placeholder instructions for selecting variables and error messages related to name validation.
  4. An "Add Group" button was included under the group object, suggesting it allows users to manage groups of aggregated variables.

Overall, these additions provide additional functionality and clarify the purpose behind each feature. However, there isn't enough information in the given code sample about how these features interact with other parts of the system or any specific performance requirements for efficiency.

If you're looking for further optimization suggestions based on typical practices for such interfaces, consider the following general tips:

  • Consistent Naming: Ensure consistent naming conventions across the configurations to maintain readability.
  • UI/UX Considerations: Analyze if adding new controls like "add" buttons impacts the usability of the interface. Sometimes, hidden complexity can hinder simplicity.
  • Scalability: If this is part of a larger application or platform, ensure that these new functionalities do not introduce scalability bottlenecks.
  • Testing: Implement unit tests to verify that all new features work as expected and perform correctly under various conditions.

For production-ready applications, integrating user feedback mechanisms and ensuring robust testing will also be important steps.

Expand Down
9 changes: 9 additions & 0 deletions ui/src/locales/lang/zh-CN/views/application-workflow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,15 @@ export default {
variableAggregationNode: {
label: '变量聚合',
text: '对多个分支的输出进行聚合处理',
Strategy: '聚合策略',
placeholder: '返回每组的第一个非空值',
placeholder1: '结构化聚合每组变量',
group: {
placeholder: '请选择变量',
noneError: '名称不能为空',
dupError: '名称不能重复',
},
add: '添加分组',
},
variableAssignNode: {
label: '变量赋值',
Expand Down
9 changes: 9 additions & 0 deletions ui/src/locales/lang/zh-Hant/views/application-workflow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,15 @@ export default {
variableAggregationNode: {
label: '變量聚合',
text: '對多個分支的輸出進行聚合處理',
Strategy: '聚合策略',
placeholder: '返回每組的第一個非空值',
placeholder1: '結構化聚合每組變量',
group: {
placeholder: '請選擇變量',
noneError: '名稱不能為空',
dupError: '名稱不能重複',
},
add: '新增分組',
},
mcpNode: {
label: 'MCP 調用',
Expand Down
4 changes: 3 additions & 1 deletion ui/src/workflow/common/data.ts
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,9 @@ export const variableAggregationNode = {
height: 252,
properties: {
stepName: t('views.applicationWorkflow.nodes.variableAggregationNode.label'),
config: {},
config: {
fields: [],
},
},
}

Expand Down
Loading
Loading