diff --git a/internal/pkg/agent/application/monitoring/component/v1_monitor.go b/internal/pkg/agent/application/monitoring/component/v1_monitor.go index 0ef6f5bb923..b46ed1af9f0 100644 --- a/internal/pkg/agent/application/monitoring/component/v1_monitor.go +++ b/internal/pkg/agent/application/monitoring/component/v1_monitor.go @@ -490,8 +490,7 @@ func (b *BeatsMonitor) injectLogsInput(cfg map[string]interface{}, componentInfo "streams": streams, } - // Make sure we don't set anything until the configuration is stable if the otel manager isn't enabled - if b.config.C.RuntimeManager != monitoringCfg.DefaultRuntimeManager { + if b.config.C.RuntimeManager == monitoringCfg.OtelRuntimeManager { input["_runtime_experimental"] = b.config.C.RuntimeManager } @@ -582,7 +581,7 @@ func (b *BeatsMonitor) injectMetricsInput( } // Make sure we don't set anything until the configuration is stable if the otel manager isn't enabled - if b.config.C.RuntimeManager != monitoringCfg.DefaultRuntimeManager { + if b.config.C.RuntimeManager == monitoringCfg.OtelRuntimeManager { for _, input := range inputs { inputMap := input.(map[string]interface{}) inputMap["_runtime_experimental"] = b.config.C.RuntimeManager diff --git a/internal/pkg/agent/application/monitoring/component/v1_monitor_test.go b/internal/pkg/agent/application/monitoring/component/v1_monitor_test.go index efcf104ba05..caf1f7391e5 100644 --- a/internal/pkg/agent/application/monitoring/component/v1_monitor_test.go +++ b/internal/pkg/agent/application/monitoring/component/v1_monitor_test.go @@ -93,7 +93,7 @@ func TestMonitoringFull(t *testing.T) { { Name: "Default runtime manager", RuntimeManager: monitoringcfg.DefaultRuntimeManager, - ExpectedConfigPath: filepath.Join(".", "testdata", "monitoring_config_full_process.yaml"), + ExpectedConfigPath: filepath.Join(".", "testdata", "monitoring_config_full_otel.yaml"), }, { Name: "Process runtime manager", diff --git a/internal/pkg/core/monitoring/config/config.go b/internal/pkg/core/monitoring/config/config.go index cb0e19987d1..13a141de8b3 100644 --- a/internal/pkg/core/monitoring/config/config.go +++ b/internal/pkg/core/monitoring/config/config.go @@ -19,7 +19,7 @@ const ( DefaultHost = "localhost" ProcessRuntimeManager = "process" OtelRuntimeManager = "otel" - DefaultRuntimeManager = ProcessRuntimeManager + DefaultRuntimeManager = OtelRuntimeManager ) // MonitoringConfig describes a configuration of a monitoring diff --git a/internal/pkg/otel/manager/execution_embedded.go b/internal/pkg/otel/manager/execution_embedded.go index 02f02e1eca1..7f1694274de 100644 --- a/internal/pkg/otel/manager/execution_embedded.go +++ b/internal/pkg/otel/manager/execution_embedded.go @@ -85,6 +85,7 @@ func (r *embeddedExecution) startCollector(ctx context.Context, logger *logger.L runErr := svc.Run(collectorCtx) close(ctl.collectorDoneCh) reportErr(ctx, errCh, runErr) + reportCollectorStatus(ctx, statusCh, nil) }() return ctl, nil } diff --git a/internal/pkg/otel/manager/execution_subprocess.go b/internal/pkg/otel/manager/execution_subprocess.go index b567cc2b122..a9438c4b353 100644 --- a/internal/pkg/otel/manager/execution_subprocess.go +++ b/internal/pkg/otel/manager/execution_subprocess.go @@ -168,7 +168,7 @@ func (r *subprocessExecution) startCollector(ctx context.Context, logger *logger if err != nil { switch { case errors.Is(err, context.Canceled): - r.reportSubprocessCollectorStatus(ctx, statusCh, aggregateStatus(componentstatus.StatusStopped, nil)) + r.reportSubprocessCollectorStatus(ctx, statusCh, nil) return } } else { @@ -182,7 +182,7 @@ func (r *subprocessExecution) startCollector(ctx context.Context, logger *logger select { case <-procCtx.Done(): - r.reportSubprocessCollectorStatus(ctx, statusCh, aggregateStatus(componentstatus.StatusStopped, nil)) + r.reportSubprocessCollectorStatus(ctx, statusCh, nil) return case <-healthCheckPollTimer.C: healthCheckPollTimer.Reset(healthCheckPollDuration) @@ -225,6 +225,10 @@ func (r *subprocessExecution) startCollector(ctx context.Context, logger *logger // cloneCollectorStatus creates a deep copy of the provided AggregateStatus. func cloneCollectorStatus(aStatus *status.AggregateStatus) *status.AggregateStatus { + if aStatus == nil { + return nil + } + st := &status.AggregateStatus{ Event: aStatus.Event, } diff --git a/internal/pkg/otel/manager/extension.go b/internal/pkg/otel/manager/extension.go index 560282ea4f1..5fec40e0c49 100644 --- a/internal/pkg/otel/manager/extension.go +++ b/internal/pkg/otel/manager/extension.go @@ -75,7 +75,7 @@ func NewAgentStatusFactory(statusCh chan *status.AggregateStatus) extension.Fact // Start implements the component.Component interface. func (as *AgentStatusExtension) Start(ctx context.Context, host component.Host) error { - as.telemetry.Logger.Debug("Starting agent status extension") + as.telemetry.Logger.Debug("Starting agent collectorStatus extension") as.host = host return nil } @@ -104,7 +104,7 @@ func (as *AgentStatusExtension) ComponentStatusChanged( source *componentstatus.InstanceID, event *componentstatus.Event, ) { - // this extension is always force loaded and not by the user, so status + // this extension is always force loaded and not by the user, so collectorStatus // information should be hidden as they didn't directly enable it if source.ComponentID().String() == AgentStatusExtensionType.String() { return diff --git a/internal/pkg/otel/manager/manager.go b/internal/pkg/otel/manager/manager.go index 2dd00d0b13c..df58e01f674 100644 --- a/internal/pkg/otel/manager/manager.go +++ b/internal/pkg/otel/manager/manager.go @@ -181,7 +181,7 @@ func NewOTelManager( beatMonitoringConfigGetter: beatMonitoringConfigGetter, errCh: make(chan error, 1), // holds at most one error collectorStatusCh: make(chan *status.AggregateStatus, 1), - componentStateCh: make(chan []runtime.ComponentComponentState, 1), + componentStateCh: make(chan []runtime.ComponentComponentState), updateCh: make(chan configUpdate, 1), doneChan: make(chan struct{}), execution: exec, @@ -241,10 +241,6 @@ func (m *OTelManager) Run(ctx context.Context) error { if m.proc != nil { m.proc.Stop(m.stopTimeout) m.proc = nil - updateErr := m.reportOtelStatusUpdate(ctx, nil) - if updateErr != nil { - reportErr(ctx, m.errCh, updateErr) - } } if m.mergedCollectorCfg == nil { @@ -284,12 +280,6 @@ func (m *OTelManager) Run(ctx context.Context) error { if m.proc != nil { m.proc.Stop(m.stopTimeout) m.proc = nil - // don't wait here for <-collectorRunErr, already occurred - // clear status, no longer running - updateErr := m.reportOtelStatusUpdate(ctx, nil) - if updateErr != nil { - err = errors.Join(err, updateErr) - } } // pass the error to the errCh so the coordinator, unless it's a cancel error if !errors.Is(err, context.Canceled) { @@ -435,24 +425,6 @@ func (m *OTelManager) applyMergedConfig(ctx context.Context, collectorStatusCh c // our caller ctx is Done return ctx.Err() } - // drain the internal status update channel - // this status handling is normally done in the main loop, but in this case we want to ensure that we emit a - // nil status after the collector has stopped - select { - case statusCh := <-collectorStatusCh: - updateErr := m.reportOtelStatusUpdate(ctx, statusCh) - if updateErr != nil { - m.logger.Error("failed to update otel status", zap.Error(updateErr)) - } - case <-ctx.Done(): - // our caller ctx is Done - return ctx.Err() - default: - } - err := m.reportOtelStatusUpdate(ctx, nil) - if err != nil { - return err - } } if m.mergedCollectorCfg == nil { @@ -625,18 +597,10 @@ func (m *OTelManager) maybeUpdateMergedConfig(mergedCfg *confmap.Conf) (updated return !bytes.Equal(mergedCfgHash, previousConfigHash) || err != nil, err } -// reportComponentStateUpdates sends component state updates to the component watch channel. It first drains -// the channel to ensure that only the most recent status is kept, as intermediate statuses can be safely discarded. -// This ensures the receiver always observes the latest reported status. +// reportComponentStateUpdates sends component state updates to the component watch channel. It is synchronous and +// blocking - the update must be received before this function returns. We are not allowed to drop older updates +// in favor of newer ones here, as the coordinator expected incremental updates. func (m *OTelManager) reportComponentStateUpdates(ctx context.Context, componentUpdates []runtime.ComponentComponentState) { - select { - case <-ctx.Done(): - // context is already done - return - case <-m.componentStateCh: - // drain the channel first - default: - } select { case m.componentStateCh <- componentUpdates: case <-ctx.Done(): diff --git a/internal/pkg/otel/manager/manager_test.go b/internal/pkg/otel/manager/manager_test.go index dcf12430823..d66ed3d818e 100644 --- a/internal/pkg/otel/manager/manager_test.go +++ b/internal/pkg/otel/manager/manager_test.go @@ -132,12 +132,15 @@ func (e *mockExecution) startCollector( <-collectorCtx.Done() close(stopCh) reportErr(ctx, errCh, nil) + reportCollectorStatus(ctx, statusCh, nil) }() handle := &mockCollectorHandle{ stopCh: stopCh, cancel: collectorCancel, } - e.collectorStarted <- struct{}{} + if e.collectorStarted != nil { + e.collectorStarted <- struct{}{} + } return handle, nil } @@ -152,33 +155,42 @@ func (h *mockCollectorHandle) Stop(waitTime time.Duration) { h.cancel() select { case <-time.After(waitTime): - return case <-h.stopCh: } } // EventListener listens to the events from the OTelManager and stores the latest error and status. type EventListener struct { - mtx sync.Mutex - err *EventTime[error] - status *EventTime[*status.AggregateStatus] + mtx sync.Mutex + err *EventTime[error] + collectorStatus *EventTime[*status.AggregateStatus] + componentStates *EventTime[[]runtime.ComponentComponentState] } // Listen starts listening to the error and status channels. It updates the latest error and status in the // EventListener. -func (e *EventListener) Listen(ctx context.Context, errorCh <-chan error, statusCh <-chan *status.AggregateStatus) { +func (e *EventListener) Listen( + ctx context.Context, + errorCh <-chan error, + collectorStatusCh <-chan *status.AggregateStatus, + componentStateCh <-chan []runtime.ComponentComponentState, +) { for { select { case <-ctx.Done(): return - case c := <-statusCh: + case c := <-collectorStatusCh: e.mtx.Lock() - e.status = &EventTime[*status.AggregateStatus]{val: c, time: time.Now()} + e.collectorStatus = &EventTime[*status.AggregateStatus]{val: c, time: time.Now()} e.mtx.Unlock() case c := <-errorCh: e.mtx.Lock() e.err = &EventTime[error]{val: c, time: time.Now()} e.mtx.Unlock() + case componentStates := <-componentStateCh: + e.mtx.Lock() + e.componentStates = &EventTime[[]runtime.ComponentComponentState]{val: componentStates, time: time.Now()} + e.mtx.Unlock() } } } @@ -190,11 +202,11 @@ func (e *EventListener) getError() error { return e.err.Value() } -// getStatus retrieves the latest status from the EventListener. -func (e *EventListener) getStatus() *status.AggregateStatus { +// getCollectorStatus retrieves the latest collector status from the EventListener. +func (e *EventListener) getCollectorStatus() *status.AggregateStatus { e.mtx.Lock() defer e.mtx.Unlock() - return e.status.Value() + return e.collectorStatus.Value() } // EnsureHealthy ensures that the OTelManager is healthy by checking the latest error and status. @@ -202,7 +214,7 @@ func (e *EventListener) EnsureHealthy(t *testing.T, u time.Time) { assert.EventuallyWithT(t, func(collect *assert.CollectT) { e.mtx.Lock() latestErr := e.err - latestStatus := e.status + latestStatus := e.collectorStatus e.mtx.Unlock() // we expect to have a reported error which is nil and a reported status which is StatusOK @@ -221,7 +233,7 @@ func (e *EventListener) EnsureOffWithoutError(t *testing.T, u time.Time) { require.EventuallyWithT(t, func(collect *assert.CollectT) { e.mtx.Lock() latestErr := e.err - latestStatus := e.status + latestStatus := e.collectorStatus e.mtx.Unlock() // we expect to have a reported error which is nil and a reported status which is nil @@ -239,7 +251,7 @@ func (e *EventListener) EnsureOffWithError(t *testing.T, u time.Time) { require.EventuallyWithT(t, func(collect *assert.CollectT) { e.mtx.Lock() latestErr := e.err - latestStatus := e.status + latestStatus := e.collectorStatus e.mtx.Unlock() // we expect to have a reported error which is not nil and a reported status which is nil @@ -409,7 +421,7 @@ func TestOTelManager_Run(t *testing.T) { require.NotNil(t, execHandle, "execModeFn handle should not be nil") execHandle.Stop(waitTimeForStop) e.EnsureHealthy(t, updateTime) - assert.EqualValues(t, 0, countHealthCheckExtensionStatuses(e.getStatus()), "health check extension status count should be 0") + assert.EqualValues(t, 0, countHealthCheckExtensionStatuses(e.getCollectorStatus()), "health check extension status count should be 0") // no configuration should stop the runner updateTime = time.Now() @@ -431,7 +443,7 @@ func TestOTelManager_Run(t *testing.T) { updateTime := time.Now() m.Update(cfg, nil) e.EnsureHealthy(t, updateTime) - assert.EqualValues(t, 0, countHealthCheckExtensionStatuses(e.getStatus()), "health check extension status count should be 0") + assert.EqualValues(t, 0, countHealthCheckExtensionStatuses(e.getCollectorStatus()), "health check extension status count should be 0") var oldPHandle *procHandle // repeatedly kill the collector @@ -449,7 +461,7 @@ func TestOTelManager_Run(t *testing.T) { // the collector should restart and report healthy updateTime = time.Now() e.EnsureHealthy(t, updateTime) - assert.EqualValues(t, 0, countHealthCheckExtensionStatuses(e.getStatus()), "health check extension status count should be 0") + assert.EqualValues(t, 0, countHealthCheckExtensionStatuses(e.getCollectorStatus()), "health check extension status count should be 0") } seenRecoveredTimes := m.recoveryRetries.Load() @@ -636,7 +648,7 @@ func TestOTelManager_Run(t *testing.T) { m.Update(cfg, nil) e.EnsureHealthy(t, updateTime) - assert.EqualValues(t, 1, countHealthCheckExtensionStatuses(e.getStatus()), "health check extension status count should be 1") + assert.EqualValues(t, 1, countHealthCheckExtensionStatuses(e.getCollectorStatus()), "health check extension status count should be 1") }, }, { @@ -752,7 +764,7 @@ func TestOTelManager_Run(t *testing.T) { return } t.Logf("latest received err: %s", eListener.getError()) - t.Logf("latest received status: %s", statusToYaml(eListener.getStatus())) + t.Logf("latest received status: %s", statusToYaml(eListener.getCollectorStatus())) for _, entry := range obs.All() { t.Logf("%+v", entry) } @@ -763,9 +775,9 @@ func TestOTelManager_Run(t *testing.T) { go func() { defer runWg.Done() if !tc.skipListeningErrors { - eListener.Listen(ctx, m.Errors(), m.WatchCollector()) + eListener.Listen(ctx, m.Errors(), m.WatchCollector(), m.WatchComponents()) } else { - eListener.Listen(ctx, nil, m.WatchCollector()) + eListener.Listen(ctx, nil, m.WatchCollector(), m.WatchComponents()) } }() @@ -940,6 +952,7 @@ func TestOTelManager_Ports(t *testing.T) { select { case colErr := <-m.Errors(): require.NoError(t, colErr, "otel manager should not return errors") + case <-m.WatchComponents(): // ensure we receive component updates case <-ctx.Done(): return } @@ -1055,6 +1068,17 @@ func TestOTelManager_PortConflict(t *testing.T) { assert.ErrorIs(t, err, context.Canceled, "otel manager should be cancelled") }() + go func() { + for { + select { + case <-m.Errors(): + case <-m.WatchComponents(): // ensure we receive component updates + case <-ctx.Done(): + return + } + } + }() + cfg := confmap.NewFromStringMap(testConfig) cfg.Delete("service::telemetry::metrics::level") // change this to default @@ -1462,9 +1486,7 @@ func TestOTelManagerEndToEnd(t *testing.T) { beatMonitoringConfigGetter := mockBeatMonitoringConfigGetter collectorStarted := make(chan struct{}) - execution := &mockExecution{ - collectorStarted: collectorStarted, - } + execution := &mockExecution{} // Create manager with test dependencies mgr := OTelManager{ @@ -1491,6 +1513,9 @@ func TestOTelManagerEndToEnd(t *testing.T) { assert.ErrorIs(t, err, context.Canceled) }() + eListener := &EventListener{} + eListener.Listen(ctx, mgr.Errors(), mgr.WatchCollector(), mgr.WatchComponents()) + collectorCfg := confmap.NewFromStringMap(map[string]interface{}{ "receivers": map[string]interface{}{ "nop": map[string]interface{}{}, @@ -1510,17 +1535,13 @@ func TestOTelManagerEndToEnd(t *testing.T) { components := []component.Component{testComp} t.Run("collector config is passed down to the collector execution", func(t *testing.T) { + updateTime := time.Now() mgr.Update(collectorCfg, nil) - select { - case <-collectorStarted: - case <-ctx.Done(): - t.Fatal("timeout waiting for collector config update") - } + eListener.EnsureHealthy(t, updateTime) expectedCfg := confmap.NewFromStringMap(collectorCfg.ToStringMap()) assert.NoError(t, injectDiagnosticsExtension(expectedCfg)) assert.NoError(t, addCollectorMetricsReader(expectedCfg)) assert.Equal(t, expectedCfg, execution.cfg) - }) t.Run("collector status is passed up to the component manager", func(t *testing.T) { @@ -1534,18 +1555,15 @@ func TestOTelManagerEndToEnd(t *testing.T) { case execution.statusCh <- otelStatus: } - collectorStatus, err := getFromChannelOrErrorWithContext(t, ctx, mgr.WatchCollector(), mgr.Errors()) - require.NoError(t, err) - assert.Equal(t, otelStatus, collectorStatus) + assert.EventuallyWithT(t, func(collect *assert.CollectT) { + assert.Equal(t, otelStatus, eListener.getCollectorStatus()) + }, 10*time.Second, 1*time.Second) }) t.Run("component config is passed down to the otel manager", func(t *testing.T) { + updateTime := time.Now() mgr.Update(collectorCfg, components) - select { - case <-collectorStarted: - case <-ctx.Done(): - t.Fatal("timeout waiting for collector config update") - } + eListener.EnsureHealthy(t, updateTime) cfg := execution.cfg require.NotNil(t, cfg) receivers, err := cfg.Sub("receivers") @@ -1553,10 +1571,6 @@ func TestOTelManagerEndToEnd(t *testing.T) { require.NotNil(t, receivers) assert.True(t, receivers.IsSet("nop")) assert.True(t, receivers.IsSet("filebeatreceiver/_agent-component/test")) - - collectorStatus, err := getFromChannelOrErrorWithContext(t, ctx, mgr.WatchCollector(), mgr.Errors()) - assert.Nil(t, err) - assert.Nil(t, collectorStatus) }) t.Run("empty collector config leaves the component config running", func(t *testing.T) {