Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
1de5336
Add rollback field to UpgradeRequest
pchila Jun 30, 2025
191a9a5
introduce rollback parameter to upgrade
pchila Jun 30, 2025
d427690
manual rollback from CLI PoC
pchila Jul 1, 2025
623b65b
Concurrently retry taking over watcher
pchila Jul 1, 2025
fe5f321
Gracefully shutdown agent watcher
pchila Jul 7, 2025
27247d6
move desired outcome check before grace period evaluation
pchila Jul 9, 2025
bcb8c93
Add rollbacks available to upgrade marker
pchila Jul 11, 2025
8736b84
remove fakeAcker in favour of generated Acker mock
pchila Jul 14, 2025
ca3aa08
Introduce WatcherHelper
pchila Jul 15, 2025
f01bfe7
Add tests for available_rollbacks
pchila Jul 16, 2025
903e4bf
Add tests for takeOverWatcher
pchila Jul 16, 2025
25db8f9
add testlocker binary to sonar exclusions
pchila Jul 21, 2025
cf8defd
disable rollback window by default
pchila Jul 25, 2025
d0017d4
Add formal checks to manual rollback arguments
pchila Jul 28, 2025
c4a242a
rename forceRollbackToPreviousVersion
pchila Jul 28, 2025
e37bc91
test watchloop
pchila Jul 28, 2025
3bd7b10
Re-invoke watcher after takeover
pchila Aug 1, 2025
6037333
Add minimum version check for creating rollbacks entries in update ma…
pchila Aug 1, 2025
ba53d11
Add manual rollback integration test
pchila Aug 2, 2025
09da0d7
Create watcher subprocess with a new Console on windows
pchila Aug 6, 2025
ca2d073
Gracefully terminate watcher process on windows
pchila Aug 6, 2025
14ce684
Add watcher takedown tests
pchila Aug 7, 2025
8e401cd
Add in-process watcher grappler
pchila Aug 11, 2025
98727ac
WIP use in-process grappler
pchila Aug 11, 2025
e7e7d49
remove in-process grappler in favor of commandGrappler
pchila Aug 11, 2025
c4f24c0
Allow watcher to listen to signals only during watch loop
pchila Aug 11, 2025
05778eb
Add postWatchHook to watcher process start to keep race detector happy
pchila Aug 12, 2025
7e18b86
fix lint errors
pchila Aug 12, 2025
f1a0a99
Fix data races in unit tests
pchila Aug 12, 2025
e6cff5c
make watcher rollback only if the agent has not been already rolled back
pchila Aug 15, 2025
dac5bb2
fix lint
pchila Aug 15, 2025
2822486
Add a pre-restart hook to Rollback operation
pchila Aug 15, 2025
1f936ce
Update upgrade details metadata Equals() with new fields
pchila Aug 27, 2025
d04351a
Remove parent death signal for watcher on linux
pchila Aug 28, 2025
acb0c65
Refactor: move TakedownWatcher function to watch subcommand
pchila Aug 28, 2025
5d6d7f4
Move RollbacksAvailable struct out of upgrade details
pchila Aug 29, 2025
35a0aec
extract manifest PathMapper to its own package
pchila Aug 29, 2025
13eec08
add install descriptor during initial install
pchila Aug 29, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 18 additions & 4 deletions .mockery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,24 @@ packages:
interfaces:
Agent:
github.com/elastic/elastic-agent/internal/pkg/agent/cmd:
config:
inpackage: True
with-expecter: True
dir: "{{.InterfaceDirRelative}}"
mockname: "{{.Mock}}{{.InterfaceName | firstUpper}}"
outpkg: "{{.PackageName}}"
filename: "{{.Mock | lower}}_{{.InterfaceName | lower}}_test.go"
interfaces:
agentWatcher:
config:
mockname: "AgentWatcher"
installationModifier:
config:
mockname: "InstallationModifier"
github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade:
config:
inpackage: True
with-expecter: True
dir: "{{.InterfaceDirRelative}}"
mockname: "{{.Mock}}{{.InterfaceName | firstUpper}}"
outpkg: "{{.PackageName}}"
filename: "{{.Mock | lower}}_{{.InterfaceName | lower}}_test.go"
interfaces:
WatcherHelper:
watcherGrappler:
2 changes: 1 addition & 1 deletion _meta/config/common.reference.p2.yml.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ inputs:
# # rollback settings
# rollback:
# # duration in which an upgraded Agent may be manually rolled back.
# window: 168h
# window: 0

# agent.process:
# # timeout for creating new processes. when process is not successfully created by this timeout
Expand Down
3 changes: 3 additions & 0 deletions control_v2.proto
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,9 @@ message UpgradeRequest {
//
// If provided Elastic Agent package embedded PGP key is not checked for signature during upgrade.
bool skipDefaultPgp = 5;

// If true it indicates that we wish to rollback the current/last upgrade
bool rollback = 6;
}

// A upgrade response message.
Expand Down
2 changes: 1 addition & 1 deletion elastic-agent.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ inputs:
# # rollback settings
# rollback:
# # duration in which an upgraded Agent may be manually rolled back.
# window: 168h
# window: 0

# agent.process:
# # timeout for creating new processes. when process is not successfully created by this timeout
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (h *Upgrade) Handle(ctx context.Context, a fleetapi.Action, ack acker.Acker

go func() {
h.log.Infof("starting upgrade to version %s in background", action.Data.Version)
if err := h.coord.Upgrade(asyncCtx, action.Data.Version, action.Data.SourceURI, action, false, false); err != nil {
if err := h.coord.Upgrade(asyncCtx, action.Data.Version, false, action.Data.SourceURI, action, false, false); err != nil {
h.log.Errorf("upgrade to version %s failed: %v", action.Data.Version, err)
// If context is cancelled in getAsyncContext, the actions are acked there
if !errors.Is(asyncCtx.Err(), context.Canceled) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,7 @@ func (u *mockUpgradeManager) Reload(rawConfig *config.Config) error {
return nil
}

func (u *mockUpgradeManager) Upgrade(
ctx context.Context,
version string,
sourceURI string,
action *fleetapi.ActionUpgrade,
details *details.Details,
skipVerifyOverride bool,
skipDefaultPgp bool,
pgpBytes ...string) (reexec.ShutdownCallbackFn, error) {
func (u *mockUpgradeManager) Upgrade(ctx context.Context, version string, rollback bool, sourceURI string, action *fleetapi.ActionUpgrade, details *details.Details, skipVerifyOverride bool, skipDefaultPgp bool, pgpBytes ...string) (reexec.ShutdownCallbackFn, error) {

return u.UpgradeFn(
ctx,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type actionCoordinator interface {

type upgradeCoordinator interface {
actionCoordinator
Upgrade(ctx context.Context, version string, sourceURI string, action *fleetapi.ActionUpgrade, skipVerifyOverride bool, skipDefaultPgp bool, pgpBytes ...string) error
Upgrade(ctx context.Context, version string, rollback bool, sourceURI string, action *fleetapi.ActionUpgrade, skipVerifyOverride bool, skipDefaultPgp bool, pgpBytes ...string) error
}

type performActionFunc func(context.Context, component.Component, component.Unit, string, map[string]interface{}) (map[string]interface{}, error)
Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/agent/application/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func New(

// monitoring is not supported in bootstrap mode https://github.com/elastic/elastic-agent/issues/1761
isMonitoringSupported := !disableMonitoring && cfg.Settings.V1MonitoringEnabled
upgrader, err := upgrade.NewUpgrader(log, cfg.Settings.DownloadConfig, agentInfo)
upgrader, err := upgrade.NewUpgrader(log, cfg.Settings.DownloadConfig, cfg.Settings.Upgrade, agentInfo, new(upgrade.AgentWatcherHelper))
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to create upgrader: %w", err)
}
Expand Down
6 changes: 3 additions & 3 deletions internal/pkg/agent/application/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ type UpgradeManager interface {
Reload(rawConfig *config.Config) error

// Upgrade upgrades running agent.
Upgrade(ctx context.Context, version string, sourceURI string, action *fleetapi.ActionUpgrade, details *details.Details, skipVerifyOverride bool, skipDefaultPgp bool, pgpBytes ...string) (_ reexec.ShutdownCallbackFn, err error)
Upgrade(ctx context.Context, version string, rollback bool, sourceURI string, action *fleetapi.ActionUpgrade, details *details.Details, skipVerifyOverride bool, skipDefaultPgp bool, pgpBytes ...string) (_ reexec.ShutdownCallbackFn, err error)

// Ack is used on startup to check if the agent has upgraded and needs to send an ack for the action
Ack(ctx context.Context, acker acker.Acker) error
Expand Down Expand Up @@ -695,7 +695,7 @@ func (c *Coordinator) Migrate(ctx context.Context, action *fleetapi.ActionMigrat

// Upgrade runs the upgrade process.
// Called from external goroutines.
func (c *Coordinator) Upgrade(ctx context.Context, version string, sourceURI string, action *fleetapi.ActionUpgrade, skipVerifyOverride bool, skipDefaultPgp bool, pgpBytes ...string) error {
func (c *Coordinator) Upgrade(ctx context.Context, version string, rollback bool, sourceURI string, action *fleetapi.ActionUpgrade, skipVerifyOverride bool, skipDefaultPgp bool, pgpBytes ...string) error {
// early check outside of upgrader before overriding the state
if !c.upgradeMgr.Upgradeable() {
return ErrNotUpgradable
Expand Down Expand Up @@ -735,7 +735,7 @@ func (c *Coordinator) Upgrade(ctx context.Context, version string, sourceURI str
det := details.NewDetails(version, details.StateRequested, actionID)
det.RegisterObserver(c.SetUpgradeDetails)

cb, err := c.upgradeMgr.Upgrade(ctx, version, sourceURI, action, det, skipVerifyOverride, skipDefaultPgp, pgpBytes...)
cb, err := c.upgradeMgr.Upgrade(ctx, version, rollback, sourceURI, action, det, skipVerifyOverride, skipDefaultPgp, pgpBytes...)
if err != nil {
c.ClearOverrideState()
if errors.Is(err, upgrade.ErrUpgradeSameVersion) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,7 @@ func TestUpgradeSameErrorAcked(t *testing.T) {

acker.On("Ack", mock.Anything, actionUpgrade).Return(nil)

require.NoError(t, coord.Upgrade(t.Context(), "9.0", "http://localhost", actionUpgrade, true, true))
require.NoError(t, coord.Upgrade(t.Context(), "9.0", false, "http://localhost", actionUpgrade, true, true))

acker.AssertCalled(t, "Ack", mock.Anything, actionUpgrade)
}
Expand Down Expand Up @@ -917,7 +917,7 @@ func TestCoordinator_Upgrade(t *testing.T) {
require.NoError(t, err)
cfgMgr.Config(ctx, cfg)

err = coord.Upgrade(ctx, "9.0.0", "", nil, true, false)
err = coord.Upgrade(ctx, "9.0.0", false, "", nil, true, false)
require.ErrorIs(t, err, ErrNotUpgradable)
cancel()

Expand Down Expand Up @@ -954,7 +954,7 @@ func TestCoordinator_UpgradeDetails(t *testing.T) {
require.NoError(t, err)
cfgMgr.Config(ctx, cfg)

err = coord.Upgrade(ctx, "9.0.0", "", nil, true, false)
err = coord.Upgrade(ctx, "9.0.0", false, "", nil, true, false)
require.ErrorIs(t, expectedErr, err)
cancel()

Expand Down Expand Up @@ -1159,7 +1159,7 @@ func (f *fakeUpgradeManager) Reload(cfg *config.Config) error {
return nil
}

func (f *fakeUpgradeManager) Upgrade(ctx context.Context, version string, sourceURI string, action *fleetapi.ActionUpgrade, details *details.Details, skipVerifyOverride bool, skipDefaultPgp bool, pgpBytes ...string) (_ reexec.ShutdownCallbackFn, err error) {
func (f *fakeUpgradeManager) Upgrade(ctx context.Context, version string, rollback bool, sourceURI string, action *fleetapi.ActionUpgrade, details *details.Details, skipVerifyOverride bool, skipDefaultPgp bool, pgpBytes ...string) (_ reexec.ShutdownCallbackFn, err error) {
f.upgradeCalled = true
if f.upgradeErr != nil {
return nil, f.upgradeErr
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -462,11 +462,7 @@ func TestCoordinatorReportsInvalidPolicy(t *testing.T) {
}
}()

upgradeMgr, err := upgrade.NewUpgrader(
log,
&artifact.Config{},
&info.AgentInfo{},
)
upgradeMgr, err := upgrade.NewUpgrader(log, &artifact.Config{}, nil, &info.AgentInfo{}, new(upgrade.AgentWatcherHelper))
require.NoError(t, err, "errored when creating a new upgrader")

// Channels have buffer length 1, so we don't have to run on multiple
Expand Down Expand Up @@ -1526,7 +1522,7 @@ func TestCoordinatorInitiatesUpgrade(t *testing.T) {
}

// Call upgrade and make sure the upgrade manager receives an Upgrade call
err := coord.Upgrade(ctx, "1.2.3", "", nil, false, false)
err := coord.Upgrade(ctx, "1.2.3", false, "", nil, false, false)
assert.True(t, upgradeMgr.upgradeCalled, "Coordinator Upgrade should call upgrade manager Upgrade")
assert.Equal(t, upgradeMgr.upgradeErr, err, "Upgrade should report upgrade manager error")

Expand Down
1 change: 1 addition & 0 deletions internal/pkg/agent/application/filelock/locker.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func (a *AppLocker) TryLock() error {
if !locked {
return ErrAppAlreadyRunning
}

return nil
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# Ignore test binary
testlocker
64 changes: 64 additions & 0 deletions internal/pkg/agent/application/filelock/testlocker/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.

// This is a simple program that will lock an applocker using a file passed using the -lockfile option, used for testing file lock works properly.
// os.Interrupt or signal.SIGTERM will make the program release the lock and exit
package main

import (
"flag"
"log"
"os"
"os/signal"
"path/filepath"
"syscall"

"github.com/elastic/elastic-agent/internal/pkg/agent/application/filelock"
)

const AcquiredLockLogFmt = "Acquired lock on file %s\n"

const lockFileFlagName = "lockfile"
const ignoreSignalFlagName = "ignoresignals"

var lockFile = flag.String(lockFileFlagName, "", "path to lock file")
var ignoreSignals = flag.Bool(ignoreSignalFlagName, false, "ignore signals")

func main() {
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)

flag.Parse()
if *lockFile == "" {
log.Fatalf("No lockfile specified. Please run %s -%s <path to lockfile>", os.Args[0], lockFileFlagName)
}

appLocker := filelock.NewAppLocker(filepath.Dir(*lockFile), filepath.Base(*lockFile))

err := appLocker.TryLock()
if err != nil {
log.Fatalf("Error locking %s: %s", *lockFile, err.Error())
}

defer func(aLocker *filelock.AppLocker) {

if unlockErr := aLocker.Unlock(); unlockErr != nil {
log.Printf("Error unlocking %s: %s", *lockFile, unlockErr.Error())
}
}(appLocker)

log.Printf(AcquiredLockLogFmt, *lockFile)

for {

s := <-signalChan
if *ignoreSignals {
log.Printf("Received signal %v , ignoring it...", s)
continue
}

log.Printf("Received signal %v , exiting...", s)
break
}
}
3 changes: 2 additions & 1 deletion internal/pkg/agent/application/upgrade/details/details.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,8 @@ func (m Metadata) Equals(otherM Metadata) bool {
m.DownloadPercent == otherM.DownloadPercent &&
m.DownloadRate == otherM.DownloadRate &&
equalTimePointers(m.RetryUntil, otherM.RetryUntil) &&
m.RetryErrorMsg == otherM.RetryErrorMsg
m.RetryErrorMsg == otherM.RetryErrorMsg &&
m.Reason == otherM.Reason
}

func equalTimePointers(t, otherT *time.Time) bool {
Expand Down
3 changes: 2 additions & 1 deletion internal/pkg/agent/application/upgrade/details/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,6 @@ const (
StateFailed State = "UPG_FAILED"

// List of well-known reasons for state transitions
ReasonWatchFailed = "watch failed"
ReasonWatchFailed = "watch failed"
ReasonManualRollback = "manual rollback requested"
)
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@ details:
expectedDetails: &details.Details{
TargetVersion: "8.9.2",
State: details.StateRollback,
Metadata: details.Metadata{
Reason: details.ReasonWatchFailed,
},
},
},
"same_version_with_details_some_state": {
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading