Skip to content

Add otel-specific outputController to libbeat pipeline#50075

Merged
faec merged 2 commits intoelastic:mainfrom
faec:otel-output-controller
Apr 15, 2026
Merged

Add otel-specific outputController to libbeat pipeline#50075
faec merged 2 commits intoelastic:mainfrom
faec:otel-output-controller

Conversation

@faec
Copy link
Copy Markdown
Contributor

@faec faec commented Apr 10, 2026

Add an OTel-specific implementation of pipeline.outputController, and create it via a separate pipeline initialization API for use with Beats receivers. Most of the core logic is unchanged in this pass (in particular this should cause no change in behavior), but the code paths are shorter/simpler than the ones for the process runtime since there are many complications and corner cases that aren't relevant when running as a Beats receiver (in particular Beats receivers do not need to dynamically reload their output configuration).

Checklist

  • My code follows the style guidelines of this project
  • I have commented my code, particularly in hard-to-understand areas
  • I have made corresponding changes to the documentation
  • I have made corresponding change to the default configuration files
  • I have added tests that prove my fix is effective or that my feature works. Where relevant, I have used the stresstest.sh script to run them under stress conditions and race detector to verify their stability.
  • I have added an entry in ./changelog/fragments using the changelog tool.

Related issues

@faec faec self-assigned this Apr 10, 2026
@faec faec added enhancement backport-skip Skip notification from the automated backport with mergify Team:Elastic-Agent-Data-Plane Label for the Agent Data Plane team skip-changelog labels Apr 10, 2026
@botelastic botelastic bot added needs_team Indicates that the issue/PR needs a Team:* label and removed needs_team Indicates that the issue/PR needs a Team:* label labels Apr 10, 2026
@github-actions
Copy link
Copy Markdown
Contributor

🤖 GitHub comments

Just comment with:

  • run docs-build : Re-trigger the docs validation. (use unformatted text in the comment!)

@faec faec marked this pull request as ready for review April 10, 2026 19:15
@faec faec requested a review from a team as a code owner April 10, 2026 19:15
@faec faec requested review from leehinman and mauri870 April 10, 2026 19:15
@elasticmachine
Copy link
Copy Markdown
Contributor

Pinging @elastic/elastic-agent-data-plane (Team:Elastic-Agent-Data-Plane)

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai bot commented Apr 10, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro Plus

Run ID: 9e32f2ee-f298-4269-b4ef-deee1e272ff6

📥 Commits

Reviewing files that changed from the base of the PR and between 999d1de and 92a9166.

📒 Files selected for processing (5)
  • libbeat/otel/otelconsumer/config.go
  • libbeat/otel/otelconsumer/otelconsumer.go
  • libbeat/publisher/pipeline/output_otel.go
  • libbeat/publisher/pipeline/output_otel_test.go
  • libbeat/publisher/pipeline/pipeline.go
💤 Files with no reviewable changes (1)
  • libbeat/otel/otelconsumer/config.go
🚧 Files skipped from review as they are similar to previous changes (3)
  • libbeat/publisher/pipeline/output_otel_test.go
  • libbeat/publisher/pipeline/pipeline.go
  • libbeat/publisher/pipeline/output_otel.go

📝 Walkthrough

Walkthrough

The changes separate OTel receiver wiring from the standard libbeat pipeline. The otelconsumer package no longer auto-registers via a package init; it exposes MakeOtelConsumer for explicit construction. The publisher removed its blank import of otelconsumer. A new otelOutputController implements queue and worker wiring for OTel outputs; Pipeline gained NewForReceiver and an outputController abstraction to select process vs OTel implementations. The Beats receiver initialization now uses NewForReceiver, removing generic output factory/reloader setup.

🚥 Pre-merge checks | ✅ 2
✅ Passed checks (2 passed)
Check name Status Explanation
Linked Issues check ✅ Passed PR fulfills both #49051 and #49479 requirements: adds NewForReceiver() pipeline entry point with explicit otelconsumer setup, abstracts outputController interface with separate OTel implementation.
Out of Scope Changes check ✅ Passed All changes directly support the stated objectives: otelconsumer refactor, outputController abstraction, receiver-specific pipeline initialization, and related cleanup.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • 🛠️ Update Documentation: Commit on current branch
  • 🛠️ Update Documentation: Create PR

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (2)
libbeat/publisher/pipeline/pipeline.go (1)

197-230: Add doc comment and consider nil-safety for monitors.Metrics.

  1. NewForReceiver is exported but lacks documentation per coding guidelines.

  2. Unlike New() (lines 163, 171-173), this doesn't guard against monitors.Metrics being nil before calling newMetricsObserver. The current call site in beat.go always provides a non-nil registry, but as a public API, defensive handling would be safer.

Suggested changes
+// NewForReceiver creates a Pipeline for use by Beats receivers running under
+// the OTel Collector runtime. Unlike New(), it wires the otelconsumer output
+// directly without requiring output factory registration.
 func NewForReceiver(
 	beatInfo beat.Info,
 	monitors Monitors,
 	userQueueConfig conf.Namespace,
 	settings Settings,
 ) (*Pipeline, error) {
+	var obs observer = nilObserver
+	if monitors.Metrics != nil {
+		obs = newMetricsObserver(monitors.Metrics)
+	}
 	p := &Pipeline{
 		beatInfo:         beatInfo,
 		monitors:         monitors,
-		observer:         newMetricsObserver(monitors.Metrics),
+		observer:         obs,
 		waitCloseTimeout: settings.WaitClose,
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@libbeat/publisher/pipeline/pipeline.go` around lines 197 - 230, Public
function NewForReceiver lacks a doc comment and isn't defensive against a nil
monitors.Metrics; add a Go doc comment describing purpose/parameters/return for
NewForReceiver and update the function to check monitors.Metrics before calling
newMetricsObserver (e.g., if monitors != nil && monitors.Metrics != nil {
observer = newMetricsObserver(monitors.Metrics) } else { observer =
newMetricsObserver(defaultRegistryOrNil) }) to mirror the nil-safety used by
New(), ensuring observer is always initialized safely even when monitors or
monitors.Metrics is nil.
libbeat/otel/otelconsumer/otelconsumer.go (1)

60-60: Add doc comment for exported function.

Per coding guidelines, exported functions should be documented. A brief description would help callers understand when to use this vs the standard output factory pattern.

Suggested doc comment
+// MakeOtelConsumer creates an output group that bridges Beat events to an
+// OpenTelemetry Collector pipeline via the beat.LogConsumer.
 func MakeOtelConsumer(beat beat.Info, observer outputs.Observer, cfg *config.C, beatPaths *paths.Path) (outputs.Group, error) {
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@libbeat/otel/otelconsumer/otelconsumer.go` at line 60, Add a Go doc comment
for the exported function MakeOtelConsumer describing its purpose and usage:
state that MakeOtelConsumer constructs an OpenTelemetry outputs.Group consumer
for Beats (accepting beat.Info, outputs.Observer, *config.C, *paths.Path) and
indicate when to prefer this factory over the standard output factory pattern
(e.g., for OTEL-specific telemetry/export behavior). Place the comment
immediately above the MakeOtelConsumer declaration and keep it concise, starting
with the function name per Go conventions.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@libbeat/otel/otelconsumer/otelconsumer.go`:
- Line 60: Add a Go doc comment for the exported function MakeOtelConsumer
describing its purpose and usage: state that MakeOtelConsumer constructs an
OpenTelemetry outputs.Group consumer for Beats (accepting beat.Info,
outputs.Observer, *config.C, *paths.Path) and indicate when to prefer this
factory over the standard output factory pattern (e.g., for OTEL-specific
telemetry/export behavior). Place the comment immediately above the
MakeOtelConsumer declaration and keep it concise, starting with the function
name per Go conventions.

In `@libbeat/publisher/pipeline/pipeline.go`:
- Around line 197-230: Public function NewForReceiver lacks a doc comment and
isn't defensive against a nil monitors.Metrics; add a Go doc comment describing
purpose/parameters/return for NewForReceiver and update the function to check
monitors.Metrics before calling newMetricsObserver (e.g., if monitors != nil &&
monitors.Metrics != nil { observer = newMetricsObserver(monitors.Metrics) } else
{ observer = newMetricsObserver(defaultRegistryOrNil) }) to mirror the
nil-safety used by New(), ensuring observer is always initialized safely even
when monitors or monitors.Metrics is nil.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 91e8d4c1-fe5f-4ad8-b39f-9fbb457d81ab

📥 Commits

Reviewing files that changed from the base of the PR and between 03c6506 and 68aa758.

📒 Files selected for processing (8)
  • libbeat/otel/otelconsumer/otelconsumer.go
  • libbeat/publisher/includes/includes.go
  • libbeat/publisher/pipeline/output_otel.go
  • libbeat/publisher/pipeline/output_otel_test.go
  • libbeat/publisher/pipeline/output_process.go
  • libbeat/publisher/pipeline/output_process_test.go
  • libbeat/publisher/pipeline/pipeline.go
  • x-pack/libbeat/cmd/instance/beat.go
💤 Files with no reviewable changes (1)
  • libbeat/publisher/includes/includes.go

@faec faec requested a review from a team as a code owner April 10, 2026 19:52
@faec faec closed this Apr 10, 2026
@faec faec deleted the otel-output-controller branch April 10, 2026 20:11
@faec faec restored the otel-output-controller branch April 10, 2026 20:12
@faec faec reopened this Apr 10, 2026
@faec faec force-pushed the otel-output-controller branch from 97206de to 999d1de Compare April 10, 2026 20:20
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
libbeat/publisher/pipeline/pipeline.go (1)

160-170: ⚠️ Potential issue | 🟠 Major

WaitCloseMode no longer disables wait-close in the normal pipeline path.

Line 164 now copies settings.WaitClose unconditionally, while Line 169 only controls force-closing. With WaitCloseMode == NoWaitOnClose, Pipeline.Close() will still wait in outputController.waitClose(...) for up to WaitClose, which is the opposite of the documented mode semantics and changes shutdown behavior for non-receiver Beats.

Suggested fix
 	p := &Pipeline{
 		beatInfo:         beat,
 		monitors:         monitors,
 		observer:         nilObserver,
-		waitCloseTimeout: settings.WaitClose,
+		waitCloseTimeout: 0,
 		processors:       settings.Processors,
 		paths:            settings.Paths,
 	}
 
-	p.forceCloseQueue = settings.WaitCloseMode == WaitOnPipelineCloseThenForce
+	switch settings.WaitCloseMode {
+	case WaitOnPipelineClose, WaitOnPipelineCloseThenForce:
+		p.waitCloseTimeout = settings.WaitClose
+	}
+	p.forceCloseQueue = settings.WaitCloseMode == WaitOnPipelineCloseThenForce
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@libbeat/publisher/pipeline/pipeline.go` around lines 160 - 170, The Pipeline
currently always sets waitCloseTimeout from settings.WaitClose while
forceCloseQueue is only set from settings.WaitCloseMode, which means
WaitCloseMode==NoWaitOnClose does not disable waiting; update the Pipeline
constructor (the Pipeline struct initialization and assignment of
waitCloseTimeout/forceCloseQueue) so that when settings.WaitCloseMode ==
NoWaitOnClose you set waitCloseTimeout to zero (or otherwise disable waiting)
and only set waitCloseTimeout from settings.WaitClose for modes that allow
waiting; adjust the logic around Pipeline.waitCloseTimeout,
Pipeline.forceCloseQueue, and any use in
Pipeline.Close/outputController.waitClose to reflect this conditional behavior
so NoWaitOnClose truly prevents waiting.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Outside diff comments:
In `@libbeat/publisher/pipeline/pipeline.go`:
- Around line 160-170: The Pipeline currently always sets waitCloseTimeout from
settings.WaitClose while forceCloseQueue is only set from
settings.WaitCloseMode, which means WaitCloseMode==NoWaitOnClose does not
disable waiting; update the Pipeline constructor (the Pipeline struct
initialization and assignment of waitCloseTimeout/forceCloseQueue) so that when
settings.WaitCloseMode == NoWaitOnClose you set waitCloseTimeout to zero (or
otherwise disable waiting) and only set waitCloseTimeout from settings.WaitClose
for modes that allow waiting; adjust the logic around Pipeline.waitCloseTimeout,
Pipeline.forceCloseQueue, and any use in
Pipeline.Close/outputController.waitClose to reflect this conditional behavior
so NoWaitOnClose truly prevents waiting.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: b0c0ba61-9dec-4429-82b9-364e5b36a4a2

📥 Commits

Reviewing files that changed from the base of the PR and between bb793e5 and 999d1de.

📒 Files selected for processing (8)
  • libbeat/otel/otelconsumer/otelconsumer.go
  • libbeat/publisher/includes/includes.go
  • libbeat/publisher/pipeline/output_otel.go
  • libbeat/publisher/pipeline/output_otel_test.go
  • libbeat/publisher/pipeline/output_process.go
  • libbeat/publisher/pipeline/output_process_test.go
  • libbeat/publisher/pipeline/pipeline.go
  • x-pack/libbeat/cmd/instance/beat.go
💤 Files with no reviewable changes (1)
  • libbeat/publisher/includes/includes.go
🚧 Files skipped from review as they are similar to previous changes (1)
  • libbeat/publisher/pipeline/output_process.go

Comment thread libbeat/publisher/pipeline/output_otel.go Outdated
Copy link
Copy Markdown
Member

@cmacknz cmacknz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM pending Lee agreeing his comment is resolved.

@faec faec merged commit c073ffb into elastic:main Apr 15, 2026
203 of 206 checks passed
@faec faec deleted the otel-output-controller branch April 15, 2026 12:09
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

backport-skip Skip notification from the automated backport with mergify enhancement skip-changelog Team:Elastic-Agent-Data-Plane Label for the Agent Data Plane team

Projects

None yet

4 participants