diff --git a/changelog/fragments/1773263871-beats-manager-shutdown.yaml b/changelog/fragments/1773263871-beats-manager-shutdown.yaml new file mode 100644 index 000000000000..f56b507bc840 --- /dev/null +++ b/changelog/fragments/1773263871-beats-manager-shutdown.yaml @@ -0,0 +1,45 @@ +# REQUIRED +# Kind can be one of: +# - breaking-change: a change to previously-documented behavior +# - deprecation: functionality that is being removed in a later release +# - bug-fix: fixes a problem in a previous version +# - enhancement: extends functionality but does not break or fix existing behavior +# - feature: new functionality +# - known-issue: problems that we are aware of in a given version +# - security: impacts on the security of a product or a user’s deployment. +# - upgrade: important information for someone upgrading from a prior version +# - other: does not fit into any of the other categories +kind: bug-fix + +# REQUIRED for all kinds +# Change summary; a 80ish characters long description of the change. +summary: Fix an issue that could delay reporting shutdown of Agent components + +# REQUIRED for breaking-change, deprecation, known-issue +# Long description; in case the summary is not enough to describe the change +# this field accommodate a description without length limits. +# description: + +# REQUIRED for breaking-change, deprecation, known-issue +# impact: + +# REQUIRED for breaking-change, deprecation, known-issue +# action: + +# REQUIRED for all kinds +# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc. +component: elastic-agent + +# AUTOMATED +# OPTIONAL to manually add other PR URLs +# PR URL: A link the PR that added the changeset. +# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added. +# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number. +# Please provide it if you are adding a fragment for a different PR. +# pr: https://github.com/owner/repo/1234 + +# AUTOMATED +# OPTIONAL to manually add other issue URLs +# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of). +# If not present is automatically filled by the tooling with the issue linked to the PR number. +# issue: https://github.com/owner/repo/1234 diff --git a/changelog/fragments/1774984035-move-manager-start.yaml b/changelog/fragments/1774984035-move-manager-start.yaml new file mode 100644 index 000000000000..b01af32188ef --- /dev/null +++ b/changelog/fragments/1774984035-move-manager-start.yaml @@ -0,0 +1,45 @@ +# REQUIRED +# Kind can be one of: +# - breaking-change: a change to previously-documented behavior +# - deprecation: functionality that is being removed in a later release +# - bug-fix: fixes a problem in a previous version +# - enhancement: extends functionality but does not break or fix existing behavior +# - feature: new functionality +# - known-issue: problems that we are aware of in a given version +# - security: impacts on the security of a product or a user’s deployment. +# - upgrade: important information for someone upgrading from a prior version +# - other: does not fit into any of the other categories +kind: bug-fix + +# REQUIRED for all kinds +# Change summary; a 80ish characters long description of the change. +summary: Fix Filebeat crash loop when running under Elastic Agent and taking too long to initialise + +# REQUIRED for breaking-change, deprecation, known-issue +# Long description; in case the summary is not enough to describe the change +# this field accommodate a description without length limits. +# description: + +# REQUIRED for breaking-change, deprecation, known-issue +# impact: + +# REQUIRED for breaking-change, deprecation, known-issue +# action: + +# REQUIRED for all kinds +# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc. +component: filebeat + +# AUTOMATED +# OPTIONAL to manually add other PR URLs +# PR URL: A link the PR that added the changeset. +# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added. +# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number. +# Please provide it if you are adding a fragment for a different PR. +# pr: https://github.com/owner/repo/1234 + +# AUTOMATED +# OPTIONAL to manually add other issue URLs +# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of). +# If not present is automatically filled by the tooling with the issue linked to the PR number. +# issue: https://github.com/owner/repo/1234 diff --git a/filebeat/beater/filebeat.go b/filebeat/beater/filebeat.go index 7681cf5fd532..0bda2cd6b58e 100644 --- a/filebeat/beater/filebeat.go +++ b/filebeat/beater/filebeat.go @@ -288,6 +288,21 @@ func (fb *Filebeat) Run(b *beat.Beat) error { } finishedLogger := newFinishedLogger(wgEvents) + // Start the check-in loop, so Filebeat can respond to Elastic Agent, + // but it won't start any inputs/output + if err := b.Manager.PreInit(); err != nil { + return err + } + + // Ensure that we only call b.Manager.Stop out of order + // if Run has failed early/before b.Manager.PostInit() was called. + managerEarlyStop := b.Manager.Stop + defer func() { + if managerEarlyStop != nil { + managerEarlyStop() + } + }() + registryMigrator := registrar.NewMigrator(config.Registry, fb.logger, b.Paths) if err := registryMigrator.Run(); err != nil { fb.logger.Errorf("Failed to migrate registry file: %+v", err) @@ -481,10 +496,8 @@ func (fb *Filebeat) Run(b *beat.Beat) error { } adiscover.Start() - // We start the manager when all the subsystem are initialized and ready to received events. - if err := b.Manager.Start(); err != nil { - return err - } + b.Manager.PostInit() + managerEarlyStop = nil // Add done channel to wait for shutdown signal waitFinished.AddChan(fb.done) diff --git a/libbeat/beat/beat_test.go b/libbeat/beat/beat_test.go index 70678eb100c7..91baf6fad6c1 100644 --- a/libbeat/beat/beat_test.go +++ b/libbeat/beat/beat_test.go @@ -21,6 +21,7 @@ import ( "regexp" "strings" "testing" + "time" "github.com/stretchr/testify/require" @@ -38,6 +39,8 @@ type testManager struct { func (tm testManager) UpdateStatus(_ status.Status, _ string) {} func (tm testManager) Enabled() bool { return tm.isEnabled } func (tm testManager) Start() error { return nil } +func (tm testManager) PreInit() error { return nil } +func (tm testManager) PostInit() {} func (tm testManager) Stop() {} func (tm testManager) AgentInfo() management.AgentInfo { return management.AgentInfo{Unprivileged: tm.isUnpriv, ManagedMode: tm.mgmtMode} @@ -49,6 +52,10 @@ func (tm testManager) UnregisterAction(_ management.Action) {} func (tm testManager) SetPayload(_ map[string]interface{}) {} func (tm testManager) RegisterDiagnosticHook(_ string, _ string, _ string, _ string, _ management.DiagnosticHook) { } +func (tm testManager) WaitForStop(_ time.Duration) bool { + tm.Stop() + return true +} func TestUserAgentString(t *testing.T) { tests := []struct { diff --git a/libbeat/cmd/instance/beat_test.go b/libbeat/cmd/instance/beat_test.go index 23dbaf51a2e4..fcc8a9a99eb3 100644 --- a/libbeat/cmd/instance/beat_test.go +++ b/libbeat/cmd/instance/beat_test.go @@ -25,6 +25,7 @@ import ( "io/ioutil" "os" "testing" + "time" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/cfgfile" @@ -506,10 +507,16 @@ func (m mockManager) RegisterDiagnosticHook(name, description, filename, content func (m mockManager) SetPayload(payload map[string]interface{}) {} func (m mockManager) SetStopCallback(f func()) {} func (m mockManager) Start() error { return nil } +func (m mockManager) PreInit() error { return nil } +func (m mockManager) PostInit() {} func (m mockManager) Status() status.Status { return status.Status(-42) } func (m mockManager) Stop() {} func (m mockManager) UnregisterAction(action management.Action) {} func (m mockManager) UpdateStatus(status status.Status, msg string) {} +func (m mockManager) WaitForStop(_ time.Duration) bool { + m.Stop() + return true +} func TestManager(t *testing.T) { // set the mockManger factory. diff --git a/libbeat/management/management.go b/libbeat/management/management.go index 5a4cd65d00fa..0e404e4d7c34 100644 --- a/libbeat/management/management.go +++ b/libbeat/management/management.go @@ -19,6 +19,7 @@ package management import ( "sync" + "time" "github.com/elastic/beats/v7/libbeat/common/reload" "github.com/elastic/beats/v7/libbeat/management/status" @@ -37,12 +38,25 @@ type Manager interface { // Enabled returns true if manager is enabled. Enabled() bool - // Start needs to invoked when the system is ready to receive an external configuration and + // Starts the unitListen loop, so the manager can already + // check-in with Elastic Agent, but no input/output will be + // started yet. Call [PostInit] to enable starting/stopping + // inputs/output. + PreInit() error + + // PostInit needs to be invoked when the system is ready to receive an external configuration and // also ready to start ingesting new events. The manager expects that all the reloadable and // reloadable list are fixed for the whole lifetime of the manager. // // Notes: Adding dynamically new reloadable hooks at runtime can lead to inconsistency in the // execution. + PostInit() + + // Start starts the manager. + // + // Deprecated: Use [PreInit] and [PostInit] instead + // + // For backwards compatibility, [Start] calls [PreInit] then [PostInit]. Start() error // Stop when this method is called, the manager will stop receiving new actions, no more action @@ -54,6 +68,11 @@ type Manager interface { // Note: Stop will not call 'UnregisterAction()' automatically. Stop() + // WaitForStop blocks until the manager has fully stopped, or timeout elapses. + // It returns true if the manager stopped before timeout, false otherwise. + // A non-positive timeout means wait indefinitely. + WaitForStop(timeout time.Duration) bool + // AgentInfo returns the information of the agent to which the manager is connected. AgentInfo() AgentInfo @@ -161,9 +180,15 @@ func (n *FallbackManager) Stop() { // the nilManager is still used for shutdown on some cases, // but that does not mean the Beat is being managed externally, // hence it will always return false. -func (n *FallbackManager) Enabled() bool { return false } -func (n *FallbackManager) AgentInfo() AgentInfo { return AgentInfo{} } -func (n *FallbackManager) Start() error { return nil } +func (n *FallbackManager) Enabled() bool { return false } +func (n *FallbackManager) AgentInfo() AgentInfo { return AgentInfo{} } +func (n *FallbackManager) PreInit() error { return nil } +func (n *FallbackManager) PostInit() {} +func (n *FallbackManager) Start() error { return nil } +func (n *FallbackManager) WaitForStop(_ time.Duration) bool { + n.Stop() + return true +} func (n *FallbackManager) CheckRawConfig(cfg *config.C) error { return nil } func (n *FallbackManager) RegisterAction(action Action) {} func (n *FallbackManager) UnregisterAction(action Action) {} diff --git a/x-pack/filebeat/tests/integration/managerV2_test.go b/x-pack/filebeat/tests/integration/managerV2_test.go index 9c3ae73d3ebe..4ef6638b3645 100644 --- a/x-pack/filebeat/tests/integration/managerV2_test.go +++ b/x-pack/filebeat/tests/integration/managerV2_test.go @@ -283,7 +283,7 @@ func TestFailedOutputReportsUnhealthy(t *testing.T) { Id: "input-unit", Type: proto.UnitType_INPUT, ConfigStateIdx: 1, - State: proto.State_STARTING, + State: proto.State_HEALTHY, LogLevel: proto.UnitLogLevel_DEBUG, Config: &proto.UnitExpectedConfig{ Id: "log-input", @@ -331,7 +331,7 @@ func TestFailedOutputReportsUnhealthy(t *testing.T) { require.Eventually(t, func() bool { return finalStateReached.Load() - }, 30*time.Second, 100*time.Millisecond, "Output unit did not report unhealthy") + }, 30*time.Second, 100*time.Millisecond, "Output unit did not report unhealthy or input did not report health") t.Cleanup(server.Stop) } diff --git a/x-pack/filebeat/tests/integration/status_reporter_test.go b/x-pack/filebeat/tests/integration/status_reporter_test.go index ef79a9865a82..f301a103d2b3 100644 --- a/x-pack/filebeat/tests/integration/status_reporter_test.go +++ b/x-pack/filebeat/tests/integration/status_reporter_test.go @@ -103,7 +103,7 @@ func TestLogStatusReporter(t *testing.T) { }, { proto.State_DEGRADED, - &inputStream, + &inputStreamIrregular, }, { proto.State_HEALTHY, @@ -159,7 +159,7 @@ func getInputStream(id string, path string, stateIdx int) proto.UnitExpected { return proto.UnitExpected{ Id: id, Type: proto.UnitType_INPUT, - ConfigStateIdx: uint64(stateIdx), + ConfigStateIdx: uint64(stateIdx), //nolint:gosec // stateIdx is always positive State: proto.State_HEALTHY, Config: &proto.UnitExpectedConfig{ Streams: []*proto.Stream{{ diff --git a/x-pack/libbeat/management/managerV2.go b/x-pack/libbeat/management/managerV2.go index a81194af740d..da7ff983091e 100644 --- a/x-pack/libbeat/management/managerV2.go +++ b/x-pack/libbeat/management/managerV2.go @@ -91,9 +91,15 @@ type BeatV2Manager struct { // sync channel for shutting down the manager after we get a stop from // either the agent or the beat stopChan chan struct{} + stopOnce sync.Once + // waits for manager goroutines started in PreInit() to exit + stopWait sync.WaitGroup isRunning bool + // Set to true when the Beat is ready to start inputs/output + beatIsReady bool + // set with the last applied output config // allows tracking if the configuration actually changed and if the // beat needs to restart if stopOnOutputReload is set @@ -117,7 +123,6 @@ type BeatV2Manager struct { // trying to reload the configuration after an input not finished error // happens forceReloadDebounce time.Duration - wg sync.WaitGroup } // ================================ @@ -212,7 +217,7 @@ func NewV2AgentManagerWithClient(config *Config, registry *reload.Registry, agen units: make(map[unitKey]*agentUnit), status: status.Running, message: "Healthy", - stopChan: make(chan struct{}, 1), + stopChan: make(chan struct{}), changeDebounce: time.Second, // forceReloadDebounce is greater than changeDebounce because it is only // used when an input has not reached its finished state, this means some events @@ -279,8 +284,11 @@ func (cm *BeatV2Manager) SetStopCallback(stopFunc func()) { cm.stopFunc = stopFunc } -// Start the config manager. -func (cm *BeatV2Manager) Start() error { +// PreInit starts the unitListen loop, so the manager can already +// check-in with Elastic Agent, but no input/output will be +// started yet. Call [PostInit] to enable starting/stopping +// inputs/output. +func (cm *BeatV2Manager) PreInit() error { if !cm.Enabled() { return fmt.Errorf("V2 Manager is disabled") } @@ -289,6 +297,7 @@ func (cm *BeatV2Manager) Start() error { cm.errCanceller = nil } + cm.logger.Debug("Manager starting") ctx := context.Background() err := cm.client.Start(ctx) if err != nil { @@ -297,7 +306,9 @@ func (cm *BeatV2Manager) Start() error { ctx, canceller := context.WithCancel(ctx) cm.errCanceller = canceller - go cm.watchErrChan(ctx) + cm.stopWait.Go(func() { + cm.watchErrChan(ctx) + }) cm.client.RegisterDiagnosticHook( "beat-rendered-config", "the rendered config used by the beat", @@ -305,14 +316,69 @@ func (cm *BeatV2Manager) Start() error { "application/yaml", cm.handleDebugYaml) - go cm.unitListen() + cm.UpdateStatus(status.Starting, "Starting") + + cm.stopWait.Go(cm.unitListen) cm.isRunning = true return nil } +// PostInit allows the manager to start/stop inputs/output. +func (cm *BeatV2Manager) PostInit() { + cm.UpdateStatus(status.Running, "Running") + + cm.mx.Lock() + defer cm.mx.Unlock() + + cm.beatIsReady = true + cm.logger.Debug("Manager ready to accept units.") +} + +// Start starts the manager. +// +// Deprecated: Use [PreInit] and [PostInit] instead +// +// For backwards compatibility, [Start] calls [PreInit] then [PostInit]. +func (cm *BeatV2Manager) Start() error { + if err := cm.PreInit(); err != nil { + return err + } + + cm.PostInit() + return nil +} + // Stop stops the current Manager and close the connection to Elastic Agent. func (cm *BeatV2Manager) Stop() { - cm.stopChan <- struct{}{} + cm.stopOnce.Do(func() { + close(cm.stopChan) + }) +} + +// WaitForStop blocks until the manager has fully stopped, or timeout elapses. +// It returns true if the manager stopped before timeout, false otherwise. +// A non-positive timeout means wait indefinitely. +func (cm *BeatV2Manager) WaitForStop(timeout time.Duration) bool { + cm.Stop() + done := make(chan struct{}) + go func() { + cm.stopWait.Wait() + close(done) + }() + + if timeout <= 0 { + <-done + return true + } + + t := time.NewTimer(timeout) + + select { + case <-done: + return true + case <-t.C: + return false + } } // CheckRawConfig is currently not implemented for V1. @@ -479,8 +545,6 @@ func (cm *BeatV2Manager) watchErrChan(ctx context.Context) { if !errors.Is(context.Canceled, err) { cm.logger.Errorf("elastic-agent-client error: %s", err) } - case <-cm.stopChan: - return } } } @@ -520,7 +584,7 @@ func (cm *BeatV2Manager) unitListen() { cm.logger.Infof( "BeatV2Manager.unitListen UnitChanged.ID(%s), UnitChanged.Type(%s), UnitChanged.Trigger(%d): %s/%s", change.Unit.ID(), - change.Type, int64(change.Triggers), change.Type, change.Triggers) + change.Type, int64(change.Triggers), change.Type, change.Triggers) //nolint:gosec // It's just logging switch change.Type { // Within the context of how we send config to beats, I'm not sure if there is a difference between @@ -541,10 +605,20 @@ func (cm *BeatV2Manager) unitListen() { cm.softDeleteUnit(change.Unit) } case <-t.C: + cm.mx.Lock() + + // If the Beat is not ready to accept configuration, do nothing + // and ensure the timer will fire again. + if !cm.beatIsReady { + cm.logger.Debug("Debounce timer fired, but Beat is not ready yet.") + cm.mx.Unlock() + t.Reset(cm.forceReloadDebounce) + continue + } + // a copy of the units is used for reload to prevent the holding of the `cm.mx`. // it could be possible that sending the configuration to reload could cause the `UpdateStatus` // to be called on the manager causing it to try and grab the `cm.mx` lock, causing a deadlock. - cm.mx.Lock() units := make(map[unitKey]*agentUnit, len(cm.units)) for k, u := range cm.units { if u.softDeleted { diff --git a/x-pack/libbeat/management/managerV2_test.go b/x-pack/libbeat/management/managerV2_test.go index 3651607540cb..720ed3acb562 100644 --- a/x-pack/libbeat/management/managerV2_test.go +++ b/x-pack/libbeat/management/managerV2_test.go @@ -48,7 +48,8 @@ func TestManagerV2(t *testing.T) { fqdnEnabled := atomic.Bool{} allStopped := atomic.Bool{} onObserved := func(observed *proto.CheckinObserved, currentIdx int) { - if currentIdx == 1 { + switch currentIdx { + case 1: oCfg := output.Config() iCfgs := inputs.Configs() apmCfg := apm.Config() @@ -56,7 +57,7 @@ func TestManagerV2(t *testing.T) { configsSet.Store(true) t.Log("output, inputs, and APM configuration set") } - } else if currentIdx == 2 { + case 2: oCfg := output.Config() iCfgs := inputs.Configs() apmCfg := apm.Config() @@ -65,7 +66,7 @@ func TestManagerV2(t *testing.T) { configsSet.Store(false) t.Log("output, inputs, and APM configuration cleared (should not happen)") } - } else { + default: oCfg := output.Config() iCfgs := inputs.Configs() apmCfg := apm.Config() @@ -259,9 +260,10 @@ func TestManagerV2(t *testing.T) { }, r, client, logptest.NewTestingLogger(t, "")) require.NoError(t, err) + //nolint:staticcheck // We want to ensure Start still has the same behaviour err = m.Start() require.NoError(t, err) - defer m.Stop() + defer stopManagerAndWait(t, m) require.Eventually(t, func() bool { return configsSet.Load() && configsCleared.Load() && logLevelSet.Load() && fqdnEnabled.Load() && allStopped.Load() @@ -395,7 +397,7 @@ func TestManagerV2_ReloadCount(t *testing.T) { err = m.Start() require.NoError(t, err) - defer m.Stop() + defer stopManagerAndWait(t, m) <-inputConfigUpdated assert.Equal(t, 1, output.reloadCount) // initial load @@ -403,6 +405,124 @@ func TestManagerV2_ReloadCount(t *testing.T) { assert.Equal(t, 0, apm.reloadCount) // no apm tracing config applied } +func TestManagerV2_PreInitAppliesBufferedUnitsAfterPostInit(t *testing.T) { + beatReady := atomic.Bool{} + + r := reload.NewRegistry() + output := &reloadable{} + r.MustRegisterOutput(output) + inputs := &reloadableList{} + r.MustRegisterInput(inputs) + + agentInfo := &proto.AgentInfo{ + Id: "elastic-agent-id", + Version: version.GetDefaultVersion(), + } + units := []*proto.UnitExpected{ + { + Id: "output-unit", + Type: proto.UnitType_OUTPUT, + State: proto.State_HEALTHY, + LogLevel: proto.UnitLogLevel_INFO, + Config: &proto.UnitExpectedConfig{ + Id: "default", + Type: "elasticsearch", + Name: "elasticsearch", + }, + }, + { + Id: "input-unit-1", + Type: proto.UnitType_INPUT, + State: proto.State_HEALTHY, + LogLevel: proto.UnitLogLevel_INFO, + Config: &proto.UnitExpectedConfig{ + Id: "filestream-unit-id", + Type: "filestream", + Name: "filestream", + Streams: []*proto.Stream{ + { + Id: "filestream-input-id", + Source: integration.RequireNewStruct(t, map[string]any{ + "id": "filestream-input-id", + "paths": []any{"/foo/bar"}, + }), + }, + }, + }, + }, + } + + server := &mock.StubServerV2{ + CheckinV2Impl: func(observed *proto.CheckinObserved) *proto.CheckinExpected { + if !beatReady.Load() { + for _, u := range observed.Units { + if u.State != proto.State_STARTING { + t.Errorf( + "Unit %q is not in %q state, got %q", + u.GetId(), proto.State_STARTING, + u.GetState().String()) + } + } + } + + return &proto.CheckinExpected{ + AgentInfo: agentInfo, + Units: units, + Features: nil, + FeaturesIdx: 1, + } + }, + ActionImpl: func(response *proto.ActionResponse) error { return nil }, + } + require.NoError(t, server.Start()) + defer server.Stop() + + client := client.NewV2( + fmt.Sprintf(":%d", server.Port), + "", + client.VersionInfo{}, + client.WithGRPCDialOptions(grpc.WithTransportCredentials(insecure.NewCredentials()))) + + m, err := NewV2AgentManagerWithClient( + &Config{ + Enabled: true, + }, + r, + client, + logp.NewNopLogger(), + ) + require.NoError(t, err) + defer stopManagerAndWait(t, m) + + mm, ok := m.(*BeatV2Manager) + require.True(t, ok, "NewV2AgentManagerWithClient must return a BeatV2Manager") + + mm.changeDebounce = 10 * time.Millisecond + mm.forceReloadDebounce = 20 * time.Millisecond + + require.NoError(t, m.PreInit()) + + // Ensure unit changes are received while the beat is not ready. + require.Eventually(t, func() bool { + mm.mx.Lock() + defer mm.mx.Unlock() + return len(mm.units) == len(units) + }, 5*time.Second, 10*time.Millisecond, "expected manager to receive units before PostInit") + + // Wait for debounce windows to pass while not ready; nothing should be reloaded. + time.Sleep(mm.changeDebounce + 2*mm.forceReloadDebounce) + assert.Nil(t, output.Config(), "output should not be reloaded before PostInit") + assert.Empty(t, inputs.Configs(), "inputs should not be reloaded before PostInit") + + // Once ready, previously buffered unit state should eventually be applied. + m.PostInit() + beatReady.Store(true) + require.Eventually(t, func() bool { + return output.Config() != nil && len(inputs.Configs()) > 0 + }, 5*time.Second, 10*time.Millisecond, + "expected buffered units to be applied after PostInit without requiring new unit changes") +} + func TestOutputError(t *testing.T) { // Uncomment the line below to see the debug logs for this test // logp.DevelopmentSetup(logp.WithLevel(logp.DebugLevel), logp.WithSelectors("*")) @@ -524,7 +644,7 @@ func TestOutputError(t *testing.T) { if err := m.Start(); err != nil { t.Fatalf("could not start ManagerV2: %s", err) } - defer m.Stop() + defer stopManagerAndWait(t, m) require.Eventually(t, func() bool { return stateReached.Load() @@ -684,7 +804,7 @@ func TestErrorPerUnit(t *testing.T) { if err := m.Start(); err != nil { t.Fatalf("could not start ManagerV2: %s", err) } - defer m.Stop() + defer stopManagerAndWait(t, m) require.Eventually(t, func() bool { return stateReached.Load() @@ -780,3 +900,12 @@ func (m *mockReloadable) Configs() []*reload.ConfigWithMeta { defer m.mutex.Unlock() return m.ConfigsFn() } + +type stopAndWait interface { + WaitForStop(time.Duration) bool +} + +func stopManagerAndWait(t *testing.T, m stopAndWait) { + t.Helper() + require.True(t, m.WaitForStop(15*time.Second), "timed out waiting for manager shutdown") +} diff --git a/x-pack/otel/otelmanager/manager.go b/x-pack/otel/otelmanager/manager.go index 986c54a41916..4c20cbacc9ca 100644 --- a/x-pack/otel/otelmanager/manager.go +++ b/x-pack/otel/otelmanager/manager.go @@ -6,6 +6,7 @@ package otelmanager import ( "sync" + "time" "github.com/elastic/beats/v7/libbeat/common/reload" "github.com/elastic/beats/v7/libbeat/management" @@ -59,6 +60,8 @@ func (n *OtelManager) Stop() { // Returning true might lead to side effects. func (n *OtelManager) Enabled() bool { return false } func (n *OtelManager) AgentInfo() management.AgentInfo { return management.AgentInfo{} } +func (n *OtelManager) PreInit() error { return nil } +func (n *OtelManager) PostInit() {} func (n *OtelManager) Start() error { return nil } func (n *OtelManager) CheckRawConfig(cfg *config.C) error { return nil } func (n *OtelManager) RegisterAction(action management.Action) {} @@ -73,3 +76,7 @@ func (n *OtelManager) SetDiagnosticExtension(receiverName string, ext Diagnostic n.ext = ext n.receiverName = receiverName } +func (n *OtelManager) WaitForStop(_ time.Duration) bool { + n.Stop() + return true +}