Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion x-pack/filebeat/fbreceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@ type filebeatReceiver struct {
}

func (fb *filebeatReceiver) Start(ctx context.Context, host component.Host) error {
if err := fb.BeatReceiver.Setup(host); err != nil {
return err
}
fb.wg.Add(1)
go func() {
defer fb.wg.Done()
fb.Logger.Info("starting filebeat receiver")
if err := fb.BeatReceiver.Start(host); err != nil {
if err := fb.BeatReceiver.Run(); err != nil {
fb.Logger.Error("error starting filebeat receiver", zap.Error(err))
}
}()
Expand Down
24 changes: 24 additions & 0 deletions x-pack/filebeat/fbreceiver/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componentstatus"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/pipeline"
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/receiver/receivertest"
Expand Down Expand Up @@ -722,6 +723,29 @@ func TestReceiverHook(t *testing.T) {
oteltest.TestReceiverHook(t, &cfg, NewFactoryWithSettings(Settings{Home: t.TempDir()}), receiverSettings, 3)
}

func TestStartShouldFailWhenStorageExtensionMissing(t *testing.T) {
cfg := &Config{Beatconfig: map[string]any{
"filebeat": map[string]any{"inputs": []map[string]any{{
"type": "benchmark", "enabled": true, "message": "x", "count": 1,
}}},
"storage": "elasticsearch_storage",
"path.home": t.TempDir(),
}}

factory := NewFactoryWithSettings(Settings{Home: t.TempDir()})
set := receiver.Settings{
ID: component.NewIDWithName(factory.Type(), "r1"),
TelemetrySettings: component.TelemetrySettings{Logger: zap.NewNop()},
}

r, err := factory.CreateLogs(t.Context(), set, cfg, consumertest.NewNop())
require.NoError(t, err)

err = r.Start(t.Context(), &oteltest.MockHost{})
require.Error(t, err, "expected Start to fail when storage extension is configured but missing")
_ = r.Shutdown(t.Context())
}

func hostFromSocket(socket string) string {
if runtime.GOOS == "windows" {
return "npipe:///" + filepath.Base(socket)
Expand Down
23 changes: 15 additions & 8 deletions x-pack/libbeat/cmd/instance/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type BeatReceiver struct {
Logger *logp.Logger
bridge *oteltelemetry.RegistryBridge
releaseSystemBridge func()
groupReporter otelstatus.RunnerReporter
}

// NewBeatReceiver creates a BeatReceiver. This will also create the beater and start the monitoring server if configured
Expand Down Expand Up @@ -114,12 +115,13 @@ func NewBeatReceiver(ctx context.Context, b *instance.Beat, creator beat.Creator
}, nil
}

// BeatReceiver.Start() starts the beat receiver.
func (br *BeatReceiver) Start(host component.Host) error {
var groupReporter otelstatus.RunnerReporter
// Setup initialises the beat receiver against the given host. It must be called before Run.
// Any configuration errors (e.g. missing storage extensions) are returned here so that the
// OTel collector can surface them as a hard startup failure instead of silently swallowing them.
func (br *BeatReceiver) Setup(host component.Host) error {
if w, ok := br.beater.(cfgfile.WithOtelFactoryWrapper); ok {
groupReporter = otelstatus.NewGroupStatusReporter(host)
w.WithOtelFactoryWrapper(otelstatus.StatusReporterFactory(groupReporter))
br.groupReporter = otelstatus.NewGroupStatusReporter(host)
w.WithOtelFactoryWrapper(otelstatus.StatusReporterFactory(br.groupReporter))
}

// We go through all extensions to find any that implement the DiagnosticExtension interface.
Expand Down Expand Up @@ -182,12 +184,17 @@ func (br *BeatReceiver) Start(host component.Host) error {
}
})

return nil
}

// Run starts the beat's main loop. Setup must be called before Run.
func (br *BeatReceiver) Run() error {
if err := br.beater.Run(&br.beat.Beat); err != nil {
// set beatreceiver status
groupReporter.UpdateStatus(status.Failed, err.Error())
if br.groupReporter != nil {
br.groupReporter.UpdateStatus(status.Failed, err.Error())
}
return fmt.Errorf("beat receiver run error: %w", err)
}

return nil
}

Expand Down
5 changes: 4 additions & 1 deletion x-pack/metricbeat/mbreceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@ type metricbeatReceiver struct {
}

func (mb *metricbeatReceiver) Start(ctx context.Context, host component.Host) error {
if err := mb.BeatReceiver.Setup(host); err != nil {
return err
}
mb.wg.Add(1)
go func() {
defer mb.wg.Done()
mb.Logger.Info("starting metricbeat receiver")
if err := mb.BeatReceiver.Start(host); err != nil {
if err := mb.BeatReceiver.Run(); err != nil {
mb.Logger.Error("error starting metricbeat receiver", zap.Error(err))
}
}()
Expand Down
Loading