@@ -11,6 +11,7 @@ import (
1111 "testing"
1212 "time"
1313
14+ "github.com/stretchr/testify/mock"
1415 "github.com/stretchr/testify/require"
1516
1617 "github.com/elastic/elastic-agent/internal/pkg/agent/application/coordinator"
@@ -68,7 +69,11 @@ func (u *mockUpgradeManager) Upgrade(
6869 pgpBytes ... )
6970}
7071
71- func (u * mockUpgradeManager ) Ack (ctx context.Context , acker acker.Acker ) error {
72+ func (u * mockUpgradeManager ) Ack (_ context.Context , _ acker.Acker ) error {
73+ return nil
74+ }
75+
76+ func (u * mockUpgradeManager ) AckAction (_ context.Context , _ acker.Acker , _ fleetapi.Action ) error {
7277 return nil
7378}
7479
@@ -110,7 +115,7 @@ func TestUpgradeHandler(t *testing.T) {
110115 return nil , nil
111116 },
112117 },
113- nil , nil , nil , nil , nil , false , nil )
118+ nil , nil , nil , nil , nil , false , nil , nil )
114119 //nolint:errcheck // We don't need the termination state of the Coordinator
115120 go c .Run (ctx )
116121
@@ -169,7 +174,7 @@ func TestUpgradeHandlerSameVersion(t *testing.T) {
169174 return nil , err
170175 },
171176 },
172- nil , nil , nil , nil , nil , false , nil )
177+ nil , nil , nil , nil , nil , false , nil , nil )
173178 //nolint:errcheck // We don't need the termination state of the Coordinator
174179 go c .Run (ctx )
175180
@@ -190,6 +195,98 @@ func TestUpgradeHandlerSameVersion(t *testing.T) {
190195 }
191196}
192197
198+ func TestDuplicateActionsHandled (t * testing.T ) {
199+ // Create a cancellable context that will shut down the coordinator after
200+ // the test.
201+ ctx , cancel := context .WithCancel (context .Background ())
202+ defer cancel ()
203+
204+ log , _ := logger .New ("" , false )
205+ upgradeCalledChan := make (chan string )
206+
207+ agentInfo := & info.AgentInfo {}
208+ acker := & fakeAcker {}
209+
210+ // Create and start the Coordinator
211+ c := coordinator .New (
212+ log ,
213+ configuration .DefaultConfiguration (),
214+ logger .DefaultLogLevel ,
215+ agentInfo ,
216+ component.RuntimeSpecs {},
217+ nil ,
218+ & mockUpgradeManager {
219+ UpgradeFn : func (
220+ ctx context.Context ,
221+ version string ,
222+ sourceURI string ,
223+ action * fleetapi.ActionUpgrade ,
224+ details * details.Details ,
225+ skipVerifyOverride bool ,
226+ skipDefaultPgp bool ,
227+ pgpBytes ... string ) (reexec.ShutdownCallbackFn , error ) {
228+
229+ defer func () {
230+ upgradeCalledChan <- action .ActionID
231+ }()
232+
233+ return nil , nil
234+ },
235+ },
236+ nil , nil , nil , nil , nil , false , nil , acker )
237+ //nolint:errcheck // We don't need the termination state of the Coordinator
238+ go c .Run (ctx )
239+
240+ u := NewUpgrade (log , c )
241+ a1 := fleetapi.ActionUpgrade {
242+ ActionID : "action-8.5-1" ,
243+ Data : fleetapi.ActionUpgradeData {
244+ Version : "8.5.0" , SourceURI : "http://localhost" ,
245+ },
246+ }
247+ a2 := fleetapi.ActionUpgrade {
248+ ActionID : "action-8.5-2" ,
249+ Data : fleetapi.ActionUpgradeData {
250+ Version : "8.5.0" , SourceURI : "http://localhost" ,
251+ },
252+ }
253+
254+ checkMsg := func (c <- chan string , expected , errMsg string ) error {
255+ t .Helper ()
256+ // Make sure this test does not dead lock or wait for too long
257+ // For some reason < 1s sometimes makes the test fail.
258+ select {
259+ case <- time .Tick (1500 * time .Millisecond ):
260+ return errors .New ("timed out waiting for Upgrade to return" )
261+ case msg := <- c :
262+ require .Equal (t , expected , msg , errMsg )
263+ }
264+
265+ return nil
266+ }
267+
268+ acker .On ("Ack" , mock .Anything , mock .Anything ).Return (nil )
269+ acker .On ("Commit" , mock .Anything ).Return (nil )
270+
271+ t .Log ("First upgrade action should be processed" )
272+ require .NoError (t , u .Handle (ctx , & a1 , acker ))
273+ require .Nil (t , checkMsg (upgradeCalledChan , a1 .ActionID , "action was not processed" ))
274+ c .ClearOverrideState () // it's upgrading, normally we would restart
275+
276+ t .Log ("Action with different ID but same version should not be propagated to upgrader but acked" )
277+ require .NoError (t , u .Handle (ctx , & a2 , acker ))
278+ require .NotNil (t , checkMsg (upgradeCalledChan , a2 .ActionID , "action was not processed" ))
279+ acker .AssertCalled (t , "Ack" , ctx , & a2 )
280+ acker .AssertCalled (t , "Commit" , ctx )
281+
282+ c .ClearOverrideState () // it's upgrading, normally we would restart
283+
284+ t .Log ("Resending action with same ID should be skipped" )
285+ require .NoError (t , u .Handle (ctx , & a1 , acker ))
286+ require .NotNil (t , checkMsg (upgradeCalledChan , a1 .ActionID , "action was not processed" ))
287+ acker .AssertNotCalled (t , "Ack" , ctx , & a1 )
288+ }
289+
193290func TestUpgradeHandlerNewVersion (t * testing.T ) {
194291 // Create a cancellable context that will shut down the coordinator after
195292 // the test.
@@ -230,15 +327,23 @@ func TestUpgradeHandlerNewVersion(t *testing.T) {
230327 return nil , nil
231328 },
232329 },
233- nil , nil , nil , nil , nil , false , nil )
330+ nil , nil , nil , nil , nil , false , nil , nil )
234331 //nolint:errcheck // We don't need the termination state of the Coordinator
235332 go c .Run (ctx )
236333
237334 u := NewUpgrade (log , c )
238- a1 := fleetapi.ActionUpgrade {Data : fleetapi.ActionUpgradeData {
239- Version : "8.2.0" , SourceURI : "http://localhost" }}
240- a2 := fleetapi.ActionUpgrade {Data : fleetapi.ActionUpgradeData {
241- Version : "8.5.0" , SourceURI : "http://localhost" }}
335+ a1 := fleetapi.ActionUpgrade {
336+ ActionID : "action-8.2" ,
337+ Data : fleetapi.ActionUpgradeData {
338+ Version : "8.2.0" , SourceURI : "http://localhost" ,
339+ },
340+ }
341+ a2 := fleetapi.ActionUpgrade {
342+ ActionID : "action-8.5" ,
343+ Data : fleetapi.ActionUpgradeData {
344+ Version : "8.5.0" , SourceURI : "http://localhost" ,
345+ },
346+ }
242347 ack := noopacker .New ()
243348
244349 checkMsg := func (c <- chan string , expected , errMsg string ) {
@@ -262,3 +367,17 @@ func TestUpgradeHandlerNewVersion(t *testing.T) {
262367 require .NoError (t , err2 )
263368 checkMsg (upgradeCalledChan , "8.5.0" , "second call to Upgrade must be with version 8.5.0" )
264369}
370+
371+ type fakeAcker struct {
372+ mock.Mock
373+ }
374+
375+ func (f * fakeAcker ) Ack (ctx context.Context , action fleetapi.Action ) error {
376+ args := f .Called (ctx , action )
377+ return args .Error (0 )
378+ }
379+
380+ func (f * fakeAcker ) Commit (ctx context.Context ) error {
381+ args := f .Called (ctx )
382+ return args .Error (0 )
383+ }
0 commit comments