Skip to content

Commit 3f10c97

Browse files
mergify[bot]belimawrfaec
authored
[8.19](backport #49796) Split BeatV2Manager Start into two methods, so Beats can reply to check-in in parallel to its initialisation (#49848)
The Start method from BeatV2Manager is split into two methods: - PreInit: responsible for starting the Elastic Agent client and start replying to check-ins. - PostInit: responsible for setting the Beats status to 'Running' and start executing Unit changes. A new method, WaitForStop is also added. It stops the BeatV2Manager and waits until all goroutines have returned. Currently it is only used in tests that use `testing.T` as the logger output to ensure no panics happen because the logger was used after the test ended. Multiple lint warnings are fixed GenAI-Assisted: Yes Human-Reviewed: Yes Tool: Cursor-CLI, Model: GPT-5.3 Codex Extra High Fast (cherry picked from commit 034546f) # Conflicts: # x-pack/libbeat/management/managerV2.go # x-pack/osquerybeat/beater/osquerybeat_status_test.go * Fix race that can block managerV2 shutdown (#49414) As described in #49388, `BeatV2Manager` can miss the shutdown signal because its `Stop` method notifies the manager by sending to its signal channel `stopChan` rather than closing it, but there are two goroutines that both listen on that channel. This PR changes `Stop` to close the channel rather than just sending. It also removes the second `stopChan` listener in `watchErrChan`, since the main goroutine already calls the context canceler for that helper when `stopChan` unblocks (this isn't strictly necessary but it will keep error states visible for a little longer during shutdown, and is what was previously happening in the "good" path where the main worker received the stop signal first). (cherry picked from commit d39cb49) --------- Co-authored-by: Tiago Queiroz <tiago.queiroz@elastic.co> Co-authored-by: Fae Charlton <fae.charlton@elastic.co>
1 parent ae99183 commit 3f10c97

File tree

11 files changed

+382
-30
lines changed

11 files changed

+382
-30
lines changed
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
# REQUIRED
2+
# Kind can be one of:
3+
# - breaking-change: a change to previously-documented behavior
4+
# - deprecation: functionality that is being removed in a later release
5+
# - bug-fix: fixes a problem in a previous version
6+
# - enhancement: extends functionality but does not break or fix existing behavior
7+
# - feature: new functionality
8+
# - known-issue: problems that we are aware of in a given version
9+
# - security: impacts on the security of a product or a user’s deployment.
10+
# - upgrade: important information for someone upgrading from a prior version
11+
# - other: does not fit into any of the other categories
12+
kind: bug-fix
13+
14+
# REQUIRED for all kinds
15+
# Change summary; a 80ish characters long description of the change.
16+
summary: Fix an issue that could delay reporting shutdown of Agent components
17+
18+
# REQUIRED for breaking-change, deprecation, known-issue
19+
# Long description; in case the summary is not enough to describe the change
20+
# this field accommodate a description without length limits.
21+
# description:
22+
23+
# REQUIRED for breaking-change, deprecation, known-issue
24+
# impact:
25+
26+
# REQUIRED for breaking-change, deprecation, known-issue
27+
# action:
28+
29+
# REQUIRED for all kinds
30+
# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
31+
component: elastic-agent
32+
33+
# AUTOMATED
34+
# OPTIONAL to manually add other PR URLs
35+
# PR URL: A link the PR that added the changeset.
36+
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
37+
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
38+
# Please provide it if you are adding a fragment for a different PR.
39+
# pr: https://github.com/owner/repo/1234
40+
41+
# AUTOMATED
42+
# OPTIONAL to manually add other issue URLs
43+
# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
44+
# If not present is automatically filled by the tooling with the issue linked to the PR number.
45+
# issue: https://github.com/owner/repo/1234
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
# REQUIRED
2+
# Kind can be one of:
3+
# - breaking-change: a change to previously-documented behavior
4+
# - deprecation: functionality that is being removed in a later release
5+
# - bug-fix: fixes a problem in a previous version
6+
# - enhancement: extends functionality but does not break or fix existing behavior
7+
# - feature: new functionality
8+
# - known-issue: problems that we are aware of in a given version
9+
# - security: impacts on the security of a product or a user’s deployment.
10+
# - upgrade: important information for someone upgrading from a prior version
11+
# - other: does not fit into any of the other categories
12+
kind: bug-fix
13+
14+
# REQUIRED for all kinds
15+
# Change summary; a 80ish characters long description of the change.
16+
summary: Fix Filebeat crash loop when running under Elastic Agent and taking too long to initialise
17+
18+
# REQUIRED for breaking-change, deprecation, known-issue
19+
# Long description; in case the summary is not enough to describe the change
20+
# this field accommodate a description without length limits.
21+
# description:
22+
23+
# REQUIRED for breaking-change, deprecation, known-issue
24+
# impact:
25+
26+
# REQUIRED for breaking-change, deprecation, known-issue
27+
# action:
28+
29+
# REQUIRED for all kinds
30+
# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
31+
component: filebeat
32+
33+
# AUTOMATED
34+
# OPTIONAL to manually add other PR URLs
35+
# PR URL: A link the PR that added the changeset.
36+
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
37+
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
38+
# Please provide it if you are adding a fragment for a different PR.
39+
# pr: https://github.com/owner/repo/1234
40+
41+
# AUTOMATED
42+
# OPTIONAL to manually add other issue URLs
43+
# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
44+
# If not present is automatically filled by the tooling with the issue linked to the PR number.
45+
# issue: https://github.com/owner/repo/1234

filebeat/beater/filebeat.go

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -288,6 +288,21 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
288288
}
289289
finishedLogger := newFinishedLogger(wgEvents)
290290

291+
// Start the check-in loop, so Filebeat can respond to Elastic Agent,
292+
// but it won't start any inputs/output
293+
if err := b.Manager.PreInit(); err != nil {
294+
return err
295+
}
296+
297+
// Ensure that we only call b.Manager.Stop out of order
298+
// if Run has failed early/before b.Manager.PostInit() was called.
299+
managerEarlyStop := b.Manager.Stop
300+
defer func() {
301+
if managerEarlyStop != nil {
302+
managerEarlyStop()
303+
}
304+
}()
305+
291306
registryMigrator := registrar.NewMigrator(config.Registry, fb.logger, b.Paths)
292307
if err := registryMigrator.Run(); err != nil {
293308
fb.logger.Errorf("Failed to migrate registry file: %+v", err)
@@ -481,10 +496,8 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
481496
}
482497
adiscover.Start()
483498

484-
// We start the manager when all the subsystem are initialized and ready to received events.
485-
if err := b.Manager.Start(); err != nil {
486-
return err
487-
}
499+
b.Manager.PostInit()
500+
managerEarlyStop = nil
488501

489502
// Add done channel to wait for shutdown signal
490503
waitFinished.AddChan(fb.done)

libbeat/beat/beat_test.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"regexp"
2222
"strings"
2323
"testing"
24+
"time"
2425

2526
"github.com/stretchr/testify/require"
2627

@@ -38,6 +39,8 @@ type testManager struct {
3839
func (tm testManager) UpdateStatus(_ status.Status, _ string) {}
3940
func (tm testManager) Enabled() bool { return tm.isEnabled }
4041
func (tm testManager) Start() error { return nil }
42+
func (tm testManager) PreInit() error { return nil }
43+
func (tm testManager) PostInit() {}
4144
func (tm testManager) Stop() {}
4245
func (tm testManager) AgentInfo() management.AgentInfo {
4346
return management.AgentInfo{Unprivileged: tm.isUnpriv, ManagedMode: tm.mgmtMode}
@@ -49,6 +52,10 @@ func (tm testManager) UnregisterAction(_ management.Action) {}
4952
func (tm testManager) SetPayload(_ map[string]interface{}) {}
5053
func (tm testManager) RegisterDiagnosticHook(_ string, _ string, _ string, _ string, _ management.DiagnosticHook) {
5154
}
55+
func (tm testManager) WaitForStop(_ time.Duration) bool {
56+
tm.Stop()
57+
return true
58+
}
5259

5360
func TestUserAgentString(t *testing.T) {
5461
tests := []struct {

libbeat/cmd/instance/beat_test.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"io/ioutil"
2626
"os"
2727
"testing"
28+
"time"
2829

2930
"github.com/elastic/beats/v7/libbeat/beat"
3031
"github.com/elastic/beats/v7/libbeat/cfgfile"
@@ -506,10 +507,16 @@ func (m mockManager) RegisterDiagnosticHook(name, description, filename, content
506507
func (m mockManager) SetPayload(payload map[string]interface{}) {}
507508
func (m mockManager) SetStopCallback(f func()) {}
508509
func (m mockManager) Start() error { return nil }
510+
func (m mockManager) PreInit() error { return nil }
511+
func (m mockManager) PostInit() {}
509512
func (m mockManager) Status() status.Status { return status.Status(-42) }
510513
func (m mockManager) Stop() {}
511514
func (m mockManager) UnregisterAction(action management.Action) {}
512515
func (m mockManager) UpdateStatus(status status.Status, msg string) {}
516+
func (m mockManager) WaitForStop(_ time.Duration) bool {
517+
m.Stop()
518+
return true
519+
}
513520

514521
func TestManager(t *testing.T) {
515522
// set the mockManger factory.

libbeat/management/management.go

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package management
1919

2020
import (
2121
"sync"
22+
"time"
2223

2324
"github.com/elastic/beats/v7/libbeat/common/reload"
2425
"github.com/elastic/beats/v7/libbeat/management/status"
@@ -37,12 +38,25 @@ type Manager interface {
3738
// Enabled returns true if manager is enabled.
3839
Enabled() bool
3940

40-
// Start needs to invoked when the system is ready to receive an external configuration and
41+
// Starts the unitListen loop, so the manager can already
42+
// check-in with Elastic Agent, but no input/output will be
43+
// started yet. Call [PostInit] to enable starting/stopping
44+
// inputs/output.
45+
PreInit() error
46+
47+
// PostInit needs to be invoked when the system is ready to receive an external configuration and
4148
// also ready to start ingesting new events. The manager expects that all the reloadable and
4249
// reloadable list are fixed for the whole lifetime of the manager.
4350
//
4451
// Notes: Adding dynamically new reloadable hooks at runtime can lead to inconsistency in the
4552
// execution.
53+
PostInit()
54+
55+
// Start starts the manager.
56+
//
57+
// Deprecated: Use [PreInit] and [PostInit] instead
58+
//
59+
// For backwards compatibility, [Start] calls [PreInit] then [PostInit].
4660
Start() error
4761

4862
// Stop when this method is called, the manager will stop receiving new actions, no more action
@@ -54,6 +68,11 @@ type Manager interface {
5468
// Note: Stop will not call 'UnregisterAction()' automatically.
5569
Stop()
5670

71+
// WaitForStop blocks until the manager has fully stopped, or timeout elapses.
72+
// It returns true if the manager stopped before timeout, false otherwise.
73+
// A non-positive timeout means wait indefinitely.
74+
WaitForStop(timeout time.Duration) bool
75+
5776
// AgentInfo returns the information of the agent to which the manager is connected.
5877
AgentInfo() AgentInfo
5978

@@ -161,9 +180,15 @@ func (n *FallbackManager) Stop() {
161180
// the nilManager is still used for shutdown on some cases,
162181
// but that does not mean the Beat is being managed externally,
163182
// hence it will always return false.
164-
func (n *FallbackManager) Enabled() bool { return false }
165-
func (n *FallbackManager) AgentInfo() AgentInfo { return AgentInfo{} }
166-
func (n *FallbackManager) Start() error { return nil }
183+
func (n *FallbackManager) Enabled() bool { return false }
184+
func (n *FallbackManager) AgentInfo() AgentInfo { return AgentInfo{} }
185+
func (n *FallbackManager) PreInit() error { return nil }
186+
func (n *FallbackManager) PostInit() {}
187+
func (n *FallbackManager) Start() error { return nil }
188+
func (n *FallbackManager) WaitForStop(_ time.Duration) bool {
189+
n.Stop()
190+
return true
191+
}
167192
func (n *FallbackManager) CheckRawConfig(cfg *config.C) error { return nil }
168193
func (n *FallbackManager) RegisterAction(action Action) {}
169194
func (n *FallbackManager) UnregisterAction(action Action) {}

x-pack/filebeat/tests/integration/managerV2_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -283,7 +283,7 @@ func TestFailedOutputReportsUnhealthy(t *testing.T) {
283283
Id: "input-unit",
284284
Type: proto.UnitType_INPUT,
285285
ConfigStateIdx: 1,
286-
State: proto.State_STARTING,
286+
State: proto.State_HEALTHY,
287287
LogLevel: proto.UnitLogLevel_DEBUG,
288288
Config: &proto.UnitExpectedConfig{
289289
Id: "log-input",
@@ -331,7 +331,7 @@ func TestFailedOutputReportsUnhealthy(t *testing.T) {
331331

332332
require.Eventually(t, func() bool {
333333
return finalStateReached.Load()
334-
}, 30*time.Second, 100*time.Millisecond, "Output unit did not report unhealthy")
334+
}, 30*time.Second, 100*time.Millisecond, "Output unit did not report unhealthy or input did not report health")
335335

336336
t.Cleanup(server.Stop)
337337
}

x-pack/filebeat/tests/integration/status_reporter_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ func TestLogStatusReporter(t *testing.T) {
103103
},
104104
{
105105
proto.State_DEGRADED,
106-
&inputStream,
106+
&inputStreamIrregular,
107107
},
108108
{
109109
proto.State_HEALTHY,
@@ -159,7 +159,7 @@ func getInputStream(id string, path string, stateIdx int) proto.UnitExpected {
159159
return proto.UnitExpected{
160160
Id: id,
161161
Type: proto.UnitType_INPUT,
162-
ConfigStateIdx: uint64(stateIdx),
162+
ConfigStateIdx: uint64(stateIdx), //nolint:gosec // stateIdx is always positive
163163
State: proto.State_HEALTHY,
164164
Config: &proto.UnitExpectedConfig{
165165
Streams: []*proto.Stream{{

0 commit comments

Comments
 (0)