Skip to content

Commit 9a1b42b

Browse files
committed
Introduce WatcherHelper
1 parent fe9cd16 commit 9a1b42b

File tree

9 files changed

+662
-360
lines changed

9 files changed

+662
-360
lines changed

.mockery.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,3 +37,6 @@ packages:
3737
installationModifier:
3838
config:
3939
mockname: "InstallationModifier"
40+
github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade:
41+
interfaces:
42+
WatcherHelper:

internal/pkg/agent/application/application.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ func New(
120120

121121
// monitoring is not supported in bootstrap mode https://github.com/elastic/elastic-agent/issues/1761
122122
isMonitoringSupported := !disableMonitoring && cfg.Settings.V1MonitoringEnabled
123-
upgrader, err := upgrade.NewUpgrader(log, cfg.Settings.DownloadConfig, cfg.Settings.Upgrade, agentInfo)
123+
upgrader, err := upgrade.NewUpgrader(log, cfg.Settings.DownloadConfig, cfg.Settings.Upgrade, agentInfo, new(upgrade.AgentWatcherHelper))
124124
if err != nil {
125125
return nil, nil, nil, fmt.Errorf("failed to create upgrader: %w", err)
126126
}

internal/pkg/agent/application/coordinator/coordinator_unit_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -461,7 +461,7 @@ func TestCoordinatorReportsInvalidPolicy(t *testing.T) {
461461
}
462462
}()
463463

464-
upgradeMgr, err := upgrade.NewUpgrader(log, &artifact.Config{}, nil, &info.AgentInfo{})
464+
upgradeMgr, err := upgrade.NewUpgrader(log, &artifact.Config{}, nil, &info.AgentInfo{}, new(upgrade.AgentWatcherHelper))
465465
require.NoError(t, err, "errored when creating a new upgrader")
466466

467467
// Channels have buffer length 1, so we don't have to run on multiple

internal/pkg/agent/application/upgrade/step_download_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ func TestDownloadWithRetries(t *testing.T) {
9191
return &mockDownloader{expectedDownloadPath, nil}, nil
9292
}
9393

94-
u, err := NewUpgrader(testLogger, &settings, nil, &info.AgentInfo{})
94+
u, err := NewUpgrader(testLogger, &settings, nil, &info.AgentInfo{}, new(AgentWatcherHelper))
9595
require.NoError(t, err)
9696

9797
parsedVersion, err := agtversion.ParseVersion("8.9.0")
@@ -141,7 +141,7 @@ func TestDownloadWithRetries(t *testing.T) {
141141
return nil, nil
142142
}
143143

144-
u, err := NewUpgrader(testLogger, &settings, nil, &info.AgentInfo{})
144+
u, err := NewUpgrader(testLogger, &settings, nil, &info.AgentInfo{}, new(AgentWatcherHelper))
145145
require.NoError(t, err)
146146

147147
parsedVersion, err := agtversion.ParseVersion("8.9.0")
@@ -196,7 +196,7 @@ func TestDownloadWithRetries(t *testing.T) {
196196
return nil, nil
197197
}
198198

199-
u, err := NewUpgrader(testLogger, &settings, nil, &info.AgentInfo{})
199+
u, err := NewUpgrader(testLogger, &settings, nil, &info.AgentInfo{}, new(AgentWatcherHelper))
200200
require.NoError(t, err)
201201

202202
parsedVersion, err := agtversion.ParseVersion("8.9.0")
@@ -241,7 +241,7 @@ func TestDownloadWithRetries(t *testing.T) {
241241
return &mockDownloader{"", errors.New("download failed")}, nil
242242
}
243243

244-
u, err := NewUpgrader(testLogger, &settings, nil, &info.AgentInfo{})
244+
u, err := NewUpgrader(testLogger, &settings, nil, &info.AgentInfo{}, new(AgentWatcherHelper))
245245
require.NoError(t, err)
246246

247247
parsedVersion, err := agtversion.ParseVersion("8.9.0")

internal/pkg/agent/application/upgrade/upgrade.go

Lines changed: 20 additions & 112 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,6 @@ import (
3636
"github.com/elastic/elastic-agent/pkg/control/v2/client"
3737
"github.com/elastic/elastic-agent/pkg/control/v2/cproto"
3838
"github.com/elastic/elastic-agent/pkg/core/logger"
39-
"github.com/elastic/elastic-agent/pkg/core/process"
40-
"github.com/elastic/elastic-agent/pkg/utils"
4139
agtversion "github.com/elastic/elastic-agent/pkg/version"
4240
currentagtversion "github.com/elastic/elastic-agent/version"
4341
)
@@ -71,12 +69,21 @@ func init() {
7169
}
7270
}
7371

74-
// TODO substitute all the references to watcher with calls to the interface
72+
// WatcherHelper is an abstraction of operations that Upgrader will trigger on elastic-agent watcher.
73+
// This is defined to help with Upgrader testing and verify interactions with elastic-agent watcher
7574
type WatcherHelper interface {
75+
// InvokeWatcher invokes an elastic-agent watcher using the agentExecutable passed as argument
7676
InvokeWatcher(log *logger.Logger, agentExecutable string) (*exec.Cmd, error)
77-
SelectWatcherExecutable(topDir string, agentInstalls ...agentInstall) string
77+
// SelectWatcherExecutable will return the path to the newer elastic-agent executable that will be used to invoke the
78+
// more recent watcher between the previous (the agent that started the upgrade) and current (the agent that will run after restart)
79+
// agent installation
80+
SelectWatcherExecutable(topDir string, previous agentInstall, current agentInstall) string
81+
// WaitForWatcher will listen for changes to the update marker, waiting for the elastic-agent watcher to set UPG_WATCHING state
82+
// in the upgrade details' metadata
7883
WaitForWatcher(ctx context.Context, log *logger.Logger, markerFilePath string, waitTime time.Duration) error
79-
TakeOverWatcher(ctx context.Context, topDir string) (*filelock.AppLocker, error)
84+
// TakeOverWatcher will look for watcher processes and terminate them while at the same time trying to acquire the watcher AppLocker.
85+
// It will return once it managed to get the AppLocker or with an error if the lock could not be acquired.
86+
TakeOverWatcher(ctx context.Context, log *logger.Logger, topDir string) (*filelock.AppLocker, error)
8087
}
8188

8289
// Upgrader performs an upgrade
@@ -88,6 +95,7 @@ type Upgrader struct {
8895
upgradeable bool
8996
fleetServerURI string
9097
markerWatcher MarkerWatcher
98+
watcherHelper WatcherHelper
9199
}
92100

93101
// IsUpgradeable when agent is installed and running as a service or flag was provided.
@@ -98,14 +106,15 @@ func IsUpgradeable() bool {
98106
}
99107

100108
// NewUpgrader creates an upgrader which is capable of performing upgrade operation
101-
func NewUpgrader(log *logger.Logger, settings *artifact.Config, upgradeConfig *configuration.UpgradeConfig, agentInfo info.Agent) (*Upgrader, error) {
109+
func NewUpgrader(log *logger.Logger, settings *artifact.Config, upgradeConfig *configuration.UpgradeConfig, agentInfo info.Agent, watcherHelper WatcherHelper) (*Upgrader, error) {
102110
return &Upgrader{
103111
log: log,
104112
settings: settings,
105113
upgradeSettings: upgradeConfig,
106114
agentInfo: agentInfo,
107115
upgradeable: IsUpgradeable(),
108116
markerWatcher: newMarkerFileWatcher(markerFilePath(paths.Data()), log),
117+
watcherHelper: watcherHelper,
109118
}, nil
110119
}
111120

@@ -368,16 +377,16 @@ func (u *Upgrader) Upgrade(ctx context.Context, version string, rollback bool, s
368377
return nil, goerrors.Join(err, rollbackErr)
369378
}
370379

371-
watcherExecutable := selectWatcherExecutable(paths.Top(), previous, current)
380+
watcherExecutable := u.watcherHelper.SelectWatcherExecutable(paths.Top(), previous, current)
372381

373382
var watcherCmd *exec.Cmd
374-
if watcherCmd, err = InvokeWatcher(u.log, watcherExecutable); err != nil {
383+
if watcherCmd, err = u.watcherHelper.InvokeWatcher(u.log, watcherExecutable); err != nil {
375384
u.log.Errorw("Rolling back: starting watcher failed", "error.message", err)
376385
rollbackErr := rollbackInstall(ctx, u.log, paths.Top(), hashedDir, currentVersionedHome)
377386
return nil, goerrors.Join(err, rollbackErr)
378387
}
379388

380-
watcherWaitErr := waitForWatcher(ctx, u.log, markerFilePath(paths.Data()), watcherMaxWaitTime)
389+
watcherWaitErr := u.watcherHelper.WaitForWatcher(ctx, u.log, markerFilePath(paths.Data()), watcherMaxWaitTime)
381390
if watcherWaitErr != nil {
382391
killWatcherErr := watcherCmd.Process.Kill()
383392
rollbackErr := rollbackInstall(ctx, u.log, paths.Top(), hashedDir, currentVersionedHome)
@@ -408,7 +417,7 @@ func (u *Upgrader) forceRollbackToPreviousVersion(ctx context.Context, topDir st
408417
}
409418

410419
// Invoke watcher again
411-
_, err = InvokeWatcher(u.log, paths.BinaryPath(paths.VersionedHome(topDir), agentName))
420+
_, err = u.watcherHelper.InvokeWatcher(u.log, paths.BinaryPath(paths.VersionedHome(topDir), agentName))
412421
if err != nil {
413422
return nil, fmt.Errorf("invoking watcher: %w", err)
414423
}
@@ -418,7 +427,7 @@ func (u *Upgrader) forceRollbackToPreviousVersion(ctx context.Context, topDir st
418427
}
419428

420429
func (u *Upgrader) PersistManualRollback(ctx context.Context, topDir string) error {
421-
watcherApplock, err := u.takeOverWatcher(ctx, topDir)
430+
watcherApplock, err := u.watcherHelper.TakeOverWatcher(ctx, u.log, topDir)
422431
if err != nil {
423432
return fmt.Errorf("taking over watcher processes: %w", err)
424433
}
@@ -443,107 +452,6 @@ func (u *Upgrader) PersistManualRollback(ctx context.Context, topDir string) err
443452
return nil
444453
}
445454

446-
func (u *Upgrader) takeOverWatcher(ctx context.Context, topDir string) (*filelock.AppLocker, error) {
447-
448-
takeoverCtx, takeoverCancel := context.WithTimeout(ctx, 30*time.Second)
449-
defer takeoverCancel()
450-
go func() {
451-
killingTicker := time.NewTicker(500 * time.Millisecond)
452-
defer killingTicker.Stop()
453-
for {
454-
select {
455-
case <-takeoverCtx.Done():
456-
return
457-
case <-killingTicker.C:
458-
pids, err := utils.GetWatcherPIDs()
459-
if err != nil {
460-
u.log.Errorf("error listing watcher processes: %s", err)
461-
continue
462-
}
463-
464-
// this should be run continuously and concurrently attempting to get the app locker
465-
for _, pid := range pids {
466-
u.log.Debugf("attempting to kill watcher process with PID: %d", pid)
467-
watcherProcess, findProcErr := os.FindProcess(pid)
468-
if findProcErr != nil {
469-
u.log.Errorf("error finding process with PID: %d: %s", pid, findProcErr)
470-
continue
471-
}
472-
killProcErr := process.Terminate(watcherProcess)
473-
if killProcErr != nil {
474-
u.log.Errorf("error killing process with PID: %d: %s", pid, killProcErr)
475-
}
476-
u.log.Debugf("killed watcher process with PID: %d", pid)
477-
}
478-
}
479-
}
480-
}()
481-
482-
// we should retry to take over the AppLocker for 30s, but AppLocker interface is limited
483-
takeOverTicker := time.NewTicker(100 * time.Millisecond)
484-
defer takeOverTicker.Stop()
485-
for {
486-
select {
487-
case <-takeoverCtx.Done():
488-
return nil, fmt.Errorf("timed out taking over watcher applocker")
489-
case <-takeOverTicker.C:
490-
locker := filelock.NewAppLocker(topDir, "watcher.lock")
491-
err := locker.TryLock()
492-
if err != nil {
493-
u.log.Errorf("error locking watcher applocker: %s", err)
494-
continue
495-
}
496-
return locker, nil
497-
}
498-
}
499-
}
500-
501-
func selectWatcherExecutable(topDir string, previous agentInstall, current agentInstall) string {
502-
// check if the upgraded version is less than the previous (currently installed) version
503-
if current.parsedVersion.Less(*previous.parsedVersion) {
504-
// use the current agent executable for watch, if downgrading the old agent doesn't understand the current agent's path structure.
505-
return paths.BinaryPath(filepath.Join(topDir, previous.versionedHome), agentName)
506-
} else {
507-
// use the new agent executable as it should be able to parse the new update marker
508-
return paths.BinaryPath(filepath.Join(topDir, current.versionedHome), agentName)
509-
}
510-
}
511-
512-
func waitForWatcher(ctx context.Context, log *logger.Logger, markerFilePath string, waitTime time.Duration) error {
513-
return waitForWatcherWithTimeoutCreationFunc(ctx, log, markerFilePath, waitTime, context.WithTimeout)
514-
}
515-
516-
type createContextWithTimeout func(ctx context.Context, timeout time.Duration) (context.Context, context.CancelFunc)
517-
518-
func waitForWatcherWithTimeoutCreationFunc(ctx context.Context, log *logger.Logger, markerFilePath string, waitTime time.Duration, createTimeoutContext createContextWithTimeout) error {
519-
// Wait for the watcher to be up and running
520-
watcherContext, cancel := createTimeoutContext(ctx, waitTime)
521-
defer cancel()
522-
523-
markerWatcher := newMarkerFileWatcher(markerFilePath, log)
524-
err := markerWatcher.Run(watcherContext)
525-
if err != nil {
526-
return fmt.Errorf("error starting update marker watcher: %w", err)
527-
}
528-
529-
log.Infof("waiting up to %s for upgrade watcher to set %s state in upgrade marker", waitTime, details.StateWatching)
530-
531-
for {
532-
select {
533-
case updMarker := <-markerWatcher.Watch():
534-
if updMarker.Details != nil && updMarker.Details.State == details.StateWatching {
535-
// watcher started and it is watching, all good
536-
log.Infof("upgrade watcher set %s state in upgrade marker: exiting wait loop", details.StateWatching)
537-
return nil
538-
}
539-
540-
case <-watcherContext.Done():
541-
log.Errorf("upgrade watcher did not start watching within %s or context has expired", waitTime)
542-
return goerrors.Join(ErrWatcherNotStarted, watcherContext.Err())
543-
}
544-
}
545-
}
546-
547455
// Ack acks last upgrade action
548456
func (u *Upgrader) Ack(ctx context.Context, acker acker.Acker) error {
549457
// get upgrade action

0 commit comments

Comments
 (0)