From feeb32b7f650eaac199d57418c28e66624ae8cc2 Mon Sep 17 00:00:00 2001 From: Panos Koutsovasilis Date: Fri, 24 Oct 2025 15:27:23 +0300 Subject: [PATCH] fix: zombie processes during restart (#10650) * fix: zombie processes during restart by extending shutdown timeout to 35sRetry * fix: linter QF1003 could use tagged switch on state * fix: linter QF1012 * doc: add changelog * doc: reword test code comments * fix: make otel manager process stop timeout way shorter * doc: add more documentation * doc: remove changelog fragment * doc: reword managerShutdownTimeout comment (cherry picked from commit 9c001b07d322907da9070e4cb0efdd5752aa1549) # Conflicts: # internal/pkg/agent/application/application.go # internal/pkg/otel/manager/execution.go # internal/pkg/otel/manager/execution_subprocess.go # internal/pkg/otel/manager/manager.go # testing/integration/ess/metrics_monitoring_test.go --- internal/pkg/agent/application/application.go | 15 + .../application/coordinator/coordinator.go | 5 + .../pkg/agent/application/reexec/manager.go | 2 + internal/pkg/otel/manager/execution.go | 31 ++ .../pkg/otel/manager/execution_subprocess.go | 335 ++++++++++++++++++ internal/pkg/otel/manager/manager.go | 32 ++ pkg/component/runtime/command.go | 11 +- pkg/testing/fixture.go | 6 + .../ess/metrics_monitoring_test.go | 69 ++++ 9 files changed, 503 insertions(+), 3 deletions(-) create mode 100644 internal/pkg/otel/manager/execution.go create mode 100644 internal/pkg/otel/manager/execution_subprocess.go diff --git a/internal/pkg/agent/application/application.go b/internal/pkg/agent/application/application.go index a83e5d6ca94..bbabbee7692 100644 --- a/internal/pkg/agent/application/application.go +++ b/internal/pkg/agent/application/application.go @@ -240,7 +240,22 @@ func New( return nil, nil, nil, errors.New(err, "failed to initialize composable controller") } +<<<<<<< HEAD otelManager := otelmanager.NewOTelManager(log.Named("otel_manager"), baseLogger) +======= + otelManager, err := otelmanager.NewOTelManager( + log.Named("otel_manager"), + logLevel, baseLogger, + otelExecMode, + agentInfo, + cfg.Settings.Collector, + monitor.ComponentMonitoringConfig, + otelmanager.CollectorStopTimeout, + ) + if err != nil { + return nil, nil, nil, fmt.Errorf("failed to create otel manager: %w", err) + } +>>>>>>> 9c001b07d (fix: zombie processes during restart (#10650)) coord := coordinator.New(log, cfg, logLevel, agentInfo, specs, reexec, upgrader, runtime, configMgr, varsManager, caps, monitor, isManaged, otelManager, actionAcker, initialUpgradeDetails, compModifiers...) if managed != nil { // the coordinator requires the config manager as well as in managed-mode the config manager requires the diff --git a/internal/pkg/agent/application/coordinator/coordinator.go b/internal/pkg/agent/application/coordinator/coordinator.go index c8ec393e004..c7ab0e72810 100644 --- a/internal/pkg/agent/application/coordinator/coordinator.go +++ b/internal/pkg/agent/application/coordinator/coordinator.go @@ -196,6 +196,11 @@ type ComponentsModifier func(comps []component.Component, cfg map[string]interfa // managerShutdownTimeout is how long the coordinator will wait during shutdown // to receive termination states from its managers. +// Note: The current timeout (5s) is shorter than the default stop timeout for +// subprocess components (30s from process.DefaultConfig()). This means the +// coordinator may not wait for the subprocesses to finish terminating, preventing +// Wait() from being called on them. This will result in zombie processes +// during restart on Unix systems. const managerShutdownTimeout = time.Second * 5 type configReloader interface { diff --git a/internal/pkg/agent/application/reexec/manager.go b/internal/pkg/agent/application/reexec/manager.go index 3a595ed5491..b46c6a8687e 100644 --- a/internal/pkg/agent/application/reexec/manager.go +++ b/internal/pkg/agent/application/reexec/manager.go @@ -48,8 +48,10 @@ func NewManager(log *logger.Logger, exec string) ExecManager { func (m *manager) ReExec(shutdownCallback ShutdownCallbackFn, argOverrides ...string) { go func() { + m.logger.Debug("Triggering manager shutdown") close(m.trigger) <-m.shutdown + m.logger.Debug("Manager shutdown complete") if shutdownCallback != nil { if err := shutdownCallback(); err != nil { diff --git a/internal/pkg/otel/manager/execution.go b/internal/pkg/otel/manager/execution.go new file mode 100644 index 00000000000..3b12d85940d --- /dev/null +++ b/internal/pkg/otel/manager/execution.go @@ -0,0 +1,31 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License 2.0; +// you may not use this file except in compliance with the Elastic License 2.0. + +package manager + +import ( + "context" + "time" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/status" + "go.opentelemetry.io/collector/confmap" + + "github.com/elastic/elastic-agent/pkg/core/logger" +) + +type collectorExecution interface { + // startCollector starts the otel collector with the given arguments, returning a handle allowing it to be stopped. + // Cancelling the context will stop all goroutines involved in the execution. + // The collector will report status events in the statusCh channel and errors on errCh in a non-blocking fashion, + // draining the channel before writing to it. + // After the collector exits, it will emit an error describing the exit status (nil if successful) and a nil status. + startCollector(ctx context.Context, logger *logger.Logger, cfg *confmap.Conf, errCh chan error, statusCh chan *status.AggregateStatus) (collectorHandle, error) +} + +type collectorHandle interface { + // Stop stops and waits for collector to exit gracefully within the given duration. Note that if the collector + // doesn't exit within that time, it will be killed and then it will wait an extra second for it to ensure it's + // really stopped. + Stop(waitTime time.Duration) +} diff --git a/internal/pkg/otel/manager/execution_subprocess.go b/internal/pkg/otel/manager/execution_subprocess.go new file mode 100644 index 00000000000..2e861d86a9f --- /dev/null +++ b/internal/pkg/otel/manager/execution_subprocess.go @@ -0,0 +1,335 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License 2.0; +// you may not use this file except in compliance with the Elastic License 2.0. + +package manager + +import ( + "bytes" + "context" + "errors" + "fmt" + "os" + "os/exec" + "time" + + "github.com/gofrs/uuid/v5" + "go.opentelemetry.io/collector/component" + "gopkg.in/yaml.v3" + + componentmonitoring "github.com/elastic/elastic-agent/internal/pkg/agent/application/monitoring/component" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/status" + "go.opentelemetry.io/collector/component/componentstatus" + "go.opentelemetry.io/collector/confmap" + "go.uber.org/zap/zapcore" + + "github.com/elastic/elastic-agent-libs/logp" + + "github.com/elastic/elastic-agent/internal/pkg/otel/monitoring" + runtimeLogger "github.com/elastic/elastic-agent/pkg/component/runtime" + "github.com/elastic/elastic-agent/pkg/core/logger" + "github.com/elastic/elastic-agent/pkg/core/process" +) + +const ( + OtelSetSupervisedFlagName = "supervised" + OtelSupervisedLoggingLevelFlagName = "supervised.logging.level" + OtelSupervisedMonitoringURLFlagName = "supervised.monitoring.url" +) + +// newSubprocessExecution creates a new execution which runs the otel collector in a subprocess. A metricsPort or +// healthCheckPort of 0 will result in a random port being used. +func newSubprocessExecution(logLevel logp.Level, collectorPath string, metricsPort int, healthCheckPort int) (*subprocessExecution, error) { + nsUUID, err := uuid.NewV4() + if err != nil { + return nil, fmt.Errorf("cannot generate UUID: %w", err) + } + componentType, err := component.NewType(healthCheckExtensionName) + if err != nil { + return nil, fmt.Errorf("cannot create component type: %w", err) + } + healthCheckExtensionID := component.NewIDWithName(componentType, nsUUID.String()).String() + + return &subprocessExecution{ + collectorPath: collectorPath, + collectorArgs: []string{ + "otel", + fmt.Sprintf("--%s", OtelSetSupervisedFlagName), + fmt.Sprintf("--%s=%s", OtelSupervisedLoggingLevelFlagName, logLevel.String()), + fmt.Sprintf("--%s=%s", OtelSupervisedMonitoringURLFlagName, monitoring.EDOTMonitoringEndpoint()), + }, + logLevel: logLevel, + healthCheckExtensionID: healthCheckExtensionID, + collectorMetricsPort: metricsPort, + collectorHealthCheckPort: healthCheckPort, + reportErrFn: reportErr, + }, nil +} + +type subprocessExecution struct { + collectorPath string + collectorArgs []string + logLevel logp.Level + healthCheckExtensionID string + collectorMetricsPort int + collectorHealthCheckPort int + reportErrFn func(ctx context.Context, errCh chan error, err error) // required for testing +} + +// startCollector starts a supervised collector and monitors its health. Process exit errors are sent to the +// processErrCh channel. Other run errors, such as not able to connect to the health endpoint, are sent to the runErrCh channel. +func (r *subprocessExecution) startCollector(ctx context.Context, logger *logger.Logger, cfg *confmap.Conf, processErrCh chan error, statusCh chan *status.AggregateStatus) (collectorHandle, error) { + if cfg == nil { + // configuration is required + return nil, errors.New("no configuration provided") + } + + if r.collectorPath == "" { + // collector path is required + return nil, errors.New("no collector path provided") + } + + if _, err := os.Stat(r.collectorPath); err != nil { + // we cannot access the collector path + return nil, fmt.Errorf("cannot access collector path: %w", err) + } + + httpHealthCheckPort, collectorMetricsPort, err := r.getCollectorPorts() + if err != nil { + return nil, fmt.Errorf("could not find port for collector: %w", err) + } + + if err := injectHeathCheckV2Extension(cfg, r.healthCheckExtensionID, httpHealthCheckPort); err != nil { + return nil, fmt.Errorf("failed to inject health check extension: %w", err) + } + + confMap := cfg.ToStringMap() + confBytes, err := yaml.Marshal(confMap) + if err != nil { + return nil, fmt.Errorf("failed to marshal config to yaml: %w", err) + } + + stdOut := runtimeLogger.NewLogWriterWithDefaults(logger.Core(), zapcore.Level(r.logLevel)) + // info level for stdErr because by default collector writes to stderr + stdErr := runtimeLogger.NewLogWriterWithDefaults(logger.Core(), zapcore.Level(r.logLevel)) + + procCtx, procCtxCancel := context.WithCancel(ctx) + env := os.Environ() + // Set the environment variable for the collector metrics port. See comment at the constant definition for more information. + env = append(env, fmt.Sprintf("%s=%d", componentmonitoring.OtelCollectorMetricsPortEnvVarName, collectorMetricsPort)) + processInfo, err := process.Start(r.collectorPath, + process.WithArgs(r.collectorArgs), + process.WithEnv(env), + process.WithCmdOptions(func(c *exec.Cmd) error { + c.Stdin = bytes.NewReader(confBytes) + c.Stdout = stdOut + c.Stderr = stdErr + return nil + }), + ) + if err != nil { + // we failed to start the process + procCtxCancel() + return nil, fmt.Errorf("failed to start supervised collector: %w", err) + } + logger.Infof("supervised collector started with pid: %d and healthcheck port: %d", processInfo.Process.Pid, httpHealthCheckPort) + if processInfo.Process == nil { + // this should not happen but just in case + procCtxCancel() + return nil, fmt.Errorf("failed to start supervised collector: process is nil") + } + + ctl := &procHandle{ + processDoneCh: make(chan struct{}), + processInfo: processInfo, + log: logger, + } + + healthCheckDone := make(chan struct{}) + go func() { + defer func() { + close(healthCheckDone) + }() + currentStatus := aggregateStatus(componentstatus.StatusStarting, nil) + r.reportSubprocessCollectorStatus(ctx, statusCh, currentStatus) + + // specify a max duration of not being able to get the status from the collector + const maxFailuresDuration = 130 * time.Second + maxFailuresTimer := time.NewTimer(maxFailuresDuration) + defer maxFailuresTimer.Stop() + + // check the health of the collector every 1 second + const healthCheckPollDuration = 1 * time.Second + healthCheckPollTimer := time.NewTimer(healthCheckPollDuration) + defer healthCheckPollTimer.Stop() + for { + statuses, err := AllComponentsStatuses(procCtx, httpHealthCheckPort) + if err != nil { + switch { + case errors.Is(err, context.Canceled): + // after the collector exits, we need to report a nil status + r.reportSubprocessCollectorStatus(ctx, statusCh, nil) + return + } + } else { + maxFailuresTimer.Reset(maxFailuresDuration) + removeManagedHealthCheckExtensionStatus(statuses, r.healthCheckExtensionID) + if !compareStatuses(currentStatus, statuses) { + currentStatus = statuses + r.reportSubprocessCollectorStatus(procCtx, statusCh, statuses) + } + } + + select { + case <-procCtx.Done(): + // after the collector exits, we need to report a nil status + r.reportSubprocessCollectorStatus(ctx, statusCh, nil) + return + case <-healthCheckPollTimer.C: + healthCheckPollTimer.Reset(healthCheckPollDuration) + case <-maxFailuresTimer.C: + failedToConnectStatuses := aggregateStatus( + componentstatus.StatusRecoverableError, + errors.New("failed to connect to collector"), + ) + if !compareStatuses(currentStatus, failedToConnectStatuses) { + currentStatus = statuses + r.reportSubprocessCollectorStatus(procCtx, statusCh, statuses) + } + } + } + }() + + go func() { + procState, procErr := processInfo.Process.Wait() + logger.Debugf("wait for pid %d returned", processInfo.PID) + procCtxCancel() + <-healthCheckDone + close(ctl.processDoneCh) + // using ctx instead of procCtx in the reportErr functions below is intentional. This allows us to report + // errors to the caller through processErrCh and essentially discard any other errors that occurred because + // the process exited. + if procErr == nil { + if procState.Success() { + // report nil error so that the caller can be notified that the process has exited without error + r.reportErrFn(ctx, processErrCh, nil) + } else { + r.reportErrFn(ctx, processErrCh, fmt.Errorf("supervised collector (pid: %d) exited with error: %s", procState.Pid(), procState.String())) + } + return + } + + r.reportErrFn(ctx, processErrCh, fmt.Errorf("failed to wait supervised collector process: %w", procErr)) + }() + + return ctl, nil +} + +// cloneCollectorStatus creates a deep copy of the provided AggregateStatus. +func cloneCollectorStatus(aStatus *status.AggregateStatus) *status.AggregateStatus { + if aStatus == nil { + return nil + } + + st := &status.AggregateStatus{ + Event: aStatus.Event, + } + + if len(aStatus.ComponentStatusMap) > 0 { + st.ComponentStatusMap = make(map[string]*status.AggregateStatus, len(aStatus.ComponentStatusMap)) + for k, cs := range aStatus.ComponentStatusMap { + st.ComponentStatusMap[k] = cloneCollectorStatus(cs) + } + } + + return st +} + +func (r *subprocessExecution) reportSubprocessCollectorStatus(ctx context.Context, statusCh chan *status.AggregateStatus, collectorStatus *status.AggregateStatus) { + // we need to clone the status to prevent any mutation on the receiver side + // affecting the original ref + clonedStatus := cloneCollectorStatus(collectorStatus) + reportCollectorStatus(ctx, statusCh, clonedStatus) +} + +// getCollectorPorts returns the ports used by the OTel collector. If the ports set in the execution struct are 0, +// random ports are returned instead. +func (r *subprocessExecution) getCollectorPorts() (healthCheckPort int, metricsPort int, err error) { + randomPorts := make([]*int, 0, 2) + // if the ports are defined (non-zero), use them + if r.collectorMetricsPort == 0 { + randomPorts = append(randomPorts, &metricsPort) + } else { + metricsPort = r.collectorMetricsPort + } + if r.collectorHealthCheckPort == 0 { + randomPorts = append(randomPorts, &healthCheckPort) + } else { + healthCheckPort = r.collectorHealthCheckPort + } + + if len(randomPorts) == 0 { + return healthCheckPort, metricsPort, nil + } + + // we need at least one random port, create it + ports, err := findRandomTCPPorts(len(randomPorts)) + if err != nil { + return 0, 0, err + } + for i, port := range ports { + *randomPorts[i] = port + } + return healthCheckPort, metricsPort, nil +} + +func removeManagedHealthCheckExtensionStatus(status *status.AggregateStatus, healthCheckExtensionID string) { + extensions, exists := status.ComponentStatusMap["extensions"] + if !exists { + return + } + + extensionID := "extension:" + healthCheckExtensionID + delete(extensions.ComponentStatusMap, extensionID) +} + +type procHandle struct { + processDoneCh chan struct{} + processInfo *process.Info + log *logger.Logger +} + +// Stop stops the process. If the process is already stopped, it does nothing. If the process does not stop within +// processKillAfter or due to an error, it will be killed. +func (s *procHandle) Stop(waitTime time.Duration) { + select { + case <-s.processDoneCh: + // process has already exited + return + default: + } + + s.log.Debugf("gracefully stopping pid %d", s.processInfo.PID) + if err := s.processInfo.Stop(); err != nil { + s.log.Warnf("failed to send stop signal to the supervised collector: %v", err) + // we failed to stop the process just kill it and return + } else { + select { + case <-time.After(waitTime): + s.log.Warnf("timeout waiting (%s) for the supervised collector to stop, killing it", waitTime.String()) + case <-s.processDoneCh: + // process has already exited + return + } + } + + // since we are here this means that the process either got an error at stop or did not stop within the timeout, + // kill it and give one more mere second for the process wait to be called + _ = s.processInfo.Kill() + select { + case <-time.After(1 * time.Second): + s.log.Warnf("supervised collector subprocess didn't exit in time after killing it") + case <-s.processDoneCh: + } +} diff --git a/internal/pkg/otel/manager/manager.go b/internal/pkg/otel/manager/manager.go index a867b0fc993..443d0adb757 100644 --- a/internal/pkg/otel/manager/manager.go +++ b/internal/pkg/otel/manager/manager.go @@ -20,6 +20,38 @@ import ( "github.com/elastic/elastic-agent/pkg/core/logger" ) +<<<<<<< HEAD +======= +type ExecutionMode string + +const ( + SubprocessExecutionMode ExecutionMode = "subprocess" + EmbeddedExecutionMode ExecutionMode = "embedded" + + // CollectorStopTimeout is the duration to wait for the collector to stop. Note: this needs to be shorter + // than 5 * time.Second (coordinator.managerShutdownTimeout) otherwise we might end up with a defunct process. + CollectorStopTimeout = 3 * time.Second +) + +type collectorRecoveryTimer interface { + // IsStopped returns true if the timer is stopped + IsStopped() bool + // Stop stops the timer + Stop() + // ResetInitial resets the timer to the initial interval + ResetInitial() time.Duration + // ResetNext resets the timer to the next interval + ResetNext() time.Duration + // C returns the timer channel + C() <-chan time.Time +} + +type configUpdate struct { + collectorCfg *confmap.Conf + components []component.Component +} + +>>>>>>> 9c001b07d (fix: zombie processes during restart (#10650)) // OTelManager is a manager that manages the lifecycle of the OTel collector inside of the Elastic Agent. type OTelManager struct { // baseLogger is the base logger for the otel collector, and doesn't include any agent-specific fields. diff --git a/pkg/component/runtime/command.go b/pkg/component/runtime/command.go index 68e7077e0aa..2abfb2254b8 100644 --- a/pkg/component/runtime/command.go +++ b/pkg/component/runtime/command.go @@ -337,9 +337,10 @@ func (c *commandRuntime) forceCompState(state client.UnitState, msg string) { // compState updates just the component state not all the units. func (c *commandRuntime) compState(state client.UnitState) { msg := stateUnknownMessage - if state == client.UnitStateHealthy { + switch state { + case client.UnitStateHealthy: msg = fmt.Sprintf("Healthy: communicating with pid '%d'", c.proc.PID) - } else if state == client.UnitStateDegraded { + case client.UnitStateDegraded: if c.missedCheckins == 1 { msg = fmt.Sprintf("Degraded: pid '%d' missed 1 check-in", c.proc.PID) } else { @@ -433,9 +434,12 @@ func (c *commandRuntime) stop(ctx context.Context) error { return case <-t.C: // kill no matter what (might already be stopped) + c.log.Debugf("timeout waiting for pid %d, killing it", c.proc.PID) _ = info.Kill() } }(c.proc, cmdSpec.Timeouts.Stop) + + c.log.Debugf("gracefully stopping pid %d", c.proc.PID) return c.proc.Stop() } @@ -443,7 +447,7 @@ func (c *commandRuntime) startWatcher(info *process.Info, comm Communicator) { go func() { err := comm.WriteStartUpInfo(info.Stdin) if err != nil { - _, _ = c.logErr.Write([]byte(fmt.Sprintf("Failed: failed to provide connection information to spawned pid '%d': %s", info.PID, err))) + _, _ = fmt.Fprintf(c.logErr, "Failed: failed to provide connection information to spawned pid '%d': %s", info.PID, err) // kill instantly _ = info.Kill() } else { @@ -452,6 +456,7 @@ func (c *commandRuntime) startWatcher(info *process.Info, comm Communicator) { ch := info.Wait() s := <-ch + c.log.Debugf("wait for pid %d returned", info.PID) c.procCh <- procState{ proc: info, state: s, diff --git a/pkg/testing/fixture.go b/pkg/testing/fixture.go index bb9e45d771e..007f2aae20d 100644 --- a/pkg/testing/fixture.go +++ b/pkg/testing/fixture.go @@ -848,6 +848,12 @@ func (f *Fixture) ExecInspect(ctx context.Context, opts ...process.CmdOption) (A return inspect, err } +// ExecRestart executes the restart subcommand on the prepared Elastic Agent binary. +func (f *Fixture) ExecRestart(ctx context.Context, opts ...process.CmdOption) error { + _, err := f.Exec(ctx, []string{"restart"}, opts...) + return err +} + // ExecVersion executes the version subcommand on the prepared Elastic Agent binary // with '--binary-only'. It returns the parsed YAML output. func (f *Fixture) ExecVersion(ctx context.Context, opts ...process.CmdOption) (AgentVersionOutput, error) { diff --git a/testing/integration/ess/metrics_monitoring_test.go b/testing/integration/ess/metrics_monitoring_test.go index d14504f66a0..bc5ad4f77d8 100644 --- a/testing/integration/ess/metrics_monitoring_test.go +++ b/testing/integration/ess/metrics_monitoring_test.go @@ -10,13 +10,22 @@ import ( "context" "encoding/json" "fmt" +<<<<<<< HEAD +======= + "net/http" + "net/http/httputil" + "runtime" +>>>>>>> 9c001b07d (fix: zombie processes during restart (#10650)) "testing" "time" "github.com/gofrs/uuid/v5" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "github.com/elastic/elastic-agent-system-metrics/metric/system/process" + "github.com/elastic/elastic-agent-libs/kibana" "github.com/elastic/elastic-agent-libs/testing/estools" atesting "github.com/elastic/elastic-agent/pkg/testing" @@ -135,6 +144,66 @@ func (runner *MetricsRunner) TestBeatsMetrics() { } return true }, time.Minute*10, time.Second*10, "could not fetch metrics for all known components in default install: %v", componentIds) +<<<<<<< HEAD +======= + + // Add a policy overwrite to change the agent monitoring to use Otel runtime + runner.addMonitoringToOtelRuntimeOverwrite() + + // since the default execution mode of Otel runtime is sub-process we should see resource + // metrics for elastic-agent/collector component. + edotCollectorComponentID := otelMonitoring.EDOTComponentID + query = genESQuery(agentStatus.Info.ID, + [][]string{ + {"match", "component.id", edotCollectorComponentID}, + {"exists", "field", "system.process.cpu.total.value"}, + {"exists", "field", "system.process.memory.size"}, + }) + + require.Eventually(t, func() bool { + now = time.Now() + res, err := estools.PerformQueryForRawQuery(ctx, query, "metrics-elastic_agent*", runner.info.ESClient) + require.NoError(t, err) + t.Logf("Fetched metrics for %s, got %d hits", edotCollectorComponentID, res.Hits.Total.Value) + if res.Hits.Total.Value < 1 { + return false + } + return true + }, time.Minute*10, time.Second*10, "could not fetch metrics for edot collector") + + if runtime.GOOS == "windows" { + return + } + + // restart the agent to validate that this does not result in any agent-spawned subprocess + // becoming defunct + err = runner.agentFixture.ExecRestart(ctx) + require.NoError(t, err, "could not restart agent") + + require.Eventually(t, func() bool { + err = runner.agentFixture.IsHealthy(ctx) + if err != nil { + t.Logf("waiting for agent healthy: %s", err.Error()) + return false + } + return true + }, 1*time.Minute, 1*time.Second) + + procStats := process.Stats{ + // filtering with '.*elastic-agent' or '^.*elastic-agent$' doesn't + // seem to work as expected + Procs: []string{".*"}, + } + err = procStats.Init() + require.NoError(t, err, "could not initialize process.Stats") + + pidMap, _, err := procStats.FetchPids() + require.NoError(t, err, "could not fetch pids") + + for _, state := range pidMap { + assert.NotEqualValuesf(t, process.Zombie, state.State, "process %d is in zombie state", state.Pid.ValueOr(0)) + } +>>>>>>> 9c001b07d (fix: zombie processes during restart (#10650)) } func genESQuery(agentID string, requiredFields [][]string) map[string]interface{} {