Skip to content
2 changes: 1 addition & 1 deletion internal/pkg/agent/application/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ func New(
agentInfo,
cfg.Settings.Collector,
monitor.ComponentMonitoringConfig,
cfg.Settings.ProcessConfig.StopTimeout,
otelmanager.CollectorStopTimeout,
)
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to create otel manager: %w", err)
Expand Down
7 changes: 6 additions & 1 deletion internal/pkg/agent/application/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,12 @@ type VarsManager interface {
type ComponentsModifier func(comps []component.Component, cfg map[string]interface{}) ([]component.Component, error)

// managerShutdownTimeout is how long the coordinator will wait during shutdown
// to receive termination states from its managers.
// to receive termination states from its managers. As the default stop timeout
// for components spawned as subprocesses is 30 seconds (process.DefaultConfig()),
// we need to wait a bit longer to ensure that all managers have had time to terminate.
// Note: if this timeout doesn't accommodate for the subprocess stop timeout,
// the Wait of subprocess might never be called, and we may end up with Zombie processes
// during restart in Unix systemss.
const managerShutdownTimeout = time.Second * 5

type configReloader interface {
Expand Down
2 changes: 2 additions & 0 deletions internal/pkg/agent/application/reexec/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,10 @@ func NewManager(log *logger.Logger, exec string) ExecManager {

func (m *manager) ReExec(shutdownCallback ShutdownCallbackFn, argOverrides ...string) {
go func() {
m.logger.Debug("Triggering manager shutdown")
close(m.trigger)
<-m.shutdown
m.logger.Debug("Manager shutdown complete")

if shutdownCallback != nil {
if err := shutdownCallback(); err != nil {
Expand Down
3 changes: 3 additions & 0 deletions internal/pkg/otel/manager/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,8 @@ type collectorExecution interface {
}

type collectorHandle interface {
// Stop stops and waits for collector to exit gracefully within the given duration. Note that if the collector
// doesn't exit within that time, it will be killed and then it will wait an extra second for it to ensure it's
// really stopped.
Stop(waitTime time.Duration)
}
22 changes: 15 additions & 7 deletions internal/pkg/otel/manager/execution_subprocess.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ func (r *subprocessExecution) startCollector(ctx context.Context, logger *logger

go func() {
procState, procErr := processInfo.Process.Wait()
logger.Debugf("wait for pid %d returned", processInfo.PID)
procCtxCancel()
<-healthCheckDone
close(ctl.processDoneCh)
Expand Down Expand Up @@ -309,19 +310,26 @@ func (s *procHandle) Stop(waitTime time.Duration) {
default:
}

s.log.Debugf("gracefully stopping pid %d", s.processInfo.PID)
if err := s.processInfo.Stop(); err != nil {
s.log.Warnf("failed to send stop signal to the supervised collector: %v", err)
// we failed to stop the process just kill it and return
_ = s.processInfo.Kill()
return
} else {
select {
case <-time.After(waitTime):
s.log.Warnf("timeout waiting (%s) for the supervised collector to stop, killing it", waitTime.String())
case <-s.processDoneCh:
// process has already exited
return
}
}

// since we are here this means that the process either got an error at stop or did not stop within the timeout,
// kill it and give one more mere second for the process wait to be called
_ = s.processInfo.Kill()
select {
case <-time.After(waitTime):
s.log.Warnf("timeout waiting (%s) for the supervised collector to stop, killing it", waitTime.String())
// our caller ctx is Done; kill the process just in case
_ = s.processInfo.Kill()
case <-time.After(1 * time.Second):
s.log.Warnf("supervised collector subprocess didn't exit in time after killing it")
case <-s.processDoneCh:
// process has already exited
}
}
4 changes: 4 additions & 0 deletions internal/pkg/otel/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ type ExecutionMode string
const (
SubprocessExecutionMode ExecutionMode = "subprocess"
EmbeddedExecutionMode ExecutionMode = "embedded"

// CollectorStopTimeout is the duration to wait for the collector to stop. Note: this needs to be shorter
// than 5 * time.Second (coordinator.managerShutdownTimeout) otherwise we might end up with a defunct process.
CollectorStopTimeout = 3 * time.Second
)

type collectorRecoveryTimer interface {
Expand Down
11 changes: 8 additions & 3 deletions pkg/component/runtime/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,9 +337,10 @@ func (c *commandRuntime) forceCompState(state client.UnitState, msg string) {
// compState updates just the component state not all the units.
func (c *commandRuntime) compState(state client.UnitState) {
msg := stateUnknownMessage
if state == client.UnitStateHealthy {
switch state {
case client.UnitStateHealthy:
msg = fmt.Sprintf("Healthy: communicating with pid '%d'", c.proc.PID)
} else if state == client.UnitStateDegraded {
case client.UnitStateDegraded:
if c.missedCheckins == 1 {
msg = fmt.Sprintf("Degraded: pid '%d' missed 1 check-in", c.proc.PID)
} else {
Expand Down Expand Up @@ -433,17 +434,20 @@ func (c *commandRuntime) stop(ctx context.Context) error {
return
case <-t.C:
// kill no matter what (might already be stopped)
c.log.Debugf("timeout waiting for pid %d, killing it", c.proc.PID)
_ = info.Kill()
}
}(c.proc, cmdSpec.Timeouts.Stop)

c.log.Debugf("gracefully stopping pid %d", c.proc.PID)
return c.proc.Stop()
}

func (c *commandRuntime) startWatcher(info *process.Info, comm Communicator) {
go func() {
err := comm.WriteStartUpInfo(info.Stdin)
if err != nil {
_, _ = c.logErr.Write([]byte(fmt.Sprintf("Failed: failed to provide connection information to spawned pid '%d': %s", info.PID, err)))
_, _ = fmt.Fprintf(c.logErr, "Failed: failed to provide connection information to spawned pid '%d': %s", info.PID, err)
// kill instantly
_ = info.Kill()
} else {
Expand All @@ -452,6 +456,7 @@ func (c *commandRuntime) startWatcher(info *process.Info, comm Communicator) {

ch := info.Wait()
s := <-ch
c.log.Debugf("wait for pid %d returned", info.PID)
c.procCh <- procState{
proc: info,
state: s,
Expand Down
6 changes: 6 additions & 0 deletions pkg/testing/fixture.go
Original file line number Diff line number Diff line change
Expand Up @@ -849,6 +849,12 @@ func (f *Fixture) ExecInspect(ctx context.Context, opts ...process.CmdOption) (A
return inspect, err
}

// ExecRestart executes the restart subcommand on the prepared Elastic Agent binary.
func (f *Fixture) ExecRestart(ctx context.Context, opts ...process.CmdOption) error {
_, err := f.Exec(ctx, []string{"restart"}, opts...)
return err
}

// ExecVersion executes the version subcommand on the prepared Elastic Agent binary
// with '--binary-only'. It returns the parsed YAML output.
func (f *Fixture) ExecVersion(ctx context.Context, opts ...process.CmdOption) (AgentVersionOutput, error) {
Expand Down
39 changes: 38 additions & 1 deletion testing/integration/ess/metrics_monitoring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,17 @@ import (
"fmt"
"net/http"
"net/http/httputil"
"runtime"
"testing"
"time"

"github.com/gofrs/uuid/v5"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"

"github.com/elastic/elastic-agent-system-metrics/metric/system/process"

"github.com/elastic/elastic-agent-libs/kibana"
"github.com/elastic/elastic-agent-libs/testing/estools"
otelMonitoring "github.com/elastic/elastic-agent/internal/pkg/otel/monitoring"
Expand Down Expand Up @@ -207,7 +211,40 @@ func (runner *MetricsRunner) TestBeatsMetrics() {
return false
}
return true
}, time.Minute*10, time.Second*10, "could not fetch metrics for all known components in default install: %v", componentIds)
}, time.Minute*10, time.Second*10, "could not fetch metrics for edot collector")

if runtime.GOOS == "windows" {
return
}

// restart the agent to validate that this does not result in any agent-spawned subprocess
// becoming defunct
err = runner.agentFixture.ExecRestart(ctx)
require.NoError(t, err, "could not restart agent")

require.Eventually(t, func() bool {
err = runner.agentFixture.IsHealthy(ctx)
if err != nil {
t.Logf("waiting for agent healthy: %s", err.Error())
return false
}
return true
}, 1*time.Minute, 1*time.Second)

procStats := process.Stats{
// filtering with '.*elastic-agent' or '^.*elastic-agent$' doesn't
// seem to work as expected
Procs: []string{".*"},
}
err = procStats.Init()
require.NoError(t, err, "could not initialize process.Stats")

pidMap, _, err := procStats.FetchPids()
require.NoError(t, err, "could not fetch pids")

for _, state := range pidMap {
assert.NotEqualValuesf(t, process.Zombie, state.State, "process %d is in zombie state", state.Pid.ValueOr(0))
}
}

func genESQuery(agentID string, requiredFields [][]string) map[string]interface{} {
Expand Down