Skip to content

Commit 0e78c27

Browse files
authored
fix: send actual EndTime in UpdateActionStatus to fix inflated durations (#7089)
Signed-off-by: Kevin Su <pingsutw@apache.org>
1 parent 3b803b9 commit 0e78c27

File tree

10 files changed

+401
-61
lines changed

10 files changed

+401
-61
lines changed

actions/k8s/client.go

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -458,7 +458,8 @@ func (c *ActionsClient) notifyRunService(ctx context.Context, taskAction *execut
458458
// On ADDED: create the action record in the DB (deduplicated via bloom filter).
459459
if eventType == watch.Added {
460460
actionKey := []byte(buildTaskActionName(update.ActionID))
461-
if c.recordedFilter != nil && c.recordedFilter.Contains(ctx, actionKey) {
461+
isDuplicate := c.recordedFilter != nil && c.recordedFilter.Contains(ctx, actionKey)
462+
if isDuplicate {
462463
logger.Debugf(ctx, "Skipping duplicate RecordAction for %s", update.ActionID.Name)
463464
} else {
464465
recordReq := &workflow.RecordActionRequest{
@@ -498,6 +499,25 @@ func (c *ActionsClient) notifyRunService(ctx context.Context, taskAction *execut
498499
c.recordedFilter.Add(ctx, actionKey)
499500
}
500501
}
502+
503+
// When a child action appears, the parent must already be running (it
504+
// created the child). Promote the parent to RUNNING so the UI doesn't
505+
// stay stuck on INITIALIZING while children are executing.
506+
if !isDuplicate && update.ParentActionName != "" {
507+
parentID := &common.ActionIdentifier{
508+
Run: update.ActionID.Run,
509+
Name: update.ParentActionName,
510+
}
511+
parentStatusReq := &workflow.UpdateActionStatusRequest{
512+
ActionId: parentID,
513+
Status: &workflow.ActionStatus{
514+
Phase: common.ActionPhase_ACTION_PHASE_RUNNING,
515+
},
516+
}
517+
if _, err := c.runClient.UpdateActionStatus(ctx, connect.NewRequest(parentStatusReq)); err != nil {
518+
logger.Warnf(ctx, "Failed to promote parent action %s to RUNNING: %v", update.ParentActionName, err)
519+
}
520+
}
501521
}
502522

503523
if update.Phase != common.ActionPhase_ACTION_PHASE_UNSPECIFIED {
@@ -507,7 +527,7 @@ func (c *ActionsClient) notifyRunService(ctx context.Context, taskAction *execut
507527
Phase: update.Phase,
508528
Attempts: taskAction.Status.Attempts,
509529
CacheStatus: taskAction.Status.CacheStatus,
510-
},
530+
},
511531
}
512532
if _, err := c.runClient.UpdateActionStatus(ctx, connect.NewRequest(statusReq)); err != nil {
513533
logger.Warnf(ctx, "Failed to update action status in run service for %s: %v", update.ActionID.Name, err)

actions/k8s/client_test.go

Lines changed: 237 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -262,3 +262,240 @@ func TestApplyRunSpecToTaskAction_ProjectsRuntimeSettings(t *testing.T) {
262262
assert.Equal(t, "platform", taskAction.Labels["team"])
263263
assert.Equal(t, "sdk", taskAction.Annotations["owner"])
264264
}
265+
266+
func TestNotifyRunService_ChildAddedPromotesParentToRunning(t *testing.T) {
267+
ctx := context.Background()
268+
269+
mockClient := runmocks.NewInternalRunServiceClient(t)
270+
c := &ActionsClient{
271+
runClient: mockClient,
272+
subscribers: make(map[string]map[chan *ActionUpdate]struct{}),
273+
}
274+
275+
runID := &common.RunIdentifier{
276+
Org: "org",
277+
Project: "proj",
278+
Domain: "dev",
279+
Name: "run1",
280+
}
281+
282+
ta := &executorv1.TaskAction{
283+
Spec: executorv1.TaskActionSpec{
284+
Org: runID.Org,
285+
Project: runID.Project,
286+
Domain: runID.Domain,
287+
RunName: runID.Name,
288+
ActionName: "child-1",
289+
},
290+
}
291+
update := &ActionUpdate{
292+
ActionID: &common.ActionIdentifier{
293+
Run: runID,
294+
Name: "child-1",
295+
},
296+
ParentActionName: "run1",
297+
}
298+
299+
// Expect RecordAction for the child
300+
mockClient.On("RecordAction", mock.Anything, mock.Anything).
301+
Return(&connect.Response[workflow.RecordActionResponse]{}, nil).Once()
302+
303+
// Expect UpdateActionStatus for the PARENT with RUNNING phase
304+
mockClient.On("UpdateActionStatus", mock.Anything, mock.MatchedBy(func(req *connect.Request[workflow.UpdateActionStatusRequest]) bool {
305+
return req.Msg.GetActionId().GetName() == "run1" &&
306+
req.Msg.GetStatus().GetPhase() == common.ActionPhase_ACTION_PHASE_RUNNING
307+
})).Return(&connect.Response[workflow.UpdateActionStatusResponse]{}, nil).Once()
308+
309+
c.notifyRunService(ctx, ta, update, watch.Added)
310+
311+
mockClient.AssertNumberOfCalls(t, "RecordAction", 1)
312+
mockClient.AssertNumberOfCalls(t, "UpdateActionStatus", 1)
313+
}
314+
315+
func TestNotifyRunService_SkipsTerminalAddedEventsOnlyWhenInBloomFilter(t *testing.T) {
316+
ctx := context.Background()
317+
318+
mockClient := runmocks.NewInternalRunServiceClient(t)
319+
filter, err := fastcheck.NewOppoBloomFilter(128, promutils.NewTestScope())
320+
require.NoError(t, err)
321+
322+
c := &ActionsClient{
323+
runClient: mockClient,
324+
recordedFilter: filter,
325+
subscribers: make(map[string]map[chan *ActionUpdate]struct{}),
326+
}
327+
328+
// Create a terminal TaskAction CRD (SUCCEEDED)
329+
ta := &executorv1.TaskAction{
330+
Spec: executorv1.TaskActionSpec{
331+
Org: "org",
332+
Project: "proj",
333+
Domain: "dev",
334+
RunName: "run1",
335+
ActionName: "completed-action",
336+
},
337+
Status: executorv1.TaskActionStatus{
338+
Conditions: []metav1.Condition{
339+
{Type: string(executorv1.ConditionTypeSucceeded), Status: "True"},
340+
},
341+
},
342+
}
343+
update := &ActionUpdate{
344+
ActionID: &common.ActionIdentifier{
345+
Run: &common.RunIdentifier{Org: "org", Project: "proj", Domain: "dev", Name: "run1"},
346+
Name: "completed-action",
347+
},
348+
Phase: common.ActionPhase_ACTION_PHASE_SUCCEEDED,
349+
}
350+
351+
// First ADDED event (cold start, not in bloom filter): should process normally
352+
mockClient.On("RecordAction", mock.Anything, mock.Anything).
353+
Return(&connect.Response[workflow.RecordActionResponse]{}, nil).Once()
354+
mockClient.On("UpdateActionStatus", mock.Anything, mock.Anything).
355+
Return(&connect.Response[workflow.UpdateActionStatusResponse]{}, nil)
356+
c.notifyRunService(ctx, ta, update, watch.Added)
357+
358+
mockClient.AssertNumberOfCalls(t, "RecordAction", 1)
359+
mockClient.AssertNumberOfCalls(t, "UpdateActionStatus", 1)
360+
361+
// Action should now be in the bloom filter
362+
actionKey := []byte(buildTaskActionName(update.ActionID))
363+
assert.True(t, filter.Contains(ctx, actionKey))
364+
365+
// Second ADDED event (reconnect, in bloom filter): should skip RecordAction
366+
// but still call UpdateActionStatus.
367+
c.notifyRunService(ctx, ta, update, watch.Added)
368+
mockClient.AssertNumberOfCalls(t, "RecordAction", 1) // no new RecordAction
369+
mockClient.AssertNumberOfCalls(t, "UpdateActionStatus", 2) // one more UpdateActionStatus
370+
}
371+
372+
func TestNotifyRunService_ProcessesNonTerminalAddedEvents(t *testing.T) {
373+
ctx := context.Background()
374+
375+
mockClient := runmocks.NewInternalRunServiceClient(t)
376+
filter, err := fastcheck.NewOppoBloomFilter(128, promutils.NewTestScope())
377+
require.NoError(t, err)
378+
379+
c := &ActionsClient{
380+
runClient: mockClient,
381+
recordedFilter: filter,
382+
subscribers: make(map[string]map[chan *ActionUpdate]struct{}),
383+
}
384+
385+
// Create a non-terminal TaskAction CRD (QUEUED)
386+
ta := &executorv1.TaskAction{
387+
Spec: executorv1.TaskActionSpec{
388+
Org: "org",
389+
Project: "proj",
390+
Domain: "dev",
391+
RunName: "run1",
392+
ActionName: "queued-action",
393+
},
394+
Status: executorv1.TaskActionStatus{
395+
Conditions: []metav1.Condition{
396+
{Type: string(executorv1.ConditionTypeProgressing), Status: "True", Reason: string(executorv1.ConditionReasonQueued)},
397+
},
398+
},
399+
}
400+
update := &ActionUpdate{
401+
ActionID: &common.ActionIdentifier{
402+
Run: &common.RunIdentifier{Org: "org", Project: "proj", Domain: "dev", Name: "run1"},
403+
Name: "queued-action",
404+
},
405+
Phase: common.ActionPhase_ACTION_PHASE_QUEUED,
406+
}
407+
408+
// Non-terminal ADDED events should be processed normally
409+
mockClient.On("RecordAction", mock.Anything, mock.Anything).
410+
Return(&connect.Response[workflow.RecordActionResponse]{}, nil).Once()
411+
mockClient.On("UpdateActionStatus", mock.Anything, mock.Anything).
412+
Return(&connect.Response[workflow.UpdateActionStatusResponse]{}, nil).Once()
413+
414+
c.notifyRunService(ctx, ta, update, watch.Added)
415+
416+
mockClient.AssertNumberOfCalls(t, "RecordAction", 1)
417+
mockClient.AssertNumberOfCalls(t, "UpdateActionStatus", 1)
418+
}
419+
420+
func TestNotifyRunService_DuplicateAddedSkipsRecordAction(t *testing.T) {
421+
ctx := context.Background()
422+
423+
mockClient := runmocks.NewInternalRunServiceClient(t)
424+
filter, err := fastcheck.NewOppoBloomFilter(128, promutils.NewTestScope())
425+
require.NoError(t, err)
426+
427+
c := &ActionsClient{
428+
runClient: mockClient,
429+
recordedFilter: filter,
430+
subscribers: make(map[string]map[chan *ActionUpdate]struct{}),
431+
}
432+
433+
ta, update := newTestActionUpdate("action-dup")
434+
update.Phase = common.ActionPhase_ACTION_PHASE_RUNNING
435+
436+
// First call — should process normally
437+
mockClient.On("RecordAction", mock.Anything, mock.Anything).
438+
Return(&connect.Response[workflow.RecordActionResponse]{}, nil).Once()
439+
mockClient.On("UpdateActionStatus", mock.Anything, mock.Anything).
440+
Return(&connect.Response[workflow.UpdateActionStatusResponse]{}, nil)
441+
c.notifyRunService(ctx, ta, update, watch.Added)
442+
443+
// Second call (duplicate ADDED) — should skip RecordAction but still call UpdateActionStatus
444+
c.notifyRunService(ctx, ta, update, watch.Added)
445+
446+
mockClient.AssertNumberOfCalls(t, "RecordAction", 1)
447+
mockClient.AssertNumberOfCalls(t, "UpdateActionStatus", 2)
448+
}
449+
450+
func TestNotifyRunService_TerminalDuplicateRepairsTimestamps(t *testing.T) {
451+
ctx := context.Background()
452+
453+
mockClient := runmocks.NewInternalRunServiceClient(t)
454+
filter, err := fastcheck.NewOppoBloomFilter(128, promutils.NewTestScope())
455+
require.NoError(t, err)
456+
457+
c := &ActionsClient{
458+
runClient: mockClient,
459+
recordedFilter: filter,
460+
subscribers: make(map[string]map[chan *ActionUpdate]struct{}),
461+
}
462+
463+
ta, update := newTestActionUpdate("action-terminal-dup")
464+
update.Phase = common.ActionPhase_ACTION_PHASE_SUCCEEDED
465+
466+
// First call — should process normally (RecordAction + UpdateActionStatus)
467+
mockClient.On("RecordAction", mock.Anything, mock.Anything).
468+
Return(&connect.Response[workflow.RecordActionResponse]{}, nil).Once()
469+
mockClient.On("UpdateActionStatus", mock.Anything, mock.Anything).
470+
Return(&connect.Response[workflow.UpdateActionStatusResponse]{}, nil).Times(2)
471+
c.notifyRunService(ctx, ta, update, watch.Added)
472+
473+
// Second call (terminal duplicate ADDED) — should skip RecordAction but
474+
// still call UpdateActionStatus to repair missing timestamps.
475+
c.notifyRunService(ctx, ta, update, watch.Added)
476+
477+
mockClient.AssertNumberOfCalls(t, "RecordAction", 1) // no new RecordAction
478+
mockClient.AssertNumberOfCalls(t, "UpdateActionStatus", 2) // one more UpdateActionStatus
479+
}
480+
481+
func TestNotifyRunService_RootActionAddedDoesNotPromoteParent(t *testing.T) {
482+
ctx := context.Background()
483+
484+
mockClient := runmocks.NewInternalRunServiceClient(t)
485+
c := &ActionsClient{
486+
runClient: mockClient,
487+
subscribers: make(map[string]map[chan *ActionUpdate]struct{}),
488+
}
489+
490+
// Root action has no parent
491+
ta, update := newTestActionUpdate("action-root")
492+
493+
mockClient.On("RecordAction", mock.Anything, mock.Anything).
494+
Return(&connect.Response[workflow.RecordActionResponse]{}, nil).Once()
495+
496+
c.notifyRunService(ctx, ta, update, watch.Added)
497+
498+
mockClient.AssertNumberOfCalls(t, "RecordAction", 1)
499+
// No UpdateActionStatus should be called for root (no parent to promote)
500+
mockClient.AssertNumberOfCalls(t, "UpdateActionStatus", 0)
501+
}

executor/pkg/controller/taskaction_controller.go

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -384,7 +384,6 @@ func (r *TaskActionReconciler) buildActionEvent(
384384

385385
info := phaseInfo.Info()
386386
updatedTime := updatedTimestamp(taskAction.Status.PhaseHistory)
387-
reportedTime := reportedTimestamp(info)
388387

389388
event := &workflow.ActionEvent{
390389
Id: actionID,
@@ -396,7 +395,7 @@ func (r *TaskActionReconciler) buildActionEvent(
396395
Cluster: r.cluster,
397396
Outputs: outputRefs(taskAction.Spec.RunOutputBase, taskAction.Spec.ActionName),
398397
ClusterEvents: toClusterEvents(info, updatedTime),
399-
ReportedTime: reportedTime,
398+
ReportedTime: timestamppb.New(time.Now()),
400399
}
401400

402401
if info != nil {
@@ -430,13 +429,6 @@ func updatedTimestamp(history []flyteorgv1.PhaseTransition) *timestamppb.Timesta
430429
return timestamppb.Now()
431430
}
432431

433-
func reportedTimestamp(info *pluginsCore.TaskInfo) *timestamppb.Timestamp {
434-
if info != nil && info.ReportedAt != nil {
435-
return timestamppb.New(*info.ReportedAt)
436-
}
437-
return timestamppb.Now()
438-
}
439-
440432
func outputRefs(runOutputBase, actionName string) *task.OutputReferences {
441433
if runOutputBase == "" {
442434
return nil
@@ -517,7 +509,7 @@ func cacheStatusFromExternalResources(resources []*pluginsCore.ExternalResource)
517509
}
518510

519511
// taskActionStatusChanged reports whether any status field has changed between old and new,
520-
// covering plugin phase, state, state version, observability JSON, and conditions.
512+
// covering plugin phase, state, state version, observability JSON, conditions, and phase history.
521513
func taskActionStatusChanged(oldStatus, newStatus flyteorgv1.TaskActionStatus) bool {
522514
if oldStatus.StateJSON != newStatus.StateJSON ||
523515
oldStatus.PluginStateVersion != newStatus.PluginStateVersion ||
@@ -532,6 +524,10 @@ func taskActionStatusChanged(oldStatus, newStatus flyteorgv1.TaskActionStatus) b
532524
return true
533525
}
534526

527+
if len(oldStatus.PhaseHistory) != len(newStatus.PhaseHistory) {
528+
return true
529+
}
530+
535531
return !reflect.DeepEqual(oldStatus.Conditions, newStatus.Conditions)
536532
}
537533

@@ -596,7 +592,7 @@ func mapPhaseToConditions(ta *flyteorgv1.TaskAction, info pluginsCore.PhaseInfo)
596592
flyteorgv1.ConditionReasonAborted, msg)
597593
}
598594

599-
// Append to PhaseHistory if this is a new phase (dedup by checking last entry)
595+
// Append to PhaseHistory if this is a new phase (dedup by checking last entry).
600596
if phaseName != "" {
601597
n := len(ta.Status.PhaseHistory)
602598
if n == 0 || ta.Status.PhaseHistory[n-1].Phase != phaseName {

0 commit comments

Comments
 (0)