Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion apps/application/flow/i_step_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,11 @@ def __init__(self, chat_info, knowledge_action_id):
self.knowledge_action_id = knowledge_action_id

def handler(self, workflow):
err = [True for key, value in workflow.get_runtime_details().items() if value.get('status') == 500]
QuerySet(KnowledgeAction).filter(id=self.knowledge_action_id).update(
state=State.SUCCESS)
state=State.FAILURE if any(err) else State.SUCCESS,
run_time=time.time() - workflow.context.get('start_time') if workflow.context.get(
'start_time') is not None else 0)


class NodeResult:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The provided code has several irregularities and potential optimizations:

  1. Inconsistent Use of for Loop: In the constructor (__init__) and handler, there are different use cases for loops (if ... else if vs. list comprehension). This can be inconsistent and lead to bugs.

  2. Potential Null Value: The line at the end of the handler method uses workflow.context.get('start_time'), which could potentially return None. Using 0 when it's missing doesn't align with common practice (e.g., handling null values).

  3. Useless List Comprehension: The list comprehension [True for key, value in workflow.get_runtime_details().items() if value.get('status') == 500] creates a list without using its elements. It would be more efficient to just calculate the error flag directly without storing all individual True/False results.

  4. Redundant Time Calculation: Calculating time.time() - workflow.context.get('start_time') twice within an assignment statement is redundant. Only calculate it once if needed multiple times or store in variable first.

Optimized version:

class KnowledgeActionModel(models.Model):
    state = models.CharField(max_length=16)
    
    # Ensure this function exists or replace it with equivalent logic if necessary
    def update_state(self, workflow):
        err = any(value['status'] == 500 for value in workflow.get_runtime_details())
        
        # Calculate only once
        run_time_seconds = time.time() - (
            workflow.context.get('start_time', 0)
        )
        
        self.state = State.FAILURE if err else State.SUCCESS
        self.run_time = round(run_time_seconds, 2)  

Additional notes:

  • Assuming Django ORM usage (models.Model), ensure all referenced classes (KnowledgeAction, State). If these are custom classes elsewhere, adjust accordingly.
  • Error handling should typically occur before performing database operations, especially sensitive ones like updating state.
  • Consider refactoring to separate concerns better; e.g., move time calculation into utility functions or methods outside class definitions.

Expand Down
2 changes: 2 additions & 0 deletions apps/application/flow/knowledge_workflow_manage.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
@date:2025/11/13 19:02
@desc:
"""
import time
import traceback
from concurrent.futures import ThreadPoolExecutor

Expand Down Expand Up @@ -43,6 +44,7 @@ def get_start_node(self):
return start_node_list[0]

def run(self):
self.context['start_time'] = time.time()
executor.submit(self._run)

def _run(self):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The provided code looks mostly clean and follows Python best practices. However, there are a few areas that could be improved:

  1. Import Statement: The traceback module is not used anywhere in the code snippet you've shared. If this import was intended to handle exceptions differently, consider removing it unless necessary.

  2. Concurrency Initialization: While using ThreadPoolExecutor is good practice for parallel processing, ensure that all tasks are correctly submitted through executor.submit(). There might be some context handling required before submitting tasks.

  3. Context Management: Accessing variables like self.context['start_time'] should be done within methods where such variables are defined or updated. It could lead to errors if _init_context() returns an empty dictionary.

  4. Task Submission Handling: Currently, _run() is only being called once by a single thread when _run() itself submits further tasks to executor, but since each task calls back into _run(), care should be taken to prevent infinite recursion loops.

Here's a revised version of the method assuming proper usage conventions:

def run(self):
    # Initialize execution time
    self.context['start_time'] = time.time()

    # Submit initial task
    future = executor.submit(self._run)

    try:
        # Wait until all futures complete
        future.result(timeout=None)
    except Exception as e:
        print(f"An error occurred: {e}")

This change ensures that the initialization step and subsequent submission of tasks work together seamlessly without any logical inconsistencies.

Expand Down
Loading