Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -490,8 +490,7 @@ func (b *BeatsMonitor) injectLogsInput(cfg map[string]interface{}, componentInfo
"streams": streams,
}

// Make sure we don't set anything until the configuration is stable if the otel manager isn't enabled
if b.config.C.RuntimeManager != monitoringCfg.DefaultRuntimeManager {
if b.config.C.RuntimeManager == monitoringCfg.OtelRuntimeManager {
input["_runtime_experimental"] = b.config.C.RuntimeManager
}

Expand Down Expand Up @@ -582,7 +581,7 @@ func (b *BeatsMonitor) injectMetricsInput(
}

// Make sure we don't set anything until the configuration is stable if the otel manager isn't enabled
if b.config.C.RuntimeManager != monitoringCfg.DefaultRuntimeManager {
if b.config.C.RuntimeManager == monitoringCfg.OtelRuntimeManager {
for _, input := range inputs {
inputMap := input.(map[string]interface{})
inputMap["_runtime_experimental"] = b.config.C.RuntimeManager
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func TestMonitoringFull(t *testing.T) {
{
Name: "Default runtime manager",
RuntimeManager: monitoringcfg.DefaultRuntimeManager,
ExpectedConfigPath: filepath.Join(".", "testdata", "monitoring_config_full_process.yaml"),
ExpectedConfigPath: filepath.Join(".", "testdata", "monitoring_config_full_otel.yaml"),
},
{
Name: "Process runtime manager",
Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/core/monitoring/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ const (
DefaultHost = "localhost"
ProcessRuntimeManager = "process"
OtelRuntimeManager = "otel"
DefaultRuntimeManager = ProcessRuntimeManager
DefaultRuntimeManager = OtelRuntimeManager
)

// MonitoringConfig describes a configuration of a monitoring
Expand Down
1 change: 1 addition & 0 deletions internal/pkg/otel/manager/execution_embedded.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ func (r *embeddedExecution) startCollector(ctx context.Context, logger *logger.L
runErr := svc.Run(collectorCtx)
close(ctl.collectorDoneCh)
reportErr(ctx, errCh, runErr)
reportCollectorStatus(ctx, statusCh, nil)
}()
return ctl, nil
}
Expand Down
8 changes: 6 additions & 2 deletions internal/pkg/otel/manager/execution_subprocess.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func (r *subprocessExecution) startCollector(ctx context.Context, logger *logger
if err != nil {
switch {
case errors.Is(err, context.Canceled):
r.reportSubprocessCollectorStatus(ctx, statusCh, aggregateStatus(componentstatus.StatusStopped, nil))
r.reportSubprocessCollectorStatus(ctx, statusCh, nil)
return
}
} else {
Expand All @@ -182,7 +182,7 @@ func (r *subprocessExecution) startCollector(ctx context.Context, logger *logger

select {
case <-procCtx.Done():
r.reportSubprocessCollectorStatus(ctx, statusCh, aggregateStatus(componentstatus.StatusStopped, nil))
r.reportSubprocessCollectorStatus(ctx, statusCh, nil)
return
case <-healthCheckPollTimer.C:
healthCheckPollTimer.Reset(healthCheckPollDuration)
Expand Down Expand Up @@ -225,6 +225,10 @@ func (r *subprocessExecution) startCollector(ctx context.Context, logger *logger

// cloneCollectorStatus creates a deep copy of the provided AggregateStatus.
func cloneCollectorStatus(aStatus *status.AggregateStatus) *status.AggregateStatus {
if aStatus == nil {
return nil
}

st := &status.AggregateStatus{
Event: aStatus.Event,
}
Expand Down
4 changes: 2 additions & 2 deletions internal/pkg/otel/manager/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func NewAgentStatusFactory(statusCh chan *status.AggregateStatus) extension.Fact

// Start implements the component.Component interface.
func (as *AgentStatusExtension) Start(ctx context.Context, host component.Host) error {
as.telemetry.Logger.Debug("Starting agent status extension")
as.telemetry.Logger.Debug("Starting agent collectorStatus extension")
as.host = host
return nil
}
Expand Down Expand Up @@ -104,7 +104,7 @@ func (as *AgentStatusExtension) ComponentStatusChanged(
source *componentstatus.InstanceID,
event *componentstatus.Event,
) {
// this extension is always force loaded and not by the user, so status
// this extension is always force loaded and not by the user, so collectorStatus
// information should be hidden as they didn't directly enable it
if source.ComponentID().String() == AgentStatusExtensionType.String() {
return
Expand Down
44 changes: 4 additions & 40 deletions internal/pkg/otel/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func NewOTelManager(
beatMonitoringConfigGetter: beatMonitoringConfigGetter,
errCh: make(chan error, 1), // holds at most one error
collectorStatusCh: make(chan *status.AggregateStatus, 1),
componentStateCh: make(chan []runtime.ComponentComponentState, 1),
componentStateCh: make(chan []runtime.ComponentComponentState),
updateCh: make(chan configUpdate, 1),
doneChan: make(chan struct{}),
execution: exec,
Expand Down Expand Up @@ -241,10 +241,6 @@ func (m *OTelManager) Run(ctx context.Context) error {
if m.proc != nil {
m.proc.Stop(m.stopTimeout)
m.proc = nil
updateErr := m.reportOtelStatusUpdate(ctx, nil)
if updateErr != nil {
reportErr(ctx, m.errCh, updateErr)
}
}

if m.mergedCollectorCfg == nil {
Expand Down Expand Up @@ -284,12 +280,6 @@ func (m *OTelManager) Run(ctx context.Context) error {
if m.proc != nil {
m.proc.Stop(m.stopTimeout)
m.proc = nil
// don't wait here for <-collectorRunErr, already occurred
// clear status, no longer running
updateErr := m.reportOtelStatusUpdate(ctx, nil)
if updateErr != nil {
err = errors.Join(err, updateErr)
}
}
// pass the error to the errCh so the coordinator, unless it's a cancel error
if !errors.Is(err, context.Canceled) {
Expand Down Expand Up @@ -435,24 +425,6 @@ func (m *OTelManager) applyMergedConfig(ctx context.Context, collectorStatusCh c
// our caller ctx is Done
return ctx.Err()
}
// drain the internal status update channel
// this status handling is normally done in the main loop, but in this case we want to ensure that we emit a
// nil status after the collector has stopped
select {
case statusCh := <-collectorStatusCh:
updateErr := m.reportOtelStatusUpdate(ctx, statusCh)
if updateErr != nil {
m.logger.Error("failed to update otel status", zap.Error(updateErr))
}
case <-ctx.Done():
// our caller ctx is Done
return ctx.Err()
default:
}
err := m.reportOtelStatusUpdate(ctx, nil)
if err != nil {
return err
}
}

if m.mergedCollectorCfg == nil {
Expand Down Expand Up @@ -625,18 +597,10 @@ func (m *OTelManager) maybeUpdateMergedConfig(mergedCfg *confmap.Conf) (updated
return !bytes.Equal(mergedCfgHash, previousConfigHash) || err != nil, err
}

// reportComponentStateUpdates sends component state updates to the component watch channel. It first drains
// the channel to ensure that only the most recent status is kept, as intermediate statuses can be safely discarded.
// This ensures the receiver always observes the latest reported status.
// reportComponentStateUpdates sends component state updates to the component watch channel. It is synchronous and
// blocking - the update must be received before this function returns. We are not allowed to drop older updates
// in favor of newer ones here, as the coordinator expected incremental updates.
func (m *OTelManager) reportComponentStateUpdates(ctx context.Context, componentUpdates []runtime.ComponentComponentState) {
select {
case <-ctx.Done():
// context is already done
return
case <-m.componentStateCh:
// drain the channel first
default:
}
select {
case m.componentStateCh <- componentUpdates:
case <-ctx.Done():
Expand Down
Loading
Loading