Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
kind: bug-fix
summary: fix reporting of scheduled upgrade details across restarts and cancels
component: elastic-agent
pr: https://github.com/elastic/elastic-agent/pull/9562
issue: https://github.com/elastic/elastic-agent/issues/8778
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func TestUpgradeHandler(t *testing.T) {
return nil, nil
},
},
nil, nil, nil, nil, nil, false, nil)
nil, nil, nil, nil, nil, false, nil, nil)
//nolint:errcheck // We don't need the termination state of the Coordinator
go c.Run(ctx)

Expand Down Expand Up @@ -174,7 +174,7 @@ func TestUpgradeHandlerSameVersion(t *testing.T) {
return nil, err
},
},
nil, nil, nil, nil, nil, false, nil)
nil, nil, nil, nil, nil, false, nil, nil)
//nolint:errcheck // We don't need the termination state of the Coordinator
go c.Run(ctx)

Expand Down Expand Up @@ -233,7 +233,7 @@ func TestDuplicateActionsHandled(t *testing.T) {
return nil, nil
},
},
nil, nil, nil, nil, nil, false, acker)
nil, nil, nil, nil, nil, false, acker, nil)
//nolint:errcheck // We don't need the termination state of the Coordinator
go c.Run(ctx)

Expand Down Expand Up @@ -327,7 +327,7 @@ func TestUpgradeHandlerNewVersion(t *testing.T) {
return nil, nil
},
},
nil, nil, nil, nil, nil, false, nil)
nil, nil, nil, nil, nil, false, nil, nil)
//nolint:errcheck // We don't need the termination state of the Coordinator
go c.Run(ctx)

Expand Down
20 changes: 17 additions & 3 deletions internal/pkg/agent/application/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ import (
"github.com/elastic/elastic-agent-libs/logp"

"github.com/elastic/elastic-agent/internal/pkg/agent/application/coordinator"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/dispatcher"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/info"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/monitoring"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/paths"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/details"
"github.com/elastic/elastic-agent/internal/pkg/agent/configuration"
"github.com/elastic/elastic-agent/internal/pkg/agent/errors"
"github.com/elastic/elastic-agent/internal/pkg/agent/storage"
Expand All @@ -34,6 +36,7 @@ import (
"github.com/elastic/elastic-agent/internal/pkg/fleetapi/acker/lazy"
"github.com/elastic/elastic-agent/internal/pkg/fleetapi/acker/retrier"
fleetclient "github.com/elastic/elastic-agent/internal/pkg/fleetapi/client"
"github.com/elastic/elastic-agent/internal/pkg/queue"
"github.com/elastic/elastic-agent/internal/pkg/release"
"github.com/elastic/elastic-agent/pkg/component"
"github.com/elastic/elastic-agent/pkg/component/runtime"
Expand All @@ -52,6 +55,7 @@ func New(
testingMode bool,
fleetInitTimeout time.Duration,
disableMonitoring bool,
initialUpgradeDetails *details.Details,
modifiers ...component.PlatformModifier,
) (*coordinator.Coordinator, coordinator.ConfigManager, composable.Controller, error) {

Expand Down Expand Up @@ -131,7 +135,6 @@ func New(
var compModifiers = []coordinator.ComponentsModifier{InjectAPMConfig}
var composableManaged bool
var isManaged bool

var actionAcker acker.Acker
if testingMode {
log.Info("Elastic Agent has been started in testing mode and is managed through the control protocol")
Expand Down Expand Up @@ -195,8 +198,19 @@ func New(
batchedAcker := lazy.NewAcker(fleetAcker, log, lazy.WithRetrier(retrier))
actionAcker = stateStore.NewStateStoreActionAcker(batchedAcker, stateStorage)

actionQueue, err := queue.NewActionQueue(stateStorage.Queue(), stateStorage)
if err != nil {
return nil, nil, nil, fmt.Errorf("unable to initialize action queue: %w", err)
}

if initialUpgradeDetails == nil {
// initial upgrade details are nil (normally the caller supplies the ones from the marker file at this point),
// hence, extract any scheduled upgrade details from the action queue.
initialUpgradeDetails = dispatcher.GetScheduledUpgradeDetails(log, actionQueue.Actions(), time.Now())
}

// TODO: stop using global state
managed, err = newManagedConfigManager(ctx, log, agentInfo, cfg, store, runtime, fleetInitTimeout, paths.Top(), client, fleetAcker, actionAcker, retrier, stateStorage, upgrader)
managed, err = newManagedConfigManager(ctx, log, agentInfo, cfg, store, runtime, fleetInitTimeout, paths.Top(), client, fleetAcker, actionAcker, retrier, stateStorage, actionQueue, upgrader)
if err != nil {
return nil, nil, nil, err
}
Expand All @@ -210,7 +224,7 @@ func New(
return nil, nil, nil, errors.New(err, "failed to initialize composable controller")
}

coord := coordinator.New(log, cfg, logLevel, agentInfo, specs, reexec, upgrader, runtime, configMgr, varsManager, caps, monitor, isManaged, actionAcker, compModifiers...)
coord := coordinator.New(log, cfg, logLevel, agentInfo, specs, reexec, upgrader, runtime, configMgr, varsManager, caps, monitor, isManaged, actionAcker, initialUpgradeDetails, compModifiers...)
if managed != nil {
// the coordinator requires the config manager as well as in managed-mode the config manager requires the
// coordinator, so it must be set here once the coordinator is created
Expand Down
1 change: 1 addition & 0 deletions internal/pkg/agent/application/application_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func TestLimitsLog(t *testing.T) {
true, // testingMode
time.Millisecond, // fleetInitTimeout
true, // disable monitoring
nil,
)
require.NoError(t, err)

Expand Down
30 changes: 24 additions & 6 deletions internal/pkg/agent/application/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,24 @@ type UpdateComponentChange struct {
}

// New creates a new coordinator.
func New(logger *logger.Logger, cfg *configuration.Configuration, logLevel logp.Level, agentInfo info.Agent, specs component.RuntimeSpecs, reexecMgr ReExecManager, upgradeMgr UpgradeManager, runtimeMgr RuntimeManager, configMgr ConfigManager, varsMgr VarsManager, caps capabilities.Capabilities, monitorMgr MonitorManager, isManaged bool, fleetAcker acker.Acker, modifiers ...ComponentsModifier) *Coordinator {
func New(
logger *logger.Logger,
cfg *configuration.Configuration,
logLevel logp.Level,
agentInfo info.Agent,
specs component.RuntimeSpecs,
reexecMgr ReExecManager,
upgradeMgr UpgradeManager,
runtimeMgr RuntimeManager,
configMgr ConfigManager,
varsMgr VarsManager,
caps capabilities.Capabilities,
monitorMgr MonitorManager,
isManaged bool,
fleetAcker acker.Acker,
initialUpgradeDetails *details.Details,
modifiers ...ComponentsModifier,
) *Coordinator {
var fleetState cproto.State
var fleetMessage string
if !isManaged {
Expand All @@ -352,11 +369,12 @@ func New(logger *logger.Logger, cfg *configuration.Configuration, logLevel logp.
fleetMessage = "Not enrolled into Fleet"
}
state := State{
State: agentclient.Starting,
Message: "Starting",
FleetState: fleetState,
FleetMessage: fleetMessage,
LogLevel: logLevel,
State: agentclient.Starting,
Message: "Starting",
FleetState: fleetState,
FleetMessage: fleetMessage,
LogLevel: logLevel,
UpgradeDetails: initialUpgradeDetails,
}
c := &Coordinator{
logger: logger,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1016,7 +1016,7 @@ func createCoordinator(t testing.TB, ctx context.Context, opts ...CoordinatorOpt
acker = &fakeActionAcker{}
}

coord := New(l, nil, logp.DebugLevel, ai, specs, &fakeReExecManager{}, upgradeManager, rm, cfgMgr, varsMgr, caps, monitoringMgr, o.managed, acker)
coord := New(l, nil, logp.DebugLevel, ai, specs, &fakeReExecManager{}, upgradeManager, rm, cfgMgr, varsMgr, caps, monitoringMgr, o.managed, acker, nil)
return coord, cfgMgr, varsMgr
}

Expand Down
Loading
Loading