Skip to content

Commit 575c0af

Browse files
authored
Fix: Correctly handle malformed dynamic workflows to avoid 'failed + succeeded + running' Schroedinger state (#6854)
1 parent d6df4be commit 575c0af

File tree

2 files changed

+205
-118
lines changed

2 files changed

+205
-118
lines changed

flytepropeller/pkg/controller/nodes/dynamic/handler.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,12 @@ func (d dynamicNodeTaskNodeHandler) Abort(ctx context.Context, nCtx interfaces.N
243243
if err != nil {
244244
if stdErrors.IsCausedBy(err, utils.ErrorCodeUser) {
245245
logger.Errorf(ctx, "failed to build dynamic workflow, user error: %s", err)
246+
// If the dynamic workflow cannot be built because of a user error, i.e. malformed dynamic workflow,
247+
// the error is deterministic and there are no nodes that need to be cleaned up as the dynamic worklow
248+
// was never able to launch sub nodes.
249+
// Returning an error on Abort would be treated as a transient system error that is retried as often as the
250+
// system retry budget allows, surfacing the underlying user error only later aftr the system retry budget is exhausted.
251+
return nil
246252
}
247253
return err
248254
}
@@ -277,7 +283,15 @@ func (d dynamicNodeTaskNodeHandler) Finalize(ctx context.Context, nCtx interface
277283
logger.Infof(ctx, "Finalizing dynamic workflow RetryAttempt [%d]", nCtx.CurrentAttempt())
278284
dCtx, err := d.buildContextualDynamicWorkflow(ctx, nCtx)
279285
if err != nil {
280-
errs = append(errs, err)
286+
if !stdErrors.IsCausedBy(err, utils.ErrorCodeUser) {
287+
// If the dynamic workflow cannot be built because of a user error, i.e. malformed dynamic workflow,
288+
// the error is deterministic and there are no nodes that need to be cleaned up as the dynamic worklow
289+
// was never able to launch sub nodes.
290+
// Including the user error in the errors returned by Finalize would cause a transient system error
291+
// that is retried as often as the system retry budget allows, surfacing the underlying user error only
292+
// after the system retry budget is exhausted.
293+
errs = append(errs, err)
294+
}
281295
} else {
282296
if dCtx.isDynamic {
283297
if err := d.nodeExecutor.FinalizeHandler(ctx, dCtx.execContext, dCtx.subWorkflow, dCtx.nodeLookup, dCtx.subWorkflow.StartNode()); err != nil {

flytepropeller/pkg/controller/nodes/dynamic/handler_test.go

Lines changed: 190 additions & 117 deletions
Original file line numberDiff line numberDiff line change
@@ -822,6 +822,119 @@ func createDynamicJobSpecWithLaunchPlans() *core.DynamicJobSpec {
822822
}
823823
}
824824

825+
func createNodeContext(t assert.TestingT, ttype string, finalOutput storage.DataReference) *nodeMocks.NodeExecutionContext {
826+
ctx := context.TODO()
827+
nodeID := "n1"
828+
wfExecID := &core.WorkflowExecutionIdentifier{
829+
Project: "project",
830+
Domain: "domain",
831+
Name: "name",
832+
}
833+
834+
nm := &nodeMocks.NodeExecutionMetadata{}
835+
nm.EXPECT().GetAnnotations().Return(map[string]string{})
836+
nm.EXPECT().GetNodeExecutionID().Return(&core.NodeExecutionIdentifier{
837+
ExecutionId: wfExecID,
838+
NodeId: nodeID,
839+
})
840+
nm.EXPECT().GetK8sServiceAccount().Return("service-account")
841+
nm.EXPECT().GetLabels().Return(map[string]string{})
842+
nm.EXPECT().GetNamespace().Return("namespace")
843+
nm.EXPECT().GetOwnerID().Return(types.NamespacedName{Namespace: "namespace", Name: "name"})
844+
nm.EXPECT().GetOwnerReference().Return(v1.OwnerReference{
845+
Kind: "sample",
846+
Name: "name",
847+
})
848+
849+
taskID := &core.Identifier{}
850+
tk := &core.TaskTemplate{
851+
Id: taskID,
852+
Type: "test",
853+
Metadata: &core.TaskMetadata{
854+
Discoverable: true,
855+
},
856+
Interface: &core.TypedInterface{
857+
Outputs: &core.VariableMap{
858+
Variables: map[string]*core.Variable{
859+
"x": {
860+
Type: &core.LiteralType{
861+
Type: &core.LiteralType_Simple{
862+
Simple: core.SimpleType_INTEGER,
863+
},
864+
},
865+
},
866+
},
867+
},
868+
},
869+
}
870+
tr := &nodeMocks.TaskReader{}
871+
tr.EXPECT().GetTaskID().Return(taskID)
872+
tr.EXPECT().GetTaskType().Return(ttype)
873+
tr.EXPECT().Read(ctx).Return(tk, nil)
874+
875+
n := &flyteMocks.ExecutableNode{}
876+
n.EXPECT().GetTaskID().Return(&tID)
877+
878+
dataStore, err := storage.NewDataStore(&storage.Config{Type: storage.TypeMemory}, promutils.NewTestScope())
879+
assert.NoError(t, err)
880+
881+
ir := &ioMocks.InputReader{}
882+
nCtx := &nodeMocks.NodeExecutionContext{}
883+
nCtx.EXPECT().NodeExecutionMetadata().Return(nm)
884+
nCtx.EXPECT().Node().Return(n)
885+
nCtx.EXPECT().InputReader().Return(ir)
886+
nCtx.EXPECT().CurrentAttempt().Return(uint32(1))
887+
nCtx.EXPECT().TaskReader().Return(tr)
888+
nCtx.EXPECT().NodeID().Return(nodeID)
889+
nCtx.EXPECT().EnqueueOwnerFunc().Return(func() error { return nil })
890+
nCtx.EXPECT().DataStore().Return(dataStore)
891+
execContext := executorMocks.ExecutionContext{}
892+
execContext.EXPECT().GetEventVersion().Return(v1alpha1.EventVersion0)
893+
execContext.EXPECT().GetParentInfo().Return(nil)
894+
nCtx.EXPECT().ExecutionContext().Return(&execContext)
895+
896+
endNodeStatus := &flyteMocks.ExecutableNodeStatus{}
897+
endNodeStatus.EXPECT().GetDataDir().Return("end-node")
898+
endNodeStatus.EXPECT().GetOutputDir().Return("end-node")
899+
900+
subNs := &flyteMocks.ExecutableNodeStatus{}
901+
subNs.On("SetDataDir", mock.Anything).Return()
902+
subNs.On("SetOutputDir", mock.Anything).Return()
903+
subNs.On("ResetDirty").Return()
904+
subNs.EXPECT().GetOutputDir().Return(finalOutput)
905+
subNs.On("SetParentTaskID", mock.Anything).Return()
906+
subNs.On("SetParentNodeID", mock.Anything).Return()
907+
subNs.EXPECT().GetAttempts().Return(0)
908+
909+
dynamicNS := &flyteMocks.ExecutableNodeStatus{}
910+
dynamicNS.On("SetDataDir", mock.Anything).Return()
911+
dynamicNS.On("SetOutputDir", mock.Anything).Return()
912+
dynamicNS.On("SetParentTaskID", mock.Anything).Return()
913+
dynamicNS.On("SetParentNodeID", mock.Anything).Return()
914+
dynamicNS.EXPECT().GetNodeExecutionStatus(ctx, "n1-1-Node_1").Return(subNs)
915+
dynamicNS.EXPECT().GetNodeExecutionStatus(ctx, "n1-1-Node_2").Return(subNs)
916+
dynamicNS.EXPECT().GetNodeExecutionStatus(ctx, "n1-1-Node_3").Return(subNs)
917+
dynamicNS.EXPECT().GetNodeExecutionStatus(ctx, v1alpha1.EndNodeID).Return(endNodeStatus)
918+
919+
ns := &flyteMocks.ExecutableNodeStatus{}
920+
ns.EXPECT().GetDataDir().Return("data-dir")
921+
ns.EXPECT().GetOutputDir().Return("output-dir")
922+
ns.EXPECT().GetNodeExecutionStatus(ctx, dynamicNodeID).Return(dynamicNS)
923+
nCtx.EXPECT().NodeStatus().Return(ns)
924+
925+
w := &flyteMocks.ExecutableWorkflow{}
926+
ws := &flyteMocks.ExecutableWorkflowStatus{}
927+
ws.EXPECT().GetNodeExecutionStatus(ctx, nodeID).Return(ns)
928+
w.EXPECT().GetExecutionStatus().Return(ws)
929+
930+
r := &nodeMocks.NodeStateReader{}
931+
r.EXPECT().GetDynamicNodeState().Return(handler.DynamicNodeState{
932+
Phase: v1alpha1.DynamicNodePhaseExecuting,
933+
})
934+
nCtx.EXPECT().NodeStateReader().Return(r)
935+
return nCtx
936+
}
937+
825938
func TestDynamicNodeTaskNodeHandler_Finalize(t *testing.T) {
826939
ctx := context.TODO()
827940

@@ -845,122 +958,8 @@ func TestDynamicNodeTaskNodeHandler_Finalize(t *testing.T) {
845958
assert.NotZero(t, len(h.ExpectedCalls))
846959
assert.Equal(t, "Finalize", h.ExpectedCalls[0].Method)
847960
})
848-
849-
createNodeContext := func(ttype string, finalOutput storage.DataReference) *nodeMocks.NodeExecutionContext {
850-
ctx := context.TODO()
851-
nodeID := "n1"
852-
wfExecID := &core.WorkflowExecutionIdentifier{
853-
Project: "project",
854-
Domain: "domain",
855-
Name: "name",
856-
}
857-
858-
nm := &nodeMocks.NodeExecutionMetadata{}
859-
nm.EXPECT().GetAnnotations().Return(map[string]string{})
860-
nm.EXPECT().GetNodeExecutionID().Return(&core.NodeExecutionIdentifier{
861-
ExecutionId: wfExecID,
862-
NodeId: nodeID,
863-
})
864-
nm.EXPECT().GetK8sServiceAccount().Return("service-account")
865-
nm.EXPECT().GetLabels().Return(map[string]string{})
866-
nm.EXPECT().GetNamespace().Return("namespace")
867-
nm.EXPECT().GetOwnerID().Return(types.NamespacedName{Namespace: "namespace", Name: "name"})
868-
nm.EXPECT().GetOwnerReference().Return(v1.OwnerReference{
869-
Kind: "sample",
870-
Name: "name",
871-
})
872-
873-
taskID := &core.Identifier{}
874-
tk := &core.TaskTemplate{
875-
Id: taskID,
876-
Type: "test",
877-
Metadata: &core.TaskMetadata{
878-
Discoverable: true,
879-
},
880-
Interface: &core.TypedInterface{
881-
Outputs: &core.VariableMap{
882-
Variables: map[string]*core.Variable{
883-
"x": {
884-
Type: &core.LiteralType{
885-
Type: &core.LiteralType_Simple{
886-
Simple: core.SimpleType_INTEGER,
887-
},
888-
},
889-
},
890-
},
891-
},
892-
},
893-
}
894-
tr := &nodeMocks.TaskReader{}
895-
tr.EXPECT().GetTaskID().Return(taskID)
896-
tr.EXPECT().GetTaskType().Return(ttype)
897-
tr.EXPECT().Read(ctx).Return(tk, nil)
898-
899-
n := &flyteMocks.ExecutableNode{}
900-
n.EXPECT().GetTaskID().Return(&tID)
901-
902-
dataStore, err := storage.NewDataStore(&storage.Config{Type: storage.TypeMemory}, promutils.NewTestScope())
903-
assert.NoError(t, err)
904-
905-
ir := &ioMocks.InputReader{}
906-
nCtx := &nodeMocks.NodeExecutionContext{}
907-
nCtx.EXPECT().NodeExecutionMetadata().Return(nm)
908-
nCtx.EXPECT().Node().Return(n)
909-
nCtx.EXPECT().InputReader().Return(ir)
910-
nCtx.EXPECT().CurrentAttempt().Return(uint32(1))
911-
nCtx.EXPECT().TaskReader().Return(tr)
912-
nCtx.EXPECT().NodeID().Return(nodeID)
913-
nCtx.EXPECT().EnqueueOwnerFunc().Return(func() error { return nil })
914-
nCtx.EXPECT().DataStore().Return(dataStore)
915-
execContext := executorMocks.ExecutionContext{}
916-
execContext.EXPECT().GetEventVersion().Return(v1alpha1.EventVersion0)
917-
execContext.EXPECT().GetParentInfo().Return(nil)
918-
nCtx.EXPECT().ExecutionContext().Return(&execContext)
919-
920-
endNodeStatus := &flyteMocks.ExecutableNodeStatus{}
921-
endNodeStatus.EXPECT().GetDataDir().Return("end-node")
922-
endNodeStatus.EXPECT().GetOutputDir().Return("end-node")
923-
924-
subNs := &flyteMocks.ExecutableNodeStatus{}
925-
subNs.On("SetDataDir", mock.Anything).Return()
926-
subNs.On("SetOutputDir", mock.Anything).Return()
927-
subNs.On("ResetDirty").Return()
928-
subNs.EXPECT().GetOutputDir().Return(finalOutput)
929-
subNs.On("SetParentTaskID", mock.Anything).Return()
930-
subNs.On("SetParentNodeID", mock.Anything).Return()
931-
subNs.EXPECT().GetAttempts().Return(0)
932-
933-
dynamicNS := &flyteMocks.ExecutableNodeStatus{}
934-
dynamicNS.On("SetDataDir", mock.Anything).Return()
935-
dynamicNS.On("SetOutputDir", mock.Anything).Return()
936-
dynamicNS.On("SetParentTaskID", mock.Anything).Return()
937-
dynamicNS.On("SetParentNodeID", mock.Anything).Return()
938-
dynamicNS.EXPECT().GetNodeExecutionStatus(ctx, "n1-1-Node_1").Return(subNs)
939-
dynamicNS.EXPECT().GetNodeExecutionStatus(ctx, "n1-1-Node_2").Return(subNs)
940-
dynamicNS.EXPECT().GetNodeExecutionStatus(ctx, "n1-1-Node_3").Return(subNs)
941-
dynamicNS.EXPECT().GetNodeExecutionStatus(ctx, v1alpha1.EndNodeID).Return(endNodeStatus)
942-
943-
ns := &flyteMocks.ExecutableNodeStatus{}
944-
ns.EXPECT().GetDataDir().Return("data-dir")
945-
ns.EXPECT().GetOutputDir().Return("output-dir")
946-
ns.EXPECT().GetNodeExecutionStatus(ctx, dynamicNodeID).Return(dynamicNS)
947-
nCtx.EXPECT().NodeStatus().Return(ns)
948-
949-
w := &flyteMocks.ExecutableWorkflow{}
950-
ws := &flyteMocks.ExecutableWorkflowStatus{}
951-
ws.EXPECT().GetNodeExecutionStatus(ctx, nodeID).Return(ns)
952-
w.EXPECT().GetExecutionStatus().Return(ws)
953-
954-
r := &nodeMocks.NodeStateReader{}
955-
r.EXPECT().GetDynamicNodeState().Return(handler.DynamicNodeState{
956-
Phase: v1alpha1.DynamicNodePhaseExecuting,
957-
})
958-
nCtx.EXPECT().NodeStateReader().Return(r)
959-
return nCtx
960-
}
961961
t.Run("dynamicnodephase-executing", func(t *testing.T) {
962-
963-
nCtx := createNodeContext("test", "x")
962+
nCtx := createNodeContext(t, "test", "x")
964963
f, err := nCtx.DataStore().ConstructReference(context.TODO(), nCtx.NodeStatus().GetOutputDir(), "futures.pb")
965964
assert.NoError(t, err)
966965
dj := createDynamicJobSpec()
@@ -981,7 +980,7 @@ func TestDynamicNodeTaskNodeHandler_Finalize(t *testing.T) {
981980

982981
t.Run("dynamicnodephase-executing-parenterror", func(t *testing.T) {
983982

984-
nCtx := createNodeContext("test", "x")
983+
nCtx := createNodeContext(t, "test", "x")
985984
f, err := nCtx.DataStore().ConstructReference(context.TODO(), nCtx.NodeStatus().GetOutputDir(), "futures.pb")
986985
assert.NoError(t, err)
987986
dj := createDynamicJobSpec()
@@ -1002,7 +1001,7 @@ func TestDynamicNodeTaskNodeHandler_Finalize(t *testing.T) {
10021001

10031002
t.Run("dynamicnodephase-executing-childerror", func(t *testing.T) {
10041003

1005-
nCtx := createNodeContext("test", "x")
1004+
nCtx := createNodeContext(t, "test", "x")
10061005
f, err := nCtx.DataStore().ConstructReference(context.TODO(), nCtx.NodeStatus().GetOutputDir(), "futures.pb")
10071006
assert.NoError(t, err)
10081007
dj := createDynamicJobSpec()
@@ -1020,6 +1019,80 @@ func TestDynamicNodeTaskNodeHandler_Finalize(t *testing.T) {
10201019
assert.NotZero(t, len(n.ExpectedCalls))
10211020
assert.Equal(t, "FinalizeHandler", n.ExpectedCalls[0].Method)
10221021
})
1022+
t.Run("dynamicnodephase-failing-malformed-dynamic-workflow", func(t *testing.T) {
1023+
1024+
nCtx := createNodeContext(t, "test-finalize", "y")
1025+
f, err := nCtx.DataStore().ConstructReference(context.TODO(), nCtx.NodeStatus().GetOutputDir(), "futures.pb")
1026+
assert.NoError(t, err)
1027+
1028+
malformedDynJobSpec := &core.DynamicJobSpec{
1029+
Tasks: []*core.TaskTemplate{}, // No tasks defined
1030+
Nodes: []*core.Node{
1031+
{
1032+
Id: "Node_1",
1033+
Target: &core.Node_TaskNode{
1034+
TaskNode: &core.TaskNode{
1035+
Reference: &core.TaskNode_ReferenceId{
1036+
ReferenceId: &core.Identifier{Name: "non_existent_task"}, // References task that doesn't exist
1037+
},
1038+
},
1039+
},
1040+
},
1041+
},
1042+
}
1043+
1044+
assert.NoError(t, nCtx.DataStore().WriteProtobuf(context.TODO(), f, storage.Options{}, malformedDynJobSpec))
1045+
1046+
mockLPLauncher := &lpMocks.Reader{}
1047+
h := &mocks.TaskNodeHandler{}
1048+
h.EXPECT().Finalize(ctx, nCtx).Return(nil)
1049+
n := &nodeMocks.Node{}
1050+
n.EXPECT().FinalizeHandler(ctx, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(fmt.Errorf("err"))
1051+
d := New(h, n, mockLPLauncher, eventConfig, promutils.NewTestScope())
1052+
1053+
// buildContextualDynamicWorkflow returning a user error (malformed dynamic workflow) should not lead
1054+
// to failure of Finalize
1055+
assert.NoError(t, d.Finalize(ctx, nCtx))
1056+
})
1057+
}
1058+
1059+
func TestDynamicNodeTaskNodeHandler_Abort(t *testing.T) {
1060+
t.Run("dynamicnodephase-failing-malformed-dynamic-workflow", func(t *testing.T) {
1061+
ctx := context.TODO()
1062+
1063+
nCtx := createNodeContext(t, "test-abort", "y")
1064+
f, err := nCtx.DataStore().ConstructReference(context.TODO(), nCtx.NodeStatus().GetOutputDir(), "futures.pb")
1065+
assert.NoError(t, err)
1066+
1067+
malformedDynJobSpec := &core.DynamicJobSpec{
1068+
Tasks: []*core.TaskTemplate{}, // No tasks defined
1069+
Nodes: []*core.Node{
1070+
{
1071+
Id: "Node_1",
1072+
Target: &core.Node_TaskNode{
1073+
TaskNode: &core.TaskNode{
1074+
Reference: &core.TaskNode_ReferenceId{
1075+
ReferenceId: &core.Identifier{Name: "non_existent_task"}, // References task that doesn't exist
1076+
},
1077+
},
1078+
},
1079+
},
1080+
},
1081+
}
1082+
1083+
assert.NoError(t, nCtx.DataStore().WriteProtobuf(context.TODO(), f, storage.Options{}, malformedDynJobSpec))
1084+
1085+
mockLPLauncher := &lpMocks.Reader{}
1086+
h := &mocks.TaskNodeHandler{}
1087+
h.EXPECT().Abort(ctx, nCtx, "reason").Return(nil)
1088+
n := &nodeMocks.Node{}
1089+
n.EXPECT().AbortHandler(ctx, mock.Anything, mock.Anything, mock.Anything, mock.Anything, "reason").Return(fmt.Errorf("err"))
1090+
d := New(h, n, mockLPLauncher, eventConfig, promutils.NewTestScope())
1091+
1092+
// buildContextualDynamicWorkflow returning a user error (malformed dynamic workflow) should not lead
1093+
// to failure of Abort
1094+
assert.NoError(t, d.Abort(ctx, nCtx, "reason"))
1095+
})
10231096
}
10241097

10251098
func init() {

0 commit comments

Comments
 (0)