Skip to content

Commit 179c6cd

Browse files
fix: dedupe upgrade actions from fleetgateway actions, handle correctly the expiration of retried stored actions, and update upgrade details on retries
1 parent bd24f79 commit 179c6cd

File tree

2 files changed

+180
-20
lines changed

2 files changed

+180
-20
lines changed

internal/pkg/agent/application/dispatcher/dispatcher.go

Lines changed: 52 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -123,8 +123,8 @@ func (ad *ActionDispatcher) Dispatch(ctx context.Context, detailsSetter details.
123123

124124
upgradeDetailsNeedUpdate := false
125125

126-
// remove any upgrade actions from the queue if there is an upgrade action in the actions
127-
ad.removeQueuedUpgrades(actions, &upgradeDetailsNeedUpdate)
126+
// remove duplicate upgrade actions from fleetgateway actions and remove all upgrade actions in the queue
127+
actions = ad.compactAndRemoveQueuedUpgrades(actions, &upgradeDetailsNeedUpdate)
128128

129129
// add any scheduled actions to the queue (we don't check the start time here, as we will check it later)
130130
// and remove them from the passed actions
@@ -137,6 +137,7 @@ func (ad *ActionDispatcher) Dispatch(ctx context.Context, detailsSetter details.
137137
actions = ad.mergeWithQueuedActions(now, actions, &upgradeDetailsNeedUpdate)
138138

139139
if upgradeDetailsNeedUpdate {
140+
upgradeDetailsNeedUpdate = false
140141
ad.updateUpgradeDetails(now, detailsSetter)
141142
}
142143

@@ -166,7 +167,7 @@ func (ad *ActionDispatcher) Dispatch(ctx context.Context, detailsSetter details.
166167
rAction, ok := action.(fleetapi.RetryableAction)
167168
if ok {
168169
rAction.SetError(err) // set the retryable action error to what the dispatcher returned
169-
ad.scheduleRetry(ctx, rAction, acker)
170+
ad.scheduleRetry(ctx, rAction, acker, &upgradeDetailsNeedUpdate)
170171
continue
171172
}
172173
ad.log.Errorf("Failed to dispatch action id %q of type %q, error: %+v", action.ID(), action.Type(), err)
@@ -180,6 +181,10 @@ func (ad *ActionDispatcher) Dispatch(ctx context.Context, detailsSetter details.
180181
reportedErr = err
181182
}
182183

184+
if upgradeDetailsNeedUpdate {
185+
ad.updateUpgradeDetails(now, detailsSetter)
186+
}
187+
183188
if len(actions) > 0 {
184189
ad.errCh <- reportedErr
185190
}
@@ -287,7 +292,17 @@ func (ad *ActionDispatcher) mergeWithQueuedActions(ts time.Time, actions []fleet
287292
dequeuedActions := ad.queue.DequeueActions()
288293

289294
for _, action := range dequeuedActions {
290-
exp, _ := action.Expiration()
295+
exp, err := action.Expiration()
296+
if err != nil {
297+
if !errors.Is(err, fleetapi.ErrNoExpiration) {
298+
// this is not a non-expiring scheduled action, e.g. there is a malformed expiration time set
299+
ad.log.Warnf("failed to get expiration time for scheduled action [id = %s]: %v", action.ID(), err)
300+
continue
301+
}
302+
// this is a non-expiring scheduled action
303+
actions = append(actions, action)
304+
continue
305+
}
291306
if ts.After(exp) {
292307
if action.Type() == fleetapi.ActionTypeUpgrade {
293308
// this is an expired upgrade action thus we need to recalculate the upgrade details
@@ -310,7 +325,7 @@ func (ad *ActionDispatcher) mergeWithQueuedActions(ts time.Time, actions []fleet
310325
// Put expired upgrade actions back onto the queue to persist them across restarts.
311326
// These are handled the same as non-expired upgrade scheduled actions and are only removed when:
312327
// 1) a cancel action is received (look at dispatchCancelActions func) or
313-
// 2) a newer upgrade action (scheduled or not) removes them. (look at removeQueuedUpgrades func)
328+
// 2) a newer upgrade action (scheduled or not) removes them. (look at compactAndRemoveQueuedUpgrades func)
314329
for _, expiredUpgradeAction := range expiredUpgradeActions {
315330
startTime, err := expiredUpgradeAction.StartTime()
316331
if err != nil {
@@ -322,31 +337,41 @@ func (ad *ActionDispatcher) mergeWithQueuedActions(ts time.Time, actions []fleet
322337
}
323338

324339
// if an upgrade action is included in the immediate dispatchable actions
325-
// mark upgradeDetailsNeedUpdate as false since the upgrade details will be set by it
340+
// mark upgradeDetailsNeedUpdate as true
326341
if slices.ContainsFunc(actions, func(action fleetapi.Action) bool {
327342
return action.Type() == fleetapi.ActionTypeUpgrade
328343
}) {
329-
*upgradeDetailsNeedUpdate = false
344+
*upgradeDetailsNeedUpdate = true
330345
}
331346

332347
return actions
333348
}
334349

335-
// removeQueuedUpgrades will scan the passed actions and if there is an upgrade action it will remove all upgrade actions
336-
// in the queue but not alter the passed list. This is done to try to only have the most recent upgrade action executed.
337-
// However, it does not eliminate duplicates in retrieved directly from the gateway. Also, if upgrade actions are removed
338-
// from the queue, upgradeDetailsNeedUpdate will be set to true.
339-
func (ad *ActionDispatcher) removeQueuedUpgrades(actions []fleetapi.Action, upgradeDetailsNeedUpdate *bool) {
340-
for _, action := range actions {
350+
// compactAndRemoveQueuedUpgrades deduplicates *upgrade* actions from the given fleetgateway actions by keeping only the
351+
// first encountered upgrade in input order (dropping any subsequent upgrades from the returned slice). If an upgrade
352+
// action is found, it also removes any queued upgrade actions from the queue. When queued upgrade actions are removed,
353+
// upgradeDetailsNeedUpdate is set to true.
354+
func (ad *ActionDispatcher) compactAndRemoveQueuedUpgrades(input []fleetapi.Action, upgradeDetailsNeedUpdate *bool) []fleetapi.Action {
355+
var actions []fleetapi.Action
356+
var upgradeAction fleetapi.Action
357+
for _, action := range input {
341358
if action.Type() == fleetapi.ActionTypeUpgrade {
359+
if upgradeAction == nil {
360+
upgradeAction = action
361+
} else {
362+
ad.log.Warnf("Found extra upgrade action in fleetgateway actions [id = %s]", action.ID())
363+
continue
364+
}
342365
if n := ad.queue.CancelType(fleetapi.ActionTypeUpgrade); n > 0 {
343366
ad.log.Debugw("New upgrade action retrieved from gateway, removing queued upgrade actions", "actions_found", n)
344367
// upgrade action(s) got removed from the queue so upgrade actions changed
345368
*upgradeDetailsNeedUpdate = true
346369
}
347-
return
348370
}
371+
actions = append(actions, action)
349372
}
373+
374+
return actions
350375
}
351376

352377
// updateUpgradeDetails will construct the upgrade details based queue actions (assuming expired ones are still in the queue)
@@ -366,9 +391,11 @@ func (ad *ActionDispatcher) updateUpgradeDetails(ts time.Time, detailsSetter det
366391
}
367392

368393
// scheduleRetry will schedule a retry for the passed action. Note that this adjusts the start time of the action
369-
// but doesn't affect expiration time.
370-
func (ad *ActionDispatcher) scheduleRetry(ctx context.Context, action fleetapi.RetryableAction, acker acker.Acker) {
394+
// but doesn't affect expiration time. If the action is scheduled to be retried and it is an upgrade action,
395+
// upgradeDetailsNeedUpdate will be set to true.
396+
func (ad *ActionDispatcher) scheduleRetry(ctx context.Context, action fleetapi.RetryableAction, acker acker.Acker, upgradeDetailsNeedUpdate *bool) {
371397
attempt := action.RetryAttempt()
398+
ad.log.Warnf("Re-scheduling action id %q of type %q, because it failed to dispatch with error: %+v", action.ID(), action.Type(), action.GetError())
372399
d, err := ad.rt.GetWait(attempt)
373400
if err != nil {
374401
ad.log.Errorf("No more retries for action id %s: %v", action.ID(), err)
@@ -392,6 +419,11 @@ func (ad *ActionDispatcher) scheduleRetry(ctx context.Context, action fleetapi.R
392419
if err != nil {
393420
ad.log.Errorf("retry action id %s attempt %d failed to persist action_queue: %v", action.ID(), attempt, err)
394421
}
422+
423+
if action.Type() == fleetapi.ActionTypeUpgrade {
424+
*upgradeDetailsNeedUpdate = true
425+
}
426+
395427
if err := acker.Ack(ctx, action); err != nil {
396428
ad.log.Errorf("Unable to ack action retry (id %s) to fleet-server: %v", action.ID(), err)
397429
return
@@ -460,5 +492,9 @@ func GetScheduledUpgradeDetails(log *logger.Logger, actions []fleetapi.Scheduled
460492
details.StateScheduled,
461493
nextUpgradeActionID)
462494
upgradeDetails.Metadata.ScheduledAt = &nextUpgradeStartTime
495+
// scheduled upgrade actions can have errors if retried
496+
if err := nextUpgradeAction.GetError(); err != nil {
497+
upgradeDetails.Metadata.ErrorMsg = fmt.Sprintf("A prior dispatch attempt failed with: %v", err)
498+
}
463499
return upgradeDetails
464500
}

internal/pkg/agent/application/dispatcher/dispatcher_test.go

Lines changed: 128 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -297,7 +297,7 @@ func TestActionDispatcher(t *testing.T) {
297297

298298
action1 := &mockScheduledAction{}
299299
action1.On("StartTime").Return(time.Time{}, fleetapi.ErrNoStartTime)
300-
action1.On("Expiration").Return(time.Now().Add(time.Hour), fleetapi.ErrNoStartTime)
300+
action1.On("Expiration").Return(time.Now().Add(time.Hour), fleetapi.ErrNoExpiration)
301301
action1.On("Type").Return(fleetapi.ActionTypeCancel)
302302
action1.On("ID").Return("id")
303303

@@ -360,8 +360,9 @@ func TestActionDispatcher(t *testing.T) {
360360
})
361361

362362
t.Run("Dispatch of a retryable action returns an error", func(t *testing.T) {
363+
testError := errors.New("test error")
363364
def := &mockHandler{}
364-
def.On("Handle", mock.Anything, mock.Anything, mock.Anything).Return(errors.New("test error")).Once()
365+
def.On("Handle", mock.Anything, mock.Anything, mock.Anything).Return(testError).Once()
365366

366367
saver := &mockSaver{}
367368
saver.On("Save").Return(nil).Times(2)
@@ -382,6 +383,7 @@ func TestActionDispatcher(t *testing.T) {
382383
action.On("RetryAttempt").Return(0).Once()
383384
action.On("SetRetryAttempt", 1).Once()
384385
action.On("SetStartTime", mock.Anything).Once()
386+
action.On("GetError").Return(testError).Once()
385387

386388
dispatchCtx, cancelFn := context.WithCancel(context.Background())
387389
defer cancelFn()
@@ -685,6 +687,118 @@ func TestActionDispatcher(t *testing.T) {
685687
}
686688
require.NoError(t, shouldNotSetDetails, "set upgrade details should not be called")
687689
})
690+
691+
t.Run("check that failed upgrade actions do not block newer ones from dispatching", func(t *testing.T) {
692+
// we start by dispatching a scheduled upgrade action
693+
ctx := t.Context()
694+
handlerErr := errors.New("text handler error")
695+
696+
def := &mockHandler{}
697+
def.On("Handle",
698+
mock.Anything, mock.Anything, mock.Anything).
699+
Return(handlerErr).Once()
700+
701+
saver := &mockSaver{}
702+
saver.On("Save").Return(nil).Times(4)
703+
saver.On("SetQueue", mock.Anything).Times(4)
704+
actionQueue, err := queue.NewActionQueue([]fleetapi.ScheduledAction{}, saver)
705+
require.NoError(t, err)
706+
707+
d, err := New(nil, t.TempDir(), def, actionQueue)
708+
require.NoError(t, err)
709+
710+
err = d.Register(&fleetapi.ActionUpgrade{}, def)
711+
require.NoError(t, err)
712+
713+
var gotDetails *details.Details
714+
detailsSetter := func(upgradeDetails *details.Details) {
715+
gotDetails = upgradeDetails
716+
}
717+
718+
initialScheduledUpgradeAction := &fleetapi.ActionUpgrade{
719+
ActionID: "scheduled-action-id",
720+
ActionType: fleetapi.ActionTypeUpgrade,
721+
ActionStartTime: time.Now().Add(1 * time.Hour).Format(time.RFC3339),
722+
Data: fleetapi.ActionUpgradeData{
723+
Version: "9.3.0",
724+
SourceURI: "https://test-uri.test.com",
725+
},
726+
}
727+
728+
dispatchDone := make(chan struct{})
729+
go func() {
730+
d.Dispatch(ctx, detailsSetter, ack, initialScheduledUpgradeAction)
731+
close(dispatchDone)
732+
}()
733+
select {
734+
case err := <-d.Errors():
735+
t.Fatalf("Unexpected error from Dispatch: %v", err)
736+
case <-dispatchDone:
737+
}
738+
739+
// make sure that the upgrade details reported are matching our expectations
740+
require.NotNilf(t, gotDetails, "upgrade details should not be nil")
741+
assert.Equal(t, initialScheduledUpgradeAction.ActionID, gotDetails.ActionID)
742+
assert.Equal(t, details.StateScheduled, gotDetails.State)
743+
assert.Equal(t, initialScheduledUpgradeAction.Data.Version, gotDetails.TargetVersion)
744+
assert.Empty(t, gotDetails.Metadata.ErrorMsg)
745+
746+
// affect directly the queue to get the dispatcher to actually dispatch our action
747+
removedItems := actionQueue.Cancel(initialScheduledUpgradeAction.ActionID)
748+
require.Equal(t, 1, removedItems)
749+
actionNewStartTime := time.Now().Add(-5 * time.Minute).UTC()
750+
initialScheduledUpgradeAction.ActionStartTime = actionNewStartTime.Format(time.RFC3339)
751+
actionQueue.Add(initialScheduledUpgradeAction, actionNewStartTime.Unix())
752+
753+
go func() {
754+
d.Dispatch(ctx, detailsSetter, ack)
755+
}()
756+
select {
757+
case err := <-d.Errors():
758+
if err != nil {
759+
t.Fatalf("Unexpected error from Dispatch: %v", err)
760+
}
761+
}
762+
763+
// make sure that upgrade details are still reported as scheduled but with a non-empty error
764+
require.NotNilf(t, gotDetails, "upgrade details should not be nil")
765+
assert.Equal(t, initialScheduledUpgradeAction.ActionID, gotDetails.ActionID)
766+
assert.Equal(t, details.StateScheduled, gotDetails.State)
767+
assert.Equal(t, initialScheduledUpgradeAction.Data.Version, gotDetails.TargetVersion)
768+
assert.NotEmpty(t, gotDetails.Metadata.ErrorMsg)
769+
770+
// issue a brand-new upgrade action
771+
newUpgradeAction := &fleetapi.ActionUpgrade{
772+
ActionID: "upgrade-action-id",
773+
ActionType: fleetapi.ActionTypeUpgrade,
774+
Data: fleetapi.ActionUpgradeData{
775+
Version: "9.3.0",
776+
SourceURI: "https://test-uri.test.com",
777+
},
778+
}
779+
def.On("Handle",
780+
mock.Anything, mock.Anything, mock.Anything).
781+
Return(nil).Once()
782+
783+
detailsSetter = func(upgradeDetails *details.Details) {
784+
gotDetails = upgradeDetails
785+
}
786+
go func() {
787+
d.Dispatch(ctx, detailsSetter, ack, newUpgradeAction)
788+
}()
789+
select {
790+
case err := <-d.Errors():
791+
if err != nil {
792+
t.Fatalf("Unexpected error from Dispatch: %v", err)
793+
}
794+
}
795+
require.Nil(t, gotDetails)
796+
797+
// make sure that the action queue doesn't have any actions
798+
assert.Empty(t, actionQueue.Actions())
799+
def.AssertExpectations(t)
800+
saver.AssertExpectations(t)
801+
})
688802
}
689803

690804
func Test_ActionDispatcher_scheduleRetry(t *testing.T) {
@@ -704,8 +818,13 @@ func Test_ActionDispatcher_scheduleRetry(t *testing.T) {
704818
action.On("ID").Return("id")
705819
action.On("RetryAttempt").Return(len(d.rt.steps)).Once()
706820
action.On("SetRetryAttempt", mock.Anything).Once()
821+
action.On("Type").Return(fleetapi.ActionTypeUpgrade).Once()
822+
action.On("GetError").Return(nil).Once()
823+
824+
upgradeDetailsNeedUpdate := false
825+
d.scheduleRetry(context.Background(), action, ack, &upgradeDetailsNeedUpdate)
826+
assert.False(t, upgradeDetailsNeedUpdate)
707827

708-
d.scheduleRetry(context.Background(), action, ack)
709828
saver.AssertExpectations(t)
710829
action.AssertExpectations(t)
711830
})
@@ -726,8 +845,13 @@ func Test_ActionDispatcher_scheduleRetry(t *testing.T) {
726845
action.On("RetryAttempt").Return(0).Once()
727846
action.On("SetRetryAttempt", 1).Once()
728847
action.On("SetStartTime", mock.Anything).Once()
848+
action.On("Type").Return(fleetapi.ActionTypeUpgrade).Twice()
849+
action.On("GetError").Return(nil).Once()
850+
851+
upgradeDetailsNeedUpdate := false
852+
d.scheduleRetry(context.Background(), action, ack, &upgradeDetailsNeedUpdate)
853+
assert.True(t, upgradeDetailsNeedUpdate)
729854

730-
d.scheduleRetry(context.Background(), action, ack)
731855
saver.AssertExpectations(t)
732856
action.AssertExpectations(t)
733857
})

0 commit comments

Comments
 (0)