From f3c044b42433887ce63d8b7ed04fa9d6bb74399c Mon Sep 17 00:00:00 2001 From: michel-laterman Date: Fri, 12 Sep 2025 14:03:50 -0700 Subject: [PATCH 1/5] Add agent_policy_id and policy_revision_idx to checkin requests Add the agent_policy_id and policy_revision_idx attributes to checkin requests. --- ...licy_revision_idx-to-checkin-requests.yaml | 35 ++++++++++ .../gateway/fleet/fleet_gateway.go | 59 ++++++++++++++-- .../gateway/fleet/fleet_gateway_test.go | 68 +++++++++++++++++++ internal/pkg/fleetapi/checkin_cmd.go | 14 ++-- 4 files changed, 164 insertions(+), 12 deletions(-) create mode 100644 changelog/fragments/1757710842-Add-agent_policy_id-and-policy_revision_idx-to-checkin-requests.yaml diff --git a/changelog/fragments/1757710842-Add-agent_policy_id-and-policy_revision_idx-to-checkin-requests.yaml b/changelog/fragments/1757710842-Add-agent_policy_id-and-policy_revision_idx-to-checkin-requests.yaml new file mode 100644 index 00000000000..bee580ca5b6 --- /dev/null +++ b/changelog/fragments/1757710842-Add-agent_policy_id-and-policy_revision_idx-to-checkin-requests.yaml @@ -0,0 +1,35 @@ +# Kind can be one of: +# - breaking-change: a change to previously-documented behavior +# - deprecation: functionality that is being removed in a later release +# - bug-fix: fixes a problem in a previous version +# - enhancement: extends functionality but does not break or fix existing behavior +# - feature: new functionality +# - known-issue: problems that we are aware of in a given version +# - security: impacts on the security of a product or a user’s deployment. +# - upgrade: important information for someone upgrading from a prior version +# - other: does not fit into any of the other categories +kind: feature + +# Change summary; a 80ish characters long description of the change. +summary: Add agent_policy_id and policy_revision_idx to checkin requests + +# Long description; in case the summary is not enough to describe the change +# this field accommodate a description without length limits. +# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment. +description: | + Add agent_policy_id and policy_revision_idx attributes to checkin requests. + These attributes are used to inform fleet-server of the policy id and revision that the agent is currently running. + Agents that use these policies no longer need to send acks for POLICY_CHANGE actions. + +# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc. +component: elastic-agent + +# PR URL; optional; the PR number that added the changeset. +# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added. +# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number. +# Please provide it if you are adding a fragment for a different PR. +#pr: https://github.com/owner/repo/1234 + +# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of). +# If not present is automatically filled by the tooling with the issue linked to the PR number. +issue: https://github.com/elastic/elastic-agent/issues/6446 diff --git a/internal/pkg/agent/application/gateway/fleet/fleet_gateway.go b/internal/pkg/agent/application/gateway/fleet/fleet_gateway.go index 2fd60a79ae6..905ce4186db 100644 --- a/internal/pkg/agent/application/gateway/fleet/fleet_gateway.go +++ b/internal/pkg/agent/application/gateway/fleet/fleet_gateway.go @@ -71,6 +71,7 @@ type stateStore interface { AckToken() string SetAckToken(ackToken string) Save() error + Action() fleetapi.Action } type FleetGateway struct { @@ -356,15 +357,21 @@ func (f *FleetGateway) execute(ctx context.Context) (*fleetapi.CheckinResponse, // Fix loglevel with the current log level used by coordinator ecsMeta.Elastic.Agent.LogLevel = state.LogLevel.String() + action := f.stateStore.Action() + agentPolicyID := getPolicyID(action) + policyRevisionIDX := getPolicyRevisionIDX(action) + // checkin cmd := fleetapi.NewCheckinCmd(f.agentInfo, f.client) req := &fleetapi.CheckinRequest{ - AckToken: ackToken, - Metadata: ecsMeta, - Status: agentStateToString(state.State), - Message: state.Message, - Components: components, - UpgradeDetails: state.UpgradeDetails, + AckToken: ackToken, + Metadata: ecsMeta, + Status: agentStateToString(state.State), + Message: state.Message, + Components: components, + UpgradeDetails: state.UpgradeDetails, + AgentPolicyID: agentPolicyID, + PolicyRevisionIDX: policyRevisionIDX, } resp, took, err := cmd.Execute(ctx, req) @@ -447,3 +454,43 @@ func RequestBackoff(done <-chan struct{}) backoff.Backoff { defaultFleetBackoffSettings.Max, ) } + +// getPolicyID will check that the passed action is a POLICY_CHANGE action and return the policy_id attribute of the policy as a string. +func getPolicyID(action fleetapi.Action) string { + policyChange, ok := action.(*fleetapi.ActionPolicyChange) + if !ok { + return "" + } + v, ok := policyChange.Data.Policy["policy_id"] + if !ok { + return "" + } + vv, ok := v.(string) + if !ok { + return "" + } + return vv +} + +// getPolicyRevisionIDX will check that the passed action is a POLICY_CHANGE action and return the policy_revision_idx attribute of the policy as an int64. +// The function will attempt to convert the attribute to int64 if int or float64 is used in order to prevent issues from serialization. +func getPolicyRevisionIDX(action fleetapi.Action) int64 { + policyChange, ok := action.(*fleetapi.ActionPolicyChange) + if !ok { + return 0 + } + v, ok := policyChange.Data.Policy["policy_revision_idx"] + if !ok { + return 0 + } + switch vv := v.(type) { + case int64: + return vv + case int: + return int64(vv) + case float64: + return int64(vv) + default: + return 0 + } +} diff --git a/internal/pkg/agent/application/gateway/fleet/fleet_gateway_test.go b/internal/pkg/agent/application/gateway/fleet/fleet_gateway_test.go index 217ac1ca457..dfcd40a93b7 100644 --- a/internal/pkg/agent/application/gateway/fleet/fleet_gateway_test.go +++ b/internal/pkg/agent/application/gateway/fleet/fleet_gateway_test.go @@ -377,6 +377,74 @@ func TestFleetGateway(t *testing.T) { default: } }) + + t.Run("sends agent_policy_id and policy_revision_idx", func(t *testing.T) { + ctx, cancel := context.WithCancel(t.Context()) + defer cancel() + + scheduler := scheduler.NewStepper() + client := newTestingClient() + + log, _ := loggertest.New("fleet_gateway") + + stateStore := newStateStore(t, log) + stateStore.SetAction(&fleetapi.ActionPolicyChange{ + ActionID: "test-action-id", + ActionType: fleetapi.ActionTypePolicyChange, + Data: fleetapi.ActionPolicyChangeData{ + Policy: map[string]interface{}{ + "policy_id": "test-policy-id", + "policy_revision_idx": 1, + }, + }, + }) + err := stateStore.Save() + require.NoError(t, err) + + gateway, err := newFleetGatewayWithScheduler( + log, + settings, + agentInfo, + client, + scheduler, + noop.New(), + emptyStateFetcher, + stateStore, + ) + require.NoError(t, err) + + waitFn := ackSeq( + client.Answer(func(headers http.Header, body io.Reader) (*http.Response, error) { + data, err := io.ReadAll(body) + require.NoError(t, err) + + var checkinRequest fleetapi.CheckinRequest + err = json.Unmarshal(data, &checkinRequest) + require.NoError(t, err) + + require.Equal(t, "test-policy-id", checkinRequest.AgentPolicyID) + require.Equal(t, int64(1), checkinRequest.PolicyRevisionIDX) + + resp := wrapStrToResp(http.StatusOK, `{ "actions": [] }`) + return resp, nil + }), + ) + + errCh := runFleetGateway(ctx, gateway) + + // Synchronize scheduler and acking of calls from the worker go routine. + scheduler.Next() + waitFn() + + cancel() + err = <-errCh + require.NoError(t, err) + select { + case actions := <-gateway.Actions(): + t.Errorf("Expected no actions, got %v", actions) + default: + } + }) } func TestRetriesOnFailures(t *testing.T) { diff --git a/internal/pkg/fleetapi/checkin_cmd.go b/internal/pkg/fleetapi/checkin_cmd.go index 16ce9afe671..fb204b6ad3a 100644 --- a/internal/pkg/fleetapi/checkin_cmd.go +++ b/internal/pkg/fleetapi/checkin_cmd.go @@ -41,12 +41,14 @@ type CheckinComponent struct { // CheckinRequest consists of multiple events reported to fleet ui. type CheckinRequest struct { - Status string `json:"status"` - AckToken string `json:"ack_token,omitempty"` - Metadata *info.ECSMeta `json:"local_metadata,omitempty"` - Message string `json:"message"` // V2 Agent message - Components []CheckinComponent `json:"components"` // V2 Agent components - UpgradeDetails *details.Details `json:"upgrade_details,omitempty"` + Status string `json:"status"` + AckToken string `json:"ack_token,omitempty"` + Metadata *info.ECSMeta `json:"local_metadata,omitempty"` + Message string `json:"message"` // V2 Agent message + Components []CheckinComponent `json:"components"` // V2 Agent components + UpgradeDetails *details.Details `json:"upgrade_details,omitempty"` + AgentPolicyID string `json:"agent_policy_id,omitempty"` + PolicyRevisionIDX int64 `json:"policy_revision_idx,omitempty"` } // SerializableEvent is a representation of the event to be send to the Fleet Server API via the checkin From 9535385b3f155028dd36c240351e1f41f5a19b48 Mon Sep 17 00:00:00 2001 From: michel-laterman Date: Fri, 12 Sep 2025 14:58:23 -0700 Subject: [PATCH 2/5] Remove policy change action acks --- .../handlers/handler_action_policy_change.go | 23 ++++++++----------- .../handler_action_policy_change_test.go | 5 ++-- 2 files changed, 12 insertions(+), 16 deletions(-) diff --git a/internal/pkg/agent/application/actions/handlers/handler_action_policy_change.go b/internal/pkg/agent/application/actions/handlers/handler_action_policy_change.go index 634f2020e55..5d89511e758 100644 --- a/internal/pkg/agent/application/actions/handlers/handler_action_policy_change.go +++ b/internal/pkg/agent/application/actions/handlers/handler_action_policy_change.go @@ -473,7 +473,6 @@ type policyChange struct { cfg *config.Config action fleetapi.Action acker acker.Acker - commit bool ackWatcher chan struct{} } @@ -482,9 +481,9 @@ func newPolicyChange( config *config.Config, action fleetapi.Action, acker acker.Acker, - commit bool) *policyChange { + makeCh bool) *policyChange { var ackWatcher chan struct{} - if commit { + if makeCh { // we don't need it otherwise ackWatcher = make(chan struct{}) } @@ -493,7 +492,6 @@ func newPolicyChange( cfg: config, action: action, acker: acker, - commit: true, ackWatcher: ackWatcher, } } @@ -502,22 +500,21 @@ func (l *policyChange) Config() *config.Config { return l.cfg } +// Ack sends an ack for the associated action if the results are expected. +// An ack will not be sent for a POLICY_CHANGE action, but will be when this method is used by UNENROLL actions. func (l *policyChange) Ack() error { - if l.action == nil { + if l.action == nil || l.ackWatcher == nil { return nil } err := l.acker.Ack(l.ctx, l.action) if err != nil { return err } - if l.commit { - err := l.acker.Commit(l.ctx) - if l.ackWatcher != nil && err == nil { - close(l.ackWatcher) - } - return err + err = l.acker.Commit(l.ctx) + if err == nil { + close(l.ackWatcher) } - return nil + return err } // WaitAck waits for policy change to be acked. @@ -525,7 +522,7 @@ func (l *policyChange) Ack() error { // Caller is responsible to use any reasonable deadline otherwise // function call can be endlessly blocking. func (l *policyChange) WaitAck(ctx context.Context) { - if !l.commit || l.ackWatcher == nil { + if l.ackWatcher == nil { return } diff --git a/internal/pkg/agent/application/actions/handlers/handler_action_policy_change_test.go b/internal/pkg/agent/application/actions/handlers/handler_action_policy_change_test.go index 9fdf7184e89..74c3dddfdc0 100644 --- a/internal/pkg/agent/application/actions/handlers/handler_action_policy_change_test.go +++ b/internal/pkg/agent/application/actions/handlers/handler_action_policy_change_test.go @@ -105,7 +105,7 @@ func TestPolicyAcked(t *testing.T) { agentInfo := &info.AgentInfo{} nullStore := &storage.NullStore{} - t.Run("Config change should ACK", func(t *testing.T) { + t.Run("Config change shouldn't ACK", func(t *testing.T) { ch := make(chan coordinator.ConfigChange, 1) tacker := &testAcker{} @@ -129,8 +129,7 @@ func TestPolicyAcked(t *testing.T) { require.NoError(t, change.Ack()) actions := tacker.Items() - assert.EqualValues(t, 1, len(actions)) - assert.Equal(t, actionID, actions[0]) + assert.Empty(t, actions) }) } From fe7ba4ed7f67d839eb49718b43ce0ce5f8f2e27b Mon Sep 17 00:00:00 2001 From: michel-laterman Date: Mon, 22 Sep 2025 09:00:52 -0700 Subject: [PATCH 3/5] Add ForcePolicyChangeAcks feature flag --- .../handlers/handler_action_policy_change.go | 14 +++++++--- .../handler_action_policy_change_test.go | 28 +++++++++++++++++++ .../handlers/handler_action_unenroll.go | 2 +- pkg/features/features.go | 26 ++++++++++++++++- 4 files changed, 64 insertions(+), 6 deletions(-) diff --git a/internal/pkg/agent/application/actions/handlers/handler_action_policy_change.go b/internal/pkg/agent/application/actions/handlers/handler_action_policy_change.go index 5d89511e758..3a8b1d2ec35 100644 --- a/internal/pkg/agent/application/actions/handlers/handler_action_policy_change.go +++ b/internal/pkg/agent/application/actions/handlers/handler_action_policy_change.go @@ -29,6 +29,7 @@ import ( "github.com/elastic/elastic-agent/internal/pkg/fleetapi/client" "github.com/elastic/elastic-agent/internal/pkg/remote" "github.com/elastic/elastic-agent/pkg/core/logger" + "github.com/elastic/elastic-agent/pkg/features" ) // PolicyChangeHandler is a handler for POLICY_CHANGE action. @@ -41,6 +42,7 @@ type PolicyChangeHandler struct { setters []actions.ClientSetter policyLogLevelSetter logLevelSetter coordinator *coordinator.Coordinator + forceAckFn func() bool // Disabled for 8.8.0 release in order to limit the surface // https://github.com/elastic/security-team/issues/6501 // // Last known valid signature validation key @@ -67,6 +69,7 @@ func NewPolicyChangeHandler( setters: setters, coordinator: coordinator, policyLogLevelSetter: policyLogLevelSetter, + forceAckFn: features.ForcePolicyChangeAcks, } } @@ -111,7 +114,7 @@ func (h *PolicyChangeHandler) Handle(ctx context.Context, a fleetapi.Action, ack return err } - h.ch <- newPolicyChange(ctx, c, a, acker, false) + h.ch <- newPolicyChange(ctx, c, a, acker, false, h.forceAckFn()) return nil } @@ -474,6 +477,7 @@ type policyChange struct { action fleetapi.Action acker acker.Acker ackWatcher chan struct{} + forceAck bool } func newPolicyChange( @@ -481,7 +485,8 @@ func newPolicyChange( config *config.Config, action fleetapi.Action, acker acker.Acker, - makeCh bool) *policyChange { + makeCh bool, + forceAck bool) *policyChange { var ackWatcher chan struct{} if makeCh { // we don't need it otherwise @@ -493,6 +498,7 @@ func newPolicyChange( action: action, acker: acker, ackWatcher: ackWatcher, + forceAck: forceAck, } } @@ -503,7 +509,7 @@ func (l *policyChange) Config() *config.Config { // Ack sends an ack for the associated action if the results are expected. // An ack will not be sent for a POLICY_CHANGE action, but will be when this method is used by UNENROLL actions. func (l *policyChange) Ack() error { - if l.action == nil || l.ackWatcher == nil { + if !l.forceAck || l.action == nil { return nil } err := l.acker.Ack(l.ctx, l.action) @@ -511,7 +517,7 @@ func (l *policyChange) Ack() error { return err } err = l.acker.Commit(l.ctx) - if err == nil { + if err == nil && l.ackWatcher != nil { close(l.ackWatcher) } return err diff --git a/internal/pkg/agent/application/actions/handlers/handler_action_policy_change_test.go b/internal/pkg/agent/application/actions/handlers/handler_action_policy_change_test.go index 74c3dddfdc0..03b74df56a7 100644 --- a/internal/pkg/agent/application/actions/handlers/handler_action_policy_change_test.go +++ b/internal/pkg/agent/application/actions/handlers/handler_action_policy_change_test.go @@ -131,6 +131,34 @@ func TestPolicyAcked(t *testing.T) { actions := tacker.Items() assert.Empty(t, actions) }) + t.Run("Config change acks when forced", func(t *testing.T) { + ch := make(chan coordinator.ConfigChange, 1) + tacker := &testAcker{} + + config := map[string]interface{}{"hello": "world"} + actionID := "abc123" + action := &fleetapi.ActionPolicyChange{ + ActionID: actionID, + ActionType: "POLICY_CHANGE", + Data: fleetapi.ActionPolicyChangeData{ + Policy: config, + }, + } + + cfg := configuration.DefaultConfiguration() + handler := NewPolicyChangeHandler(log, agentInfo, cfg, nullStore, ch, nilLogLevelSet(t), &coordinator.Coordinator{}) + handler.forceAckFn = func() bool { return true } + + err := handler.Handle(context.Background(), action, tacker) + require.NoError(t, err) + + change := <-ch + require.NoError(t, change.Ack()) + + actions := tacker.Items() + assert.Len(t, actions, 1) + assert.Equal(t, actionID, actions[0]) + }) } func TestPolicyChangeHandler_handlePolicyChange_FleetClientSettings(t *testing.T) { diff --git a/internal/pkg/agent/application/actions/handlers/handler_action_unenroll.go b/internal/pkg/agent/application/actions/handlers/handler_action_unenroll.go index 3f5e0cfed99..fd7dd55afac 100644 --- a/internal/pkg/agent/application/actions/handlers/handler_action_unenroll.go +++ b/internal/pkg/agent/application/actions/handlers/handler_action_unenroll.go @@ -93,7 +93,7 @@ func (h *Unenroll) handle(ctx context.Context, a fleetapi.Action, acker acker.Ac } // Generate empty policy change, this removing all the running components - unenrollPolicy := newPolicyChange(ctx, config.New(), a, acker, true) + unenrollPolicy := newPolicyChange(ctx, config.New(), a, acker, true, true) h.ch <- unenrollPolicy // backup action for future start to avoid starting fleet gateway loop diff --git a/pkg/features/features.go b/pkg/features/features.go index 1e547e73a47..c80ea361b09 100644 --- a/pkg/features/features.go +++ b/pkg/features/features.go @@ -36,7 +36,8 @@ type Flags struct { fqdn bool fqdnCallbacks map[string]BoolValueOnChangeCallback - tamperProtection bool + tamperProtection bool + forcePolicyChangeAcks bool } type cfg struct { @@ -48,6 +49,9 @@ type cfg struct { TamperProtection *struct { Enabled bool `json:"enabled" yaml:"enabled" config:"enabled"` } `json:"tamper_protection,omitempty" yaml:"tamper_protection,omitempty" config:"tamper_protection,omitempty"` + ForcePolicyChangeAcks struct { + Enabled bool `json:"enabled" yaml:"enabled" config:"enabled"` + } `json:"force_policy_change_acks" toml: "force_policy_change_acks" config:"force_policy_change_acks"` } `json:"features" yaml:"features" config:"features"` } `json:"agent" yaml:"agent" config:"agent"` } @@ -66,6 +70,13 @@ func (f *Flags) TamperProtection() bool { return f.tamperProtection } +func (f *Flags) ForcePolicyChangeAcks() bool { + f.mu.RLock() + defer f.mu.RUnlock() + + return f.forcePolicyChangeAcks +} + func (f *Flags) AsProto() *proto.Features { return &proto.Features{ Fqdn: &proto.FQDNFeature{ @@ -121,6 +132,13 @@ func (f *Flags) setTamperProtection(newValue bool) { f.tamperProtection = newValue } +func (f *Flags) setForcePolicyChangeAcks(newValue bool) { + f.mu.Lock() + defer f.mu.Unlock() + + f.forcePolicyChangeAcks = newValue +} + // setSource sets the source from he given cfg. func (f *Flags) setSource(c cfg) error { // Use JSON marshalling-unmarshalling to convert cfg to mapstr @@ -208,6 +226,7 @@ func Apply(c *config.Config) error { current.setFQDN(parsed.FQDN()) current.setTamperProtection(parsed.TamperProtection()) + current.setForcePolicyChangeAcks(parsed.ForcePolicyChangeAcks()) return err } @@ -220,3 +239,8 @@ func FQDN() bool { func TamperProtection() bool { return current.TamperProtection() } + +// ForcePolicyChangeAcks reports if the agent should force sending an ACK for POLICY_CHANGE actions. +func ForcePolicyChangeAcks() bool { + return current.ForcePolicyChangeAcks() +} From b585e91cbb4a5917cbc7389696d78a03b0517d9f Mon Sep 17 00:00:00 2001 From: michel-laterman Date: Mon, 22 Sep 2025 09:30:31 -0700 Subject: [PATCH 4/5] Change FF to explicitly disable acks --- ...licy_revision_idx-to-checkin-requests.yaml | 4 +-- .../handlers/handler_action_policy_change.go | 16 ++++----- .../handler_action_policy_change_test.go | 35 +++++++++++++++++-- .../handlers/handler_action_unenroll.go | 2 +- pkg/features/features.go | 34 +++++++++++------- 5 files changed, 65 insertions(+), 26 deletions(-) diff --git a/changelog/fragments/1757710842-Add-agent_policy_id-and-policy_revision_idx-to-checkin-requests.yaml b/changelog/fragments/1757710842-Add-agent_policy_id-and-policy_revision_idx-to-checkin-requests.yaml index bee580ca5b6..a267150c03b 100644 --- a/changelog/fragments/1757710842-Add-agent_policy_id-and-policy_revision_idx-to-checkin-requests.yaml +++ b/changelog/fragments/1757710842-Add-agent_policy_id-and-policy_revision_idx-to-checkin-requests.yaml @@ -19,7 +19,7 @@ summary: Add agent_policy_id and policy_revision_idx to checkin requests description: | Add agent_policy_id and policy_revision_idx attributes to checkin requests. These attributes are used to inform fleet-server of the policy id and revision that the agent is currently running. - Agents that use these policies no longer need to send acks for POLICY_CHANGE actions. + Add a feature flag to disable sending acks for POLICY_CHANGE actions on a future release. # Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc. component: elastic-agent @@ -28,7 +28,7 @@ component: elastic-agent # If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added. # NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number. # Please provide it if you are adding a fragment for a different PR. -#pr: https://github.com/owner/repo/1234 +pr: https://github.com/elastic/elastic-agent/pull/9931 # Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of). # If not present is automatically filled by the tooling with the issue linked to the PR number. diff --git a/internal/pkg/agent/application/actions/handlers/handler_action_policy_change.go b/internal/pkg/agent/application/actions/handlers/handler_action_policy_change.go index 3a8b1d2ec35..9bfb90ad7ea 100644 --- a/internal/pkg/agent/application/actions/handlers/handler_action_policy_change.go +++ b/internal/pkg/agent/application/actions/handlers/handler_action_policy_change.go @@ -42,7 +42,7 @@ type PolicyChangeHandler struct { setters []actions.ClientSetter policyLogLevelSetter logLevelSetter coordinator *coordinator.Coordinator - forceAckFn func() bool + disableAckFn func() bool // Disabled for 8.8.0 release in order to limit the surface // https://github.com/elastic/security-team/issues/6501 // // Last known valid signature validation key @@ -69,7 +69,7 @@ func NewPolicyChangeHandler( setters: setters, coordinator: coordinator, policyLogLevelSetter: policyLogLevelSetter, - forceAckFn: features.ForcePolicyChangeAcks, + disableAckFn: features.DisablePolicyChangeAcks, } } @@ -114,7 +114,7 @@ func (h *PolicyChangeHandler) Handle(ctx context.Context, a fleetapi.Action, ack return err } - h.ch <- newPolicyChange(ctx, c, a, acker, false, h.forceAckFn()) + h.ch <- newPolicyChange(ctx, c, a, acker, false, h.disableAckFn()) return nil } @@ -477,7 +477,7 @@ type policyChange struct { action fleetapi.Action acker acker.Acker ackWatcher chan struct{} - forceAck bool + disableAck bool } func newPolicyChange( @@ -486,7 +486,7 @@ func newPolicyChange( action fleetapi.Action, acker acker.Acker, makeCh bool, - forceAck bool) *policyChange { + disableAck bool) *policyChange { var ackWatcher chan struct{} if makeCh { // we don't need it otherwise @@ -498,7 +498,7 @@ func newPolicyChange( action: action, acker: acker, ackWatcher: ackWatcher, - forceAck: forceAck, + disableAck: disableAck, } } @@ -507,9 +507,9 @@ func (l *policyChange) Config() *config.Config { } // Ack sends an ack for the associated action if the results are expected. -// An ack will not be sent for a POLICY_CHANGE action, but will be when this method is used by UNENROLL actions. +// An ack will be sent for UNENROLL actions, or by POLICY_CHANGE actions if it has not been explicitly disabled. func (l *policyChange) Ack() error { - if !l.forceAck || l.action == nil { + if l.disableAck || l.action == nil { return nil } err := l.acker.Ack(l.ctx, l.action) diff --git a/internal/pkg/agent/application/actions/handlers/handler_action_policy_change_test.go b/internal/pkg/agent/application/actions/handlers/handler_action_policy_change_test.go index 03b74df56a7..dd671fb41ac 100644 --- a/internal/pkg/agent/application/actions/handlers/handler_action_policy_change_test.go +++ b/internal/pkg/agent/application/actions/handlers/handler_action_policy_change_test.go @@ -105,7 +105,7 @@ func TestPolicyAcked(t *testing.T) { agentInfo := &info.AgentInfo{} nullStore := &storage.NullStore{} - t.Run("Config change shouldn't ACK", func(t *testing.T) { + t.Run("Default: Config changes are ACKed", func(t *testing.T) { ch := make(chan coordinator.ConfigChange, 1) tacker := &testAcker{} @@ -119,6 +119,7 @@ func TestPolicyAcked(t *testing.T) { }, } + // Test default FF value cfg := configuration.DefaultConfiguration() handler := NewPolicyChangeHandler(log, agentInfo, cfg, nullStore, ch, nilLogLevelSet(t), &coordinator.Coordinator{}) @@ -129,7 +130,8 @@ func TestPolicyAcked(t *testing.T) { require.NoError(t, change.Ack()) actions := tacker.Items() - assert.Empty(t, actions) + assert.Len(t, actions, 1) + assert.Equal(t, actionID, actions[0]) }) t.Run("Config change acks when forced", func(t *testing.T) { ch := make(chan coordinator.ConfigChange, 1) @@ -147,7 +149,7 @@ func TestPolicyAcked(t *testing.T) { cfg := configuration.DefaultConfiguration() handler := NewPolicyChangeHandler(log, agentInfo, cfg, nullStore, ch, nilLogLevelSet(t), &coordinator.Coordinator{}) - handler.forceAckFn = func() bool { return true } + handler.disableAckFn = func() bool { return false } err := handler.Handle(context.Background(), action, tacker) require.NoError(t, err) @@ -159,6 +161,33 @@ func TestPolicyAcked(t *testing.T) { assert.Len(t, actions, 1) assert.Equal(t, actionID, actions[0]) }) + t.Run("Config change do not ack when disabled", func(t *testing.T) { + ch := make(chan coordinator.ConfigChange, 1) + tacker := &testAcker{} + + config := map[string]interface{}{"hello": "world"} + actionID := "abc123" + action := &fleetapi.ActionPolicyChange{ + ActionID: actionID, + ActionType: "POLICY_CHANGE", + Data: fleetapi.ActionPolicyChangeData{ + Policy: config, + }, + } + + cfg := configuration.DefaultConfiguration() + handler := NewPolicyChangeHandler(log, agentInfo, cfg, nullStore, ch, nilLogLevelSet(t), &coordinator.Coordinator{}) + handler.disableAckFn = func() bool { return true } + + err := handler.Handle(context.Background(), action, tacker) + require.NoError(t, err) + + change := <-ch + require.NoError(t, change.Ack()) + + actions := tacker.Items() + assert.Empty(t, actions) + }) } func TestPolicyChangeHandler_handlePolicyChange_FleetClientSettings(t *testing.T) { diff --git a/internal/pkg/agent/application/actions/handlers/handler_action_unenroll.go b/internal/pkg/agent/application/actions/handlers/handler_action_unenroll.go index fd7dd55afac..d05e325b68f 100644 --- a/internal/pkg/agent/application/actions/handlers/handler_action_unenroll.go +++ b/internal/pkg/agent/application/actions/handlers/handler_action_unenroll.go @@ -93,7 +93,7 @@ func (h *Unenroll) handle(ctx context.Context, a fleetapi.Action, acker acker.Ac } // Generate empty policy change, this removing all the running components - unenrollPolicy := newPolicyChange(ctx, config.New(), a, acker, true, true) + unenrollPolicy := newPolicyChange(ctx, config.New(), a, acker, true, false) h.ch <- unenrollPolicy // backup action for future start to avoid starting fleet gateway loop diff --git a/pkg/features/features.go b/pkg/features/features.go index c80ea361b09..bac9ec82c8c 100644 --- a/pkg/features/features.go +++ b/pkg/features/features.go @@ -21,6 +21,10 @@ import ( // 8.11+ - default is enabled const defaultTamperProtection = true +// The default value of the disable policy change acks flag if the flag is missing. +// 9.2 - disabled (acks are sent) +const defaultDisablePolicyChangeAcks = false + var ( current = Flags{ tamperProtection: defaultTamperProtection, @@ -36,8 +40,8 @@ type Flags struct { fqdn bool fqdnCallbacks map[string]BoolValueOnChangeCallback - tamperProtection bool - forcePolicyChangeAcks bool + tamperProtection bool + disablePolicyChangeAcks bool } type cfg struct { @@ -49,9 +53,9 @@ type cfg struct { TamperProtection *struct { Enabled bool `json:"enabled" yaml:"enabled" config:"enabled"` } `json:"tamper_protection,omitempty" yaml:"tamper_protection,omitempty" config:"tamper_protection,omitempty"` - ForcePolicyChangeAcks struct { + DisablePolicyChangeAcks struct { Enabled bool `json:"enabled" yaml:"enabled" config:"enabled"` - } `json:"force_policy_change_acks" toml: "force_policy_change_acks" config:"force_policy_change_acks"` + } `json:"disable_policy_change_acks" toml: "disable_policy_change_acks" config:"disable_policy_change_acks"` } `json:"features" yaml:"features" config:"features"` } `json:"agent" yaml:"agent" config:"agent"` } @@ -70,11 +74,11 @@ func (f *Flags) TamperProtection() bool { return f.tamperProtection } -func (f *Flags) ForcePolicyChangeAcks() bool { +func (f *Flags) DisablePolicyChangeAcks() bool { f.mu.RLock() defer f.mu.RUnlock() - return f.forcePolicyChangeAcks + return f.disablePolicyChangeAcks } func (f *Flags) AsProto() *proto.Features { @@ -132,11 +136,11 @@ func (f *Flags) setTamperProtection(newValue bool) { f.tamperProtection = newValue } -func (f *Flags) setForcePolicyChangeAcks(newValue bool) { +func (f *Flags) setDisablePolicyChangeAcks(newValue bool) { f.mu.Lock() defer f.mu.Unlock() - f.forcePolicyChangeAcks = newValue + f.disablePolicyChangeAcks = newValue } // setSource sets the source from he given cfg. @@ -204,6 +208,12 @@ func Parse(policy any) (*Flags, error) { flags.setTamperProtection(defaultTamperProtection) } + if parsedFlags.Agent.Features.DisablePolicyChangeAcks != nil { + flags.setDisablePolicyChangeAcks(parsedFlags.Agent.Features.DisablePolicyChangeAcks.Enabled) + } else { + flags.setDisablePolicyChangeAcks(defaultDisablePolicyChangeAcks) + } + if err := flags.setSource(parsedFlags); err != nil { return nil, fmt.Errorf("error creating feature flags source: %w", err) } @@ -226,7 +236,7 @@ func Apply(c *config.Config) error { current.setFQDN(parsed.FQDN()) current.setTamperProtection(parsed.TamperProtection()) - current.setForcePolicyChangeAcks(parsed.ForcePolicyChangeAcks()) + current.setDisablePolicyChangeAcks(parsed.DisablePolicyChangeAcks()) return err } @@ -240,7 +250,7 @@ func TamperProtection() bool { return current.TamperProtection() } -// ForcePolicyChangeAcks reports if the agent should force sending an ACK for POLICY_CHANGE actions. -func ForcePolicyChangeAcks() bool { - return current.ForcePolicyChangeAcks() +// DisablePolicyChangeAcks reports if the agent will stop using ACKs for POLICY_CHANGE actions. +func DisablePolicyChangeAcks() bool { + return current.DisablePolicyChangeAcks() } From 67a3b80c697c1c25a876d2d0187b02b54b47c6a9 Mon Sep 17 00:00:00 2001 From: michel-laterman Date: Mon, 22 Sep 2025 09:47:10 -0700 Subject: [PATCH 5/5] Fix feature flag --- pkg/features/features.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/features/features.go b/pkg/features/features.go index bac9ec82c8c..3d1ee3a0931 100644 --- a/pkg/features/features.go +++ b/pkg/features/features.go @@ -53,9 +53,9 @@ type cfg struct { TamperProtection *struct { Enabled bool `json:"enabled" yaml:"enabled" config:"enabled"` } `json:"tamper_protection,omitempty" yaml:"tamper_protection,omitempty" config:"tamper_protection,omitempty"` - DisablePolicyChangeAcks struct { + DisablePolicyChangeAcks *struct { Enabled bool `json:"enabled" yaml:"enabled" config:"enabled"` - } `json:"disable_policy_change_acks" toml: "disable_policy_change_acks" config:"disable_policy_change_acks"` + } `json:"disable_policy_change_acks" yaml:"disable_policy_change_acks" config:"disable_policy_change_acks"` } `json:"features" yaml:"features" config:"features"` } `json:"agent" yaml:"agent" config:"agent"` }