Skip to content

Conversation

@zhanweizhang7
Copy link
Contributor

What this PR does / why we need it?

Summary of your change

Please indicate you've done the following:

  • Made sure tests are passing and test coverage is added if needed.
  • Made sure commit message follow the rule of Conventional Commits specification.
  • Considered the docs impact and opened a new docs issue or PR with docs changes if needed.

@f2c-ci-robot
Copy link

f2c-ci-robot bot commented Nov 14, 2025

Adding the "do-not-merge/release-note-label-needed" label because no release-note block was detected, please follow our release note process to remove it.

Details

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository.

@zhanweizhang7 zhanweizhang7 merged commit f335098 into v2 Nov 14, 2025
3 of 5 checks passed
@f2c-ci-robot
Copy link

f2c-ci-robot bot commented Nov 14, 2025

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by:

The full list of commands accepted by this bot can be found here.

Details Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@zhanweizhang7 zhanweizhang7 deleted the pr@v2@perf_mem branch November 14, 2025 10:42
self._cleanup()

def run_chain_async(self, current_node, node_result_future, language='zh'):
future = executor.submit(self.run_chain_manage, current_node, node_result_future, language)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The proposed code has several improvements:

  1. Exception Handling: Wrapped the main logic with a try-except block to ensure that any unexpected errors during execution are caught and cleaned up properly.

  2. Cleanup Method: Created a _cleanup method specifically for cleaning up resources at the end of operations like running a chain asynchronously. This ensures all object references are cleared, which can prevent memory leaks.

  3. Docstring Updates: Updated comments throughout the methods to be more informative and consistent in format.

  4. Comments Removal: Removed unnecessary redundant comments within functions as they do not enhance readability but may confuse readers.

These changes improve the robustness of the code by handling exceptions and reducing resource pollution due to stale references.

Suggested Cleanup Code

To further streamline the cleanup process, you could also consider defining an abstract base class (NodeContext) for managing context objects across different nodes, so each subclass handles its own specific clean-ups. Here's how it might look:

class NodeContext(metaclass=abc.ABCMeta):
    def clear(self): pass

# Example implementation using threading.local for thread-specific contexts
import threading

current_context = threading.local()

def setup_thread_specific_context():
    global current_context
    current_context.node_chunk_manage = None
    current_context.work_flow_post_handler = None
    current_context.flow = None
    current_context.start_node = None
    current_context.current_node = None
    current_context.current_result = None
    current_context.chat_record = None
    current_context.base_to_response = None
    current_context.params = None
    current_context.lock = None

setup_thread_specific_context()

This approach centralizes context management, making it easier to handle various stateful objects across threads or asynchronous tasks.

Final Cleaned-up Code Snippet:

@staticmethod
def start_task(flow_id: str) -> Task:
    flow = Flow.objects.filter(id=flow_id).first()
    if not flow:
        raise ValueError(f"Flow with id {flow_id} does not exist.")
    
    return cls(flow)

async def run_async(self, async_loop=None, language="zh"):
    try:
        start = time.time()
        self._start_time = start
        
        await self._set_start_info(async_loop, "run_async", False)
        
        # Additional steps...
        
        while True:
            await asyncio.sleep(0.1)
            if not self.running and all(not f.done() for f in getattr(self, 'future', [])):
                break
        
        result_text, info_dict = [], []
        
        # Handle intermediate messages (e.g., step-by-step outputs)
        
        # Generate final response
        resp = self._to_api_resp("final_response", "", "", {}, "")
        
        # Await results from background tasks
        await asyncio.gather(*[f.result() for f in getattr(self, 'future', [])])
        
        print(f'Total execution time took {time.time() - start:.2f}s')
        
        return resp
    
    finally:
        self._end_time = round(time.time(), 4)
        total_cost = int((self._execution_total / len(self.output_queue)) * 1000) if self.output_queue else 0
        
        logger.info(f"{task_type.upper()}, cost:{total_cost}ms")
        
        task_manager.save_task(self.task)
        setattr(self.task, '_id', self.id)
        self.save()
        
        self._clear_all_objects()
        

def _clear_all_objects(self):
    if hasattr(current_context, 'result'):
        del current_context.result

Note: The above cleaned-up code assumes the existence of relevant classes and modules such as Task, Flow, output_queue, among others. These need to be implemented or referenced correctly based on your actual system architecture.

yield data


async def anext_async(agen):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The provided code has several improvements and optimizations:

Improvements/Enhancements:

  1. Global Event Loop Management:

    • Global events loop management is implemented using a thread-safe approach. This avoids creating multiple event loops which can lead to resource leaks.
  2. Error Handling on Exit:

    • Proper error handling is added when the coroutine exits (finally block). The exception details are still logged.
  3. Concurrency Improvements:

    • Instead of creating a new event loop for each response, the function uses an existing global one managed by a separate thread. This reduces overhead associated with repeatedly spinning up and closing event loops.
  4. Queueing Results:

    • All asynchronous tasks that produce results now use a queue.Queue() instead of returning data synchronously. This keeps memory usage more efficient while maintaining high throughput.

Optimizations:

  1. Avoid Repeated Logging:

    • Ensure consistent logging format and possibly adjust logs based on their severity level.
  2. Thread Safety Enhancements:

    • Use context managers or explicit locking mechanisms where applicable across different threads to prevent race conditions.
  3. Memory Efficiency:

    • Minimize the time spent managing resources (like locks) during critical sections of execution. If possible, consider offloading computationally intensive parts to worker processes.
  4. Logging Contexts:

    • Consider adding additional contextual information to log messages such as request ID, user IP, etc., especially when logging errors for debugging.

By addressing these points, you improve both performance and maintainability of the codebase. Always ensure compatibility across the specified cutoff date and test thoroughly before deployment.

chat_info_dict = cache.get(Cache_Version.CHAT.get_key(key=chat_id), version=Cache_Version.CHAT_INFO.get_version())
if chat_info_dict:
return ChatInfo.map_to_chat_info(chat_info_dict)
return None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code looks generally clean and follows Python conventions. Here are some minor points for consideration:

Potential Issues:

  1. String Formatting in @dataclass Decorator: The string format is not compatible with dataclasses.dataclass. Use 'YYYY-MM-DD HH:MM' instead of the full date format.

  2. Empty Lists:

    exclude_document_id_list=self.exclude_document_id_list

    Ensure that exclude_document_id_list is properly initialized before passing it to the constructor.

  3. Static Method Parameters:
    The static method map_to_chat_record should use keyword parameters (chat_record_dict) without default values, but you can add an optional parameter like this:

    @staticmethod
    def map_to_chat_record(chat_record_dict=None):
        if chat_record_dict is None:
            chat_record_dict = {}
        return ChatRecord(
            id=chat_record_dict.get('id'):
            chat_id=chat_record_dict.get('chat_id'),
            vote_status=chat_record_dict.get('vote_status'),
            problem_text=chat_record_dict.get('problem_text'),
            answer_text=chat_record_dict.get('answer_text'),
            answer_text_list=chat_record_dict.get('answer_text_list'),
            message_tokens=chat_record_dict.get('message_tokens'),
            answer_tokens=chat_record_dict.get('answer_tokens'),
            const=chat_record_dict.get('const'),
            details=chat_record_dict.get('details'),
            improve_paragraph_id_list=chat_record_dict.get('improve_paragraph_id_list'),
            run_time=chat_record_dict.get('run_time'),
            index=chat_record_dict.get('index'))

Optimization Suggestions:

  1. Avoid Recursively Serializing Objects:
    In the case of nested objects (like chat_record_list), avoid serializing them recursively within each object by using a custom function. This can be done by modifying the serialization logic:

    def chat_record_to_dict(self, chat_record):
        return {
            'id': chat_record.id,
            # Serialize other fields here...
            'run_time': chat_record.run_time.strftime('%Y-%M-%d %H:%M:%S')  # Example format conversion
        }
    
    def serialize_data(self):
        data = self.to_dict()
        data['chat_record_list'] = [
            self.chat_record_to_dict(record) for record in self.chat_record_list
        ]
        return data
  2. Use Data Classes Instead of Regular Classes:
    Consider converting ChatRecord and ChatInfo into Pydantic models or FastAPI schemas for better type safety and additional features provided by these libraries.

  3. Consider Caching Strategy:
    If caching involves multiple layers (e.g., chat_info -> chat_records), ensure that the cache keys are appropriate and do not lead to conflicts between different caches.

By addressing these points, you can enhance both readability and performance of your code.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants