Skip to content

Commit f5dd0a4

Browse files
[APO-2731] Filter undefined values from subworkflow inputs
When optional subworkflow inputs have dependencies that resolve to undefined, the workflow should execute seamlessly using default values instead of raising INVALID_INPUTS errors. This change filters out undefined values in both SubworkflowDeploymentNode and InlineSubworkflowNode input compilation methods. Co-Authored-By: vargas@vellum.ai <vargas@vellum.ai>
1 parent adbd146 commit f5dd0a4

File tree

4 files changed

+108
-7
lines changed

4 files changed

+108
-7
lines changed

src/vellum/workflows/nodes/core/inline_subworkflow_node/node.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -152,10 +152,15 @@ def _compile_subworkflow_inputs(self) -> InputsType:
152152
inputs_dict = {}
153153
for descriptor in inputs_class:
154154
if hasattr(self, descriptor.name):
155-
inputs_dict[descriptor.name] = getattr(self, descriptor.name)
155+
value = getattr(self, descriptor.name)
156+
# Filter out undefined values to allow optional inputs to use their defaults
157+
if value is not undefined:
158+
inputs_dict[descriptor.name] = value
156159
return inputs_class(**inputs_dict)
157160
elif isinstance(self.subworkflow_inputs, dict):
158-
return inputs_class(**self.subworkflow_inputs)
161+
# Filter out undefined values to allow optional inputs to use their defaults
162+
filtered_inputs = {k: v for k, v in self.subworkflow_inputs.items() if v is not undefined}
163+
return inputs_class(**filtered_inputs)
159164
elif isinstance(self.subworkflow_inputs, inputs_class):
160165
return self.subworkflow_inputs
161166
else:

src/vellum/workflows/nodes/core/inline_subworkflow_node/tests/test_node.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import pytest
22

3+
from vellum.workflows.constants import undefined
34
from vellum.workflows.errors.types import WorkflowErrorCode
45
from vellum.workflows.exceptions import NodeException
56
from vellum.workflows.inputs.base import BaseInputs
@@ -178,3 +179,38 @@ class Outputs(BaseWorkflow.Outputs):
178179

179180
assert initiated_events[0].body.workflow_definition.is_dynamic is False # Main workflow
180181
assert initiated_events[1].body.workflow_definition.is_dynamic is True # Inline workflow
182+
183+
184+
def test_inline_subworkflow_node__undefined_input_filtered():
185+
"""Test that undefined input values are filtered out when compiling subworkflow inputs."""
186+
187+
# GIVEN a subworkflow with a required input and an optional input with a default
188+
class SubworkflowInputs(BaseInputs):
189+
required_input: str
190+
optional_input: str = "default_value"
191+
192+
class InnerNode(BaseNode):
193+
class Outputs(BaseNode.Outputs):
194+
result = SubworkflowInputs.required_input
195+
196+
class TestSubworkflow(BaseWorkflow[SubworkflowInputs, BaseState]):
197+
graph = InnerNode
198+
199+
class Outputs(BaseWorkflow.Outputs):
200+
result = InnerNode.Outputs.result
201+
202+
# AND a node that passes an undefined value for the optional input
203+
class TestNode(InlineSubworkflowNode):
204+
subworkflow = TestSubworkflow
205+
subworkflow_inputs = {
206+
"required_input": "hello",
207+
"optional_input": undefined,
208+
}
209+
210+
# WHEN we run the node
211+
node = TestNode()
212+
events = list(node.run())
213+
214+
# THEN the node should complete successfully using the default value for optional_input
215+
assert events[-1].name == "result"
216+
assert events[-1].value == "hello"

src/vellum/workflows/nodes/displayable/subworkflow_deployment_node/node.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -88,9 +88,9 @@ def _compile_subworkflow_inputs(self) -> List[WorkflowRequestInputRequest]:
8888
def _add_compiled_input(
8989
self, compiled_inputs: List[WorkflowRequestInputRequest], input_name: str, input_value: Any
9090
) -> None:
91-
# Exclude inputs that resolved to be null. This ensure that we don't pass input values
91+
# Exclude inputs that resolved to be null or undefined. This ensures that we don't pass input values
9292
# to optional subworkflow inputs whose values were unresolved.
93-
if input_value is None:
93+
if input_value is None or input_value is undefined:
9494
return
9595
if isinstance(input_value, str):
9696
compiled_inputs.append(
@@ -155,12 +155,12 @@ def _compile_subworkflow_inputs_for_direct_invocation(self, workflow: "BaseWorkf
155155
if isinstance(self.subworkflow_inputs, BaseInputs):
156156
inputs_dict = {}
157157
for input_descriptor, input_value in self.subworkflow_inputs:
158-
if input_value is not None:
158+
if input_value is not None and input_value is not undefined:
159159
inputs_dict[input_descriptor.name] = input_value
160160
return inputs_class(**inputs_dict)
161161
else:
162-
# Filter out None values for direct invocation
163-
filtered_inputs = {k: v for k, v in self.subworkflow_inputs.items() if v is not None}
162+
# Filter out None and undefined values for direct invocation
163+
filtered_inputs = {k: v for k, v in self.subworkflow_inputs.items() if v is not None and v is not undefined}
164164
return inputs_class(**filtered_inputs)
165165

166166
def _run_resolved_workflow(

src/vellum/workflows/nodes/displayable/subworkflow_deployment_node/tests/test_node.py

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,10 @@
1414
from vellum.client.types.workflow_request_chat_history_input_request import WorkflowRequestChatHistoryInputRequest
1515
from vellum.client.types.workflow_request_json_input_request import WorkflowRequestJsonInputRequest
1616
from vellum.client.types.workflow_request_number_input_request import WorkflowRequestNumberInputRequest
17+
from vellum.client.types.workflow_request_string_input_request import WorkflowRequestStringInputRequest
1718
from vellum.client.types.workflow_result_event import WorkflowResultEvent
1819
from vellum.client.types.workflow_stream_event import WorkflowStreamEvent
20+
from vellum.workflows.constants import undefined
1921
from vellum.workflows.context import execution_context
2022
from vellum.workflows.errors import WorkflowErrorCode
2123
from vellum.workflows.exceptions import NodeException
@@ -493,3 +495,61 @@ class ExampleSubworkflowDeploymentNode(SubworkflowDeploymentNode):
493495
# AND the error should indicate the missing required input
494496
assert exc_info.value.code == WorkflowErrorCode.INVALID_INPUTS
495497
assert exc_info.value.message == "Missing required input for 'my_var_1'"
498+
499+
500+
def test_run_workflow__undefined_input_filtered(vellum_client):
501+
"""Confirm that undefined input values are filtered out when compiling subworkflow inputs."""
502+
503+
# GIVEN a Subworkflow Deployment Node with an undefined input value
504+
class ExampleSubworkflowDeploymentNode(SubworkflowDeploymentNode):
505+
deployment = "example_subworkflow_deployment"
506+
subworkflow_inputs = {
507+
"required_input": "hello",
508+
"optional_input": undefined,
509+
}
510+
511+
# AND we know what the Subworkflow Deployment will respond with
512+
def generate_subworkflow_events(*args: Any, **kwargs: Any) -> Iterator[WorkflowStreamEvent]:
513+
execution_id = str(uuid4())
514+
expected_events: List[WorkflowStreamEvent] = [
515+
WorkflowExecutionWorkflowResultEvent(
516+
execution_id=execution_id,
517+
data=WorkflowResultEvent(
518+
id=str(uuid4()),
519+
state="INITIATED",
520+
ts=datetime.now(),
521+
),
522+
),
523+
WorkflowExecutionWorkflowResultEvent(
524+
execution_id=execution_id,
525+
data=WorkflowResultEvent(
526+
id=str(uuid4()),
527+
state="FULFILLED",
528+
ts=datetime.now(),
529+
outputs=[
530+
WorkflowOutputString(
531+
id=str(uuid4()),
532+
name="result",
533+
value="success",
534+
)
535+
],
536+
),
537+
),
538+
]
539+
yield from expected_events
540+
541+
vellum_client.execute_workflow_stream.side_effect = generate_subworkflow_events
542+
543+
# WHEN we run the node
544+
node = ExampleSubworkflowDeploymentNode()
545+
events = list(node.run())
546+
547+
# THEN the node should have completed successfully
548+
assert events[-1].name == "result"
549+
assert events[-1].value == "success"
550+
551+
# AND the undefined input should be filtered out from the API call
552+
call_kwargs = vellum_client.execute_workflow_stream.call_args.kwargs
553+
assert call_kwargs["inputs"] == [
554+
WorkflowRequestStringInputRequest(name="required_input", value="hello"),
555+
]

0 commit comments

Comments
 (0)