diff --git a/x-pack/filebeat/fbreceiver/receiver.go b/x-pack/filebeat/fbreceiver/receiver.go index 005277ecef53..1ae4eaba6cc8 100644 --- a/x-pack/filebeat/fbreceiver/receiver.go +++ b/x-pack/filebeat/fbreceiver/receiver.go @@ -21,11 +21,14 @@ type filebeatReceiver struct { } func (fb *filebeatReceiver) Start(ctx context.Context, host component.Host) error { + if err := fb.BeatReceiver.Setup(host); err != nil { + return err + } fb.wg.Add(1) go func() { defer fb.wg.Done() fb.Logger.Info("starting filebeat receiver") - if err := fb.BeatReceiver.Start(host); err != nil { + if err := fb.BeatReceiver.Run(); err != nil { fb.Logger.Error("error starting filebeat receiver", zap.Error(err)) } }() diff --git a/x-pack/filebeat/fbreceiver/receiver_test.go b/x-pack/filebeat/fbreceiver/receiver_test.go index 503e881eea8d..a3d36c09fecd 100644 --- a/x-pack/filebeat/fbreceiver/receiver_test.go +++ b/x-pack/filebeat/fbreceiver/receiver_test.go @@ -31,6 +31,7 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componentstatus" + "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/pipeline" "go.opentelemetry.io/collector/receiver" "go.opentelemetry.io/collector/receiver/receivertest" @@ -722,6 +723,29 @@ func TestReceiverHook(t *testing.T) { oteltest.TestReceiverHook(t, &cfg, NewFactoryWithSettings(Settings{Home: t.TempDir()}), receiverSettings, 3) } +func TestStartShouldFailWhenStorageExtensionMissing(t *testing.T) { + cfg := &Config{Beatconfig: map[string]any{ + "filebeat": map[string]any{"inputs": []map[string]any{{ + "type": "benchmark", "enabled": true, "message": "x", "count": 1, + }}}, + "storage": "elasticsearch_storage", + "path.home": t.TempDir(), + }} + + factory := NewFactoryWithSettings(Settings{Home: t.TempDir()}) + set := receiver.Settings{ + ID: component.NewIDWithName(factory.Type(), "r1"), + TelemetrySettings: component.TelemetrySettings{Logger: zap.NewNop()}, + } + + r, err := factory.CreateLogs(t.Context(), set, cfg, consumertest.NewNop()) + require.NoError(t, err) + + err = r.Start(t.Context(), &oteltest.MockHost{}) + require.Error(t, err, "expected Start to fail when storage extension is configured but missing") + _ = r.Shutdown(t.Context()) +} + func hostFromSocket(socket string) string { if runtime.GOOS == "windows" { return "npipe:///" + filepath.Base(socket) diff --git a/x-pack/libbeat/cmd/instance/receiver.go b/x-pack/libbeat/cmd/instance/receiver.go index f8adbef1f617..e191e6b3ecf5 100644 --- a/x-pack/libbeat/cmd/instance/receiver.go +++ b/x-pack/libbeat/cmd/instance/receiver.go @@ -39,6 +39,7 @@ type BeatReceiver struct { Logger *logp.Logger bridge *oteltelemetry.RegistryBridge releaseSystemBridge func() + groupReporter otelstatus.RunnerReporter } // NewBeatReceiver creates a BeatReceiver. This will also create the beater and start the monitoring server if configured @@ -114,12 +115,13 @@ func NewBeatReceiver(ctx context.Context, b *instance.Beat, creator beat.Creator }, nil } -// BeatReceiver.Start() starts the beat receiver. -func (br *BeatReceiver) Start(host component.Host) error { - var groupReporter otelstatus.RunnerReporter +// Setup initialises the beat receiver against the given host. It must be called before Run. +// Any configuration errors (e.g. missing storage extensions) are returned here so that the +// OTel collector can surface them as a hard startup failure instead of silently swallowing them. +func (br *BeatReceiver) Setup(host component.Host) error { if w, ok := br.beater.(cfgfile.WithOtelFactoryWrapper); ok { - groupReporter = otelstatus.NewGroupStatusReporter(host) - w.WithOtelFactoryWrapper(otelstatus.StatusReporterFactory(groupReporter)) + br.groupReporter = otelstatus.NewGroupStatusReporter(host) + w.WithOtelFactoryWrapper(otelstatus.StatusReporterFactory(br.groupReporter)) } // We go through all extensions to find any that implement the DiagnosticExtension interface. @@ -182,12 +184,17 @@ func (br *BeatReceiver) Start(host component.Host) error { } }) + return nil +} + +// Run starts the beat's main loop. Setup must be called before Run. +func (br *BeatReceiver) Run() error { if err := br.beater.Run(&br.beat.Beat); err != nil { - // set beatreceiver status - groupReporter.UpdateStatus(status.Failed, err.Error()) + if br.groupReporter != nil { + br.groupReporter.UpdateStatus(status.Failed, err.Error()) + } return fmt.Errorf("beat receiver run error: %w", err) } - return nil } diff --git a/x-pack/metricbeat/mbreceiver/receiver.go b/x-pack/metricbeat/mbreceiver/receiver.go index 66a56259440e..f17b7840bdee 100644 --- a/x-pack/metricbeat/mbreceiver/receiver.go +++ b/x-pack/metricbeat/mbreceiver/receiver.go @@ -21,11 +21,14 @@ type metricbeatReceiver struct { } func (mb *metricbeatReceiver) Start(ctx context.Context, host component.Host) error { + if err := mb.BeatReceiver.Setup(host); err != nil { + return err + } mb.wg.Add(1) go func() { defer mb.wg.Done() mb.Logger.Info("starting metricbeat receiver") - if err := mb.BeatReceiver.Start(host); err != nil { + if err := mb.BeatReceiver.Run(); err != nil { mb.Logger.Error("error starting metricbeat receiver", zap.Error(err)) } }()