Skip to content

Commit 0f86f9c

Browse files
pkoutsovasilismergify[bot]
authored andcommitted
fix: scheduled upgrade details state (#9562)
* fix: persisting and reporting of upgrade details * ci: align and extend dispatcher unit-tests * ci: update coordinator and application new signatures in unit-tests * ci: add integration tests for scheduled upgrade details * doc: add changelog fragment * doc: reword existing and add more comments in code * feat: change queuedUpgradeActions inside dispatchCancelActions to have values of struct{} * fix: remove redundant continue * fix: dedupe upgrade actions from fleetgateway actions, handle correctly the expiration of retried stored actions, and update upgrade details on retries (cherry picked from commit ff80471) # Conflicts: # internal/pkg/agent/application/actions/handlers/handler_action_upgrade_test.go # internal/pkg/agent/application/application.go # internal/pkg/agent/application/application_test.go # internal/pkg/agent/application/coordinator/coordinator.go # internal/pkg/agent/application/coordinator/coordinator_test.go # internal/pkg/agent/cmd/run.go
1 parent b281d6a commit 0f86f9c

File tree

14 files changed

+1165
-293
lines changed

14 files changed

+1165
-293
lines changed
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
kind: bug-fix
2+
summary: fix reporting of scheduled upgrade details across restarts and cancels
3+
component: elastic-agent
4+
pr: https://github.com/elastic/elastic-agent/pull/9562
5+
issue: https://github.com/elastic/elastic-agent/issues/8778

internal/pkg/agent/application/actions/handlers/handler_action_upgrade_test.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,11 @@ func TestUpgradeHandler(t *testing.T) {
115115
return nil, nil
116116
},
117117
},
118+
<<<<<<< HEAD
118119
nil, nil, nil, nil, nil, false, nil)
120+
=======
121+
nil, nil, nil, nil, nil, false, nil, nil, nil)
122+
>>>>>>> ff8047180 (fix: scheduled upgrade details state (#9562))
119123
//nolint:errcheck // We don't need the termination state of the Coordinator
120124
go c.Run(ctx)
121125

@@ -174,7 +178,11 @@ func TestUpgradeHandlerSameVersion(t *testing.T) {
174178
return nil, err
175179
},
176180
},
181+
<<<<<<< HEAD
177182
nil, nil, nil, nil, nil, false, nil)
183+
=======
184+
nil, nil, nil, nil, nil, false, nil, nil, nil)
185+
>>>>>>> ff8047180 (fix: scheduled upgrade details state (#9562))
178186
//nolint:errcheck // We don't need the termination state of the Coordinator
179187
go c.Run(ctx)
180188

@@ -233,7 +241,11 @@ func TestDuplicateActionsHandled(t *testing.T) {
233241
return nil, nil
234242
},
235243
},
244+
<<<<<<< HEAD
236245
nil, nil, nil, nil, nil, false, acker)
246+
=======
247+
nil, nil, nil, nil, nil, false, nil, acker, nil)
248+
>>>>>>> ff8047180 (fix: scheduled upgrade details state (#9562))
237249
//nolint:errcheck // We don't need the termination state of the Coordinator
238250
go c.Run(ctx)
239251

@@ -327,7 +339,11 @@ func TestUpgradeHandlerNewVersion(t *testing.T) {
327339
return nil, nil
328340
},
329341
},
342+
<<<<<<< HEAD
330343
nil, nil, nil, nil, nil, false, nil)
344+
=======
345+
nil, nil, nil, nil, nil, false, nil, nil, nil)
346+
>>>>>>> ff8047180 (fix: scheduled upgrade details state (#9562))
331347
//nolint:errcheck // We don't need the termination state of the Coordinator
332348
go c.Run(ctx)
333349

internal/pkg/agent/application/application.go

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,12 @@ import (
1818
"github.com/elastic/elastic-agent-libs/logp"
1919

2020
"github.com/elastic/elastic-agent/internal/pkg/agent/application/coordinator"
21+
"github.com/elastic/elastic-agent/internal/pkg/agent/application/dispatcher"
2122
"github.com/elastic/elastic-agent/internal/pkg/agent/application/info"
2223
"github.com/elastic/elastic-agent/internal/pkg/agent/application/monitoring"
2324
"github.com/elastic/elastic-agent/internal/pkg/agent/application/paths"
2425
"github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade"
26+
"github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/details"
2527
"github.com/elastic/elastic-agent/internal/pkg/agent/configuration"
2628
"github.com/elastic/elastic-agent/internal/pkg/agent/errors"
2729
"github.com/elastic/elastic-agent/internal/pkg/agent/storage"
@@ -34,6 +36,11 @@ import (
3436
"github.com/elastic/elastic-agent/internal/pkg/fleetapi/acker/lazy"
3537
"github.com/elastic/elastic-agent/internal/pkg/fleetapi/acker/retrier"
3638
fleetclient "github.com/elastic/elastic-agent/internal/pkg/fleetapi/client"
39+
<<<<<<< HEAD
40+
=======
41+
otelmanager "github.com/elastic/elastic-agent/internal/pkg/otel/manager"
42+
"github.com/elastic/elastic-agent/internal/pkg/queue"
43+
>>>>>>> ff8047180 (fix: scheduled upgrade details state (#9562))
3744
"github.com/elastic/elastic-agent/internal/pkg/release"
3845
"github.com/elastic/elastic-agent/pkg/component"
3946
"github.com/elastic/elastic-agent/pkg/component/runtime"
@@ -52,6 +59,11 @@ func New(
5259
testingMode bool,
5360
fleetInitTimeout time.Duration,
5461
disableMonitoring bool,
62+
<<<<<<< HEAD
63+
=======
64+
override CfgOverrider,
65+
initialUpgradeDetails *details.Details,
66+
>>>>>>> ff8047180 (fix: scheduled upgrade details state (#9562))
5567
modifiers ...component.PlatformModifier,
5668
) (*coordinator.Coordinator, coordinator.ConfigManager, composable.Controller, error) {
5769

@@ -131,7 +143,6 @@ func New(
131143
var compModifiers = []coordinator.ComponentsModifier{InjectAPMConfig}
132144
var composableManaged bool
133145
var isManaged bool
134-
135146
var actionAcker acker.Acker
136147
if testingMode {
137148
log.Info("Elastic Agent has been started in testing mode and is managed through the control protocol")
@@ -195,8 +206,19 @@ func New(
195206
batchedAcker := lazy.NewAcker(fleetAcker, log, lazy.WithRetrier(retrier))
196207
actionAcker = stateStore.NewStateStoreActionAcker(batchedAcker, stateStorage)
197208

209+
actionQueue, err := queue.NewActionQueue(stateStorage.Queue(), stateStorage)
210+
if err != nil {
211+
return nil, nil, nil, fmt.Errorf("unable to initialize action queue: %w", err)
212+
}
213+
214+
if initialUpgradeDetails == nil {
215+
// initial upgrade details are nil (normally the caller supplies the ones from the marker file at this point),
216+
// hence, extract any scheduled upgrade details from the action queue.
217+
initialUpgradeDetails = dispatcher.GetScheduledUpgradeDetails(log, actionQueue.Actions(), time.Now())
218+
}
219+
198220
// TODO: stop using global state
199-
managed, err = newManagedConfigManager(ctx, log, agentInfo, cfg, store, runtime, fleetInitTimeout, paths.Top(), client, fleetAcker, actionAcker, retrier, stateStorage, upgrader)
221+
managed, err = newManagedConfigManager(ctx, log, agentInfo, cfg, store, runtime, fleetInitTimeout, paths.Top(), client, fleetAcker, actionAcker, retrier, stateStorage, actionQueue, upgrader)
200222
if err != nil {
201223
return nil, nil, nil, err
202224
}
@@ -210,7 +232,15 @@ func New(
210232
return nil, nil, nil, errors.New(err, "failed to initialize composable controller")
211233
}
212234

235+
<<<<<<< HEAD
213236
coord := coordinator.New(log, cfg, logLevel, agentInfo, specs, reexec, upgrader, runtime, configMgr, varsManager, caps, monitor, isManaged, actionAcker, compModifiers...)
237+
=======
238+
otelManager, err := otelmanager.NewOTelManager(log.Named("otel_manager"), logLevel, baseLogger, otelmanager.EmbeddedExecutionMode, agentInfo, monitor.ComponentMonitoringConfig)
239+
if err != nil {
240+
return nil, nil, nil, fmt.Errorf("failed to create otel manager: %w", err)
241+
}
242+
coord := coordinator.New(log, cfg, logLevel, agentInfo, specs, reexec, upgrader, runtime, configMgr, varsManager, caps, monitor, isManaged, otelManager, actionAcker, initialUpgradeDetails, compModifiers...)
243+
>>>>>>> ff8047180 (fix: scheduled upgrade details state (#9562))
214244
if managed != nil {
215245
// the coordinator requires the config manager as well as in managed-mode the config manager requires the
216246
// coordinator, so it must be set here once the coordinator is created

internal/pkg/agent/application/application_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,11 @@ func TestLimitsLog(t *testing.T) {
6363
true, // testingMode
6464
time.Millisecond, // fleetInitTimeout
6565
true, // disable monitoring
66+
<<<<<<< HEAD
67+
=======
68+
nil, // no configuration overrides
69+
nil,
70+
>>>>>>> ff8047180 (fix: scheduled upgrade details state (#9562))
6671
)
6772
require.NoError(t, err)
6873

internal/pkg/agent/application/coordinator/coordinator.go

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -343,7 +343,29 @@ type UpdateComponentChange struct {
343343
}
344344

345345
// New creates a new coordinator.
346+
<<<<<<< HEAD
346347
func New(logger *logger.Logger, cfg *configuration.Configuration, logLevel logp.Level, agentInfo info.Agent, specs component.RuntimeSpecs, reexecMgr ReExecManager, upgradeMgr UpgradeManager, runtimeMgr RuntimeManager, configMgr ConfigManager, varsMgr VarsManager, caps capabilities.Capabilities, monitorMgr MonitorManager, isManaged bool, fleetAcker acker.Acker, modifiers ...ComponentsModifier) *Coordinator {
348+
=======
349+
func New(
350+
logger *logger.Logger,
351+
cfg *configuration.Configuration,
352+
logLevel logp.Level,
353+
agentInfo info.Agent,
354+
specs component.RuntimeSpecs,
355+
reexecMgr ReExecManager,
356+
upgradeMgr UpgradeManager,
357+
runtimeMgr RuntimeManager,
358+
configMgr ConfigManager,
359+
varsMgr VarsManager,
360+
caps capabilities.Capabilities,
361+
monitorMgr MonitorManager,
362+
isManaged bool,
363+
otelMgr OTelManager,
364+
fleetAcker acker.Acker,
365+
initialUpgradeDetails *details.Details,
366+
modifiers ...ComponentsModifier,
367+
) *Coordinator {
368+
>>>>>>> ff8047180 (fix: scheduled upgrade details state (#9562))
347369
var fleetState cproto.State
348370
var fleetMessage string
349371
if !isManaged {
@@ -352,11 +374,12 @@ func New(logger *logger.Logger, cfg *configuration.Configuration, logLevel logp.
352374
fleetMessage = "Not enrolled into Fleet"
353375
}
354376
state := State{
355-
State: agentclient.Starting,
356-
Message: "Starting",
357-
FleetState: fleetState,
358-
FleetMessage: fleetMessage,
359-
LogLevel: logLevel,
377+
State: agentclient.Starting,
378+
Message: "Starting",
379+
FleetState: fleetState,
380+
FleetMessage: fleetMessage,
381+
LogLevel: logLevel,
382+
UpgradeDetails: initialUpgradeDetails,
360383
}
361384
c := &Coordinator{
362385
logger: logger,

internal/pkg/agent/application/coordinator/coordinator_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1016,7 +1016,11 @@ func createCoordinator(t testing.TB, ctx context.Context, opts ...CoordinatorOpt
10161016
acker = &fakeActionAcker{}
10171017
}
10181018

1019+
<<<<<<< HEAD
10191020
coord := New(l, nil, logp.DebugLevel, ai, specs, &fakeReExecManager{}, upgradeManager, rm, cfgMgr, varsMgr, caps, monitoringMgr, o.managed, acker)
1021+
=======
1022+
coord := New(l, nil, logp.DebugLevel, ai, specs, &fakeReExecManager{}, upgradeManager, rm, cfgMgr, varsMgr, caps, monitoringMgr, o.managed, otelMgr, acker, nil)
1023+
>>>>>>> ff8047180 (fix: scheduled upgrade details state (#9562))
10201024
return coord, cfgMgr, varsMgr
10211025
}
10221026

0 commit comments

Comments
 (0)