Skip to content

Commit f0dcf41

Browse files
Add agent_policy_id and policy_revision_idx to checkin attributes
Allow the agents to add their currently running policy_id and revision_idx attributes to the checkin request bodies. These attributes, if included and different from the agent doc will be used when updating the agent doc in the pre-poll checkin. If the agent's policy id does not match the expected policy id from the server a reassign is detected and a new policy change action will be sent. If the checkin ID is greater than what was previously recorded or the policy id changes from what was previously recoreded, then the api keys will be managed.
1 parent 5b5468a commit f0dcf41

File tree

11 files changed

+386
-150
lines changed

11 files changed

+386
-150
lines changed
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
# Kind can be one of:
2+
# - breaking-change: a change to previously-documented behavior
3+
# - deprecation: functionality that is being removed in a later release
4+
# - bug-fix: fixes a problem in a previous version
5+
# - enhancement: extends functionality but does not break or fix existing behavior
6+
# - feature: new functionality
7+
# - known-issue: problems that we are aware of in a given version
8+
# - security: impacts on the security of a product or a user’s deployment.
9+
# - upgrade: important information for someone upgrading from a prior version
10+
# - other: does not fit into any of the other categories
11+
kind: feature
12+
13+
# Change summary; a 80ish characters long description of the change.
14+
summary: Add agent_policy_id and policy_revision_idx to checkin requests
15+
16+
# Long description; in case the summary is not enough to describe the change
17+
# this field accommodate a description without length limits.
18+
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
19+
description: |
20+
Add the agent_policy_id and policy_revision_idx attributes to checkin
21+
request bodies so an agent is able to inform fleet-server of its exact
22+
policy. These details will replace the need for an ack on
23+
policy_change actions, and will be used to determine when to send a
24+
policy change when there is a new revision available, or when the
25+
agent is reassigned to a different policy.
26+
27+
# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
28+
component: fleet-server
29+
30+
# PR URL; optional; the PR number that added the changeset.
31+
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
32+
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
33+
# Please provide it if you are adding a fragment for a different PR.
34+
pr: https://github.com/elastic/fleet-server/pull/5501
35+
36+
# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
37+
# If not present is automatically filled by the tooling with the issue linked to the PR number.
38+
issue: https://github.com/elastic/elastic-agent/issues/6446

internal/pkg/api/handleAck.go

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -442,19 +442,28 @@ func (ack *AckT) updateAPIKey(ctx context.Context,
442442
agentID string,
443443
apiKeyID, permissionHash string,
444444
toRetireAPIKeyIDs []model.ToRetireAPIKeyIdsItems, outputName string) error {
445-
bulk := ack.bulk
445+
return updateAPIKey(ctx, zlog, ack.bulk, agentID, apiKeyID, permissionHash, toRetireAPIKeyIDs, outputName)
446+
}
447+
448+
func updateAPIKey(ctx context.Context,
449+
zlog zerolog.Logger,
450+
bulk bulk.Bulk,
451+
agentID string,
452+
apiKeyID, permissionHash string,
453+
toRetireAPIKeyIDs []model.ToRetireAPIKeyIdsItems, outputName string) error {
446454
// use output bulker if exists
455+
outBulk := bulk
447456
if outputName != "" {
448-
outputBulk := ack.bulk.GetBulker(outputName)
457+
outputBulk := bulk.GetBulker(outputName)
449458
if outputBulk != nil {
450459
zlog.Debug().Str(ecs.PolicyOutputName, outputName).Msg("Using output bulker in updateAPIKey")
451-
bulk = outputBulk
460+
outBulk = outputBulk
452461
}
453462
}
454463
if apiKeyID != "" {
455-
res, err := bulk.APIKeyRead(ctx, apiKeyID, true)
464+
res, err := outBulk.APIKeyRead(ctx, apiKeyID, true)
456465
if err != nil {
457-
if isAgentActive(ctx, zlog, ack.bulk, agentID) {
466+
if isAgentActive(ctx, zlog, outBulk, agentID) {
458467
zlog.Warn().
459468
Err(err).
460469
Str(LogAPIKeyID, apiKeyID).
@@ -480,7 +489,7 @@ func (ack *AckT) updateAPIKey(ctx context.Context,
480489
Str(LogAPIKeyID, apiKeyID).
481490
Msg("Failed to cleanup roles")
482491
} else if removedRolesCount > 0 {
483-
if err := bulk.APIKeyUpdate(ctx, apiKeyID, permissionHash, clean); err != nil {
492+
if err := outBulk.APIKeyUpdate(ctx, apiKeyID, permissionHash, clean); err != nil {
484493
zlog.Error().Err(err).RawJSON("roles", clean).Str(LogAPIKeyID, apiKeyID).Str(ecs.PolicyOutputName, outputName).Msg("Failed to update API Key")
485494
} else {
486495
zlog.Debug().
@@ -493,7 +502,7 @@ func (ack *AckT) updateAPIKey(ctx context.Context,
493502
}
494503
}
495504
}
496-
ack.invalidateAPIKeys(ctx, zlog, toRetireAPIKeyIDs, apiKeyID)
505+
invalidateAPIKeys(ctx, zlog, bulk, toRetireAPIKeyIDs, apiKeyID)
497506
}
498507

499508
return nil

internal/pkg/api/handleCheckin.go

Lines changed: 68 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -278,16 +278,34 @@ func (ct *CheckinT) ProcessRequest(zlog zerolog.Logger, w http.ResponseWriter, r
278278
return fmt.Errorf("failed to update upgrade_details: %w", err)
279279
}
280280

281+
initialOpts := []checkin.Option{
282+
checkin.WithStatus(string(req.Status)),
283+
checkin.WithMessage(req.Message),
284+
checkin.WithMeta(rawMeta),
285+
checkin.WithComponents(rawComponents),
286+
checkin.WithSeqNo(seqno),
287+
checkin.WithVer(ver),
288+
checkin.WithUnhealthyReason(unhealthyReason),
289+
checkin.WithDeleteAudit(agent.AuditUnenrolledReason != "" || agent.UnenrolledAt != ""),
290+
}
291+
292+
revID, opts, err := ct.processPolicyDetails(r.Context(), zlog, agent, req)
293+
if err != nil {
294+
return fmt.Errorf("failed to update policy details: %w", err)
295+
}
296+
if len(opts) > 0 {
297+
initialOpts = append(initialOpts, opts...)
298+
}
299+
281300
// Subscribe to actions dispatcher
282301
aSub := ct.ad.Subscribe(zlog, agent.Id, seqno)
283302
defer ct.ad.Unsubscribe(zlog, aSub)
284303
actCh := aSub.Ch()
285304

286-
// use revision_idx=0 if the agent has a single output where no API key is defined
287-
// This will force the policy monitor to emit a new policy to regerate API keys
288-
revID := agent.PolicyRevisionIdx
289305
for _, output := range agent.Outputs {
290306
if output.APIKey == "" {
307+
// use revision_idx=0 if the agent has a single output where no API key is defined
308+
// This will force the policy monitor to emit a new policy to regerate API keys
291309
revID = 0
292310
break
293311
}
@@ -327,7 +345,7 @@ func (ct *CheckinT) ProcessRequest(zlog zerolog.Logger, w http.ResponseWriter, r
327345
// Initial update on checkin, and any user fields that might have changed
328346
// Run a script to remove audit_unenrolled_* and unenrolled_at attributes if one is set on checkin.
329347
// 8.16.x releases would incorrectly set unenrolled_at
330-
err = ct.bc.CheckIn(agent.Id, checkin.WithStatus(string(req.Status)), checkin.WithMessage(req.Message), checkin.WithMeta(rawMeta), checkin.WithComponents(rawComponents), checkin.WithSeqNo(seqno), checkin.WithVer(ver), checkin.WithUnhealthyReason(unhealthyReason), checkin.WithDeleteAudit(agent.AuditUnenrolledReason != "" || agent.UnenrolledAt != ""))
348+
err = ct.bc.CheckIn(agent.Id, initialOpts...)
331349
if err != nil {
332350
zlog.Error().Err(err).Str(ecs.AgentID, agent.Id).Msg("checkin failed")
333351
}
@@ -1123,3 +1141,49 @@ func calcPollDuration(zlog zerolog.Logger, pollDuration, setupDuration, jitterDu
11231141

11241142
return pollDuration, jitter
11251143
}
1144+
1145+
// processPolicyDetails handles the agent_policy_id and revision_idx included in the checkin request.
1146+
// The API keys will be managed if the agent reports a new policy id from its last checkin, or if the revision is greater than what the last checkin reported.
1147+
// It returns the revision idx that should be used when subscribing for new POLICY_CHANGE actons and optional args to use when doing the non-tick checkin.
1148+
func (ct *CheckinT) processPolicyDetails(ctx context.Context, zlog zerolog.Logger, agent *model.Agent, req *CheckinRequest) (int64, []checkin.Option, error) {
1149+
// no details specified
1150+
if req == nil || req.PolicyRevisionIdx == nil || req.AgentPolicyId == nil {
1151+
return agent.PolicyRevisionIdx, nil, nil
1152+
}
1153+
policyID := *req.AgentPolicyId
1154+
revisionIDX := *req.PolicyRevisionIdx
1155+
1156+
span, ctx := apm.StartSpan(ctx, "Process policy details", "process")
1157+
span.Context.SetLabel("agent_id", agent.Agent.ID)
1158+
span.Context.SetLabel(dl.FieldAgentPolicyID, policyID)
1159+
span.Context.SetLabel(dl.FieldPolicyRevisionIdx, revisionIDX)
1160+
defer span.End()
1161+
1162+
// update agent doc if policy id or revision idx does not match
1163+
var opts []checkin.Option
1164+
if policyID != agent.PolicyID || revisionIDX != agent.PolicyRevisionIdx {
1165+
opts = []checkin.Option{
1166+
checkin.WithAgentPolicyID(policyID),
1167+
checkin.WithPolicyRevisionIDX(revisionIDX),
1168+
}
1169+
}
1170+
// Policy reassign, subscribe to policy with revision 0
1171+
if policyID != agent.PolicyID {
1172+
zlog.Debug().Str(dl.FieldAgentPolicyID, policyID).Str("new_policy_id", agent.PolicyID).Msg("Policy ID mismatch detected, reassigning agent.")
1173+
return 0, opts, nil
1174+
}
1175+
1176+
// Update API keys if the policy has changed, or if the revision increments.
1177+
if policyID != agent.AgentPolicyID || revisionIDX > agent.PolicyRevisionIdx {
1178+
for outputName, output := range agent.Outputs {
1179+
if output.Type != policy.OutputTypeElasticsearch {
1180+
continue
1181+
}
1182+
if err := updateAPIKey(ctx, zlog, ct.bulker, agent.Id, output.APIKeyID, output.PermissionsHash, output.ToRetireAPIKeyIds, outputName); err != nil {
1183+
// Only returns ErrUpdatingInactiveAgent
1184+
return 0, nil, err
1185+
}
1186+
}
1187+
}
1188+
return revisionIDX, opts, nil
1189+
}

internal/pkg/api/handleCheckin_test.go

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1118,3 +1118,131 @@ func TestValidateCheckinRequest(t *testing.T) {
11181118
})
11191119
}
11201120
}
1121+
1122+
func TestProcessPolicyDetails(t *testing.T) {
1123+
policyID := "policy-id"
1124+
revIDX2 := int64(2)
1125+
tests := []struct {
1126+
name string
1127+
agent *model.Agent
1128+
req *CheckinRequest
1129+
revIDX int64
1130+
returnsOps bool
1131+
err error
1132+
}{{
1133+
name: "request has no policy details",
1134+
agent: &model.Agent{
1135+
PolicyRevisionIdx: 1,
1136+
},
1137+
req: &CheckinRequest{},
1138+
revIDX: 1,
1139+
returnsOps: false,
1140+
err: nil,
1141+
}, {
1142+
name: "policy reassign detected",
1143+
agent: &model.Agent{
1144+
Agent: &model.AgentMetadata{
1145+
ID: "agent-id",
1146+
},
1147+
PolicyID: "new-policy-id",
1148+
AgentPolicyID: policyID,
1149+
PolicyRevisionIdx: 2,
1150+
},
1151+
req: &CheckinRequest{
1152+
AgentPolicyId: &policyID,
1153+
PolicyRevisionIdx: &revIDX2,
1154+
},
1155+
revIDX: 0,
1156+
returnsOps: true,
1157+
err: nil,
1158+
}, {
1159+
name: "revision updated",
1160+
agent: &model.Agent{
1161+
Agent: &model.AgentMetadata{
1162+
ID: "agent-id",
1163+
},
1164+
PolicyID: policyID,
1165+
AgentPolicyID: policyID,
1166+
PolicyRevisionIdx: 1,
1167+
},
1168+
req: &CheckinRequest{
1169+
AgentPolicyId: &policyID,
1170+
PolicyRevisionIdx: &revIDX2,
1171+
},
1172+
revIDX: 2,
1173+
returnsOps: true,
1174+
err: nil,
1175+
}, {
1176+
name: "agent_policy_id has changed",
1177+
agent: &model.Agent{
1178+
Agent: &model.AgentMetadata{
1179+
ID: "agent-id",
1180+
},
1181+
PolicyID: policyID,
1182+
AgentPolicyID: "old-policy-id",
1183+
PolicyRevisionIdx: 1,
1184+
},
1185+
req: &CheckinRequest{
1186+
AgentPolicyId: &policyID,
1187+
PolicyRevisionIdx: &revIDX2,
1188+
},
1189+
revIDX: 2,
1190+
returnsOps: true,
1191+
err: nil,
1192+
}, {
1193+
name: "agent does not have agent_policy_id present",
1194+
agent: &model.Agent{
1195+
Agent: &model.AgentMetadata{
1196+
ID: "agent-id",
1197+
},
1198+
PolicyID: policyID,
1199+
PolicyRevisionIdx: 2,
1200+
},
1201+
req: &CheckinRequest{
1202+
AgentPolicyId: &policyID,
1203+
PolicyRevisionIdx: &revIDX2,
1204+
},
1205+
revIDX: 2,
1206+
returnsOps: false,
1207+
err: nil,
1208+
}, {
1209+
name: "details present with no changes from agent doc",
1210+
agent: &model.Agent{
1211+
Agent: &model.AgentMetadata{
1212+
ID: "agent-id",
1213+
},
1214+
AgentPolicyID: policyID,
1215+
PolicyID: policyID,
1216+
PolicyRevisionIdx: revIDX2,
1217+
},
1218+
req: &CheckinRequest{
1219+
AgentPolicyId: &policyID,
1220+
PolicyRevisionIdx: &revIDX2,
1221+
},
1222+
revIDX: 2,
1223+
returnsOps: false,
1224+
err: nil,
1225+
}}
1226+
1227+
for _, tc := range tests {
1228+
t.Run(tc.name, func(t *testing.T) {
1229+
logger := testlog.SetLogger(t)
1230+
checkin := &CheckinT{
1231+
bulker: ftesting.NewMockBulk(),
1232+
}
1233+
1234+
revIDX, opts, err := checkin.processPolicyDetails(t.Context(), logger, tc.agent, tc.req)
1235+
assert.Equal(t, tc.revIDX, revIDX)
1236+
if tc.returnsOps {
1237+
assert.NotEmpty(t, opts)
1238+
} else {
1239+
assert.Empty(t, opts)
1240+
}
1241+
if tc.err != nil {
1242+
assert.ErrorIs(t, tc.err, err)
1243+
} else {
1244+
assert.NoError(t, err)
1245+
}
1246+
})
1247+
}
1248+
}

internal/pkg/api/openapi.gen.go

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)