@@ -13,7 +13,7 @@ import (
1313 "sync/atomic"
1414 "time"
1515
16- "github.com/elastic/elastic-agent/internal/pkg/otel/configtranslate "
16+ "github.com/elastic/elastic-agent/internal/pkg/otel/translate "
1717
1818 "go.opentelemetry.io/collector/component/componentstatus"
1919
@@ -349,7 +349,7 @@ type managerChans struct {
349349 varsManagerUpdate <- chan []* transpiler.Vars
350350 varsManagerError <- chan error
351351
352- otelManagerUpdate <- chan * status.AggregateStatus
352+ otelManagerUpdate chan * status.AggregateStatus
353353 otelManagerError <- chan error
354354
355355 upgradeMarkerUpdate <- chan upgrade.UpdateMarker
@@ -463,7 +463,9 @@ func New(logger *logger.Logger, cfg *configuration.Configuration, logLevel logp.
463463 c .managerChans .varsManagerError = varsMgr .Errors ()
464464 }
465465 if otelMgr != nil {
466- c .managerChans .otelManagerUpdate = otelMgr .Watch ()
466+ // The otel manager sends updates to the watchRuntimeComponents function, which extracts component status
467+ // and forwards the rest to this channel.
468+ c .managerChans .otelManagerUpdate = make (chan * status.AggregateStatus )
467469 c .managerChans .otelManagerError = otelMgr .Errors ()
468470 }
469471 if upgradeMgr != nil && upgradeMgr .MarkerWatcher () != nil {
@@ -656,77 +658,148 @@ func (c *Coordinator) watchRuntimeComponents(ctx context.Context) {
656658 state := make (map [string ]runtime.ComponentState )
657659
658660 var subChan <- chan runtime.ComponentComponentState
661+ var otelChan <- chan * status.AggregateStatus
659662 // A real Coordinator will always have a runtime manager, but unit tests
660663 // may not initialize all managers -- in that case we leave subChan nil,
661664 // and just idle until Coordinator shuts down.
662665 if c .runtimeMgr != nil {
663666 subChan = c .runtimeMgr .SubscribeAll (ctx ).Ch ()
664667 }
668+ if c .otelMgr != nil {
669+ otelChan = c .otelMgr .Watch ()
670+ }
665671 for {
666672 select {
667673 case <- ctx .Done ():
668674 return
669- case s := <- subChan :
670- oldState , ok := state [s .Component .ID ]
671- if ! ok {
672- componentLog := coordinatorComponentLog {
673- ID : s .Component .ID ,
674- State : s .State .State .String (),
675- }
676- logBasedOnState (c .logger , s .State .State , fmt .Sprintf ("Spawned new component %s: %s" , s .Component .ID , s .State .Message ), "component" , componentLog )
677- for ui , us := range s .State .Units {
678- unitLog := coordinatorUnitLog {
679- ID : ui .UnitID ,
680- Type : ui .UnitType .String (),
681- State : us .State .String (),
682- }
683- logBasedOnState (c .logger , us .State , fmt .Sprintf ("Spawned new unit %s: %s" , ui .UnitID , us .Message ), "component" , componentLog , "unit" , unitLog )
684- }
685- } else {
686- componentLog := coordinatorComponentLog {
687- ID : s .Component .ID ,
688- State : s .State .State .String (),
689- }
690- if oldState .State != s .State .State {
691- cl := coordinatorComponentLog {
692- ID : s .Component .ID ,
693- State : s .State .State .String (),
694- OldState : oldState .State .String (),
695- }
696- logBasedOnState (c .logger , s .State .State , fmt .Sprintf ("Component state changed %s (%s->%s): %s" , s .Component .ID , oldState .State .String (), s .State .State .String (), s .State .Message ), "component" , cl )
697- }
698- for ui , us := range s .State .Units {
699- oldUS , ok := oldState .Units [ui ]
700- if ! ok {
701- unitLog := coordinatorUnitLog {
702- ID : ui .UnitID ,
703- Type : ui .UnitType .String (),
704- State : us .State .String (),
705- }
706- logBasedOnState (c .logger , us .State , fmt .Sprintf ("Spawned new unit %s: %s" , ui .UnitID , us .Message ), "component" , componentLog , "unit" , unitLog )
707- } else if oldUS .State != us .State {
708- unitLog := coordinatorUnitLog {
709- ID : ui .UnitID ,
710- Type : ui .UnitType .String (),
711- State : us .State .String (),
712- OldState : oldUS .State .String (),
713- }
714- logBasedOnState (c .logger , us .State , fmt .Sprintf ("Unit state changed %s (%s->%s): %s" , ui .UnitID , oldUS .State .String (), us .State .String (), us .Message ), "component" , componentLog , "unit" , unitLog )
715- }
716- }
717- }
718- state [s .Component .ID ] = s .State
719- if s .State .State == client .UnitStateStopped {
720- delete (state , s .Component .ID )
721- }
675+ case componentState := <- subChan :
676+ logComponentStateChange (c .logger , state , & componentState )
722677 // Forward the final changes back to Coordinator, unless our context
723678 // has ended.
724679 select {
725- case c .managerChans .runtimeManagerUpdate <- s :
680+ case c .managerChans .runtimeManagerUpdate <- componentState :
726681 case <- ctx .Done ():
727682 return
728683 }
684+ case otelStatus := <- otelChan :
685+ // We don't break on errors here, because we want to forward the status
686+ // even if there was an error, and the rest of the code gracefully handles componentStates being nil
687+ componentStates , err := translate .GetAllComponentStates (otelStatus , c .componentModel )
688+ if err != nil {
689+ c .setOTelError (err )
690+ }
691+ err = translate .DropComponentStateFromOtelStatus (otelStatus )
692+ if err != nil {
693+ c .setOTelError (err )
694+ }
695+
696+ // forward the remaining otel status
697+ // TODO: Implement subscriptions for otel manager status to avoid the need for this
698+ select {
699+ case c .managerChans .otelManagerUpdate <- otelStatus :
700+ case <- ctx .Done ():
701+ return
702+ }
703+
704+ // drop component states which don't exist in the configuration anymore
705+ // we need to do this because we aren't guaranteed to receive a STOPPED state when the component is removed
706+ componentIds := make (map [string ]bool )
707+ for _ , componentState := range componentStates {
708+ componentIds [componentState .Component .ID ] = true
709+ }
710+ for id := range state {
711+ if _ , ok := componentIds [id ]; ! ok {
712+ // this component is not in the configuration anymore, emit a fake STOPPED state
713+ componentStates = append (componentStates , runtime.ComponentComponentState {
714+ Component : component.Component {
715+ ID : id ,
716+ },
717+ State : runtime.ComponentState {
718+ State : client .UnitStateStopped ,
719+ },
720+ })
721+ }
722+ }
723+ // now handle the component states
724+ for _ , componentState := range componentStates {
725+ logComponentStateChange (c .logger , state , & componentState )
726+ // Forward the final changes back to Coordinator, unless our context
727+ // has ended.
728+ select {
729+ case c .managerChans .runtimeManagerUpdate <- componentState :
730+ case <- ctx .Done ():
731+ return
732+ }
733+ }
734+ }
735+ }
736+ }
737+
738+ // logComponentStateChange emits a log message based on the new component state.
739+ func logComponentStateChange (
740+ logger * logger.Logger ,
741+ coordinatorState map [string ]runtime.ComponentState ,
742+ componentState * runtime.ComponentComponentState ) {
743+ oldState , ok := coordinatorState [componentState .Component .ID ]
744+ if ! ok {
745+ componentLog := coordinatorComponentLog {
746+ ID : componentState .Component .ID ,
747+ State : componentState .State .State .String (),
729748 }
749+ logMessage := fmt .Sprintf ("Spawned new component %s: %s" ,
750+ componentState .Component .ID ,
751+ componentState .State .Message )
752+ logBasedOnState (logger , componentState .State .State , logMessage , "component" , componentLog )
753+ for ui , us := range componentState .State .Units {
754+ unitLog := coordinatorUnitLog {
755+ ID : ui .UnitID ,
756+ Type : ui .UnitType .String (),
757+ State : us .State .String (),
758+ }
759+ unitLogMessage := fmt .Sprintf ("Spawned new unit %s: %s" , ui .UnitID , us .Message )
760+ logBasedOnState (logger , us .State , unitLogMessage , "component" , componentLog , "unit" , unitLog )
761+ }
762+ } else {
763+ componentLog := coordinatorComponentLog {
764+ ID : componentState .Component .ID ,
765+ State : componentState .State .State .String (),
766+ }
767+ if oldState .State != componentState .State .State {
768+ cl := coordinatorComponentLog {
769+ ID : componentState .Component .ID ,
770+ State : componentState .State .State .String (),
771+ OldState : oldState .State .String (),
772+ }
773+ logBasedOnState (logger , componentState .State .State ,
774+ fmt .Sprintf ("Component state changed %s (%s->%s): %s" ,
775+ componentState .Component .ID , oldState .State .String (),
776+ componentState .State .State .String (),
777+ componentState .State .Message ),
778+ "component" , cl )
779+ }
780+ for ui , us := range componentState .State .Units {
781+ oldUS , ok := oldState .Units [ui ]
782+ if ! ok {
783+ unitLog := coordinatorUnitLog {
784+ ID : ui .UnitID ,
785+ Type : ui .UnitType .String (),
786+ State : us .State .String (),
787+ }
788+ logBasedOnState (logger , us .State , fmt .Sprintf ("Spawned new unit %s: %s" , ui .UnitID , us .Message ), "component" , componentLog , "unit" , unitLog )
789+ } else if oldUS .State != us .State {
790+ unitLog := coordinatorUnitLog {
791+ ID : ui .UnitID ,
792+ Type : ui .UnitType .String (),
793+ State : us .State .String (),
794+ OldState : oldUS .State .String (),
795+ }
796+ logBasedOnState (logger , us .State , fmt .Sprintf ("Unit state changed %s (%s->%s): %s" , ui .UnitID , oldUS .State .String (), us .State .String (), us .Message ), "component" , componentLog , "unit" , unitLog )
797+ }
798+ }
799+ }
800+ coordinatorState [componentState .Component .ID ] = componentState .State
801+ if componentState .State .State == client .UnitStateStopped {
802+ delete (coordinatorState , componentState .Component .ID )
730803 }
731804}
732805
@@ -1231,8 +1304,8 @@ func (c *Coordinator) runLoopIteration(ctx context.Context) {
12311304 c .processVars (ctx , vars )
12321305 }
12331306
1234- case collector := <- c .managerChans .otelManagerUpdate :
1235- c .state .Collector = collector
1307+ case collectorStatus := <- c .managerChans .otelManagerUpdate :
1308+ c .state .Collector = collectorStatus
12361309 c .stateNeedsRefresh = true
12371310
12381311 case ll := <- c .logLevelCh :
@@ -1469,7 +1542,7 @@ func (c *Coordinator) updateOtelManagerConfig(model *component.Model) error {
14691542 if len (model .Components ) > 0 {
14701543 var err error
14711544 c .logger .With ("components" , model .Components ).Debug ("Updating otel manager model" )
1472- componentOtelCfg , err = configtranslate .GetOtelConfig (model , c .agentInfo , c .monitorMgr .ComponentMonitoringConfig )
1545+ componentOtelCfg , err = translate .GetOtelConfig (model , c .agentInfo , c .monitorMgr .ComponentMonitoringConfig )
14731546 if err != nil {
14741547 c .logger .Errorf ("failed to generate otel config: %v" , err )
14751548 }
0 commit comments