diff --git a/internal/pkg/agent/application/application.go b/internal/pkg/agent/application/application.go index 825b078cceb..b9443140d7f 100644 --- a/internal/pkg/agent/application/application.go +++ b/internal/pkg/agent/application/application.go @@ -266,7 +266,7 @@ func New( agentInfo, cfg.Settings.Collector, monitor.ComponentMonitoringConfig, - cfg.Settings.ProcessConfig.StopTimeout, + otelmanager.CollectorStopTimeout, ) if err != nil { return nil, nil, nil, fmt.Errorf("failed to create otel manager: %w", err) diff --git a/internal/pkg/agent/application/coordinator/coordinator.go b/internal/pkg/agent/application/coordinator/coordinator.go index 50383f8ff03..15797044c80 100644 --- a/internal/pkg/agent/application/coordinator/coordinator.go +++ b/internal/pkg/agent/application/coordinator/coordinator.go @@ -230,6 +230,11 @@ type ComponentsModifier func(comps []component.Component, cfg map[string]interfa // managerShutdownTimeout is how long the coordinator will wait during shutdown // to receive termination states from its managers. +// Note: The current timeout (5s) is shorter than the default stop timeout for +// subprocess components (30s from process.DefaultConfig()). This means the +// coordinator may not wait for the subprocesses to finish terminating, preventing +// Wait() from being called on them. This will result in zombie processes +// during restart on Unix systems. const managerShutdownTimeout = time.Second * 5 type configReloader interface { diff --git a/internal/pkg/agent/application/reexec/manager.go b/internal/pkg/agent/application/reexec/manager.go index 3a595ed5491..b46c6a8687e 100644 --- a/internal/pkg/agent/application/reexec/manager.go +++ b/internal/pkg/agent/application/reexec/manager.go @@ -48,8 +48,10 @@ func NewManager(log *logger.Logger, exec string) ExecManager { func (m *manager) ReExec(shutdownCallback ShutdownCallbackFn, argOverrides ...string) { go func() { + m.logger.Debug("Triggering manager shutdown") close(m.trigger) <-m.shutdown + m.logger.Debug("Manager shutdown complete") if shutdownCallback != nil { if err := shutdownCallback(); err != nil { diff --git a/internal/pkg/otel/manager/execution.go b/internal/pkg/otel/manager/execution.go index 06085455d94..3b12d85940d 100644 --- a/internal/pkg/otel/manager/execution.go +++ b/internal/pkg/otel/manager/execution.go @@ -24,5 +24,8 @@ type collectorExecution interface { } type collectorHandle interface { + // Stop stops and waits for collector to exit gracefully within the given duration. Note that if the collector + // doesn't exit within that time, it will be killed and then it will wait an extra second for it to ensure it's + // really stopped. Stop(waitTime time.Duration) } diff --git a/internal/pkg/otel/manager/execution_subprocess.go b/internal/pkg/otel/manager/execution_subprocess.go index 5c2f7f2f054..2e861d86a9f 100644 --- a/internal/pkg/otel/manager/execution_subprocess.go +++ b/internal/pkg/otel/manager/execution_subprocess.go @@ -203,6 +203,7 @@ func (r *subprocessExecution) startCollector(ctx context.Context, logger *logger go func() { procState, procErr := processInfo.Process.Wait() + logger.Debugf("wait for pid %d returned", processInfo.PID) procCtxCancel() <-healthCheckDone close(ctl.processDoneCh) @@ -309,19 +310,26 @@ func (s *procHandle) Stop(waitTime time.Duration) { default: } + s.log.Debugf("gracefully stopping pid %d", s.processInfo.PID) if err := s.processInfo.Stop(); err != nil { s.log.Warnf("failed to send stop signal to the supervised collector: %v", err) // we failed to stop the process just kill it and return - _ = s.processInfo.Kill() - return + } else { + select { + case <-time.After(waitTime): + s.log.Warnf("timeout waiting (%s) for the supervised collector to stop, killing it", waitTime.String()) + case <-s.processDoneCh: + // process has already exited + return + } } + // since we are here this means that the process either got an error at stop or did not stop within the timeout, + // kill it and give one more mere second for the process wait to be called + _ = s.processInfo.Kill() select { - case <-time.After(waitTime): - s.log.Warnf("timeout waiting (%s) for the supervised collector to stop, killing it", waitTime.String()) - // our caller ctx is Done; kill the process just in case - _ = s.processInfo.Kill() + case <-time.After(1 * time.Second): + s.log.Warnf("supervised collector subprocess didn't exit in time after killing it") case <-s.processDoneCh: - // process has already exited } } diff --git a/internal/pkg/otel/manager/manager.go b/internal/pkg/otel/manager/manager.go index 31c806acbf0..2753cdf2151 100644 --- a/internal/pkg/otel/manager/manager.go +++ b/internal/pkg/otel/manager/manager.go @@ -44,6 +44,10 @@ type ExecutionMode string const ( SubprocessExecutionMode ExecutionMode = "subprocess" EmbeddedExecutionMode ExecutionMode = "embedded" + + // CollectorStopTimeout is the duration to wait for the collector to stop. Note: this needs to be shorter + // than 5 * time.Second (coordinator.managerShutdownTimeout) otherwise we might end up with a defunct process. + CollectorStopTimeout = 3 * time.Second ) type collectorRecoveryTimer interface { diff --git a/pkg/component/runtime/command.go b/pkg/component/runtime/command.go index 68e7077e0aa..2abfb2254b8 100644 --- a/pkg/component/runtime/command.go +++ b/pkg/component/runtime/command.go @@ -337,9 +337,10 @@ func (c *commandRuntime) forceCompState(state client.UnitState, msg string) { // compState updates just the component state not all the units. func (c *commandRuntime) compState(state client.UnitState) { msg := stateUnknownMessage - if state == client.UnitStateHealthy { + switch state { + case client.UnitStateHealthy: msg = fmt.Sprintf("Healthy: communicating with pid '%d'", c.proc.PID) - } else if state == client.UnitStateDegraded { + case client.UnitStateDegraded: if c.missedCheckins == 1 { msg = fmt.Sprintf("Degraded: pid '%d' missed 1 check-in", c.proc.PID) } else { @@ -433,9 +434,12 @@ func (c *commandRuntime) stop(ctx context.Context) error { return case <-t.C: // kill no matter what (might already be stopped) + c.log.Debugf("timeout waiting for pid %d, killing it", c.proc.PID) _ = info.Kill() } }(c.proc, cmdSpec.Timeouts.Stop) + + c.log.Debugf("gracefully stopping pid %d", c.proc.PID) return c.proc.Stop() } @@ -443,7 +447,7 @@ func (c *commandRuntime) startWatcher(info *process.Info, comm Communicator) { go func() { err := comm.WriteStartUpInfo(info.Stdin) if err != nil { - _, _ = c.logErr.Write([]byte(fmt.Sprintf("Failed: failed to provide connection information to spawned pid '%d': %s", info.PID, err))) + _, _ = fmt.Fprintf(c.logErr, "Failed: failed to provide connection information to spawned pid '%d': %s", info.PID, err) // kill instantly _ = info.Kill() } else { @@ -452,6 +456,7 @@ func (c *commandRuntime) startWatcher(info *process.Info, comm Communicator) { ch := info.Wait() s := <-ch + c.log.Debugf("wait for pid %d returned", info.PID) c.procCh <- procState{ proc: info, state: s, diff --git a/pkg/testing/fixture.go b/pkg/testing/fixture.go index 81010fbce3b..255680eab08 100644 --- a/pkg/testing/fixture.go +++ b/pkg/testing/fixture.go @@ -849,6 +849,12 @@ func (f *Fixture) ExecInspect(ctx context.Context, opts ...process.CmdOption) (A return inspect, err } +// ExecRestart executes the restart subcommand on the prepared Elastic Agent binary. +func (f *Fixture) ExecRestart(ctx context.Context, opts ...process.CmdOption) error { + _, err := f.Exec(ctx, []string{"restart"}, opts...) + return err +} + // ExecVersion executes the version subcommand on the prepared Elastic Agent binary // with '--binary-only'. It returns the parsed YAML output. func (f *Fixture) ExecVersion(ctx context.Context, opts ...process.CmdOption) (AgentVersionOutput, error) { diff --git a/testing/integration/ess/metrics_monitoring_test.go b/testing/integration/ess/metrics_monitoring_test.go index b8b86e98183..d38d5d79f7a 100644 --- a/testing/integration/ess/metrics_monitoring_test.go +++ b/testing/integration/ess/metrics_monitoring_test.go @@ -13,13 +13,17 @@ import ( "fmt" "net/http" "net/http/httputil" + "runtime" "testing" "time" "github.com/gofrs/uuid/v5" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "github.com/elastic/elastic-agent-system-metrics/metric/system/process" + "github.com/elastic/elastic-agent-libs/kibana" "github.com/elastic/elastic-agent-libs/testing/estools" otelMonitoring "github.com/elastic/elastic-agent/internal/pkg/otel/monitoring" @@ -207,7 +211,40 @@ func (runner *MetricsRunner) TestBeatsMetrics() { return false } return true - }, time.Minute*10, time.Second*10, "could not fetch metrics for all known components in default install: %v", componentIds) + }, time.Minute*10, time.Second*10, "could not fetch metrics for edot collector") + + if runtime.GOOS == "windows" { + return + } + + // restart the agent to validate that this does not result in any agent-spawned subprocess + // becoming defunct + err = runner.agentFixture.ExecRestart(ctx) + require.NoError(t, err, "could not restart agent") + + require.Eventually(t, func() bool { + err = runner.agentFixture.IsHealthy(ctx) + if err != nil { + t.Logf("waiting for agent healthy: %s", err.Error()) + return false + } + return true + }, 1*time.Minute, 1*time.Second) + + procStats := process.Stats{ + // filtering with '.*elastic-agent' or '^.*elastic-agent$' doesn't + // seem to work as expected + Procs: []string{".*"}, + } + err = procStats.Init() + require.NoError(t, err, "could not initialize process.Stats") + + pidMap, _, err := procStats.FetchPids() + require.NoError(t, err, "could not fetch pids") + + for _, state := range pidMap { + assert.NotEqualValuesf(t, process.Zombie, state.State, "process %d is in zombie state", state.Pid.ValueOr(0)) + } } func genESQuery(agentID string, requiredFields [][]string) map[string]interface{} {