Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
kind: bug-fix
summary: increase manager timeout to prevent defunct subprocesses
component: elastic-agent
pr: https://github.com/elastic/elastic-agent/pull/10650
issue: https://github.com/elastic/elastic-agent/issues/7756
9 changes: 7 additions & 2 deletions internal/pkg/agent/application/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,13 @@ type VarsManager interface {
type ComponentsModifier func(comps []component.Component, cfg map[string]interface{}) ([]component.Component, error)

// managerShutdownTimeout is how long the coordinator will wait during shutdown
// to receive termination states from its managers.
const managerShutdownTimeout = time.Second * 5
// to receive termination states from its managers. As the default stop timeout
// for components spawned as subprocesses is 30 seconds (process.DefaultConfig()),
// we need to wait a bit longer to ensure that all managers have had time to terminate.
// Note: if this timeout doesn't accommodate for the subprocess stop timeout,
// the Wait of subprocess might never be called, and we may end up with Zombie processes
// during restart in Unix systemss.
const managerShutdownTimeout = time.Second * 35

type configReloader interface {
Reload(*config.Config) error
Expand Down
2 changes: 2 additions & 0 deletions internal/pkg/agent/application/reexec/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,10 @@ func NewManager(log *logger.Logger, exec string) ExecManager {

func (m *manager) ReExec(shutdownCallback ShutdownCallbackFn, argOverrides ...string) {
go func() {
m.logger.Debug("Triggering manager shutdown")
close(m.trigger)
<-m.shutdown
m.logger.Debug("Manager shutdown complete")

if shutdownCallback != nil {
if err := shutdownCallback(); err != nil {
Expand Down
22 changes: 15 additions & 7 deletions internal/pkg/otel/manager/execution_subprocess.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ func (r *subprocessExecution) startCollector(ctx context.Context, logger *logger

go func() {
procState, procErr := processInfo.Process.Wait()
logger.Debugf("wait for pid %d returned", processInfo.PID)
procCtxCancel()
<-healthCheckDone
close(ctl.processDoneCh)
Expand Down Expand Up @@ -303,19 +304,26 @@ func (s *procHandle) Stop(waitTime time.Duration) {
default:
}

s.log.Debugf("gracefully stopping pid %d", s.processInfo.PID)
if err := s.processInfo.Stop(); err != nil {
s.log.Warnf("failed to send stop signal to the supervised collector: %v", err)
// we failed to stop the process just kill it and return
_ = s.processInfo.Kill()
return
} else {
select {
case <-time.After(waitTime):
s.log.Warnf("timeout waiting (%s) for the supervised collector to stop, killing it", waitTime.String())
case <-s.processDoneCh:
// process has already exited
return
}
}

// since we are here this means that the process either got an error at stop or did not stop within the timeout,
// kill it and give one more mere second for the process wait to be called
_ = s.processInfo.Kill()
select {
case <-time.After(waitTime):
s.log.Warnf("timeout waiting (%s) for the supervised collector to stop, killing it", waitTime.String())
// our caller ctx is Done; kill the process just in case
_ = s.processInfo.Kill()
case <-time.After(1 * time.Second):
s.log.Warnf("supervised collector subprocess didn't exit in time after killing it")
case <-s.processDoneCh:
// process has already exited
}
}
11 changes: 8 additions & 3 deletions pkg/component/runtime/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,9 +337,10 @@ func (c *commandRuntime) forceCompState(state client.UnitState, msg string) {
// compState updates just the component state not all the units.
func (c *commandRuntime) compState(state client.UnitState) {
msg := stateUnknownMessage
if state == client.UnitStateHealthy {
switch state {
case client.UnitStateHealthy:
msg = fmt.Sprintf("Healthy: communicating with pid '%d'", c.proc.PID)
} else if state == client.UnitStateDegraded {
case client.UnitStateDegraded:
if c.missedCheckins == 1 {
msg = fmt.Sprintf("Degraded: pid '%d' missed 1 check-in", c.proc.PID)
} else {
Expand Down Expand Up @@ -433,17 +434,20 @@ func (c *commandRuntime) stop(ctx context.Context) error {
return
case <-t.C:
// kill no matter what (might already be stopped)
c.log.Debugf("timeout waiting for pid %d, killing it", c.proc.PID)
_ = info.Kill()
}
}(c.proc, cmdSpec.Timeouts.Stop)

c.log.Debugf("gracefully stopping pid %d", c.proc.PID)
return c.proc.Stop()
}

func (c *commandRuntime) startWatcher(info *process.Info, comm Communicator) {
go func() {
err := comm.WriteStartUpInfo(info.Stdin)
if err != nil {
_, _ = c.logErr.Write([]byte(fmt.Sprintf("Failed: failed to provide connection information to spawned pid '%d': %s", info.PID, err)))
_, _ = fmt.Fprintf(c.logErr, "Failed: failed to provide connection information to spawned pid '%d': %s", info.PID, err)
// kill instantly
_ = info.Kill()
} else {
Expand All @@ -452,6 +456,7 @@ func (c *commandRuntime) startWatcher(info *process.Info, comm Communicator) {

ch := info.Wait()
s := <-ch
c.log.Debugf("wait for pid %d returned", info.PID)
c.procCh <- procState{
proc: info,
state: s,
Expand Down
6 changes: 6 additions & 0 deletions pkg/testing/fixture.go
Original file line number Diff line number Diff line change
Expand Up @@ -849,6 +849,12 @@ func (f *Fixture) ExecInspect(ctx context.Context, opts ...process.CmdOption) (A
return inspect, err
}

// ExecRestart executes the restart subcommand on the prepared Elastic Agent binary.
func (f *Fixture) ExecRestart(ctx context.Context, opts ...process.CmdOption) error {
_, err := f.Exec(ctx, []string{"restart"}, opts...)
return err
}

// ExecVersion executes the version subcommand on the prepared Elastic Agent binary
// with '--binary-only'. It returns the parsed YAML output.
func (f *Fixture) ExecVersion(ctx context.Context, opts ...process.CmdOption) (AgentVersionOutput, error) {
Expand Down
39 changes: 38 additions & 1 deletion testing/integration/ess/metrics_monitoring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,17 @@ import (
"fmt"
"net/http"
"net/http/httputil"
"runtime"
"testing"
"time"

"github.com/gofrs/uuid/v5"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"

"github.com/elastic/elastic-agent-system-metrics/metric/system/process"

"github.com/elastic/elastic-agent-libs/kibana"
"github.com/elastic/elastic-agent-libs/testing/estools"
otelMonitoring "github.com/elastic/elastic-agent/internal/pkg/otel/monitoring"
Expand Down Expand Up @@ -207,7 +211,40 @@ func (runner *MetricsRunner) TestBeatsMetrics() {
return false
}
return true
}, time.Minute*10, time.Second*10, "could not fetch metrics for all known components in default install: %v", componentIds)
}, time.Minute*10, time.Second*10, "could not fetch metrics for edot collector")

if runtime.GOOS == "windows" {
return
}

// restart the agent to validate that this does not result in any agent-spawned subprocess
// becoming defunct
err = runner.agentFixture.ExecRestart(ctx)
require.NoError(t, err, "could not restart agent")

require.Eventually(t, func() bool {
err = runner.agentFixture.IsHealthy(ctx)
if err != nil {
t.Logf("waiting for agent healthy: %s", err.Error())
return false
}
return true
}, 1*time.Minute, 1*time.Second)

procStats := process.Stats{
// filtering with '.*elastic-agent' or '^.*elastic-agent$' doesn't
// seem to work as expected
Procs: []string{".*"},
}
err = procStats.Init()
require.NoError(t, err, "could not initialize process.Stats")

pidMap, _, err := procStats.FetchPids()
require.NoError(t, err, "could not fetch pids")

for _, state := range pidMap {
assert.NotEqualValuesf(t, process.Zombie, state.State, "process %d is in zombie state", state.Pid.ValueOr(0))
}
}

func genESQuery(agentID string, requiredFields [][]string) map[string]interface{} {
Expand Down