Skip to content

Commit e747bc1

Browse files
Add cancelations to policy monitor dispatches (#5111)
Add support for canceling dispatching policies to agents. This occurs whenever a new policy is recieved by the policy monitor. This allows the monitor to stop sending revision N when N+1 is recieved.
1 parent fae7420 commit e747bc1

File tree

3 files changed

+195
-11
lines changed

3 files changed

+195
-11
lines changed
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
# Kind can be one of:
2+
# - breaking-change: a change to previously-documented behavior
3+
# - deprecation: functionality that is being removed in a later release
4+
# - bug-fix: fixes a problem in a previous version
5+
# - enhancement: extends functionality but does not break or fix existing behavior
6+
# - feature: new functionality
7+
# - known-issue: problems that we are aware of in a given version
8+
# - security: impacts on the security of a product or a user’s deployment.
9+
# - upgrade: important information for someone upgrading from a prior version
10+
# - other: does not fit into any of the other categories
11+
kind: enhancement
12+
13+
# Change summary; a 80ish characters long description of the change.
14+
summary: Cancel policy dispatches when a new revision arrives
15+
16+
# Long description; in case the summary is not enough to describe the change
17+
# this field accommodate a description without length limits.
18+
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
19+
description: |
20+
The policy monitor may cancel dispatches to the pending queue when
21+
new output is recieved. This allows the cancellation of sending
22+
revision N to agents when N+1 is recieved.
23+
24+
# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
25+
component: fleet-server
26+
27+
# PR URL; optional; the PR number that added the changeset.
28+
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
29+
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
30+
# Please provide it if you are adding a fragment for a different PR.
31+
#pr: https://github.com/owner/repo/1234
32+
33+
# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
34+
# If not present is automatically filled by the tooling with the issue linked to the PR number.
35+
issue: https://github.com/elastic/fleet-server/issues/3254

internal/pkg/policy/monitor.go

Lines changed: 52 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,8 @@ type monitorT struct {
8888
policiesIndex string
8989
limit *rate.Limiter
9090

91-
startCh chan struct{}
91+
startCh chan struct{}
92+
dispatchCh chan struct{}
9293
}
9394

9495
// NewMonitor creates the policy monitor for subscribing agents.
@@ -135,14 +136,17 @@ func (m *monitorT) Run(ctx context.Context) error {
135136

136137
close(m.startCh)
137138

138-
var iCtx context.Context
139+
// use a cancellable context so we can stop dispatching changes if a new hit is received.
140+
// the cancel func is manually called before return, or after policies have been dispatched.
141+
iCtx, iCancel := context.WithCancel(ctx)
139142
var trans *apm.Transaction
140143
LOOP:
141144
for {
142145
m.log.Trace().Msg("policy monitor loop start")
143-
iCtx = ctx
144146
select {
145147
case <-m.kickCh:
148+
cancelOnce(iCtx, iCancel)
149+
iCtx, iCancel = context.WithCancel(ctx)
146150
m.log.Trace().Msg("policy monitor kicked")
147151
if m.bulker.HasTracer() {
148152
trans = m.bulker.StartTransaction("initial policies", "policy_monitor")
@@ -151,20 +155,31 @@ LOOP:
151155

152156
if err := m.loadPolicies(iCtx); err != nil {
153157
endTrans(trans)
158+
cancelOnce(iCtx, iCancel)
154159
return err
155160
}
156-
m.dispatchPending(iCtx)
157-
endTrans(trans)
161+
go func(ctx context.Context, cancel context.CancelFunc, trans *apm.Transaction) {
162+
m.dispatchPending(ctx)
163+
endTrans(trans)
164+
cancelOnce(ctx, cancel)
165+
}(iCtx, iCancel, trans)
158166
case <-m.deployCh:
167+
cancelOnce(iCtx, iCancel)
168+
iCtx, iCancel = context.WithCancel(ctx)
159169
m.log.Trace().Msg("policy monitor deploy ch")
160170
if m.bulker.HasTracer() {
161171
trans = m.bulker.StartTransaction("forced policies", "policy_monitor")
162172
iCtx = apm.ContextWithTransaction(ctx, trans)
163173
}
164174

165-
m.dispatchPending(iCtx)
166-
endTrans(trans)
175+
go func(ctx context.Context, cancel context.CancelFunc, trans *apm.Transaction) {
176+
m.dispatchPending(ctx)
177+
endTrans(trans)
178+
cancelOnce(ctx, cancel)
179+
}(iCtx, iCancel, trans)
167180
case hits := <-s.Output(): // TODO would be nice to attach transaction IDs to hits, but would likely need a bigger refactor.
181+
cancelOnce(iCtx, iCancel)
182+
iCtx, iCancel = context.WithCancel(ctx)
168183
m.log.Trace().Int("hits", len(hits)).Msg("policy monitor hits from sub")
169184
if m.bulker.HasTracer() {
170185
trans = m.bulker.StartTransaction("output policies", "policy_monitor")
@@ -173,18 +188,33 @@ LOOP:
173188

174189
if err := m.processHits(iCtx, hits); err != nil {
175190
endTrans(trans)
191+
cancelOnce(iCtx, iCancel)
176192
return err
177193
}
178-
m.dispatchPending(iCtx)
179-
endTrans(trans)
194+
go func(ctx context.Context, cancel context.CancelFunc, trans *apm.Transaction) {
195+
m.dispatchPending(ctx)
196+
endTrans(trans)
197+
cancelOnce(ctx, cancel)
198+
}(iCtx, iCancel, trans)
180199
case <-ctx.Done():
181200
break LOOP
182201
}
183202
}
184203

204+
iCancel()
185205
return nil
186206
}
187207

208+
// cancelOnce calls cancel if the context is not done.
209+
func cancelOnce(ctx context.Context, cancel context.CancelFunc) {
210+
select {
211+
case <-ctx.Done():
212+
return
213+
default:
214+
cancel()
215+
}
216+
}
217+
188218
func unmarshalHits(hits []es.HitT) ([]model.Policy, error) {
189219
policies := make([]model.Policy, len(hits))
190220
for i, hit := range hits {
@@ -224,6 +254,14 @@ func (m *monitorT) waitStart(ctx context.Context) error {
224254
// dispatchPending will dispatch all pending policy changes to the subscriptions in the queue.
225255
// dispatches are rate limited by the monitor's limiter.
226256
func (m *monitorT) dispatchPending(ctx context.Context) {
257+
// dispatchCh is used in tests to be able to control when a dispatch execution proceeds
258+
if m.dispatchCh != nil {
259+
select {
260+
case <-m.dispatchCh:
261+
case <-ctx.Done():
262+
return
263+
}
264+
}
227265
span, ctx := apm.StartSpan(ctx, "dispatch pending", "dispatch")
228266
defer span.End()
229267
m.mut.Lock()
@@ -243,7 +281,10 @@ func (m *monitorT) dispatchPending(ctx context.Context) {
243281
// If too many (checkin) responses are written concurrently memory usage may explode due to allocating gzip writers.
244282
err := m.limit.Wait(ctx)
245283
if err != nil {
246-
m.log.Warn().Err(err).Msg("Policy limit error")
284+
m.pendingQ.pushFront(s) // context cancelled before sub is handled, put it back
285+
if !errors.Is(err, context.Canceled) {
286+
m.log.Warn().Err(err).Msg("Policy limit error")
287+
}
247288
return
248289
}
249290
// Lookup the latest policy for this subscription
@@ -257,6 +298,7 @@ func (m *monitorT) dispatchPending(ctx context.Context) {
257298

258299
select {
259300
case <-ctx.Done():
301+
m.pendingQ.pushFront(s) // context cancelled before sub is handled, put it back
260302
m.log.Debug().Err(ctx.Err()).Msg("context termination detected in policy dispatch")
261303
return
262304
case s.ch <- &policy.pp:

internal/pkg/policy/monitor_test.go

Lines changed: 108 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,6 @@ func TestMonitor_SamePolicy(t *testing.T) {
226226
}
227227

228228
func TestMonitor_NewPolicyExists(t *testing.T) {
229-
230229
tests := []struct {
231230
name string
232231
delay time.Duration
@@ -442,3 +441,111 @@ LOOP:
442441
ms.AssertExpectations(t)
443442
mm.AssertExpectations(t)
444443
}
444+
445+
func Test_Monitor_cancel_pending(t *testing.T) {
446+
ctx, cancel := context.WithCancel(t.Context())
447+
defer cancel()
448+
ctx = testlog.SetLogger(t).WithContext(ctx)
449+
450+
chHitT := make(chan []es.HitT, 2)
451+
defer close(chHitT)
452+
ms := mmock.NewMockSubscription()
453+
ms.On("Output").Return((<-chan []es.HitT)(chHitT))
454+
mm := mmock.NewMockMonitor()
455+
mm.On("Subscribe").Return(ms).Once()
456+
mm.On("Unsubscribe", mock.Anything).Return().Once()
457+
bulker := ftesting.NewMockBulk()
458+
459+
monitor := NewMonitor(bulker, mm, config.ServerLimits{})
460+
pm := monitor.(*monitorT)
461+
pm.policyF = func(ctx context.Context, bulker bulk.Bulk, opt ...dl.Option) ([]model.Policy, error) {
462+
return []model.Policy{}, nil
463+
}
464+
pm.dispatchCh = make(chan struct{}, 1)
465+
466+
agentId := uuid.Must(uuid.NewV4()).String()
467+
policyId := uuid.Must(uuid.NewV4()).String()
468+
469+
rId := xid.New().String()
470+
policy := model.Policy{
471+
ESDocument: model.ESDocument{
472+
Id: rId,
473+
Version: 1,
474+
SeqNo: 1,
475+
},
476+
PolicyID: policyId,
477+
Data: policyDataDefault,
478+
RevisionIdx: 1,
479+
}
480+
policyData, err := json.Marshal(&policy)
481+
require.NoError(t, err)
482+
policy2 := model.Policy{
483+
ESDocument: model.ESDocument{
484+
Id: rId,
485+
Version: 1,
486+
SeqNo: 1,
487+
},
488+
PolicyID: policyId,
489+
Data: policyDataDefault,
490+
RevisionIdx: 2,
491+
}
492+
policyData2, err := json.Marshal(&policy2)
493+
require.NoError(t, err)
494+
495+
// Send both revisions to monitor as as seperate hits
496+
chHitT <- []es.HitT{{
497+
ID: rId,
498+
SeqNo: 1,
499+
Version: 1,
500+
Source: policyData,
501+
}}
502+
chHitT <- []es.HitT{{
503+
ID: rId,
504+
SeqNo: 2,
505+
Version: 1,
506+
Source: policyData2,
507+
}}
508+
509+
// start monitor
510+
var merr error
511+
var mwg sync.WaitGroup
512+
mwg.Add(1)
513+
go func() {
514+
defer mwg.Done()
515+
merr = monitor.Run(ctx)
516+
}()
517+
err = monitor.(*monitorT).waitStart(ctx)
518+
require.NoError(t, err)
519+
520+
// subscribe with revision 0
521+
s, err := monitor.Subscribe(agentId, policyId, 0)
522+
defer monitor.Unsubscribe(s)
523+
require.NoError(t, err)
524+
525+
// This sleep allows the main run to call dispatch
526+
// but dispatch will not proceed until there is a signal from the dispatchCh
527+
time.Sleep(100 * time.Millisecond)
528+
pm.dispatchCh <- struct{}{}
529+
530+
tm := time.NewTimer(time.Second)
531+
policies := make([]*ParsedPolicy, 0, 2)
532+
LOOP:
533+
for {
534+
select {
535+
case p := <-s.Output():
536+
policies = append(policies, p)
537+
case <-tm.C:
538+
break LOOP
539+
}
540+
}
541+
542+
cancel()
543+
mwg.Wait()
544+
if merr != nil && merr != context.Canceled {
545+
t.Fatal(merr)
546+
}
547+
require.Len(t, policies, 1, "expected to recieve one revision")
548+
require.Equal(t, policies[0].Policy.RevisionIdx, int64(2))
549+
ms.AssertExpectations(t)
550+
mm.AssertExpectations(t)
551+
}

0 commit comments

Comments
 (0)