Skip to content

Commit 66d4aed

Browse files
authored
Fix array node empty task executions when using remote (flyteorg#3136)
* fix array node empty task executions Signed-off-by: Troy Chiu <y.troychiu@gmail.com> * lint Signed-off-by: Troy Chiu <y.troychiu@gmail.com> * nit Signed-off-by: Troy Chiu <y.troychiu@gmail.com> --------- Signed-off-by: Troy Chiu <y.troychiu@gmail.com>
1 parent 219996e commit 66d4aed

File tree

2 files changed

+24
-14
lines changed

2 files changed

+24
-14
lines changed

flytekit/remote/remote.py

Lines changed: 23 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2552,7 +2552,7 @@ def sync_node_execution(
25522552
return execution
25532553

25542554
# If a node ran a static subworkflow or a dynamic subworkflow then the parent flag will be set.
2555-
if execution.metadata.is_parent_node or execution.metadata.is_array:
2555+
if execution.metadata.is_parent_node:
25562556
# We'll need to query child node executions regardless since this is a parent node
25572557
child_node_executions = iterate_node_executions(
25582558
self.client,
@@ -2606,23 +2606,32 @@ def sync_node_execution(
26062606
"not have inputs and outputs filled in"
26072607
)
26082608
return execution
2609-
elif execution._node.array_node is not None:
2610-
# if there's a task node underneath the array node, let's fetch the interface for it
2611-
if execution._node.array_node.node.task_node is not None:
2612-
tid = execution._node.array_node.node.task_node.reference_id
2613-
t = self.fetch_task(tid.project, tid.domain, tid.name, tid.version)
2614-
if t.interface:
2615-
execution._interface = t.interface
2616-
else:
2617-
logger.error(f"Fetched map task does not have an interface, skipping i/o {t}")
2618-
return execution
2619-
else:
2620-
logger.error(f"Array node not over task, skipping i/o {t}")
2621-
return execution
26222609
else:
26232610
logger.error(f"NE {execution} undeterminable, {type(execution._node)}, {execution._node}")
26242611
raise ValueError(f"Node execution undeterminable, entity has type {type(execution._node)}")
26252612

2613+
# Handle the case for array nodes
2614+
elif execution.metadata.is_array:
2615+
if execution._node.array_node is None:
2616+
logger.error("Array node not found")
2617+
return execution
2618+
# if there's a task node underneath the array node, let's fetch the interface for it
2619+
if execution._node.array_node.node.task_node is not None:
2620+
tid = execution._node.array_node.node.task_node.reference_id
2621+
t = self.fetch_task(tid.project, tid.domain, tid.name, tid.version)
2622+
execution._task_executions = [
2623+
self.sync_task_execution(FlyteTaskExecution.promote_from_model(task_execution), t)
2624+
for task_execution in iterate_task_executions(self.client, execution.id)
2625+
]
2626+
if t.interface:
2627+
execution._interface = t.interface
2628+
else:
2629+
logger.error(f"Fetched map task does not have an interface, skipping i/o {t}")
2630+
return execution
2631+
else:
2632+
logger.error("Array node not over task, skipping i/o")
2633+
return execution
2634+
26262635
# Handle the case for gate nodes
26272636
elif execution._node.gate_node is not None:
26282637
logger.info("Skipping gate node execution for now - gate nodes don't have inputs and outputs filled in")

tests/flytekit/integration/remote/test_remote.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -614,6 +614,7 @@ def test_execute_workflow_with_maptask(register):
614614
wait=True,
615615
)
616616
assert execution.outputs["o0"] == [4, 5, 6]
617+
assert len(execution.node_executions["n0"].task_executions) == 1
617618

618619
def test_executes_nested_workflow_dictating_interruptible(register):
619620
remote = FlyteRemote(Config.auto(config_file=CONFIG), PROJECT, DOMAIN)

0 commit comments

Comments
 (0)