Skip to content

Commit 14a5997

Browse files
authored
Translate beat receiver status to component state (#8273)
* Add otel status translation * Modify monitoring comparison integration test # Conflicts: # testing/integration/beat_receivers_test.go * Add coordinator test * Add more status unit tests * Make messages more compatible with beats processes * Check agent status in monitoring integration test * Improve translation error handling
1 parent 5238f5e commit 14a5997

File tree

9 files changed

+1192
-100
lines changed

9 files changed

+1192
-100
lines changed

internal/pkg/agent/application/coordinator/coordinator.go

Lines changed: 133 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import (
1313
"sync/atomic"
1414
"time"
1515

16-
"github.com/elastic/elastic-agent/internal/pkg/otel/configtranslate"
16+
"github.com/elastic/elastic-agent/internal/pkg/otel/translate"
1717

1818
"go.opentelemetry.io/collector/component/componentstatus"
1919

@@ -349,7 +349,7 @@ type managerChans struct {
349349
varsManagerUpdate <-chan []*transpiler.Vars
350350
varsManagerError <-chan error
351351

352-
otelManagerUpdate <-chan *status.AggregateStatus
352+
otelManagerUpdate chan *status.AggregateStatus
353353
otelManagerError <-chan error
354354

355355
upgradeMarkerUpdate <-chan upgrade.UpdateMarker
@@ -463,7 +463,9 @@ func New(logger *logger.Logger, cfg *configuration.Configuration, logLevel logp.
463463
c.managerChans.varsManagerError = varsMgr.Errors()
464464
}
465465
if otelMgr != nil {
466-
c.managerChans.otelManagerUpdate = otelMgr.Watch()
466+
// The otel manager sends updates to the watchRuntimeComponents function, which extracts component status
467+
// and forwards the rest to this channel.
468+
c.managerChans.otelManagerUpdate = make(chan *status.AggregateStatus)
467469
c.managerChans.otelManagerError = otelMgr.Errors()
468470
}
469471
if upgradeMgr != nil && upgradeMgr.MarkerWatcher() != nil {
@@ -656,77 +658,148 @@ func (c *Coordinator) watchRuntimeComponents(ctx context.Context) {
656658
state := make(map[string]runtime.ComponentState)
657659

658660
var subChan <-chan runtime.ComponentComponentState
661+
var otelChan <-chan *status.AggregateStatus
659662
// A real Coordinator will always have a runtime manager, but unit tests
660663
// may not initialize all managers -- in that case we leave subChan nil,
661664
// and just idle until Coordinator shuts down.
662665
if c.runtimeMgr != nil {
663666
subChan = c.runtimeMgr.SubscribeAll(ctx).Ch()
664667
}
668+
if c.otelMgr != nil {
669+
otelChan = c.otelMgr.Watch()
670+
}
665671
for {
666672
select {
667673
case <-ctx.Done():
668674
return
669-
case s := <-subChan:
670-
oldState, ok := state[s.Component.ID]
671-
if !ok {
672-
componentLog := coordinatorComponentLog{
673-
ID: s.Component.ID,
674-
State: s.State.State.String(),
675-
}
676-
logBasedOnState(c.logger, s.State.State, fmt.Sprintf("Spawned new component %s: %s", s.Component.ID, s.State.Message), "component", componentLog)
677-
for ui, us := range s.State.Units {
678-
unitLog := coordinatorUnitLog{
679-
ID: ui.UnitID,
680-
Type: ui.UnitType.String(),
681-
State: us.State.String(),
682-
}
683-
logBasedOnState(c.logger, us.State, fmt.Sprintf("Spawned new unit %s: %s", ui.UnitID, us.Message), "component", componentLog, "unit", unitLog)
684-
}
685-
} else {
686-
componentLog := coordinatorComponentLog{
687-
ID: s.Component.ID,
688-
State: s.State.State.String(),
689-
}
690-
if oldState.State != s.State.State {
691-
cl := coordinatorComponentLog{
692-
ID: s.Component.ID,
693-
State: s.State.State.String(),
694-
OldState: oldState.State.String(),
695-
}
696-
logBasedOnState(c.logger, s.State.State, fmt.Sprintf("Component state changed %s (%s->%s): %s", s.Component.ID, oldState.State.String(), s.State.State.String(), s.State.Message), "component", cl)
697-
}
698-
for ui, us := range s.State.Units {
699-
oldUS, ok := oldState.Units[ui]
700-
if !ok {
701-
unitLog := coordinatorUnitLog{
702-
ID: ui.UnitID,
703-
Type: ui.UnitType.String(),
704-
State: us.State.String(),
705-
}
706-
logBasedOnState(c.logger, us.State, fmt.Sprintf("Spawned new unit %s: %s", ui.UnitID, us.Message), "component", componentLog, "unit", unitLog)
707-
} else if oldUS.State != us.State {
708-
unitLog := coordinatorUnitLog{
709-
ID: ui.UnitID,
710-
Type: ui.UnitType.String(),
711-
State: us.State.String(),
712-
OldState: oldUS.State.String(),
713-
}
714-
logBasedOnState(c.logger, us.State, fmt.Sprintf("Unit state changed %s (%s->%s): %s", ui.UnitID, oldUS.State.String(), us.State.String(), us.Message), "component", componentLog, "unit", unitLog)
715-
}
716-
}
717-
}
718-
state[s.Component.ID] = s.State
719-
if s.State.State == client.UnitStateStopped {
720-
delete(state, s.Component.ID)
721-
}
675+
case componentState := <-subChan:
676+
logComponentStateChange(c.logger, state, &componentState)
722677
// Forward the final changes back to Coordinator, unless our context
723678
// has ended.
724679
select {
725-
case c.managerChans.runtimeManagerUpdate <- s:
680+
case c.managerChans.runtimeManagerUpdate <- componentState:
726681
case <-ctx.Done():
727682
return
728683
}
684+
case otelStatus := <-otelChan:
685+
// We don't break on errors here, because we want to forward the status
686+
// even if there was an error, and the rest of the code gracefully handles componentStates being nil
687+
componentStates, err := translate.GetAllComponentStates(otelStatus, c.componentModel)
688+
if err != nil {
689+
c.setOTelError(err)
690+
}
691+
err = translate.DropComponentStateFromOtelStatus(otelStatus)
692+
if err != nil {
693+
c.setOTelError(err)
694+
}
695+
696+
// forward the remaining otel status
697+
// TODO: Implement subscriptions for otel manager status to avoid the need for this
698+
select {
699+
case c.managerChans.otelManagerUpdate <- otelStatus:
700+
case <-ctx.Done():
701+
return
702+
}
703+
704+
// drop component states which don't exist in the configuration anymore
705+
// we need to do this because we aren't guaranteed to receive a STOPPED state when the component is removed
706+
componentIds := make(map[string]bool)
707+
for _, componentState := range componentStates {
708+
componentIds[componentState.Component.ID] = true
709+
}
710+
for id := range state {
711+
if _, ok := componentIds[id]; !ok {
712+
// this component is not in the configuration anymore, emit a fake STOPPED state
713+
componentStates = append(componentStates, runtime.ComponentComponentState{
714+
Component: component.Component{
715+
ID: id,
716+
},
717+
State: runtime.ComponentState{
718+
State: client.UnitStateStopped,
719+
},
720+
})
721+
}
722+
}
723+
// now handle the component states
724+
for _, componentState := range componentStates {
725+
logComponentStateChange(c.logger, state, &componentState)
726+
// Forward the final changes back to Coordinator, unless our context
727+
// has ended.
728+
select {
729+
case c.managerChans.runtimeManagerUpdate <- componentState:
730+
case <-ctx.Done():
731+
return
732+
}
733+
}
734+
}
735+
}
736+
}
737+
738+
// logComponentStateChange emits a log message based on the new component state.
739+
func logComponentStateChange(
740+
logger *logger.Logger,
741+
coordinatorState map[string]runtime.ComponentState,
742+
componentState *runtime.ComponentComponentState) {
743+
oldState, ok := coordinatorState[componentState.Component.ID]
744+
if !ok {
745+
componentLog := coordinatorComponentLog{
746+
ID: componentState.Component.ID,
747+
State: componentState.State.State.String(),
729748
}
749+
logMessage := fmt.Sprintf("Spawned new component %s: %s",
750+
componentState.Component.ID,
751+
componentState.State.Message)
752+
logBasedOnState(logger, componentState.State.State, logMessage, "component", componentLog)
753+
for ui, us := range componentState.State.Units {
754+
unitLog := coordinatorUnitLog{
755+
ID: ui.UnitID,
756+
Type: ui.UnitType.String(),
757+
State: us.State.String(),
758+
}
759+
unitLogMessage := fmt.Sprintf("Spawned new unit %s: %s", ui.UnitID, us.Message)
760+
logBasedOnState(logger, us.State, unitLogMessage, "component", componentLog, "unit", unitLog)
761+
}
762+
} else {
763+
componentLog := coordinatorComponentLog{
764+
ID: componentState.Component.ID,
765+
State: componentState.State.State.String(),
766+
}
767+
if oldState.State != componentState.State.State {
768+
cl := coordinatorComponentLog{
769+
ID: componentState.Component.ID,
770+
State: componentState.State.State.String(),
771+
OldState: oldState.State.String(),
772+
}
773+
logBasedOnState(logger, componentState.State.State,
774+
fmt.Sprintf("Component state changed %s (%s->%s): %s",
775+
componentState.Component.ID, oldState.State.String(),
776+
componentState.State.State.String(),
777+
componentState.State.Message),
778+
"component", cl)
779+
}
780+
for ui, us := range componentState.State.Units {
781+
oldUS, ok := oldState.Units[ui]
782+
if !ok {
783+
unitLog := coordinatorUnitLog{
784+
ID: ui.UnitID,
785+
Type: ui.UnitType.String(),
786+
State: us.State.String(),
787+
}
788+
logBasedOnState(logger, us.State, fmt.Sprintf("Spawned new unit %s: %s", ui.UnitID, us.Message), "component", componentLog, "unit", unitLog)
789+
} else if oldUS.State != us.State {
790+
unitLog := coordinatorUnitLog{
791+
ID: ui.UnitID,
792+
Type: ui.UnitType.String(),
793+
State: us.State.String(),
794+
OldState: oldUS.State.String(),
795+
}
796+
logBasedOnState(logger, us.State, fmt.Sprintf("Unit state changed %s (%s->%s): %s", ui.UnitID, oldUS.State.String(), us.State.String(), us.Message), "component", componentLog, "unit", unitLog)
797+
}
798+
}
799+
}
800+
coordinatorState[componentState.Component.ID] = componentState.State
801+
if componentState.State.State == client.UnitStateStopped {
802+
delete(coordinatorState, componentState.Component.ID)
730803
}
731804
}
732805

@@ -1231,8 +1304,8 @@ func (c *Coordinator) runLoopIteration(ctx context.Context) {
12311304
c.processVars(ctx, vars)
12321305
}
12331306

1234-
case collector := <-c.managerChans.otelManagerUpdate:
1235-
c.state.Collector = collector
1307+
case collectorStatus := <-c.managerChans.otelManagerUpdate:
1308+
c.state.Collector = collectorStatus
12361309
c.stateNeedsRefresh = true
12371310

12381311
case ll := <-c.logLevelCh:
@@ -1469,7 +1542,7 @@ func (c *Coordinator) updateOtelManagerConfig(model *component.Model) error {
14691542
if len(model.Components) > 0 {
14701543
var err error
14711544
c.logger.With("components", model.Components).Debug("Updating otel manager model")
1472-
componentOtelCfg, err = configtranslate.GetOtelConfig(model, c.agentInfo, c.monitorMgr.ComponentMonitoringConfig)
1545+
componentOtelCfg, err = translate.GetOtelConfig(model, c.agentInfo, c.monitorMgr.ComponentMonitoringConfig)
14731546
if err != nil {
14741547
c.logger.Errorf("failed to generate otel config: %v", err)
14751548
}

internal/pkg/agent/application/coordinator/coordinator_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1342,6 +1342,7 @@ type fakeOTelManager struct {
13421342
updateCallback func(*confmap.Conf) error
13431343
result error
13441344
errChan chan error
1345+
statusChan chan *status.AggregateStatus
13451346
}
13461347

13471348
func (f *fakeOTelManager) Run(ctx context.Context) error {
@@ -1365,7 +1366,7 @@ func (f *fakeOTelManager) Update(cfg *confmap.Conf) {
13651366
}
13661367

13671368
func (f *fakeOTelManager) Watch() <-chan *status.AggregateStatus {
1368-
return nil
1369+
return f.statusChan
13691370
}
13701371

13711372
// An implementation of the RuntimeManager interface for use in testing.

0 commit comments

Comments
 (0)