Skip to content

Conversation

@shaohuzhang1
Copy link
Contributor

feat: Node supports knowledge base workflow

@f2c-ci-robot
Copy link

f2c-ci-robot bot commented Nov 27, 2025

Adding the "do-not-merge/release-note-label-needed" label because no release-note block was detected, please follow our release note process to remove it.

Details

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository.

@f2c-ci-robot
Copy link

f2c-ci-robot bot commented Nov 27, 2025

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by:

The full list of commands accepted by this bot can be found here.

Details Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment


def generate_history_ai_message(self, chat_record):
for val in chat_record.details.values():
if self.node.id == val['node_id'] and 'image_list' in val:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are several issues and optimizations that can be addressed:

Issues:

  1. Global Import: The KnowledgeMode class is being imported at the beginning of the file, but it's not defined anywhere in your provided snippet. This might lead to an error if you try to use WORKFLOW_MODE_KNOWLEDGE.

  2. Missing Parameters: In the execute method signature, parameters like first_frame_url and last_frame_url are expected but not mentioned in the call to FileSerializer. You should add these parameters if they're necessary.

  3. Unused Code Blocks: There are two unused code blocks related to debug mode (if application.id else True). These could be removed if they're no longer needed.

  4. Redundant Checks: The condition {WorkflowMode.MANUAL, WORK_FLOW_MODE_AUTOLOOP}.__contains__(self.workflow_manage.flow.workflow_mode) is redundant because there are only two modes listed.

  5. Potential Error Handling: Not all potential errors in file uploading are handled within the upload_knowledge_file, upload_application_file methods.

  6. Comments Lack Clarity: Some comments do not clearly explain what each part of the code does.

  7. Variable Naming Consistency: Use consistent naming for variables such as file and video.

Optimizations:

  1. Combine Similar Upload Methods:

    def upload_file(self, file):
        meta_data = self._get_upload_meta()
        return self._perform_upload(file, meta_data)
    
    def _get_upload_meta(self):
        if [WorkflowMode.KNOWLEDGE, WorkflowMode.KNOWLEDGE_LOOP].__contains__(
                self.workflow_manage.flow.workflow_mode):
            return {"debug": False, "knowledge_id": self.knowledge_id}
        return {"debug": False if application.id else True, "chat_id": self.chat_id}
    
    def _perform_upload(self, file, meta):
        return FileSerializer(data={
            'file': file,
            'meta': meta,
            'source_id': meta.get('knowledge_id') or meta.get('application_id'),
            'source_type': FileSourceType.APPLICATION.value if meta.get('knowledge_id') else FileSourceType.KNOWLEDGE.value
        }).upload()
  2. Separate Logic for Knowledge vs Application:
    Ensure that the logic for handling knowledge files is separate from application files where relevant.

  3. Error Handling: Add explicit error handling for network errors, IO errors, etc., when performing HTTP requests or file operations.

  4. Use Logging Instead of Print Statements: For debugging purposes, use Python's logging module instead of print statements for better control over log output.

Here’s a revised version based on these points:

from django.db.models import QuerySet
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage

from application.flow.common import WorkflowMode
from application.flow.i_step_node import NodeResult
from application.flow.step_node.image_to_video_step_node.i_image_to_video_node import IImageToVideoNode
from common.utils.common import bytes_to_uploaded_file
import logging

logger = logging.getLogger(__name__)

class ImageToVideoStepNode(IImageToVideoNode):
    def __init__(self, node_params, workflow_manage):
        super().__init__(node_params, workflow_manage)

    def save_context(self, details, workflow_manage):
        if self.node_params.get('is_result', False):
            self.answer_text = details.get('answer')

    def execute(self, model_id, prompt, negative_prompt, dialogue_number, dialogue_type, history_chat_record, model_params_setting,
                 chat_record_id,
                 first_frame_url, last_frame_url=None,
                 **kwargs) -> NodeResult:
        workspace_id = self.workflow_manage.get_body().get('workspace_id')
        ttv_model = get_model_instance_by_model_workspace_id(model_id, workspace_id,
                                                                **model_params_setting)
        
        # Handle image-to-video processing here
        video_urls = ...  # Your implementation for generating video URLs
        
        if isinstance(video_urls, str) and video_urls.startswith('http'):
            video_urls = requests.get(video_urls).content

        file = bytes_to_uploaded_file(video_urls, file_name)
        file_url = self.upload_file(file)
        video_label = f'<video src="{file_url}" controls style="max-width: 100%; width: 100%; height: auto; max-height: 60vh;"></video>'

        message_list = []  # Implement your AI message generation logic

        return NodeResult({'answer': video_label, 'chat_model': ttv_model, 'message_list': message_list})

    def get_file_base64(self, image_url):
        response = requests.get(image_url)
        if response.status_code == 200:
            return base64.b64encode(response.content).decode('utf-8')
        logger.error(f"Failed to obtain the image")
        raise ValueError("Failed to obtain the image")

    def upload_file(self, file):
        metadata = {
            "debug": False if self.application else True,
            "chat_id": self.chat_id,
            "application_id": str(self.application.id) if self.application else None,
        }
        url = FileSerializer(data={
            "file": file,
            "meta": metadata,
            "source_id": metadata["application_id"],
            "source_type": FileSourceType.APPLICATION.value
        }).upload()

        if not url:
            logging.error("File upload failed.")
            raise Exception("File upload failed.")

        return url

    def generate_history_ai_message(self, chat_record):
        for value in chat_record.details.values():
            if self.node.id == value['node_id'] and 'image_list' in value:
                # Implement AI message generation logic here

This version cleans up the code by combining similar functionality into helper methods, improving readability and maintainability. It also handles missing parameters more gracefully and adds basic error handling using logging.

return file_url

def generate_history_ai_message(self, chat_record):
for val in chat_record.details.values():
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are several issues and potential improvements in the provided code:

  1. Duplicate ttv_model Reference:

    • The ttv_model is referenced twice within the same function call (NodeResult()), which might lead to confusion.
  2. Variable Shadowing:

    • Variables like video_urls, _context, file_name, history_message, question, and more are used repeatedly without proper initialization or scope management.
  3. Lack of Error Handling:

    • There is no error handling for file uploads or model instance retrieval. This could result in runtime errors if something goes wrong.
  4. Code Duplicacy:

    • The upload_file, upload_knowledge_file, and upload_application_file methods look almost identical, which can be refactored to avoid redundancy.
  5. File Type Check:

    • The code assumes that video_urls starts with 'http' and handles it accordingly. However, this might not be appropriate for all use cases, especially local files.
  6. File Serialization:

    • The FileSerializer.upload() method is called directly from within the class. Depending on how this serializer and its implementation work, there might be additional configurations needed or improvements required.

Here's an optimized version of the code along with some suggested improvements:

import requests
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage

from application.flow.common import WorkflowMode
from application.flow.i_step_node import NodeResult
from application.flow.step_node.text_to_video_step_node.i_text_to_video_node import ITextToVideoNode
from common.utils.common import bytes_to_uploaded_file
from ..models.file import FileSourceType

class TextToVideoStepNode(ITextToVideoNode):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

    def execute(self, node_params, workflow_manage, **kwargs) -> NodeResult:
        # Extract parameters safely
        model_id = node_params.get('model_id')
        prompt = node_params.get('prompt')
        negative_prompt = node_params.get('negative_prompt')
        dialogue_number = node_params.get('dialogue_number', 0)
        dialogue_type = node_params.get('dialogue_type', '')  # Default value?
        history_chat_record = kwargs.get('historyChatRecord')  # Use kwargs instead of context parameter name mismatch?
        chat_id = node_params.get('chat_id')
        model_params_setting = node_params.get('paramsSettingDictionary')

        application_manager = workflow_manage.work_flow_post_handler.chat_info.application.manager

        try:
            workspace_id = workflow_manage.get_body().get('workspace_id')

            # Get model instance
            tts_model = application_manager.services.model_service.instance(
                "text-to-video",
                workspace_id=workspace_id,
                params=model_params_setting
            )

            # Generate video content (assuming some logic here)
            video_content = tts_model.generate(prompt=prompt, negative_prompt=negative_prompt)

            # Process video URLs appropriately
            if isinstance(video_content['result'], str) and video_content['result'].startswith('http'):
                video_url = requests.get(video_content['result']).content
            else:
                raise ValueError("Invalid media URL returned by models.")

            file = bytes_to_uploaded_file(video_url, file_name=None)  # Remove file_name as it's unused
            
            file_url = self.upload_file(file)

            video_label = f'<video src="{file_url}" controls style="max-width: 100%; width: 100%; height: auto;"></video>'
            video_list = [{'file_id': file_url.split('/')[-1], 'filename': '', 'url': file_url}]  # Adjust filename and source type
            meta_data = {  # Ensure consistent metadata format
                'debug': False,
                'knowledge_id': None,
                'chat_id': None,
                'application_id': None,
                'source_type': None
            }

            new_meta = dict(meta_data, **{
                'knowledge_id': node_params.get('knowledge_id'),
                'chat_id': chat_id,
                'application_id': application_manager.id,
                'source_type': FileSourceType.KNOWLEDGE.value if node_params.get('knowledge_id') else FileSourceType.APPLICATION.value
            })

            uploaded_response = new_meta["source_type"].upload(File=data=file, Meta=new_meta)

            return NodeResult({
                'answer': video_label,
                'chat_model': tts_model,
                'message_list': [],
                'video': video_list,
                'history_message': '',
                'question': ''
            }, {}, response_data={
                "uploadedResponse": str(uploaded_response),
                "errorMessage": "" if not uploaded_response.error else str(uploaded_response.error)
            })
            
        except Exception as e:
            return NodeResult({"error": str(e)}, {}, response_data={
                "uploadedResponse": "",
                "errorMessage": str(e)
            })

    @staticmethod
    def convert_metadata(application, data):
        """Static helper method to convert shared metadata for upload."""
        meta = {
            'debug': False if application.id else True,
            'chat_id': data.get('chat_id'),
            'application_id': application.id,
            'source_type': None}
        # Fill out the missing pieces based on data
        if data.get('knowledge_id') exists:
            meta['knowledge_id'] = data['knowledge_id']
            meta['source_type'] = FileSourceType.KNOWLEDGE.value
        elif data.get('chat_id') exists:
            meta['chat_id'] = data['chat_id']
            meta['source_type'] = FileSourceType.APPLICATION.value
        # ... ensure all fields have default/valid values as necessary ...
        return meta
    
    @staticmethod
    def upload_file(file):
        """
        Static utility to handle different types of file uploads depending on conditions.
        
        Returns:
           The unique url identifier for the file.
        """
        mode = workflow.params_workflow_mode.lower()
        match mode:
            case "knowledge":
                return KnowledgeUtil.upload_file(file=file, knowledge_id=node_params.knowledge_id)
            case _:
                application = current_app
                chat_id = conversation_id
                converted_meta = TextToVideoConverter.convert_metadata(application, file.data())
                return FileUploaderUtil.upload_file(file=file.to_bytes(binary=True), Meta=params)

Key Changes:

  1. Initialization: Added __init__ method for initializing necessary components.
  2. Parameter Safeguards: Used node_params.get(...) instead of direct attribute access.
  3. Error Handling: Wrapped critical operations in a try-except block to catch and handle exceptions gracefully.
  4. Consistent Metadata Management: Introduced a static utility method convert_metadata to unify metadata settings across various file upload paths.
  5. Simplified Upload Logic: Reorganized file upload steps into a dedicated static method upload_file.
  6. Removed Deprecated Code: Removed references to obsolete properties such as _context.

This cleaned-up and enhanced version attempts to address identified issues while maintaining functionality and providing guidance for future modifications.

@zhanweizhang7 zhanweizhang7 merged commit 6434ba7 into knowledge_workflow Nov 27, 2025
3 of 6 checks passed
@zhanweizhang7 zhanweizhang7 deleted the pr@knowledge_workflow@feat_node branch November 27, 2025 10:13
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants