From 1263f242e193581c3192761bd7dae952f78e646f Mon Sep 17 00:00:00 2001 From: Panos Koutsovasilis Date: Fri, 17 Oct 2025 14:17:44 +0300 Subject: [PATCH 1/9] fix: zombie processes during restart by extending shutdown timeout to 35sRetry --- .../application/coordinator/coordinator.go | 9 ++++- .../pkg/agent/application/reexec/manager.go | 2 + .../pkg/otel/manager/execution_subprocess.go | 22 +++++++---- pkg/component/runtime/command.go | 4 ++ pkg/testing/fixture.go | 6 +++ .../ess/metrics_monitoring_test.go | 38 ++++++++++++++++++- 6 files changed, 71 insertions(+), 10 deletions(-) diff --git a/internal/pkg/agent/application/coordinator/coordinator.go b/internal/pkg/agent/application/coordinator/coordinator.go index 50383f8ff03..e7eb18ce58d 100644 --- a/internal/pkg/agent/application/coordinator/coordinator.go +++ b/internal/pkg/agent/application/coordinator/coordinator.go @@ -229,8 +229,13 @@ type VarsManager interface { type ComponentsModifier func(comps []component.Component, cfg map[string]interface{}) ([]component.Component, error) // managerShutdownTimeout is how long the coordinator will wait during shutdown -// to receive termination states from its managers. -const managerShutdownTimeout = time.Second * 5 +// to receive termination states from its managers. As the default stop timeout +// for components spawned as subprocesses is 30 seconds (process.DefaultConfig()), +// we need to wait a bit longer to ensure that all managers have had time to terminate. +// Note: if this timeout doesn't accommodate for the subprocess stop timeout, +// the Wait of subprocess might never be called, and we may end up with Zombie processes +// during restart in Unix systemss. +const managerShutdownTimeout = time.Second * 35 type configReloader interface { Reload(*config.Config) error diff --git a/internal/pkg/agent/application/reexec/manager.go b/internal/pkg/agent/application/reexec/manager.go index 3a595ed5491..b46c6a8687e 100644 --- a/internal/pkg/agent/application/reexec/manager.go +++ b/internal/pkg/agent/application/reexec/manager.go @@ -48,8 +48,10 @@ func NewManager(log *logger.Logger, exec string) ExecManager { func (m *manager) ReExec(shutdownCallback ShutdownCallbackFn, argOverrides ...string) { go func() { + m.logger.Debug("Triggering manager shutdown") close(m.trigger) <-m.shutdown + m.logger.Debug("Manager shutdown complete") if shutdownCallback != nil { if err := shutdownCallback(); err != nil { diff --git a/internal/pkg/otel/manager/execution_subprocess.go b/internal/pkg/otel/manager/execution_subprocess.go index 5c2f7f2f054..2e861d86a9f 100644 --- a/internal/pkg/otel/manager/execution_subprocess.go +++ b/internal/pkg/otel/manager/execution_subprocess.go @@ -203,6 +203,7 @@ func (r *subprocessExecution) startCollector(ctx context.Context, logger *logger go func() { procState, procErr := processInfo.Process.Wait() + logger.Debugf("wait for pid %d returned", processInfo.PID) procCtxCancel() <-healthCheckDone close(ctl.processDoneCh) @@ -309,19 +310,26 @@ func (s *procHandle) Stop(waitTime time.Duration) { default: } + s.log.Debugf("gracefully stopping pid %d", s.processInfo.PID) if err := s.processInfo.Stop(); err != nil { s.log.Warnf("failed to send stop signal to the supervised collector: %v", err) // we failed to stop the process just kill it and return - _ = s.processInfo.Kill() - return + } else { + select { + case <-time.After(waitTime): + s.log.Warnf("timeout waiting (%s) for the supervised collector to stop, killing it", waitTime.String()) + case <-s.processDoneCh: + // process has already exited + return + } } + // since we are here this means that the process either got an error at stop or did not stop within the timeout, + // kill it and give one more mere second for the process wait to be called + _ = s.processInfo.Kill() select { - case <-time.After(waitTime): - s.log.Warnf("timeout waiting (%s) for the supervised collector to stop, killing it", waitTime.String()) - // our caller ctx is Done; kill the process just in case - _ = s.processInfo.Kill() + case <-time.After(1 * time.Second): + s.log.Warnf("supervised collector subprocess didn't exit in time after killing it") case <-s.processDoneCh: - // process has already exited } } diff --git a/pkg/component/runtime/command.go b/pkg/component/runtime/command.go index 68e7077e0aa..ef75269f253 100644 --- a/pkg/component/runtime/command.go +++ b/pkg/component/runtime/command.go @@ -433,9 +433,12 @@ func (c *commandRuntime) stop(ctx context.Context) error { return case <-t.C: // kill no matter what (might already be stopped) + c.log.Debugf("timeout waiting for pid %d, killing it", c.proc.PID) _ = info.Kill() } }(c.proc, cmdSpec.Timeouts.Stop) + + c.log.Debugf("gracefully stopping pid %d", c.proc.PID) return c.proc.Stop() } @@ -452,6 +455,7 @@ func (c *commandRuntime) startWatcher(info *process.Info, comm Communicator) { ch := info.Wait() s := <-ch + c.log.Debugf("wait for pid %d returned", info.PID) c.procCh <- procState{ proc: info, state: s, diff --git a/pkg/testing/fixture.go b/pkg/testing/fixture.go index 81010fbce3b..255680eab08 100644 --- a/pkg/testing/fixture.go +++ b/pkg/testing/fixture.go @@ -849,6 +849,12 @@ func (f *Fixture) ExecInspect(ctx context.Context, opts ...process.CmdOption) (A return inspect, err } +// ExecRestart executes the restart subcommand on the prepared Elastic Agent binary. +func (f *Fixture) ExecRestart(ctx context.Context, opts ...process.CmdOption) error { + _, err := f.Exec(ctx, []string{"restart"}, opts...) + return err +} + // ExecVersion executes the version subcommand on the prepared Elastic Agent binary // with '--binary-only'. It returns the parsed YAML output. func (f *Fixture) ExecVersion(ctx context.Context, opts ...process.CmdOption) (AgentVersionOutput, error) { diff --git a/testing/integration/ess/metrics_monitoring_test.go b/testing/integration/ess/metrics_monitoring_test.go index b8b86e98183..c4dfeb4575e 100644 --- a/testing/integration/ess/metrics_monitoring_test.go +++ b/testing/integration/ess/metrics_monitoring_test.go @@ -13,13 +13,17 @@ import ( "fmt" "net/http" "net/http/httputil" + "runtime" "testing" "time" "github.com/gofrs/uuid/v5" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "github.com/elastic/elastic-agent-system-metrics/metric/system/process" + "github.com/elastic/elastic-agent-libs/kibana" "github.com/elastic/elastic-agent-libs/testing/estools" otelMonitoring "github.com/elastic/elastic-agent/internal/pkg/otel/monitoring" @@ -207,7 +211,39 @@ func (runner *MetricsRunner) TestBeatsMetrics() { return false } return true - }, time.Minute*10, time.Second*10, "could not fetch metrics for all known components in default install: %v", componentIds) + }, time.Minute*10, time.Second*10, "could not fetch metrics for edot collector") + + if runtime.GOOS == "windows" { + return + } + + // restart the agent to try and cause + err = runner.agentFixture.ExecRestart(ctx) + require.NoError(t, err, "could not restart agent") + + require.Eventually(t, func() bool { + err = runner.agentFixture.IsHealthy(ctx) + if err != nil { + t.Logf("waiting for agent healthy: %s", err.Error()) + return false + } + return true + }, 1*time.Minute, 1*time.Second) + + procStats := process.Stats{ + // filtering with '.*elastic-agent' or '^.*elastic-agent$' doesn't + // seem to work as expected, filtering is done in the for loop below + Procs: []string{".*"}, + } + err = procStats.Init() + require.NoError(t, err, "could not initialize process.Stats") + + pidMap, _, err := procStats.FetchPids() + require.NoError(t, err, "could not fetch pids") + + for _, state := range pidMap { + assert.NotEqualValuesf(t, process.Zombie, state.State, "process %d is in zombie state", state.Pid.ValueOr(0)) + } } func genESQuery(agentID string, requiredFields [][]string) map[string]interface{} { From c9fa8d8824861bb52b2d830666aa15309c59e502 Mon Sep 17 00:00:00 2001 From: Panos Koutsovasilis Date: Mon, 20 Oct 2025 14:04:50 +0300 Subject: [PATCH 2/9] fix: linter QF1003 could use tagged switch on state --- pkg/component/runtime/command.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pkg/component/runtime/command.go b/pkg/component/runtime/command.go index ef75269f253..a207b45c784 100644 --- a/pkg/component/runtime/command.go +++ b/pkg/component/runtime/command.go @@ -337,9 +337,10 @@ func (c *commandRuntime) forceCompState(state client.UnitState, msg string) { // compState updates just the component state not all the units. func (c *commandRuntime) compState(state client.UnitState) { msg := stateUnknownMessage - if state == client.UnitStateHealthy { + switch state { + case client.UnitStateHealthy: msg = fmt.Sprintf("Healthy: communicating with pid '%d'", c.proc.PID) - } else if state == client.UnitStateDegraded { + case client.UnitStateDegraded: if c.missedCheckins == 1 { msg = fmt.Sprintf("Degraded: pid '%d' missed 1 check-in", c.proc.PID) } else { From 0abf1dc3333151603c87ac79950d47046161537c Mon Sep 17 00:00:00 2001 From: Panos Koutsovasilis Date: Mon, 20 Oct 2025 14:08:02 +0300 Subject: [PATCH 3/9] fix: linter QF1012 --- pkg/component/runtime/command.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/component/runtime/command.go b/pkg/component/runtime/command.go index a207b45c784..2abfb2254b8 100644 --- a/pkg/component/runtime/command.go +++ b/pkg/component/runtime/command.go @@ -447,7 +447,7 @@ func (c *commandRuntime) startWatcher(info *process.Info, comm Communicator) { go func() { err := comm.WriteStartUpInfo(info.Stdin) if err != nil { - _, _ = c.logErr.Write([]byte(fmt.Sprintf("Failed: failed to provide connection information to spawned pid '%d': %s", info.PID, err))) + _, _ = fmt.Fprintf(c.logErr, "Failed: failed to provide connection information to spawned pid '%d': %s", info.PID, err) // kill instantly _ = info.Kill() } else { From 82c10c0da905873de30eb1f527479b697cbca77b Mon Sep 17 00:00:00 2001 From: Panos Koutsovasilis Date: Tue, 21 Oct 2025 13:10:59 +0300 Subject: [PATCH 4/9] doc: add changelog --- ...ease-manager-timeout-to-prevent-defunct-subprocesses.yaml | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 changelog/fragments/1761041331-increase-manager-timeout-to-prevent-defunct-subprocesses.yaml diff --git a/changelog/fragments/1761041331-increase-manager-timeout-to-prevent-defunct-subprocesses.yaml b/changelog/fragments/1761041331-increase-manager-timeout-to-prevent-defunct-subprocesses.yaml new file mode 100644 index 00000000000..099bb933cc2 --- /dev/null +++ b/changelog/fragments/1761041331-increase-manager-timeout-to-prevent-defunct-subprocesses.yaml @@ -0,0 +1,5 @@ +kind: bug-fix +summary: increase manager timeout to prevent defunct subprocesses +component: elastic-agent +pr: https://github.com/elastic/elastic-agent/pull/10650 +issue: https://github.com/elastic/elastic-agent/issues/7756 From 49d4dfe6e68cf2c6887ff4b7761f5bd901c1f766 Mon Sep 17 00:00:00 2001 From: Panos Koutsovasilis Date: Tue, 21 Oct 2025 16:16:56 +0300 Subject: [PATCH 5/9] doc: reword test code comments --- testing/integration/ess/metrics_monitoring_test.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/testing/integration/ess/metrics_monitoring_test.go b/testing/integration/ess/metrics_monitoring_test.go index c4dfeb4575e..d38d5d79f7a 100644 --- a/testing/integration/ess/metrics_monitoring_test.go +++ b/testing/integration/ess/metrics_monitoring_test.go @@ -217,7 +217,8 @@ func (runner *MetricsRunner) TestBeatsMetrics() { return } - // restart the agent to try and cause + // restart the agent to validate that this does not result in any agent-spawned subprocess + // becoming defunct err = runner.agentFixture.ExecRestart(ctx) require.NoError(t, err, "could not restart agent") @@ -232,7 +233,7 @@ func (runner *MetricsRunner) TestBeatsMetrics() { procStats := process.Stats{ // filtering with '.*elastic-agent' or '^.*elastic-agent$' doesn't - // seem to work as expected, filtering is done in the for loop below + // seem to work as expected Procs: []string{".*"}, } err = procStats.Init() From f3eee75e72027bb811e225d2d10cae9f02b16276 Mon Sep 17 00:00:00 2001 From: Panos Koutsovasilis Date: Wed, 22 Oct 2025 16:13:58 +0300 Subject: [PATCH 6/9] fix: make otel manager process stop timeout way shorter --- internal/pkg/agent/application/application.go | 2 +- internal/pkg/agent/application/coordinator/coordinator.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/pkg/agent/application/application.go b/internal/pkg/agent/application/application.go index 825b078cceb..8e672304ae4 100644 --- a/internal/pkg/agent/application/application.go +++ b/internal/pkg/agent/application/application.go @@ -266,7 +266,7 @@ func New( agentInfo, cfg.Settings.Collector, monitor.ComponentMonitoringConfig, - cfg.Settings.ProcessConfig.StopTimeout, + 3*time.Second, // this needs to be shorter than 5 * time.Seconds (coordinator.managerShutdownTimeout) otherwise we might end up with defunct processes ) if err != nil { return nil, nil, nil, fmt.Errorf("failed to create otel manager: %w", err) diff --git a/internal/pkg/agent/application/coordinator/coordinator.go b/internal/pkg/agent/application/coordinator/coordinator.go index e7eb18ce58d..9dbef7aacbf 100644 --- a/internal/pkg/agent/application/coordinator/coordinator.go +++ b/internal/pkg/agent/application/coordinator/coordinator.go @@ -235,7 +235,7 @@ type ComponentsModifier func(comps []component.Component, cfg map[string]interfa // Note: if this timeout doesn't accommodate for the subprocess stop timeout, // the Wait of subprocess might never be called, and we may end up with Zombie processes // during restart in Unix systemss. -const managerShutdownTimeout = time.Second * 35 +const managerShutdownTimeout = time.Second * 5 type configReloader interface { Reload(*config.Config) error From f66af911dbef379e9f010cbabec81e16868ab943 Mon Sep 17 00:00:00 2001 From: Panos Koutsovasilis Date: Thu, 23 Oct 2025 21:16:02 +0300 Subject: [PATCH 7/9] doc: add more documentation --- internal/pkg/agent/application/application.go | 2 +- internal/pkg/otel/manager/execution.go | 3 +++ internal/pkg/otel/manager/manager.go | 4 ++++ 3 files changed, 8 insertions(+), 1 deletion(-) diff --git a/internal/pkg/agent/application/application.go b/internal/pkg/agent/application/application.go index 8e672304ae4..b9443140d7f 100644 --- a/internal/pkg/agent/application/application.go +++ b/internal/pkg/agent/application/application.go @@ -266,7 +266,7 @@ func New( agentInfo, cfg.Settings.Collector, monitor.ComponentMonitoringConfig, - 3*time.Second, // this needs to be shorter than 5 * time.Seconds (coordinator.managerShutdownTimeout) otherwise we might end up with defunct processes + otelmanager.CollectorStopTimeout, ) if err != nil { return nil, nil, nil, fmt.Errorf("failed to create otel manager: %w", err) diff --git a/internal/pkg/otel/manager/execution.go b/internal/pkg/otel/manager/execution.go index 06085455d94..3b12d85940d 100644 --- a/internal/pkg/otel/manager/execution.go +++ b/internal/pkg/otel/manager/execution.go @@ -24,5 +24,8 @@ type collectorExecution interface { } type collectorHandle interface { + // Stop stops and waits for collector to exit gracefully within the given duration. Note that if the collector + // doesn't exit within that time, it will be killed and then it will wait an extra second for it to ensure it's + // really stopped. Stop(waitTime time.Duration) } diff --git a/internal/pkg/otel/manager/manager.go b/internal/pkg/otel/manager/manager.go index 31c806acbf0..2753cdf2151 100644 --- a/internal/pkg/otel/manager/manager.go +++ b/internal/pkg/otel/manager/manager.go @@ -44,6 +44,10 @@ type ExecutionMode string const ( SubprocessExecutionMode ExecutionMode = "subprocess" EmbeddedExecutionMode ExecutionMode = "embedded" + + // CollectorStopTimeout is the duration to wait for the collector to stop. Note: this needs to be shorter + // than 5 * time.Second (coordinator.managerShutdownTimeout) otherwise we might end up with a defunct process. + CollectorStopTimeout = 3 * time.Second ) type collectorRecoveryTimer interface { From bdb676ee12d7e6de7515af1e80519d7b005fa8e5 Mon Sep 17 00:00:00 2001 From: Panos Koutsovasilis Date: Fri, 24 Oct 2025 07:48:26 +0300 Subject: [PATCH 8/9] doc: remove changelog fragment --- ...ease-manager-timeout-to-prevent-defunct-subprocesses.yaml | 5 ----- 1 file changed, 5 deletions(-) delete mode 100644 changelog/fragments/1761041331-increase-manager-timeout-to-prevent-defunct-subprocesses.yaml diff --git a/changelog/fragments/1761041331-increase-manager-timeout-to-prevent-defunct-subprocesses.yaml b/changelog/fragments/1761041331-increase-manager-timeout-to-prevent-defunct-subprocesses.yaml deleted file mode 100644 index 099bb933cc2..00000000000 --- a/changelog/fragments/1761041331-increase-manager-timeout-to-prevent-defunct-subprocesses.yaml +++ /dev/null @@ -1,5 +0,0 @@ -kind: bug-fix -summary: increase manager timeout to prevent defunct subprocesses -component: elastic-agent -pr: https://github.com/elastic/elastic-agent/pull/10650 -issue: https://github.com/elastic/elastic-agent/issues/7756 From c8c8830bd41b820b7fe28ceadb642ce24387a567 Mon Sep 17 00:00:00 2001 From: Panos Koutsovasilis Date: Fri, 24 Oct 2025 12:26:03 +0300 Subject: [PATCH 9/9] doc: reword managerShutdownTimeout comment --- .../pkg/agent/application/coordinator/coordinator.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/internal/pkg/agent/application/coordinator/coordinator.go b/internal/pkg/agent/application/coordinator/coordinator.go index 9dbef7aacbf..15797044c80 100644 --- a/internal/pkg/agent/application/coordinator/coordinator.go +++ b/internal/pkg/agent/application/coordinator/coordinator.go @@ -229,12 +229,12 @@ type VarsManager interface { type ComponentsModifier func(comps []component.Component, cfg map[string]interface{}) ([]component.Component, error) // managerShutdownTimeout is how long the coordinator will wait during shutdown -// to receive termination states from its managers. As the default stop timeout -// for components spawned as subprocesses is 30 seconds (process.DefaultConfig()), -// we need to wait a bit longer to ensure that all managers have had time to terminate. -// Note: if this timeout doesn't accommodate for the subprocess stop timeout, -// the Wait of subprocess might never be called, and we may end up with Zombie processes -// during restart in Unix systemss. +// to receive termination states from its managers. +// Note: The current timeout (5s) is shorter than the default stop timeout for +// subprocess components (30s from process.DefaultConfig()). This means the +// coordinator may not wait for the subprocesses to finish terminating, preventing +// Wait() from being called on them. This will result in zombie processes +// during restart on Unix systems. const managerShutdownTimeout = time.Second * 5 type configReloader interface {