Skip to content

Commit 9aa2ba2

Browse files
Handle when agent checks in with revision idx that is too high
Handle the scenario when the agent checks in with a revision_idx value that is greater than the latest available policy in ES. Add E2E tests when using policy_id and revision_idx values in checkin.
1 parent 99f1c89 commit 9aa2ba2

File tree

6 files changed

+498
-14
lines changed

6 files changed

+498
-14
lines changed

internal/pkg/api/handleCheckin.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1144,7 +1144,7 @@ func calcPollDuration(zlog zerolog.Logger, pollDuration, setupDuration, jitterDu
11441144
}
11451145

11461146
// processPolicyDetails handles the agent_policy_id and revision_idx included in the checkin request.
1147-
// The API keys will be managed if the agent reports a new policy id from its last checkin, or if the revision is greater than what the last checkin reported.
1147+
// The API keys will be managed if the agent reports a new policy id from its last checkin, or if the revision is different than what the last checkin reported.
11481148
// It returns the revision idx that should be used when subscribing for new POLICY_CHANGE actons and optional args to use when doing the non-tick checkin.
11491149
func (ct *CheckinT) processPolicyDetails(ctx context.Context, zlog zerolog.Logger, agent *model.Agent, req *CheckinRequest) (int64, []checkin.Option, error) {
11501150
// no details specified
@@ -1162,7 +1162,7 @@ func (ct *CheckinT) processPolicyDetails(ctx context.Context, zlog zerolog.Logge
11621162

11631163
// update agent doc if policy id or revision idx does not match
11641164
var opts []checkin.Option
1165-
if policyID != agent.PolicyID || revisionIDX != agent.PolicyRevisionIdx {
1165+
if policyID != agent.AgentPolicyID || revisionIDX != agent.PolicyRevisionIdx {
11661166
opts = []checkin.Option{
11671167
checkin.WithAgentPolicyID(policyID),
11681168
checkin.WithPolicyRevisionIDX(revisionIDX),
@@ -1174,8 +1174,14 @@ func (ct *CheckinT) processPolicyDetails(ctx context.Context, zlog zerolog.Logge
11741174
return 0, opts, nil
11751175
}
11761176

1177-
// Update API keys if the policy has changed, or if the revision increments.
1178-
if policyID != agent.AgentPolicyID || revisionIDX > agent.PolicyRevisionIdx {
1177+
// Check if the checkin revision_idx is greater than the latest available
1178+
latestRev := ct.pm.LatestRev(ctx, agent.PolicyID)
1179+
if latestRev != 0 && revisionIDX > latestRev {
1180+
return 0, opts, nil
1181+
}
1182+
1183+
// Update API keys if the policy has changed, or if the revision differs.
1184+
if policyID != agent.AgentPolicyID || revisionIDX != agent.PolicyRevisionIdx {
11791185
for outputName, output := range agent.Outputs {
11801186
if output.Type != policy.OutputTypeElasticsearch {
11811187
continue

internal/pkg/api/handleCheckin_test.go

Lines changed: 80 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ import (
2626
"github.com/elastic/fleet-server/v7/internal/pkg/es"
2727
"github.com/elastic/fleet-server/v7/internal/pkg/model"
2828
mockmonitor "github.com/elastic/fleet-server/v7/internal/pkg/monitor/mock"
29-
"github.com/elastic/fleet-server/v7/internal/pkg/policy"
3029
"github.com/elastic/fleet-server/v7/internal/pkg/sqn"
3130
ftesting "github.com/elastic/fleet-server/v7/internal/pkg/testing"
3231
testcache "github.com/elastic/fleet-server/v7/internal/pkg/testing/cache"
@@ -39,6 +38,30 @@ import (
3938
"github.com/stretchr/testify/require"
4039
)
4140

41+
type mockPolicyMonitor struct {
42+
mock.Mock
43+
}
44+
45+
func (m *mockPolicyMonitor) Run(ctx context.Context) error {
46+
args := m.Called(ctx)
47+
return args.Error(0)
48+
}
49+
50+
func (m *mockPolicyMonitor) Subscribe(agentID, policyID string, revIDX int64) (Subscription, error) {
51+
args := m.Called(agentID, policyID, revIDX)
52+
return args.Get(0).(Subscription), args.Error(1)
53+
}
54+
55+
func (m *mockPolicyMonitor) Unsubscribe(sub Subscription) error {
56+
args := m.Called(sub)
57+
return args.Error(0)
58+
}
59+
60+
func (m *mockPolicyMonitor) LatestRev(ctx context.Context, id string) int64 {
61+
args := m.Called(ctx, id)
62+
return args.Get(0).(int64)
63+
}
64+
4265
func TestConvertActionData(t *testing.T) {
4366
tests := []struct {
4467
name string
@@ -341,12 +364,13 @@ func TestResolveSeqNo(t *testing.T) {
341364
bc := checkin.NewBulk(nil)
342365
bulker := ftesting.NewMockBulk()
343366
pim := mockmonitor.NewMockMonitor()
344-
pm := policy.NewMonitor(bulker, pim, config.ServerLimits{PolicyLimit: config.Limit{Interval: 5 * time.Millisecond, Burst: 1}})
367+
pm := &mockPolicyMonitor{}
345368
ct, err := NewCheckinT(verCon, cfg, c, bc, pm, nil, nil, nil)
346369
assert.NoError(t, err)
347370

348371
resp, _ := ct.resolveSeqNo(ctx, logger, tc.req, tc.agent)
349372
assert.Equal(t, tc.resp, resp)
373+
pm.AssertExpectations(t)
350374
})
351375
}
352376

@@ -1123,18 +1147,22 @@ func TestProcessPolicyDetails(t *testing.T) {
11231147
policyID := "policy-id"
11241148
revIDX2 := int64(2)
11251149
tests := []struct {
1126-
name string
1127-
agent *model.Agent
1128-
req *CheckinRequest
1129-
revIDX int64
1130-
returnsOps bool
1131-
err error
1150+
name string
1151+
agent *model.Agent
1152+
req *CheckinRequest
1153+
getPolicyMonitor func() *mockPolicyMonitor
1154+
revIDX int64
1155+
returnsOps bool
1156+
err error
11321157
}{{
11331158
name: "request has no policy details",
11341159
agent: &model.Agent{
11351160
PolicyRevisionIdx: 1,
11361161
},
1137-
req: &CheckinRequest{},
1162+
req: &CheckinRequest{},
1163+
getPolicyMonitor: func() *mockPolicyMonitor {
1164+
return &mockPolicyMonitor{}
1165+
},
11381166
revIDX: 1,
11391167
returnsOps: false,
11401168
err: nil,
@@ -1152,6 +1180,9 @@ func TestProcessPolicyDetails(t *testing.T) {
11521180
AgentPolicyId: &policyID,
11531181
PolicyRevisionIdx: &revIDX2,
11541182
},
1183+
getPolicyMonitor: func() *mockPolicyMonitor {
1184+
return &mockPolicyMonitor{}
1185+
},
11551186
revIDX: 0,
11561187
returnsOps: true,
11571188
err: nil,
@@ -1169,9 +1200,36 @@ func TestProcessPolicyDetails(t *testing.T) {
11691200
AgentPolicyId: &policyID,
11701201
PolicyRevisionIdx: &revIDX2,
11711202
},
1203+
getPolicyMonitor: func() *mockPolicyMonitor {
1204+
pm := &mockPolicyMonitor{}
1205+
pm.On("LatestRev", mock.Anything, policyID).Return(int64(2)).Once()
1206+
return pm
1207+
},
11721208
revIDX: 2,
11731209
returnsOps: true,
11741210
err: nil,
1211+
}, {
1212+
name: "checkin revision is greater than the policy's latest revision",
1213+
agent: &model.Agent{
1214+
Agent: &model.AgentMetadata{
1215+
ID: "agent-id",
1216+
},
1217+
PolicyID: policyID,
1218+
AgentPolicyID: policyID,
1219+
PolicyRevisionIdx: 1,
1220+
},
1221+
req: &CheckinRequest{
1222+
AgentPolicyId: &policyID,
1223+
PolicyRevisionIdx: &revIDX2,
1224+
},
1225+
getPolicyMonitor: func() *mockPolicyMonitor {
1226+
pm := &mockPolicyMonitor{}
1227+
pm.On("LatestRev", mock.Anything, policyID).Return(int64(2)).Once()
1228+
return pm
1229+
},
1230+
revIDX: 0,
1231+
returnsOps: true,
1232+
err: nil,
11751233
}, {
11761234
name: "agent_policy_id has changed",
11771235
agent: &model.Agent{
@@ -1202,6 +1260,11 @@ func TestProcessPolicyDetails(t *testing.T) {
12021260
AgentPolicyId: &policyID,
12031261
PolicyRevisionIdx: &revIDX2,
12041262
},
1263+
getPolicyMonitor: func() *mockPolicyMonitor {
1264+
pm := &mockPolicyMonitor{}
1265+
pm.On("LatestRev", mock.Anything, policyID).Return(int64(2)).Once()
1266+
return pm
1267+
},
12051268
revIDX: 2,
12061269
returnsOps: false,
12071270
err: nil,
@@ -1219,6 +1282,11 @@ func TestProcessPolicyDetails(t *testing.T) {
12191282
AgentPolicyId: &policyID,
12201283
PolicyRevisionIdx: &revIDX2,
12211284
},
1285+
getPolicyMonitor: func() *mockPolicyMonitor {
1286+
pm := &mockPolicyMonitor{}
1287+
pm.On("LatestRev", mock.Anything, policyID).Return(int64(2)).Once()
1288+
return pm
1289+
},
12221290
revIDX: 2,
12231291
returnsOps: false,
12241292
err: nil,
@@ -1227,8 +1295,10 @@ func TestProcessPolicyDetails(t *testing.T) {
12271295
for _, tc := range tests {
12281296
t.Run(tc.name, func(t *testing.T) {
12291297
logger := testlog.SetLogger(t)
1298+
pm := tc.getPolicyMonitor()
12301299
checkin := &CheckinT{
12311300
bulker: ftesting.NewMockBulk(),
1301+
pm: pm,
12321302
}
12331303

12341304
revIDX, opts, err := checkin.processPolicyDetails(t.Context(), logger, tc.agent, tc.req)
@@ -1243,6 +1313,7 @@ func TestProcessPolicyDetails(t *testing.T) {
12431313
} else {
12441314
assert.NoError(t, err)
12451315
}
1316+
pm.AssertExpectations(t)
12461317
})
12471318
}
12481319
}

internal/pkg/policy/monitor.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,9 @@ type Monitor interface {
6262

6363
// Unsubscribe removes the current subscription.
6464
Unsubscribe(sub Subscription) error
65+
66+
// LatestRev returns the latest revision idx for the specified policy.
67+
LatestRev(ctx context.Context, policyID string) int64
6568
}
6669

6770
type policyFetcher func(ctx context.Context, bulker bulk.Bulk, opt ...dl.Option) ([]model.Policy, error)
@@ -557,3 +560,34 @@ func (m *monitorT) Unsubscribe(sub Subscription) error {
557560

558561
return nil
559562
}
563+
564+
// LatestRev returns the revision_idx for the passed policy ID.
565+
// If the policy does not exist in the map, then all policies are foribly reloaded.
566+
// On an error with the reload, or if the policy does not exist a 0 is returned.
567+
func (m *monitorT) LatestRev(ctx context.Context, id string) int64 {
568+
if id == "" {
569+
return 0
570+
}
571+
572+
m.mut.Lock()
573+
p, ok := m.policies[id]
574+
m.mut.Unlock()
575+
576+
if !ok {
577+
// We've not seen this policy before, force load.
578+
err := m.loadPolicies(ctx)
579+
if err != nil {
580+
m.log.Error().Err(err).Str(ecs.PolicyID, id).Msg("Unable to load policies.")
581+
return 0
582+
}
583+
584+
m.mut.Lock()
585+
p, ok = m.policies[id]
586+
m.mut.Unlock()
587+
if !ok {
588+
m.log.Warn().Str(ecs.PolicyID, id).Msg("Unable to find policy after load.")
589+
return 0
590+
}
591+
}
592+
return p.pp.Policy.RevisionIdx
593+
}

internal/pkg/policy/monitor_test.go

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ package policy
99
import (
1010
"context"
1111
"encoding/json"
12+
"fmt"
1213
"sync"
1314
"testing"
1415
"time"
@@ -549,3 +550,76 @@ LOOP:
549550
ms.AssertExpectations(t)
550551
mm.AssertExpectations(t)
551552
}
553+
554+
func TestMonitor_LatestRev(t *testing.T) {
555+
t.Run("empty policy id", func(t *testing.T) {
556+
pm := &monitorT{}
557+
idx := pm.LatestRev(t.Context(), "")
558+
assert.Equal(t, int64(0), idx)
559+
})
560+
561+
t.Run("policy load error", func(t *testing.T) {
562+
bulker := ftesting.NewMockBulk()
563+
mm := mmock.NewMockMonitor()
564+
monitor := NewMonitor(bulker, mm, config.ServerLimits{})
565+
pm := monitor.(*monitorT)
566+
pm.policyF = func(ctx context.Context, bulker bulk.Bulk, opt ...dl.Option) ([]model.Policy, error) {
567+
return nil, fmt.Errorf("policy fetch error")
568+
}
569+
570+
idx := pm.LatestRev(t.Context(), "test-id")
571+
assert.Equal(t, int64(0), idx)
572+
})
573+
574+
t.Run("policy not found", func(t *testing.T) {
575+
bulker := ftesting.NewMockBulk()
576+
mm := mmock.NewMockMonitor()
577+
monitor := NewMonitor(bulker, mm, config.ServerLimits{})
578+
pm := monitor.(*monitorT)
579+
pm.policyF = func(ctx context.Context, bulker bulk.Bulk, opt ...dl.Option) ([]model.Policy, error) {
580+
return []model.Policy{}, nil
581+
}
582+
idx := pm.LatestRev(t.Context(), "test-id")
583+
assert.Equal(t, int64(0), idx)
584+
})
585+
586+
t.Run("policy found after load", func(t *testing.T) {
587+
bulker := ftesting.NewMockBulk()
588+
mm := mmock.NewMockMonitor()
589+
monitor := NewMonitor(bulker, mm, config.ServerLimits{})
590+
pm := monitor.(*monitorT)
591+
policyId := uuid.Must(uuid.NewV4()).String()
592+
rId := xid.New().String()
593+
policy := model.Policy{
594+
ESDocument: model.ESDocument{
595+
Id: rId,
596+
Version: 1,
597+
SeqNo: 1,
598+
},
599+
PolicyID: policyId,
600+
Data: policyDataDefault,
601+
RevisionIdx: 2,
602+
}
603+
pm.policyF = func(ctx context.Context, bulker bulk.Bulk, opt ...dl.Option) ([]model.Policy, error) {
604+
return []model.Policy{policy}, nil
605+
}
606+
idx := pm.LatestRev(t.Context(), policyId)
607+
assert.Equal(t, int64(2), idx)
608+
})
609+
610+
t.Run("policy found", func(t *testing.T) {
611+
pm := &monitorT{
612+
policies: map[string]policyT{
613+
"test-id": policyT{
614+
pp: ParsedPolicy{
615+
Policy: model.Policy{
616+
RevisionIdx: 1,
617+
},
618+
},
619+
},
620+
},
621+
}
622+
idx := pm.LatestRev(t.Context(), "test-id")
623+
assert.Equal(t, int64(1), idx)
624+
})
625+
}

0 commit comments

Comments
 (0)