Skip to content
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
kind: bug-fix
summary: fix reporting of scheduled upgrade details across restarts and cancels
component: elastic-agent
pr: https://github.com/elastic/elastic-agent/pull/9562
issue: https://github.com/elastic/elastic-agent/issues/8778
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func TestUpgradeHandler(t *testing.T) {
return nil, nil
},
},
nil, nil, nil, nil, nil, false, nil, nil)
nil, nil, nil, nil, nil, false, nil, nil, nil)
//nolint:errcheck // We don't need the termination state of the Coordinator
go c.Run(ctx)

Expand Down Expand Up @@ -174,7 +174,7 @@ func TestUpgradeHandlerSameVersion(t *testing.T) {
return nil, err
},
},
nil, nil, nil, nil, nil, false, nil, nil)
nil, nil, nil, nil, nil, false, nil, nil, nil)
//nolint:errcheck // We don't need the termination state of the Coordinator
go c.Run(ctx)

Expand Down Expand Up @@ -233,7 +233,7 @@ func TestDuplicateActionsHandled(t *testing.T) {
return nil, nil
},
},
nil, nil, nil, nil, nil, false, nil, acker)
nil, nil, nil, nil, nil, false, nil, acker, nil)
//nolint:errcheck // We don't need the termination state of the Coordinator
go c.Run(ctx)

Expand Down Expand Up @@ -327,7 +327,7 @@ func TestUpgradeHandlerNewVersion(t *testing.T) {
return nil, nil
},
},
nil, nil, nil, nil, nil, false, nil, nil)
nil, nil, nil, nil, nil, false, nil, nil, nil)
//nolint:errcheck // We don't need the termination state of the Coordinator
go c.Run(ctx)

Expand Down
20 changes: 17 additions & 3 deletions internal/pkg/agent/application/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ import (
"github.com/elastic/elastic-agent-libs/logp"

"github.com/elastic/elastic-agent/internal/pkg/agent/application/coordinator"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/dispatcher"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/info"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/monitoring"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/paths"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/details"
"github.com/elastic/elastic-agent/internal/pkg/agent/configuration"
"github.com/elastic/elastic-agent/internal/pkg/agent/errors"
"github.com/elastic/elastic-agent/internal/pkg/agent/storage"
Expand All @@ -34,6 +36,7 @@ import (
"github.com/elastic/elastic-agent/internal/pkg/fleetapi/acker/retrier"
fleetclient "github.com/elastic/elastic-agent/internal/pkg/fleetapi/client"
otelmanager "github.com/elastic/elastic-agent/internal/pkg/otel/manager"
"github.com/elastic/elastic-agent/internal/pkg/queue"
"github.com/elastic/elastic-agent/internal/pkg/release"
"github.com/elastic/elastic-agent/pkg/component"
"github.com/elastic/elastic-agent/pkg/component/runtime"
Expand All @@ -59,6 +62,7 @@ func New(
fleetInitTimeout time.Duration,
disableMonitoring bool,
override CfgOverrider,
initialUpgradeDetails *details.Details,
modifiers ...component.PlatformModifier,
) (*coordinator.Coordinator, coordinator.ConfigManager, composable.Controller, error) {

Expand Down Expand Up @@ -143,7 +147,6 @@ func New(
var compModifiers = []coordinator.ComponentsModifier{InjectAPMConfig}
var composableManaged bool
var isManaged bool

var actionAcker acker.Acker
if testingMode {
log.Info("Elastic Agent has been started in testing mode and is managed through the control protocol")
Expand Down Expand Up @@ -212,8 +215,19 @@ func New(
batchedAcker := lazy.NewAcker(fleetAcker, log, lazy.WithRetrier(retrier))
actionAcker = stateStore.NewStateStoreActionAcker(batchedAcker, stateStorage)

actionQueue, err := queue.NewActionQueue(stateStorage.Queue(), stateStorage)
if err != nil {
return nil, nil, nil, fmt.Errorf("unable to initialize action queue: %w", err)
}

if initialUpgradeDetails == nil {
// initial upgrade details are nil (normally the caller supplies the ones from the marker file at this point),
// hence, extract any scheduled upgrade details from the action queue.
initialUpgradeDetails = dispatcher.GetScheduledUpgradeDetails(log, actionQueue.Actions(), time.Now())
}

// TODO: stop using global state
managed, err = newManagedConfigManager(ctx, log, agentInfo, cfg, store, runtime, fleetInitTimeout, paths.Top(), client, fleetAcker, actionAcker, retrier, stateStorage, upgrader)
managed, err = newManagedConfigManager(ctx, log, agentInfo, cfg, store, runtime, fleetInitTimeout, paths.Top(), client, fleetAcker, actionAcker, retrier, stateStorage, actionQueue, upgrader)
if err != nil {
return nil, nil, nil, err
}
Expand All @@ -230,7 +244,7 @@ func New(
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to create otel manager: %w", err)
}
coord := coordinator.New(log, cfg, logLevel, agentInfo, specs, reexec, upgrader, runtime, configMgr, varsManager, caps, monitor, isManaged, otelManager, actionAcker, compModifiers...)
coord := coordinator.New(log, cfg, logLevel, agentInfo, specs, reexec, upgrader, runtime, configMgr, varsManager, caps, monitor, isManaged, otelManager, actionAcker, initialUpgradeDetails, compModifiers...)
if managed != nil {
// the coordinator requires the config manager as well as in managed-mode the config manager requires the
// coordinator, so it must be set here once the coordinator is created
Expand Down
1 change: 1 addition & 0 deletions internal/pkg/agent/application/application_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func TestLimitsLog(t *testing.T) {
time.Millisecond, // fleetInitTimeout
true, // disable monitoring
nil, // no configuration overrides
nil,
)
require.NoError(t, err)

Expand Down
12 changes: 7 additions & 5 deletions internal/pkg/agent/application/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,7 @@ func New(
isManaged bool,
otelMgr OTelManager,
fleetAcker acker.Acker,
initialUpgradeDetails *details.Details,
modifiers ...ComponentsModifier,
) *Coordinator {
var fleetState cproto.State
Expand All @@ -430,11 +431,12 @@ func New(
fleetMessage = "Not enrolled into Fleet"
}
state := State{
State: agentclient.Starting,
Message: "Starting",
FleetState: fleetState,
FleetMessage: fleetMessage,
LogLevel: logLevel,
State: agentclient.Starting,
Message: "Starting",
FleetState: fleetState,
FleetMessage: fleetMessage,
LogLevel: logLevel,
UpgradeDetails: initialUpgradeDetails,
}
c := &Coordinator{
logger: logger,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1093,7 +1093,7 @@ func createCoordinator(t testing.TB, ctx context.Context, opts ...CoordinatorOpt
acker = &fakeActionAcker{}
}

coord := New(l, nil, logp.DebugLevel, ai, specs, &fakeReExecManager{}, upgradeManager, rm, cfgMgr, varsMgr, caps, monitoringMgr, o.managed, otelMgr, acker)
coord := New(l, nil, logp.DebugLevel, ai, specs, &fakeReExecManager{}, upgradeManager, rm, cfgMgr, varsMgr, caps, monitoringMgr, o.managed, otelMgr, acker, nil)
return coord, cfgMgr, varsMgr
}

Expand Down
Loading
Loading