Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions api/core/workflow/nodes/variable_assigner/v1/node.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from collections.abc import Mapping, Sequence
from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING, Any, Set, Tuple

from core.variables import SegmentType, Variable
from core.workflow.constants import CONVERSATION_VARIABLE_NODE_ID
Expand Down Expand Up @@ -33,6 +33,15 @@ def __init__(
graph_runtime_state=graph_runtime_state,
)

def blocks_variable_output(self, variable_selectors: Set[Tuple[str, ...]]) -> bool:
"""
Check if this Variable Assigner node blocks the output of specific variables.

Returns True if this node updates any of the requested conversation variables.
"""
assigned_selector = self.node_data.assigned_variable_selector
return assigned_selector in variable_selectors
Comment on lines +42 to +43

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Type mismatch in blocks_variable_output causes comparison to always return False

The blocks_variable_output method in v1 VariableAssignerNode compares a Sequence[str] (list) directly with a set[tuple[str, ...]], which will always return False because a list is never equal to a tuple.

Click to expand

Bug Mechanism

The assigned_variable_selector is of type Sequence[str] (typically a list[str] from Pydantic parsing at node_data.py:14). The variable_selectors parameter is a set[tuple[str, ...]] (as defined in the base class at node.py:435 and populated in coordinator.py:172-175).

When checking assigned_selector in variable_selectors at line 43, Python compares a list with tuples in the set. Since ['conversation', 'conv_var'] != ('conversation', 'conv_var'), the comparison always returns False.

Comparison with v2 Implementation

The v2 implementation correctly handles this at v2/node.py:79:

item_selector_tuple = tuple(item.variable_selector)
if item_selector_tuple in variable_selectors:
    return True

Impact

The streaming response coordinator uses blocks_variable_output to determine if a node must complete before streaming can proceed. Because this method always returns False, the v1 Variable Assigner node will never block streaming output, potentially causing the Answer node to stream the old/default value of a conversation variable before the Variable Assigner has updated it.

Recommendation: Convert the assigned_selector to a tuple before comparison:

assigned_selector = tuple(self.node_data.assigned_variable_selector)
return assigned_selector in variable_selectors
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.


@classmethod
def version(cls) -> str:
return "1"
Expand Down Expand Up @@ -89,10 +98,17 @@ def _run(self) -> NodeRunResult:
self.graph_runtime_state.variable_pool.add(assigned_variable_selector, updated_variable)

updated_variables = [common_helpers.variable_to_processed_data(assigned_variable_selector, updated_variable)]

# Prepare input value for result
if self.node_data.write_mode == WriteMode.CLEAR:
result_input_value = updated_variable.to_object()
else:
result_input_value = income_value.to_object()

return NodeRunResult(
status=WorkflowNodeExecutionStatus.SUCCEEDED,
inputs={
"value": income_value.to_object(),
"value": result_input_value,
},
# NOTE(QuantumGhost): although only one variable is updated in `v1.VariableAssignerNode`,
# we still set `output_variables` as a list to ensure the schema of output is
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
app:
description: Validate v1 Variable Assigner blocks streaming until conversation variable is updated.
icon: 🤖
icon_background: '#FFEAD5'
mode: advanced-chat
name: test_streaming_conversation_variables_v1_overwrite
use_icon_as_answer_icon: false
dependencies: []
kind: app
version: 0.5.0
workflow:
conversation_variables:
- description: ''
id: 6ddf2d7f-3d1b-4bb0-9a5e-9b0c87c7b5e6
name: conv_var
selector:
- conversation
- conv_var
value: default
value_type: string
environment_variables: []
features:
file_upload:
allowed_file_extensions:
- .JPG
- .JPEG
- .PNG
- .GIF
- .WEBP
- .SVG
allowed_file_types:
- image
allowed_file_upload_methods:
- local_file
- remote_url
enabled: false
fileUploadConfig:
audio_file_size_limit: 50
batch_count_limit: 5
file_size_limit: 15
image_file_size_limit: 10
video_file_size_limit: 100
workflow_file_upload_limit: 10
image:
enabled: false
number_limits: 3
transfer_methods:
- local_file
- remote_url
number_limits: 3
opening_statement: ''
retriever_resource:
enabled: true
sensitive_word_avoidance:
enabled: false
speech_to_text:
enabled: false
suggested_questions: []
suggested_questions_after_answer:
enabled: false
text_to_speech:
enabled: false
language: ''
voice: ''
graph:
edges:
- data:
isInIteration: false
isInLoop: false
sourceType: start
targetType: assigner
id: start-source-assigner-target
source: start
sourceHandle: source
target: assigner
targetHandle: target
type: custom
zIndex: 0
- data:
isInLoop: false
sourceType: assigner
targetType: answer
id: assigner-source-answer-target
source: assigner
sourceHandle: source
target: answer
targetHandle: target
type: custom
zIndex: 0
nodes:
- data:
desc: ''
selected: false
title: Start
type: start
variables: []
height: 54
id: start
position:
x: 30
y: 253
positionAbsolute:
x: 30
y: 253
selected: false
sourcePosition: right
targetPosition: left
type: custom
width: 244
- data:
answer: 'Current Value Of `conv_var` is:{{#conversation.conv_var#}}'
desc: ''
selected: false
title: Answer
type: answer
variables: []
height: 106
id: answer
position:
x: 638
y: 253
positionAbsolute:
x: 638
y: 253
selected: true
sourcePosition: right
targetPosition: left
type: custom
width: 244
- data:
assigned_variable_selector:
- conversation
- conv_var
desc: ''
input_variable_selector:
- sys
- query
selected: false
title: Variable Assigner
type: assigner
write_mode: over-write
height: 84
id: assigner
position:
x: 334
y: 253
positionAbsolute:
x: 334
y: 253
selected: false
sourcePosition: right
targetPosition: left
type: custom
width: 244
viewport:
x: 0
y: 0
zoom: 0.7
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,33 @@ def test_streaming_conversation_variables():
runner = TableTestRunner()
result = runner.run_test_case(case)
assert result.success, f"Test failed: {result.error}"


def test_streaming_conversation_variables_v1_overwrite_waits_for_assignment():
fixture_name = "test_streaming_conversation_variables_v1_overwrite"
input_query = "overwrite-value"

case = WorkflowTestCase(
fixture_path=fixture_name,
use_auto_mock=False,
mock_config=MockConfigBuilder().build(),
query=input_query,
inputs={},
expected_outputs={"answer": f"Current Value Of `conv_var` is:{input_query}"},
)

runner = TableTestRunner()
result = runner.run_test_case(case)
assert result.success, f"Test failed: {result.error}"

events = result.events
conv_var_chunk_events = [
event
for event in events
if isinstance(event, NodeRunStreamChunkEvent) and event.selector == ["conversation", "conv_var"]
]

assert conv_var_chunk_events, "Expected conversation variable chunk events to be emitted"
assert all(event.chunk == input_query for event in conv_var_chunk_events), (
"Expected streamed conversation variable value to match the input query"
)