@@ -6,6 +6,8 @@ package fleet
66
77import (
88 "context"
9+ stderrors "errors"
10+ "sync"
911 "time"
1012
1113 "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/status"
@@ -15,6 +17,7 @@ import (
1517 eaclient "github.com/elastic/elastic-agent-client/v7/pkg/client"
1618 "github.com/elastic/elastic-agent/internal/pkg/agent/application/coordinator"
1719 "github.com/elastic/elastic-agent/internal/pkg/agent/application/info"
20+ "github.com/elastic/elastic-agent/internal/pkg/agent/configuration"
1821 "github.com/elastic/elastic-agent/internal/pkg/agent/errors"
1922 "github.com/elastic/elastic-agent/internal/pkg/core/backoff"
2023 "github.com/elastic/elastic-agent/internal/pkg/fleetapi"
@@ -83,8 +86,8 @@ type FleetGateway struct {
8386 acker acker.Acker
8487 unauthCounter int
8588 checkinFailCounter int
86- stateFetcher func () coordinator.State
8789 stateStore stateStore
90+ stateFetcher StateFetcher
8891 errCh chan error
8992 actionCh chan []fleetapi.Action
9093}
@@ -95,19 +98,22 @@ func New(
9598 agentInfo agentInfo ,
9699 client client.Sender ,
97100 acker acker.Acker ,
98- stateFetcher func () coordinator.State ,
99101 stateStore stateStore ,
102+ stateFetcher StateFetcher ,
103+ cfg configuration.FleetCheckin ,
100104) (* FleetGateway , error ) {
101105 scheduler := scheduler .NewPeriodicJitter (defaultGatewaySettings .Duration , defaultGatewaySettings .Jitter )
106+ st := defaultGatewaySettings
107+ st .Backoff = getBackoffSettings (cfg )
102108 return newFleetGatewayWithScheduler (
103109 log ,
104- defaultGatewaySettings ,
110+ st ,
105111 agentInfo ,
106112 client ,
107113 scheduler ,
108114 acker ,
109- stateFetcher ,
110115 stateStore ,
116+ stateFetcher ,
111117 )
112118}
113119
@@ -118,8 +124,8 @@ func newFleetGatewayWithScheduler(
118124 client client.Sender ,
119125 scheduler scheduler.Scheduler ,
120126 acker acker.Acker ,
121- stateFetcher func () coordinator.State ,
122127 stateStore stateStore ,
128+ stateFetcher StateFetcher ,
123129) (* FleetGateway , error ) {
124130 return & FleetGateway {
125131 log : log ,
@@ -144,7 +150,6 @@ func (f *FleetGateway) Run(ctx context.Context) error {
144150 if f .settings .Backoff == nil {
145151 requestBackoff = RequestBackoff (ctx .Done ())
146152 } else {
147- // this is only used in tests
148153 requestBackoff = backoff .NewEqualJitterBackoff (
149154 ctx .Done (),
150155 f .settings .Backoff .Init ,
@@ -193,11 +198,20 @@ func (f *FleetGateway) doExecute(ctx context.Context, bo backoff.Backoff) (*flee
193198 f .log .Debugf ("Checking started" )
194199 resp , took , err := f .execute (ctx )
195200 if err != nil {
196- f .checkinFailCounter ++
201+ becauseOfStateChanged := errors .Is (err , errComponentStateChanged )
202+
203+ // don't count that as failed attempt
204+ if ! becauseOfStateChanged {
205+ f .checkinFailCounter ++
206+ }
197207
208+ warnMsg := "Possible transient error during checkin with fleet-server, retrying"
209+ if becauseOfStateChanged {
210+ warnMsg = "Check in cancelled because of state change, retrying"
211+ }
198212 // Report the first two failures at warn level as they may be recoverable with retries.
199213 if f .checkinFailCounter <= 2 {
200- f .log .Warnw ("Possible transient error during checkin with fleet-server, retrying" ,
214+ f .log .Warnw (warnMsg ,
201215 "error.message" , err , "request_duration_ns" , took , "failed_checkins" , f .checkinFailCounter ,
202216 "retry_after_ns" , bo .NextWait ())
203217 } else {
@@ -348,7 +362,7 @@ func (f *FleetGateway) execute(ctx context.Context) (*fleetapi.CheckinResponse,
348362 }
349363
350364 // get current state
351- state := f .stateFetcher ( )
365+ state , stateCtx := f .stateFetcher . FetchState ( ctx )
352366
353367 // convert components into checkin components structure
354368 components := f .convertToCheckinComponents (state .Components , state .Collector )
@@ -374,7 +388,8 @@ func (f *FleetGateway) execute(ctx context.Context) (*fleetapi.CheckinResponse,
374388 PolicyRevisionIDX : policyRevisionIDX ,
375389 }
376390
377- resp , took , err := cmd .Execute (ctx , req )
391+ resp , took , err := cmd .Execute (stateCtx , req )
392+ f .stateFetcher .Done ()
378393 if isUnauth (err ) {
379394 f .unauthCounter ++
380395 if f .shouldUseLongSched () {
@@ -390,6 +405,9 @@ func (f *FleetGateway) execute(ctx context.Context) (*fleetapi.CheckinResponse,
390405
391406 f .unauthCounter = 0
392407 if err != nil {
408+ if errors .Is (err , context .Canceled ) && errors .Is (context .Cause (stateCtx ), errComponentStateChanged ) {
409+ return nil , took , stderrors .Join (err , errComponentStateChanged )
410+ }
393411 return nil , took , err
394412 }
395413
@@ -494,3 +512,118 @@ func getPolicyRevisionIDX(action fleetapi.Action) int64 {
494512 return 0
495513 }
496514}
515+
516+ var errComponentStateChanged = errors .New ("error component state changed" )
517+
518+ type StateFetcher interface {
519+ // FetchState returns the current state and a context that is valid as long as the returned state is valid to use.
520+ FetchState (ctx context.Context ) (coordinator.State , context.Context )
521+ // Done should be called once the checkin call is complete.
522+ Done ()
523+ StartStateWatch (ctx context.Context ) error
524+ }
525+
526+ type FastCheckinStateFetcher struct {
527+ log * logger.Logger
528+ fetcher func () coordinator.State
529+ stateChan chan coordinator.State
530+
531+ cancel context.CancelCauseFunc
532+ mutex sync.Mutex
533+ }
534+
535+ func NewFastCheckinStateFetcher (log * logger.Logger , fetcher func () coordinator.State , stateChan chan coordinator.State ) * FastCheckinStateFetcher {
536+ return & FastCheckinStateFetcher {
537+ log : log ,
538+ fetcher : fetcher ,
539+ stateChan : stateChan ,
540+ cancel : nil ,
541+ mutex : sync.Mutex {},
542+ }
543+ }
544+
545+ // Fetch wraps the state fetching to send in the check-in request under the checkin state mutex.
546+ // After the state is fetched the checkin cancellation function has be initialized and the new context
547+ // is returned.
548+ func (s * FastCheckinStateFetcher ) FetchState (ctx context.Context ) (coordinator.State , context.Context ) {
549+ s .mutex .Lock ()
550+ defer s .mutex .Unlock ()
551+
552+ if s .cancel != nil {
553+ s .cancel (nil ) // ensure ctx cleanup
554+ }
555+
556+ ctx2 , ctxCancel := context .WithCancelCause (ctx )
557+ state := s .fetcher ()
558+ s .cancel = ctxCancel
559+ return state , ctx2
560+ }
561+
562+ func (s * FastCheckinStateFetcher ) Done () {
563+ s .mutex .Lock ()
564+ defer s .mutex .Unlock ()
565+
566+ if s .cancel != nil {
567+ s .cancel (nil ) // ensure ctx cleanup
568+ s .cancel = nil
569+ }
570+ }
571+
572+ func (s * FastCheckinStateFetcher ) invalidateState () {
573+ s .mutex .Lock ()
574+ defer s .mutex .Unlock ()
575+
576+ if s .cancel != nil {
577+ s .cancel (errComponentStateChanged )
578+ s .cancel = nil
579+ }
580+ }
581+
582+ func (s * FastCheckinStateFetcher ) StartStateWatch (ctx context.Context ) error {
583+ s .log .Info ("FleetGateway state watching started" )
584+ for {
585+ select {
586+ case <- ctx .Done ():
587+ s .log .Info ("FleetGateway state watching stopped" )
588+ return ctx .Err ()
589+ case _ , isOpen := <- s .stateChan :
590+ if ! isOpen {
591+ s .log .Info ("FleetGateway state watching channel closed, stopping loop." )
592+ return nil
593+ }
594+ // TODO: consider check for specific changes e.g. degraded?
595+ s .invalidateState ()
596+ }
597+ }
598+ }
599+
600+ // CheckinStateFetcher implements the simple state fetching without any invalidation or fast checkin logic.
601+ type CheckinStateFetcher struct {
602+ fetcher func () coordinator.State
603+ }
604+
605+ func NewCheckinStateFetcher (fetcher func () coordinator.State ) * CheckinStateFetcher {
606+ return & CheckinStateFetcher {fetcher : fetcher }
607+ }
608+
609+ // FetchState returns the current state and the given ctx because the current state is always valid to use.
610+ func (s * CheckinStateFetcher ) FetchState (ctx context.Context ) (coordinator.State , context.Context ) {
611+ state := s .fetcher ()
612+ return state , ctx
613+ }
614+
615+ func (s * CheckinStateFetcher ) Done () {}
616+ func (s * CheckinStateFetcher ) StartStateWatch (ctx context.Context ) error { return nil }
617+
618+ func getBackoffSettings (cfg configuration.FleetCheckin ) * backoffSettings {
619+ bo := defaultFleetBackoffSettings
620+
621+ if cfg .RequestBackoffInit > 0 {
622+ bo .Init = cfg .RequestBackoffInit
623+ }
624+ if cfg .RequestBackoffMax > 0 {
625+ bo .Max = cfg .RequestBackoffMax
626+ }
627+
628+ return & bo
629+ }
0 commit comments