Skip to content

Commit cabe8c7

Browse files
fix: dedupe upgrade actions from fleetgateway actions, handle correctly the expiration of retried stored actions, and update upgrade details on retries
1 parent 85f2944 commit cabe8c7

File tree

2 files changed

+174
-20
lines changed

2 files changed

+174
-20
lines changed

internal/pkg/agent/application/dispatcher/dispatcher.go

Lines changed: 52 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -123,8 +123,8 @@ func (ad *ActionDispatcher) Dispatch(ctx context.Context, detailsSetter details.
123123

124124
upgradeDetailsNeedUpdate := false
125125

126-
// remove any upgrade actions from the queue if there is an upgrade action in the actions
127-
ad.removeQueuedUpgrades(actions, &upgradeDetailsNeedUpdate)
126+
// remove duplicate upgrade actions from fleetgateway actions and remove all upgrade actions in the queue
127+
actions = ad.compactAndRemoveQueuedUpgrades(actions, &upgradeDetailsNeedUpdate)
128128

129129
// add any scheduled actions to the queue (we don't check the start time here, as we will check it later)
130130
// and remove them from the passed actions
@@ -137,6 +137,7 @@ func (ad *ActionDispatcher) Dispatch(ctx context.Context, detailsSetter details.
137137
actions = ad.mergeWithQueuedActions(now, actions, &upgradeDetailsNeedUpdate)
138138

139139
if upgradeDetailsNeedUpdate {
140+
upgradeDetailsNeedUpdate = false
140141
ad.updateUpgradeDetails(now, detailsSetter)
141142
}
142143

@@ -166,7 +167,7 @@ func (ad *ActionDispatcher) Dispatch(ctx context.Context, detailsSetter details.
166167
rAction, ok := action.(fleetapi.RetryableAction)
167168
if ok {
168169
rAction.SetError(err) // set the retryable action error to what the dispatcher returned
169-
ad.scheduleRetry(ctx, rAction, acker)
170+
ad.scheduleRetry(ctx, rAction, acker, &upgradeDetailsNeedUpdate)
170171
continue
171172
}
172173
ad.log.Errorf("Failed to dispatch action id %q of type %q, error: %+v", action.ID(), action.Type(), err)
@@ -180,6 +181,10 @@ func (ad *ActionDispatcher) Dispatch(ctx context.Context, detailsSetter details.
180181
reportedErr = err
181182
}
182183

184+
if upgradeDetailsNeedUpdate {
185+
ad.updateUpgradeDetails(now, detailsSetter)
186+
}
187+
183188
if len(actions) > 0 {
184189
ad.errCh <- reportedErr
185190
}
@@ -285,7 +290,17 @@ func (ad *ActionDispatcher) mergeWithQueuedActions(ts time.Time, actions []fleet
285290
dequeuedActions := ad.queue.DequeueActions()
286291

287292
for _, action := range dequeuedActions {
288-
exp, _ := action.Expiration()
293+
exp, err := action.Expiration()
294+
if err != nil {
295+
if !errors.Is(err, fleetapi.ErrNoExpiration) {
296+
// this is not a non-expiring scheduled action, e.g. there is a malformed expiration time set
297+
ad.log.Warnf("failed to get expiration time for scheduled action [id = %s]: %v", action.ID(), err)
298+
continue
299+
}
300+
// this is a non-expiring scheduled action
301+
actions = append(actions, action)
302+
continue
303+
}
289304
if ts.After(exp) {
290305
if action.Type() == fleetapi.ActionTypeUpgrade {
291306
// this is an expired upgrade action thus we need to recalculate the upgrade details
@@ -308,7 +323,7 @@ func (ad *ActionDispatcher) mergeWithQueuedActions(ts time.Time, actions []fleet
308323
// Put expired upgrade actions back onto the queue to persist them across restarts.
309324
// These are handled the same as non-expired upgrade scheduled actions and are only removed when:
310325
// 1) a cancel action is received (look at dispatchCancelActions func) or
311-
// 2) a newer upgrade action (scheduled or not) removes them. (look at removeQueuedUpgrades func)
326+
// 2) a newer upgrade action (scheduled or not) removes them. (look at compactAndRemoveQueuedUpgrades func)
312327
for _, expiredUpgradeAction := range expiredUpgradeActions {
313328
startTime, err := expiredUpgradeAction.StartTime()
314329
if err != nil {
@@ -320,31 +335,41 @@ func (ad *ActionDispatcher) mergeWithQueuedActions(ts time.Time, actions []fleet
320335
}
321336

322337
// if an upgrade action is included in the immediate dispatchable actions
323-
// mark upgradeDetailsNeedUpdate as false since the upgrade details will be set by it
338+
// mark upgradeDetailsNeedUpdate as true
324339
if slices.ContainsFunc(actions, func(action fleetapi.Action) bool {
325340
return action.Type() == fleetapi.ActionTypeUpgrade
326341
}) {
327-
*upgradeDetailsNeedUpdate = false
342+
*upgradeDetailsNeedUpdate = true
328343
}
329344

330345
return actions
331346
}
332347

333-
// removeQueuedUpgrades will scan the passed actions and if there is an upgrade action it will remove all upgrade actions
334-
// in the queue but not alter the passed list. This is done to try to only have the most recent upgrade action executed.
335-
// However, it does not eliminate duplicates in retrieved directly from the gateway. Also, if upgrade actions are removed
336-
// from the queue, upgradeDetailsNeedUpdate will be set to true.
337-
func (ad *ActionDispatcher) removeQueuedUpgrades(actions []fleetapi.Action, upgradeDetailsNeedUpdate *bool) {
338-
for _, action := range actions {
348+
// compactAndRemoveQueuedUpgrades deduplicates *upgrade* actions from the given fleetgateway actions by keeping only the
349+
// first encountered upgrade in input order (dropping any subsequent upgrades from the returned slice). If an upgrade
350+
// action is found, it also removes any queued upgrade actions from the queue. When queued upgrade actions are removed,
351+
// upgradeDetailsNeedUpdate is set to true.
352+
func (ad *ActionDispatcher) compactAndRemoveQueuedUpgrades(input []fleetapi.Action, upgradeDetailsNeedUpdate *bool) []fleetapi.Action {
353+
var actions []fleetapi.Action
354+
var upgradeAction fleetapi.Action
355+
for _, action := range input {
339356
if action.Type() == fleetapi.ActionTypeUpgrade {
357+
if upgradeAction == nil {
358+
upgradeAction = action
359+
} else {
360+
ad.log.Warnf("Found extra upgrade action in fleetgateway actions [id = %s]", action.ID())
361+
continue
362+
}
340363
if n := ad.queue.CancelType(fleetapi.ActionTypeUpgrade); n > 0 {
341364
ad.log.Debugw("New upgrade action retrieved from gateway, removing queued upgrade actions", "actions_found", n)
342365
// upgrade action(s) got removed from the queue so upgrade actions changed
343366
*upgradeDetailsNeedUpdate = true
344367
}
345-
return
346368
}
369+
actions = append(actions, action)
347370
}
371+
372+
return actions
348373
}
349374

350375
// updateUpgradeDetails will construct the upgrade details based queue actions (assuming expired ones are still in the queue)
@@ -364,9 +389,11 @@ func (ad *ActionDispatcher) updateUpgradeDetails(ts time.Time, detailsSetter det
364389
}
365390

366391
// scheduleRetry will schedule a retry for the passed action. Note that this adjusts the start time of the action
367-
// but doesn't affect expiration time.
368-
func (ad *ActionDispatcher) scheduleRetry(ctx context.Context, action fleetapi.RetryableAction, acker acker.Acker) {
392+
// but doesn't affect expiration time. If the action is scheduled to be retried and it is an upgrade action,
393+
// upgradeDetailsNeedUpdate will be set to true.
394+
func (ad *ActionDispatcher) scheduleRetry(ctx context.Context, action fleetapi.RetryableAction, acker acker.Acker, upgradeDetailsNeedUpdate *bool) {
369395
attempt := action.RetryAttempt()
396+
ad.log.Warnf("Re-scheduling action id %q of type %q, because it failed to dispatch with error: %+v", action.ID(), action.Type(), action.GetError())
370397
d, err := ad.rt.GetWait(attempt)
371398
if err != nil {
372399
ad.log.Errorf("No more retries for action id %s: %v", action.ID(), err)
@@ -390,6 +417,11 @@ func (ad *ActionDispatcher) scheduleRetry(ctx context.Context, action fleetapi.R
390417
if err != nil {
391418
ad.log.Errorf("retry action id %s attempt %d failed to persist action_queue: %v", action.ID(), attempt, err)
392419
}
420+
421+
if action.Type() == fleetapi.ActionTypeUpgrade {
422+
*upgradeDetailsNeedUpdate = true
423+
}
424+
393425
if err := acker.Ack(ctx, action); err != nil {
394426
ad.log.Errorf("Unable to ack action retry (id %s) to fleet-server: %v", action.ID(), err)
395427
return
@@ -458,5 +490,9 @@ func GetScheduledUpgradeDetails(log *logger.Logger, actions []fleetapi.Scheduled
458490
details.StateScheduled,
459491
nextUpgradeActionID)
460492
upgradeDetails.Metadata.ScheduledAt = &nextUpgradeStartTime
493+
// scheduled upgrade actions can have errors if retried
494+
if err := nextUpgradeAction.GetError(); err != nil {
495+
upgradeDetails.Metadata.ErrorMsg = fmt.Sprintf("A prior dispatch attempt failed with: %v", err)
496+
}
461497
return upgradeDetails
462498
}

internal/pkg/agent/application/dispatcher/dispatcher_test.go

Lines changed: 122 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -297,7 +297,7 @@ func TestActionDispatcher(t *testing.T) {
297297

298298
action1 := &mockScheduledAction{}
299299
action1.On("StartTime").Return(time.Time{}, fleetapi.ErrNoStartTime)
300-
action1.On("Expiration").Return(time.Now().Add(time.Hour), fleetapi.ErrNoStartTime)
300+
action1.On("Expiration").Return(time.Now().Add(time.Hour), fleetapi.ErrNoExpiration)
301301
action1.On("Type").Return(fleetapi.ActionTypeCancel)
302302
action1.On("ID").Return("id")
303303

@@ -360,8 +360,9 @@ func TestActionDispatcher(t *testing.T) {
360360
})
361361

362362
t.Run("Dispatch of a retryable action returns an error", func(t *testing.T) {
363+
testError := errors.New("test error")
363364
def := &mockHandler{}
364-
def.On("Handle", mock.Anything, mock.Anything, mock.Anything).Return(errors.New("test error")).Once()
365+
def.On("Handle", mock.Anything, mock.Anything, mock.Anything).Return(testError).Once()
365366

366367
saver := &mockSaver{}
367368
saver.On("Save").Return(nil).Times(2)
@@ -382,6 +383,7 @@ func TestActionDispatcher(t *testing.T) {
382383
action.On("RetryAttempt").Return(0).Once()
383384
action.On("SetRetryAttempt", 1).Once()
384385
action.On("SetStartTime", mock.Anything).Once()
386+
action.On("GetError").Return(testError).Once()
385387

386388
dispatchCtx, cancelFn := context.WithCancel(context.Background())
387389
defer cancelFn()
@@ -685,6 +687,112 @@ func TestActionDispatcher(t *testing.T) {
685687
}
686688
require.NoError(t, shouldNotSetDetails, "set upgrade details should not be called")
687689
})
690+
691+
t.Run("check that failed upgrade actions do not block newer ones from dispatching", func(t *testing.T) {
692+
// we start by dispatching a scheduled upgrade action
693+
ctx := t.Context()
694+
handlerErr := errors.New("text handler error")
695+
696+
def := &mockHandler{}
697+
def.On("Handle",
698+
mock.Anything, mock.Anything, mock.Anything).
699+
Return(handlerErr).Once()
700+
701+
saver := &mockSaver{}
702+
saver.On("Save").Return(nil).Times(4)
703+
saver.On("SetQueue", mock.Anything).Times(4)
704+
actionQueue, err := queue.NewActionQueue([]fleetapi.ScheduledAction{}, saver)
705+
require.NoError(t, err)
706+
707+
d, err := New(nil, t.TempDir(), def, actionQueue)
708+
require.NoError(t, err)
709+
710+
err = d.Register(&fleetapi.ActionUpgrade{}, def)
711+
require.NoError(t, err)
712+
713+
var gotDetails *details.Details
714+
detailsSetter := func(upgradeDetails *details.Details) {
715+
gotDetails = upgradeDetails
716+
}
717+
718+
initialScheduledUpgradeAction := &fleetapi.ActionUpgrade{
719+
ActionID: "scheduled-action-id",
720+
ActionType: fleetapi.ActionTypeUpgrade,
721+
ActionStartTime: time.Now().Add(1 * time.Hour).Format(time.RFC3339),
722+
Data: fleetapi.ActionUpgradeData{
723+
Version: "9.3.0",
724+
SourceURI: "https://test-uri.test.com",
725+
},
726+
}
727+
728+
dispatchDone := make(chan struct{})
729+
go func() {
730+
d.Dispatch(ctx, detailsSetter, ack, initialScheduledUpgradeAction)
731+
close(dispatchDone)
732+
}()
733+
select {
734+
case err := <-d.Errors():
735+
t.Fatalf("Unexpected error from Dispatch: %v", err)
736+
case <-dispatchDone:
737+
}
738+
739+
// make sure that the upgrade details reported are matching our expectations
740+
require.NotNilf(t, gotDetails, "upgrade details should not be nil")
741+
assert.Equal(t, initialScheduledUpgradeAction.ActionID, gotDetails.ActionID)
742+
assert.Equal(t, details.StateScheduled, gotDetails.State)
743+
assert.Equal(t, initialScheduledUpgradeAction.Data.Version, gotDetails.TargetVersion)
744+
assert.Empty(t, gotDetails.Metadata.ErrorMsg)
745+
746+
// affect directly the queue to get the dispatcher to actually dispatch our action
747+
removedItems := actionQueue.Cancel(initialScheduledUpgradeAction.ActionID)
748+
require.Equal(t, 1, removedItems)
749+
actionNewStartTime := time.Now().Add(-5 * time.Minute).UTC()
750+
initialScheduledUpgradeAction.ActionStartTime = actionNewStartTime.Format(time.RFC3339)
751+
actionQueue.Add(initialScheduledUpgradeAction, actionNewStartTime.Unix())
752+
753+
go func() {
754+
d.Dispatch(ctx, detailsSetter, ack)
755+
}()
756+
if err := <-d.Errors(); err != nil {
757+
t.Fatalf("Unexpected error from Dispatch: %v", err)
758+
}
759+
760+
// make sure that upgrade details are still reported as scheduled but with a non-empty error
761+
require.NotNilf(t, gotDetails, "upgrade details should not be nil")
762+
assert.Equal(t, initialScheduledUpgradeAction.ActionID, gotDetails.ActionID)
763+
assert.Equal(t, details.StateScheduled, gotDetails.State)
764+
assert.Equal(t, initialScheduledUpgradeAction.Data.Version, gotDetails.TargetVersion)
765+
assert.NotEmpty(t, gotDetails.Metadata.ErrorMsg)
766+
767+
// issue a brand-new upgrade action
768+
newUpgradeAction := &fleetapi.ActionUpgrade{
769+
ActionID: "upgrade-action-id",
770+
ActionType: fleetapi.ActionTypeUpgrade,
771+
Data: fleetapi.ActionUpgradeData{
772+
Version: "9.3.0",
773+
SourceURI: "https://test-uri.test.com",
774+
},
775+
}
776+
def.On("Handle",
777+
mock.Anything, mock.Anything, mock.Anything).
778+
Return(nil).Once()
779+
780+
detailsSetter = func(upgradeDetails *details.Details) {
781+
gotDetails = upgradeDetails
782+
}
783+
go func() {
784+
d.Dispatch(ctx, detailsSetter, ack, newUpgradeAction)
785+
}()
786+
if err := <-d.Errors(); err != nil {
787+
t.Fatalf("Unexpected error from Dispatch: %v", err)
788+
}
789+
require.Nil(t, gotDetails)
790+
791+
// make sure that the action queue doesn't have any actions
792+
assert.Empty(t, actionQueue.Actions())
793+
def.AssertExpectations(t)
794+
saver.AssertExpectations(t)
795+
})
688796
}
689797

690798
func Test_ActionDispatcher_scheduleRetry(t *testing.T) {
@@ -704,8 +812,13 @@ func Test_ActionDispatcher_scheduleRetry(t *testing.T) {
704812
action.On("ID").Return("id")
705813
action.On("RetryAttempt").Return(len(d.rt.steps)).Once()
706814
action.On("SetRetryAttempt", mock.Anything).Once()
815+
action.On("Type").Return(fleetapi.ActionTypeUpgrade).Once()
816+
action.On("GetError").Return(nil).Once()
817+
818+
upgradeDetailsNeedUpdate := false
819+
d.scheduleRetry(context.Background(), action, ack, &upgradeDetailsNeedUpdate)
820+
assert.False(t, upgradeDetailsNeedUpdate)
707821

708-
d.scheduleRetry(context.Background(), action, ack)
709822
saver.AssertExpectations(t)
710823
action.AssertExpectations(t)
711824
})
@@ -726,8 +839,13 @@ func Test_ActionDispatcher_scheduleRetry(t *testing.T) {
726839
action.On("RetryAttempt").Return(0).Once()
727840
action.On("SetRetryAttempt", 1).Once()
728841
action.On("SetStartTime", mock.Anything).Once()
842+
action.On("Type").Return(fleetapi.ActionTypeUpgrade).Twice()
843+
action.On("GetError").Return(nil).Once()
844+
845+
upgradeDetailsNeedUpdate := false
846+
d.scheduleRetry(context.Background(), action, ack, &upgradeDetailsNeedUpdate)
847+
assert.True(t, upgradeDetailsNeedUpdate)
729848

730-
d.scheduleRetry(context.Background(), action, ack)
731849
saver.AssertExpectations(t)
732850
action.AssertExpectations(t)
733851
})

0 commit comments

Comments
 (0)