Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion x-pack/filebeat/fbreceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@ type filebeatReceiver struct {
}

func (fb *filebeatReceiver) Start(ctx context.Context, host component.Host) error {
if err := fb.BeatReceiver.Setup(host); err != nil {
return err
}
fb.wg.Add(1)
go func() {
defer fb.wg.Done()
fb.Logger.Info("starting filebeat receiver")
if err := fb.BeatReceiver.Start(host); err != nil {
if err := fb.BeatReceiver.Run(); err != nil {
fb.Logger.Error("error starting filebeat receiver", zap.Error(err))
}
}()
Expand Down
24 changes: 24 additions & 0 deletions x-pack/filebeat/fbreceiver/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componentstatus"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/pipeline"
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/receiver/receivertest"
Expand Down Expand Up @@ -722,6 +723,29 @@ func TestReceiverHook(t *testing.T) {
oteltest.TestReceiverHook(t, &cfg, NewFactoryWithSettings(Settings{Home: t.TempDir()}), receiverSettings, 3)
}

func TestStartShouldFailWhenStorageExtensionMissing(t *testing.T) {
cfg := &Config{Beatconfig: map[string]any{
"filebeat": map[string]any{"inputs": []map[string]any{{
"type": "benchmark", "enabled": true, "message": "x", "count": 1,
}}},
"storage": "elasticsearch_storage",
"path.home": t.TempDir(),
}}

factory := NewFactoryWithSettings(Settings{Home: t.TempDir()})
set := receiver.Settings{
ID: component.NewIDWithName(factory.Type(), "r1"),
TelemetrySettings: component.TelemetrySettings{Logger: zap.NewNop()},
}

r, err := factory.CreateLogs(t.Context(), set, cfg, consumertest.NewNop())
require.NoError(t, err)

err = r.Start(t.Context(), &oteltest.MockHost{})
require.Error(t, err, "expected Start to fail when storage extension is configured but missing")
_ = r.Shutdown(t.Context())
}

func hostFromSocket(socket string) string {
if runtime.GOOS == "windows" {
return "npipe:///" + filepath.Base(socket)
Expand Down
21 changes: 13 additions & 8 deletions x-pack/libbeat/cmd/instance/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type BeatReceiver struct {
Logger *logp.Logger
bridge *oteltelemetry.RegistryBridge
releaseSystemBridge func()
groupReporter otelstatus.RunnerReporter
}

// NewBeatReceiver creates a BeatReceiver. This will also create the beater and start the monitoring server if configured
Expand Down Expand Up @@ -114,12 +115,13 @@ func NewBeatReceiver(ctx context.Context, b *instance.Beat, creator beat.Creator
}, nil
}

// BeatReceiver.Start() starts the beat receiver.
func (br *BeatReceiver) Start(host component.Host) error {
var groupReporter otelstatus.RunnerReporter
// Setup initialises the beat receiver against the given host. It must be called before Run.
// Any configuration errors (e.g. missing storage extensions) are returned here so that the
// OTel collector can surface them as a hard startup failure instead of silently swallowing them.
func (br *BeatReceiver) Setup(host component.Host) error {
if w, ok := br.beater.(cfgfile.WithOtelFactoryWrapper); ok {
groupReporter = otelstatus.NewGroupStatusReporter(host)
w.WithOtelFactoryWrapper(otelstatus.StatusReporterFactory(groupReporter))
br.groupReporter = otelstatus.NewGroupStatusReporter(host)
w.WithOtelFactoryWrapper(otelstatus.StatusReporterFactory(br.groupReporter))
}

// We go through all extensions to find any that implement the DiagnosticExtension interface.
Expand Down Expand Up @@ -182,12 +184,15 @@ func (br *BeatReceiver) Start(host component.Host) error {
}
})

return nil
}

// Run starts the beat's main loop. Setup must be called before Run.
func (br *BeatReceiver) Run() error {
if err := br.beater.Run(&br.beat.Beat); err != nil {
// set beatreceiver status
groupReporter.UpdateStatus(status.Failed, err.Error())
br.groupReporter.UpdateStatus(status.Failed, err.Error())
return fmt.Errorf("beat receiver run error: %w", err)
}

return nil
}

Expand Down
5 changes: 4 additions & 1 deletion x-pack/metricbeat/mbreceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@ type metricbeatReceiver struct {
}

func (mb *metricbeatReceiver) Start(ctx context.Context, host component.Host) error {
if err := mb.BeatReceiver.Setup(host); err != nil {
return err
}
mb.wg.Add(1)
go func() {
defer mb.wg.Done()
mb.Logger.Info("starting metricbeat receiver")
if err := mb.BeatReceiver.Start(host); err != nil {
if err := mb.BeatReceiver.Run(); err != nil {
mb.Logger.Error("error starting metricbeat receiver", zap.Error(err))
}
}()
Expand Down
Loading