Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions changelog/fragments/1774984035-move-manager-start.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# REQUIRED
# Kind can be one of:
# - breaking-change: a change to previously-documented behavior
# - deprecation: functionality that is being removed in a later release
# - bug-fix: fixes a problem in a previous version
# - enhancement: extends functionality but does not break or fix existing behavior
# - feature: new functionality
# - known-issue: problems that we are aware of in a given version
# - security: impacts on the security of a product or a user’s deployment.
# - upgrade: important information for someone upgrading from a prior version
# - other: does not fit into any of the other categories
kind: bug-fix

# REQUIRED for all kinds
# Change summary; a 80ish characters long description of the change.
summary: Fix Filebeat crash loop when running under Elastic Agent and taking too long to initialise

# REQUIRED for breaking-change, deprecation, known-issue
# Long description; in case the summary is not enough to describe the change
# this field accommodate a description without length limits.
# description:

# REQUIRED for breaking-change, deprecation, known-issue
# impact:

# REQUIRED for breaking-change, deprecation, known-issue
# action:

# REQUIRED for all kinds
# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
component: filebeat

# AUTOMATED
# OPTIONAL to manually add other PR URLs
# PR URL: A link the PR that added the changeset.
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
# Please provide it if you are adding a fragment for a different PR.
# pr: https://github.com/owner/repo/1234

# AUTOMATED
# OPTIONAL to manually add other issue URLs
# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
# If not present is automatically filled by the tooling with the issue linked to the PR number.
# issue: https://github.com/owner/repo/1234
21 changes: 17 additions & 4 deletions filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,21 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
}
finishedLogger := newFinishedLogger(wgEvents)

// Start the check-in loop, so Filebeat can respond to Elastic Agent,
// but it won't start any inputs/output
if err := b.Manager.PreInit(); err != nil {
return err
}

// Ensure that we only call b.Manager.Stop out of order
// if Run has failed early/before b.Manager.PostInit() was called.
managerEarlyStop := b.Manager.Stop
defer func() {
if managerEarlyStop != nil {
managerEarlyStop()
}
}()

registryMigrator := registrar.NewMigrator(config.Registry, fb.logger, b.Paths)
if err := registryMigrator.Run(); err != nil {
fb.logger.Errorf("Failed to migrate registry file: %+v", err)
Expand Down Expand Up @@ -481,10 +496,8 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
}
adiscover.Start()

// We start the manager when all the subsystem are initialized and ready to received events.
if err := b.Manager.Start(); err != nil {
return err
}
b.Manager.PostInit()
managerEarlyStop = nil

// Add done channel to wait for shutdown signal
waitFinished.AddChan(fb.done)
Expand Down
7 changes: 7 additions & 0 deletions libbeat/beat/beat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"regexp"
"strings"
"testing"
"time"

"github.com/stretchr/testify/require"

Expand All @@ -38,6 +39,8 @@ type testManager struct {
func (tm testManager) UpdateStatus(_ status.Status, _ string) {}
func (tm testManager) Enabled() bool { return tm.isEnabled }
func (tm testManager) Start() error { return nil }
func (tm testManager) PreInit() error { return nil }
func (tm testManager) PostInit() {}
func (tm testManager) Stop() {}
func (tm testManager) AgentInfo() management.AgentInfo {
return management.AgentInfo{Unprivileged: tm.isUnpriv, ManagedMode: tm.mgmtMode}
Expand All @@ -49,6 +52,10 @@ func (tm testManager) UnregisterAction(_ management.Action) {}
func (tm testManager) SetPayload(_ map[string]interface{}) {}
func (tm testManager) RegisterDiagnosticHook(_ string, _ string, _ string, _ string, _ management.DiagnosticHook) {
}
func (tm testManager) WaitForStop(_ time.Duration) bool {
tm.Stop()
return true
}

func TestUserAgentString(t *testing.T) {
tests := []struct {
Expand Down
7 changes: 7 additions & 0 deletions libbeat/cmd/instance/beat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"io/ioutil"
"os"
"testing"
"time"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/cfgfile"
Expand Down Expand Up @@ -506,10 +507,16 @@ func (m mockManager) RegisterDiagnosticHook(name, description, filename, content
func (m mockManager) SetPayload(payload map[string]interface{}) {}
func (m mockManager) SetStopCallback(f func()) {}
func (m mockManager) Start() error { return nil }
func (m mockManager) PreInit() error { return nil }
func (m mockManager) PostInit() {}
func (m mockManager) Status() status.Status { return status.Status(-42) }
func (m mockManager) Stop() {}
func (m mockManager) UnregisterAction(action management.Action) {}
func (m mockManager) UpdateStatus(status status.Status, msg string) {}
func (m mockManager) WaitForStop(_ time.Duration) bool {
m.Stop()
return true
}

func TestManager(t *testing.T) {
// set the mockManger factory.
Expand Down
24 changes: 23 additions & 1 deletion libbeat/management/management.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package management

import (
"sync"
"time"

"github.com/elastic/beats/v7/libbeat/common/reload"
"github.com/elastic/beats/v7/libbeat/management/status"
Expand All @@ -37,12 +38,25 @@ type Manager interface {
// Enabled returns true if manager is enabled.
Enabled() bool

// Start needs to invoked when the system is ready to receive an external configuration and
// Starts the unitListen loop, so the manager can already
// check-in with Elastic Agent, but no input/output will be
// started yet. Call [PostInit] to enable starting/stopping
// inputs/output.
PreInit() error

// PostInit needs to be invoked when the system is ready to receive an external configuration and
// also ready to start ingesting new events. The manager expects that all the reloadable and
// reloadable list are fixed for the whole lifetime of the manager.
//
// Notes: Adding dynamically new reloadable hooks at runtime can lead to inconsistency in the
// execution.
PostInit()

// Start starts the manager.
//
// Deprecated: Use [PreInit] and [PostInit] instead
//
// For backwards compatibility, [Start] calls [PreInit] then [PostInit].
Start() error

// Stop when this method is called, the manager will stop receiving new actions, no more action
Expand All @@ -54,6 +68,11 @@ type Manager interface {
// Note: Stop will not call 'UnregisterAction()' automatically.
Stop()

// WaitForStop blocks until the manager has fully stopped, or timeout elapses.
// It returns true if the manager stopped before timeout, false otherwise.
// A non-positive timeout means wait indefinitely.
WaitForStop(timeout time.Duration) bool

// AgentInfo returns the information of the agent to which the manager is connected.
AgentInfo() AgentInfo

Expand Down Expand Up @@ -163,7 +182,10 @@ func (n *FallbackManager) Stop() {
// hence it will always return false.
func (n *FallbackManager) Enabled() bool { return false }
func (n *FallbackManager) AgentInfo() AgentInfo { return AgentInfo{} }
func (n *FallbackManager) PreInit() error { return nil }
func (n *FallbackManager) PostInit() {}
func (n *FallbackManager) Start() error { return nil }
func (n *FallbackManager) WaitForStop(_ time.Duration) bool { return true }
func (n *FallbackManager) CheckRawConfig(cfg *config.C) error { return nil }
func (n *FallbackManager) RegisterAction(action Action) {}
func (n *FallbackManager) UnregisterAction(action Action) {}
Expand Down
4 changes: 2 additions & 2 deletions x-pack/filebeat/tests/integration/managerV2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ func TestFailedOutputReportsUnhealthy(t *testing.T) {
Id: "input-unit",
Type: proto.UnitType_INPUT,
ConfigStateIdx: 1,
State: proto.State_STARTING,
State: proto.State_HEALTHY,
LogLevel: proto.UnitLogLevel_DEBUG,
Config: &proto.UnitExpectedConfig{
Id: "log-input",
Expand Down Expand Up @@ -331,7 +331,7 @@ func TestFailedOutputReportsUnhealthy(t *testing.T) {

require.Eventually(t, func() bool {
return finalStateReached.Load()
}, 30*time.Second, 100*time.Millisecond, "Output unit did not report unhealthy")
}, 30*time.Second, 100*time.Millisecond, "Output unit did not report unhealthy or input did not report health")

t.Cleanup(server.Stop)
}
Expand Down
4 changes: 2 additions & 2 deletions x-pack/filebeat/tests/integration/status_reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func TestLogStatusReporter(t *testing.T) {
},
{
proto.State_DEGRADED,
&inputStream,
&inputStreamIrregular,
},
{
proto.State_HEALTHY,
Expand Down Expand Up @@ -159,7 +159,7 @@ func getInputStream(id string, path string, stateIdx int) proto.UnitExpected {
return proto.UnitExpected{
Id: id,
Type: proto.UnitType_INPUT,
ConfigStateIdx: uint64(stateIdx),
ConfigStateIdx: uint64(stateIdx), //nolint:gosec // stateIdx is always positive
State: proto.State_HEALTHY,
Config: &proto.UnitExpectedConfig{
Streams: []*proto.Stream{{
Expand Down
87 changes: 81 additions & 6 deletions x-pack/libbeat/management/managerV2.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,15 @@
// sync channel for shutting down the manager after we get a stop from
// either the agent or the beat
stopChan chan struct{}
stopOnce sync.Once

Check failure on line 94 in x-pack/libbeat/management/managerV2.go

View workflow job for this annotation

GitHub Actions / lint (windows-latest)

field stopOnce is unused (unused)

Check failure on line 94 in x-pack/libbeat/management/managerV2.go

View workflow job for this annotation

GitHub Actions / lint (macos-latest)

field stopOnce is unused (unused)

Check failure on line 94 in x-pack/libbeat/management/managerV2.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

field stopOnce is unused (unused)
// waits for manager goroutines started in PreInit() to exit
stopWait sync.WaitGroup

isRunning bool

// Set to true when the Beat is ready to start inputs/output
beatIsReady bool

// set with the last applied output config
// allows tracking if the configuration actually changed and if the
// beat needs to restart if stopOnOutputReload is set
Expand All @@ -117,7 +123,7 @@
// trying to reload the configuration after an input not finished error
// happens
forceReloadDebounce time.Duration
wg sync.WaitGroup

Check failure on line 126 in x-pack/libbeat/management/managerV2.go

View workflow job for this annotation

GitHub Actions / lint (windows-latest)

field wg is unused (unused)

Check failure on line 126 in x-pack/libbeat/management/managerV2.go

View workflow job for this annotation

GitHub Actions / lint (macos-latest)

field wg is unused (unused)

Check failure on line 126 in x-pack/libbeat/management/managerV2.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

field wg is unused (unused)
}

// ================================
Expand Down Expand Up @@ -279,8 +285,11 @@
cm.stopFunc = stopFunc
}

// Start the config manager.
func (cm *BeatV2Manager) Start() error {
// PreInit starts the unitListen loop, so the manager can already
// check-in with Elastic Agent, but no input/output will be
// started yet. Call [PostInit] to enable starting/stopping
// inputs/output.
func (cm *BeatV2Manager) PreInit() error {
if !cm.Enabled() {
return fmt.Errorf("V2 Manager is disabled")
}
Expand All @@ -289,6 +298,7 @@
cm.errCanceller = nil
}

cm.logger.Debug("Manager starting")
ctx := context.Background()
err := cm.client.Start(ctx)
if err != nil {
Expand All @@ -297,24 +307,79 @@
ctx, canceller := context.WithCancel(ctx)
cm.errCanceller = canceller

go cm.watchErrChan(ctx)
cm.stopWait.Go(func() {
cm.watchErrChan(ctx)
})
cm.client.RegisterDiagnosticHook(
"beat-rendered-config",
"the rendered config used by the beat",
"beat-rendered-config.yml",
"application/yaml",
cm.handleDebugYaml)

go cm.unitListen()
cm.UpdateStatus(status.Starting, "Starting")

cm.stopWait.Go(cm.unitListen)
cm.isRunning = true
return nil
}

// PostInit allows the manager to start/stop inputs/output.
func (cm *BeatV2Manager) PostInit() {
cm.UpdateStatus(status.Running, "Running")

cm.mx.Lock()
defer cm.mx.Unlock()

cm.beatIsReady = true
cm.logger.Debug("Manager ready to accept units.")
}

// Start starts the manager.
//
// Deprecated: Use [PreInit] and [PostInit] instead
//
// For backwards compatibility, [Start] calls [PreInit] then [PostInit].
func (cm *BeatV2Manager) Start() error {
if err := cm.PreInit(); err != nil {
return err
}

cm.PostInit()
return nil
}

// Stop stops the current Manager and close the connection to Elastic Agent.
func (cm *BeatV2Manager) Stop() {
cm.stopChan <- struct{}{}
}

// WaitForStop blocks until the manager has fully stopped, or timeout elapses.
// It returns true if the manager stopped before timeout, false otherwise.
// A non-positive timeout means wait indefinitely.
func (cm *BeatV2Manager) WaitForStop(timeout time.Duration) bool {
cm.Stop()
done := make(chan struct{})
go func() {
cm.stopWait.Wait()
close(done)
}()

if timeout <= 0 {
<-done
return true
}

t := time.NewTimer(timeout)

select {
case <-done:
return true
case <-t.C:
return false
}
}

// CheckRawConfig is currently not implemented for V1.
func (cm *BeatV2Manager) CheckRawConfig(_ *conf.C) error {
// This does not do anything on V1 or V2, but here we are
Expand Down Expand Up @@ -476,7 +541,7 @@
return
case err := <-cm.client.Errors():
// Don't print the context canceled errors that happen normally during shutdown, restart, etc
if !errors.Is(context.Canceled, err) {

Check failure on line 544 in x-pack/libbeat/management/managerV2.go

View workflow job for this annotation

GitHub Actions / lint (windows-latest)

SA1032: arguments have the wrong order (staticcheck)

Check failure on line 544 in x-pack/libbeat/management/managerV2.go

View workflow job for this annotation

GitHub Actions / lint (macos-latest)

SA1032: arguments have the wrong order (staticcheck)

Check failure on line 544 in x-pack/libbeat/management/managerV2.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

SA1032: arguments have the wrong order (staticcheck)
cm.logger.Errorf("elastic-agent-client error: %s", err)
}
case <-cm.stopChan:
Expand Down Expand Up @@ -520,7 +585,7 @@
cm.logger.Infof(
"BeatV2Manager.unitListen UnitChanged.ID(%s), UnitChanged.Type(%s), UnitChanged.Trigger(%d): %s/%s",
change.Unit.ID(),
change.Type, int64(change.Triggers), change.Type, change.Triggers)
change.Type, int64(change.Triggers), change.Type, change.Triggers) //nolint:gosec // It's just logging

switch change.Type {
// Within the context of how we send config to beats, I'm not sure if there is a difference between
Expand All @@ -541,10 +606,20 @@
cm.softDeleteUnit(change.Unit)
}
case <-t.C:
cm.mx.Lock()

// If the Beat is not ready to accept configuration, do nothing
// and ensure the timer will fire again.
if !cm.beatIsReady {
cm.logger.Debug("Debounce timer fired, but Beat is not ready yet.")
cm.mx.Unlock()
t.Reset(cm.forceReloadDebounce)
continue
}

// a copy of the units is used for reload to prevent the holding of the `cm.mx`.
// it could be possible that sending the configuration to reload could cause the `UpdateStatus`
// to be called on the manager causing it to try and grab the `cm.mx` lock, causing a deadlock.
cm.mx.Lock()
units := make(map[unitKey]*agentUnit, len(cm.units))
for k, u := range cm.units {
if u.softDeleted {
Expand Down
Loading
Loading