Skip to content

Commit 5ec2966

Browse files
Make otel runtime component updates synchronous (#10675) (#10692)
* Make otel runtime component updates synchronous * Make the regression test runnable on main * Add comments on final status in collector executions * Add some more comments (cherry picked from commit f950f1a) Co-authored-by: Mikołaj Świątek <[email protected]>
1 parent 23b86be commit 5ec2966

File tree

5 files changed

+184
-87
lines changed

5 files changed

+184
-87
lines changed

internal/pkg/otel/manager/execution.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,11 @@ import (
1515
)
1616

1717
type collectorExecution interface {
18+
// startCollector starts the otel collector with the given arguments, returning a handle allowing it to be stopped.
19+
// Cancelling the context will stop all goroutines involved in the execution.
20+
// The collector will report status events in the statusCh channel and errors on errCh in a non-blocking fashion,
21+
// draining the channel before writing to it.
22+
// After the collector exits, it will emit an error describing the exit status (nil if successful) and a nil status.
1823
startCollector(ctx context.Context, logger *logger.Logger, cfg *confmap.Conf, errCh chan error, statusCh chan *status.AggregateStatus) (collectorHandle, error)
1924
}
2025

internal/pkg/otel/manager/execution_embedded.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,9 @@ func (r *embeddedExecution) startCollector(ctx context.Context, logger *logger.L
8787
}
8888
runErr := svc.Run(collectorCtx)
8989
close(ctl.collectorDoneCh)
90+
// after the collector exits, we need to report the error and a nil status
9091
reportErr(ctx, errCh, runErr)
92+
reportCollectorStatus(ctx, statusCh, nil)
9193
}()
9294
return ctl, nil
9395
}

internal/pkg/otel/manager/execution_subprocess.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,8 @@ func (r *subprocessExecution) startCollector(ctx context.Context, logger *logger
168168
if err != nil {
169169
switch {
170170
case errors.Is(err, context.Canceled):
171-
r.reportSubprocessCollectorStatus(ctx, statusCh, aggregateStatus(componentstatus.StatusStopped, nil))
171+
// after the collector exits, we need to report a nil status
172+
r.reportSubprocessCollectorStatus(ctx, statusCh, nil)
172173
return
173174
}
174175
} else {
@@ -182,7 +183,8 @@ func (r *subprocessExecution) startCollector(ctx context.Context, logger *logger
182183

183184
select {
184185
case <-procCtx.Done():
185-
r.reportSubprocessCollectorStatus(ctx, statusCh, aggregateStatus(componentstatus.StatusStopped, nil))
186+
// after the collector exits, we need to report a nil status
187+
r.reportSubprocessCollectorStatus(ctx, statusCh, nil)
186188
return
187189
case <-healthCheckPollTimer.C:
188190
healthCheckPollTimer.Reset(healthCheckPollDuration)
@@ -225,6 +227,10 @@ func (r *subprocessExecution) startCollector(ctx context.Context, logger *logger
225227

226228
// cloneCollectorStatus creates a deep copy of the provided AggregateStatus.
227229
func cloneCollectorStatus(aStatus *status.AggregateStatus) *status.AggregateStatus {
230+
if aStatus == nil {
231+
return nil
232+
}
233+
228234
st := &status.AggregateStatus{
229235
Event: aStatus.Event,
230236
}

internal/pkg/otel/manager/manager.go

Lines changed: 11 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ func NewOTelManager(
181181
beatMonitoringConfigGetter: beatMonitoringConfigGetter,
182182
errCh: make(chan error, 1), // holds at most one error
183183
collectorStatusCh: make(chan *status.AggregateStatus, 1),
184-
componentStateCh: make(chan []runtime.ComponentComponentState, 1),
184+
componentStateCh: make(chan []runtime.ComponentComponentState),
185185
updateCh: make(chan configUpdate, 1),
186186
doneChan: make(chan struct{}),
187187
execution: exec,
@@ -241,10 +241,6 @@ func (m *OTelManager) Run(ctx context.Context) error {
241241
if m.proc != nil {
242242
m.proc.Stop(m.stopTimeout)
243243
m.proc = nil
244-
updateErr := m.reportOtelStatusUpdate(ctx, nil)
245-
if updateErr != nil {
246-
reportErr(ctx, m.errCh, updateErr)
247-
}
248244
}
249245

250246
if m.mergedCollectorCfg == nil {
@@ -284,12 +280,6 @@ func (m *OTelManager) Run(ctx context.Context) error {
284280
if m.proc != nil {
285281
m.proc.Stop(m.stopTimeout)
286282
m.proc = nil
287-
// don't wait here for <-collectorRunErr, already occurred
288-
// clear status, no longer running
289-
updateErr := m.reportOtelStatusUpdate(ctx, nil)
290-
if updateErr != nil {
291-
err = errors.Join(err, updateErr)
292-
}
293283
}
294284
// pass the error to the errCh so the coordinator, unless it's a cancel error
295285
if !errors.Is(err, context.Canceled) {
@@ -429,30 +419,19 @@ func (m *OTelManager) applyMergedConfig(ctx context.Context, collectorStatusCh c
429419
if m.proc != nil {
430420
m.proc.Stop(m.stopTimeout)
431421
m.proc = nil
422+
// We wait here for the collector to exit before possibly starting a new one. The execution indicates this
423+
// by sending an error over the appropriate channel. It will also send a nil status that we'll either process
424+
// after exiting from this function and going back to the main loop, or it will be overridden by the status
425+
// from the newly started collector.
426+
// This is the only blocking wait inside the main loop involving channels, so we need to be extra careful not to
427+
// deadlock.
428+
// TODO: Verify if we need to wait for the error at all. Stop() is already blocking.
432429
select {
433430
case <-collectorRunErr:
434431
case <-ctx.Done():
435432
// our caller ctx is Done
436433
return ctx.Err()
437434
}
438-
// drain the internal status update channel
439-
// this status handling is normally done in the main loop, but in this case we want to ensure that we emit a
440-
// nil status after the collector has stopped
441-
select {
442-
case statusCh := <-collectorStatusCh:
443-
updateErr := m.reportOtelStatusUpdate(ctx, statusCh)
444-
if updateErr != nil {
445-
m.logger.Error("failed to update otel status", zap.Error(updateErr))
446-
}
447-
case <-ctx.Done():
448-
// our caller ctx is Done
449-
return ctx.Err()
450-
default:
451-
}
452-
err := m.reportOtelStatusUpdate(ctx, nil)
453-
if err != nil {
454-
return err
455-
}
456435
}
457436

458437
if m.mergedCollectorCfg == nil {
@@ -625,18 +604,10 @@ func (m *OTelManager) maybeUpdateMergedConfig(mergedCfg *confmap.Conf) (updated
625604
return !bytes.Equal(mergedCfgHash, previousConfigHash) || err != nil, err
626605
}
627606

628-
// reportComponentStateUpdates sends component state updates to the component watch channel. It first drains
629-
// the channel to ensure that only the most recent status is kept, as intermediate statuses can be safely discarded.
630-
// This ensures the receiver always observes the latest reported status.
607+
// reportComponentStateUpdates sends component state updates to the component watch channel. It is synchronous and
608+
// blocking - the update must be received before this function returns. We are not allowed to drop older updates
609+
// in favor of newer ones here, as the coordinator expected incremental updates.
631610
func (m *OTelManager) reportComponentStateUpdates(ctx context.Context, componentUpdates []runtime.ComponentComponentState) {
632-
select {
633-
case <-ctx.Done():
634-
// context is already done
635-
return
636-
case <-m.componentStateCh:
637-
// drain the channel first
638-
default:
639-
}
640611
select {
641612
case m.componentStateCh <- componentUpdates:
642613
case <-ctx.Done():

0 commit comments

Comments
 (0)