Skip to content

Commit 0d00710

Browse files
Refactor the bulk checkin handler to use optional args (#5493)
* Refactor the bulk checkin handler to use optional args Refactor the bulk checkin handler to use optional args so the list is easier to read and so that we can more easily add policy ID/revision idx identifiers as a follow up. * Fix linter
1 parent bbfbaa3 commit 0d00710

File tree

4 files changed

+197
-91
lines changed

4 files changed

+197
-91
lines changed
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
# Kind can be one of:
2+
# - breaking-change: a change to previously-documented behavior
3+
# - deprecation: functionality that is being removed in a later release
4+
# - bug-fix: fixes a problem in a previous version
5+
# - enhancement: extends functionality but does not break or fix existing behavior
6+
# - feature: new functionality
7+
# - known-issue: problems that we are aware of in a given version
8+
# - security: impacts on the security of a product or a user’s deployment.
9+
# - upgrade: important information for someone upgrading from a prior version
10+
# - other: does not fit into any of the other categories
11+
kind: enhancement
12+
13+
# Change summary; a 80ish characters long description of the change.
14+
summary: Refactor bulk checkin handler
15+
16+
# Long description; in case the summary is not enough to describe the change
17+
# this field accommodate a description without length limits.
18+
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
19+
description: Refactor the bulk checkin handler to allow for future extensions
20+
21+
# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
22+
component: fleet-server
23+
24+
# PR URL; optional; the PR number that added the changeset.
25+
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
26+
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
27+
# Please provide it if you are adding a fragment for a different PR.
28+
pr: https://github.com/elastic/fleet-server/pull/5493
29+
30+
# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
31+
# If not present is automatically filled by the tooling with the issue linked to the PR number.
32+
#issue: https://github.com/owner/repo/1234

internal/pkg/api/handleCheckin.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -327,7 +327,7 @@ func (ct *CheckinT) ProcessRequest(zlog zerolog.Logger, w http.ResponseWriter, r
327327
// Initial update on checkin, and any user fields that might have changed
328328
// Run a script to remove audit_unenrolled_* and unenrolled_at attributes if one is set on checkin.
329329
// 8.16.x releases would incorrectly set unenrolled_at
330-
err = ct.bc.CheckIn(agent.Id, string(req.Status), req.Message, rawMeta, rawComponents, seqno, ver, unhealthyReason, agent.AuditUnenrolledReason != "" || agent.UnenrolledAt != "")
330+
err = ct.bc.CheckIn(agent.Id, checkin.WithStatus(string(req.Status)), checkin.WithMessage(req.Message), checkin.WithMeta(rawMeta), checkin.WithComponents(rawComponents), checkin.WithSeqNo(seqno), checkin.WithVer(ver), checkin.WithUnhealthyReason(unhealthyReason), checkin.WithDeleteAudit(agent.AuditUnenrolledReason != "" || agent.UnenrolledAt != ""))
331331
if err != nil {
332332
zlog.Error().Err(err).Str(ecs.AgentID, agent.Id).Msg("checkin failed")
333333
}
@@ -382,7 +382,7 @@ func (ct *CheckinT) ProcessRequest(zlog zerolog.Logger, w http.ResponseWriter, r
382382
zlog.Trace().Msg("fire long poll")
383383
break LOOP
384384
case <-tick.C:
385-
err := ct.bc.CheckIn(agent.Id, string(req.Status), req.Message, nil, rawComponents, nil, ver, unhealthyReason, false)
385+
err := ct.bc.CheckIn(agent.Id, checkin.WithStatus(string(req.Status)), checkin.WithMessage(req.Message), checkin.WithComponents(rawComponents), checkin.WithVer(ver), checkin.WithUnhealthyReason(unhealthyReason)) // FIXME If we change to properly handle empty strings we could stop passing optional args here.
386386
if err != nil {
387387
zlog.Error().Err(err).Str(ecs.AgentID, agent.Id).Msg("checkin failed")
388388
}

internal/pkg/checkin/bulk.go

Lines changed: 137 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,78 @@ func WithFlushInterval(d time.Duration) Opt {
4141
}
4242
}
4343

44+
// Option is the type for optional arguments for agent checkins.
45+
type Option func(*pendingT)
46+
47+
func WithStatus(status string) Option {
48+
return func(pending *pendingT) {
49+
pending.status = status
50+
}
51+
}
52+
53+
func WithMessage(message string) Option {
54+
return func(pending *pendingT) {
55+
pending.message = message
56+
}
57+
}
58+
59+
func WithUnhealthyReason(reason *[]string) Option {
60+
return func(pending *pendingT) {
61+
pending.unhealthyReason = reason
62+
}
63+
}
64+
65+
func WithMeta(meta []byte) Option {
66+
return func(pending *pendingT) {
67+
if pending.extra == nil {
68+
pending.extra = &extraT{}
69+
}
70+
pending.extra.meta = meta
71+
}
72+
}
73+
74+
func WithSeqNo(seqno sqn.SeqNo) Option {
75+
return func(pending *pendingT) {
76+
if !seqno.IsSet() {
77+
return
78+
}
79+
if pending.extra == nil {
80+
pending.extra = &extraT{}
81+
}
82+
pending.extra.seqNo = seqno
83+
}
84+
}
85+
86+
func WithVer(ver string) Option {
87+
return func(pending *pendingT) {
88+
if pending.extra == nil {
89+
pending.extra = &extraT{}
90+
}
91+
pending.extra.ver = ver
92+
}
93+
}
94+
95+
func WithComponents(components []byte) Option {
96+
return func(pending *pendingT) {
97+
if pending.extra == nil {
98+
pending.extra = &extraT{}
99+
}
100+
pending.extra.components = components
101+
}
102+
}
103+
104+
func WithDeleteAudit(del bool) Option {
105+
return func(pending *pendingT) {
106+
if !del {
107+
return
108+
}
109+
if pending.extra == nil {
110+
pending.extra = &extraT{}
111+
}
112+
pending.extra.deleteAudit = del
113+
}
114+
}
115+
44116
type extraT struct {
45117
meta []byte
46118
seqNo sqn.SeqNo
@@ -82,7 +154,6 @@ func NewBulk(bulker bulk.Bulk, opts ...Opt) *Bulk {
82154
}
83155

84156
func parseOpts(opts ...Opt) optionsT {
85-
86157
outOpts := optionsT{
87158
flushInterval: defaultFlushInterval,
88159
}
@@ -97,7 +168,6 @@ func parseOpts(opts ...Opt) optionsT {
97168
// Generate and cache timestamp on seconds change.
98169
// Avoid thousands of formats of an identical string.
99170
func (bc *Bulk) timestamp() string {
100-
101171
// WARNING: Expects mutex locked.
102172
now := time.Now()
103173
if now.Unix() != bc.unix {
@@ -112,31 +182,17 @@ func (bc *Bulk) timestamp() string {
112182
// The pending agents are sent to elasticsearch as a bulk update at each flush interval.
113183
// NOTE: If Checkin is called after Run has returned it will just add the entry to the pending map and not do any operations, this may occur when the fleet-server is shutting down.
114184
// WARNING: Bulk will take ownership of fields, so do not use after passing in.
115-
func (bc *Bulk) CheckIn(id string, status string, message string, meta []byte, components []byte, seqno sqn.SeqNo, newVer string, unhealthyReason *[]string, deleteAudit bool) error {
116-
// Separate out the extra data to minimize
117-
// the memory footprint of the 90% case of just
118-
// updating the timestamp.
119-
var extra *extraT
120-
if meta != nil || seqno.IsSet() || newVer != "" || components != nil || deleteAudit {
121-
extra = &extraT{
122-
meta: meta,
123-
seqNo: seqno,
124-
ver: newVer,
125-
components: components,
126-
deleteAudit: deleteAudit,
127-
}
128-
}
129-
185+
func (bc *Bulk) CheckIn(id string, opts ...Option) error {
130186
bc.mut.Lock()
187+
pending := pendingT{
188+
ts: bc.timestamp(),
189+
}
131190

132-
bc.pending[id] = pendingT{
133-
ts: bc.timestamp(),
134-
status: status,
135-
message: message,
136-
extra: extra,
137-
unhealthyReason: unhealthyReason,
191+
for _, opt := range opts {
192+
opt(&pending)
138193
}
139194

195+
bc.pending[id] = pending
140196
bc.mut.Unlock()
141197
return nil
142198
}
@@ -181,32 +237,28 @@ func (bc *Bulk) flush(ctx context.Context) error {
181237
var err error
182238
var needRefresh bool
183239
for id, pendingData := range pending {
184-
185-
// In the simple case, there are no fields and no seqNo.
186-
// When that is true, we can reuse an already generated
187-
// JSON body containing just the timestamp updates.
188240
var body []byte
189241
if pendingData.extra == nil {
242+
// agents that checkin without extra attributes are cachable
243+
// Cacheable agents can share the same status, message, and unhealthy reason. Timestamps are ignored.
244+
// This prevents an extra JSON serialization when agents have the same update body.
190245
var ok bool
191246
body, ok = simpleCache[pendingData]
192247
if !ok {
193-
fields := bulk.UpdateFields{
194-
dl.FieldLastCheckin: pendingData.ts,
195-
dl.FieldUpdatedAt: nowTimestamp,
196-
dl.FieldLastCheckinStatus: pendingData.status,
197-
dl.FieldLastCheckinMessage: pendingData.message,
198-
dl.FieldUnhealthyReason: pendingData.unhealthyReason,
199-
}
200-
if body, err = fields.Marshal(); err != nil {
248+
body, err = toUpdateBody(nowTimestamp, pendingData)
249+
if err != nil {
201250
return err
202251
}
203252
simpleCache[pendingData] = body
204253
}
205254
} else if pendingData.extra.deleteAudit {
255+
if pendingData.extra.seqNo.IsSet() {
256+
needRefresh = true
257+
}
206258
// Use a script instead of a partial doc to update if attributes need to be removed
207259
params, err := encodeParams(nowTimestamp, pendingData)
208260
if err != nil {
209-
return err
261+
return fmt.Errorf("unable to parse checkin details as params: %w", err)
210262
}
211263
action := &estypes.UpdateAction{
212264
Script: &estypes.Script{
@@ -220,48 +272,12 @@ func (bc *Bulk) flush(ctx context.Context) error {
220272
if err != nil {
221273
return fmt.Errorf("could not marshall script action: %w", err)
222274
}
223-
if pendingData.extra.seqNo.IsSet() {
224-
needRefresh = true
225-
}
226275
} else {
227-
fields := bulk.UpdateFields{
228-
dl.FieldLastCheckin: pendingData.ts, // Set the checkin timestamp
229-
dl.FieldUpdatedAt: nowTimestamp, // Set "updated_at" to the current timestamp
230-
dl.FieldLastCheckinStatus: pendingData.status, // Set the pending status
231-
dl.FieldLastCheckinMessage: pendingData.message, // Set the status message
232-
dl.FieldUnhealthyReason: pendingData.unhealthyReason,
233-
}
234-
235-
// If the agent version is not empty it needs to be updated
236-
// Assuming the agent can by upgraded keeping the same id, but incrementing the version
237-
if pendingData.extra.ver != "" {
238-
fields[dl.FieldAgent] = map[string]interface{}{
239-
dl.FieldAgentVersion: pendingData.extra.ver,
240-
}
241-
}
242-
243-
// Update local metadata if provided
244-
if pendingData.extra.meta != nil {
245-
// Surprise: The json encodeer compacts this raw JSON during
246-
// the encode process, so there my be unexpected memory overhead:
247-
// https://github.com/golang/go/blob/go1.16.3/src/encoding/json/encode.go#L499
248-
fields[dl.FieldLocalMetadata] = json.RawMessage(pendingData.extra.meta)
249-
}
250-
251-
// Update components if provided
252-
if pendingData.extra.components != nil {
253-
fields[dl.FieldComponents] = json.RawMessage(pendingData.extra.components)
254-
}
255-
256-
// If seqNo changed, set the field appropriately
257276
if pendingData.extra.seqNo.IsSet() {
258-
fields[dl.FieldActionSeqNo] = pendingData.extra.seqNo
259-
260-
// Only refresh if seqNo changed; dropping metadata not important.
261277
needRefresh = true
262278
}
263-
264-
if body, err = fields.Marshal(); err != nil {
279+
body, err = toUpdateBody(nowTimestamp, pendingData)
280+
if err != nil {
265281
return err
266282
}
267283
}
@@ -290,19 +306,60 @@ func (bc *Bulk) flush(ctx context.Context) error {
290306
return err
291307
}
292308

309+
func toUpdateBody(now string, pending pendingT) ([]byte, error) {
310+
fields := bulk.UpdateFields{
311+
dl.FieldUpdatedAt: now, // Set "updated_at" to the current timestamp
312+
dl.FieldLastCheckin: pending.ts, // Set the checkin timestamp
313+
dl.FieldLastCheckinStatus: pending.status, // Set the pending status
314+
dl.FieldLastCheckinMessage: pending.message, // Set the status message
315+
dl.FieldUnhealthyReason: pending.unhealthyReason,
316+
}
317+
if pending.extra != nil {
318+
// If the agent version is not empty it needs to be updated
319+
// Assuming the agent can by upgraded keeping the same id, but incrementing the version
320+
if pending.extra.ver != "" {
321+
fields[dl.FieldAgent] = map[string]interface{}{
322+
dl.FieldAgentVersion: pending.extra.ver,
323+
}
324+
}
325+
326+
// Update local metadata if provided
327+
if pending.extra.meta != nil {
328+
// Surprise: The json encoder compacts this raw JSON during
329+
// the encode process, so there my be unexpected memory overhead:
330+
// https://github.com/golang/go/blob/de5d7eccb99088e3ab42c0d907da6852d8f9cebe/src/encoding/json/encode.go#L503-L507
331+
fields[dl.FieldLocalMetadata] = json.RawMessage(pending.extra.meta)
332+
}
333+
334+
// Update components if provided
335+
if pending.extra.components != nil {
336+
fields[dl.FieldComponents] = json.RawMessage(pending.extra.components)
337+
}
338+
339+
// If seqNo changed, set the field appropriately
340+
if pending.extra.seqNo.IsSet() {
341+
fields[dl.FieldActionSeqNo] = pending.extra.seqNo
342+
}
343+
}
344+
return fields.Marshal()
345+
}
346+
293347
func encodeParams(now string, data pendingT) (map[string]json.RawMessage, error) {
294348
var (
295-
tsNow json.RawMessage
296-
ts json.RawMessage
297-
status json.RawMessage
298-
message json.RawMessage
299-
reason json.RawMessage
349+
tsNow json.RawMessage
350+
ts json.RawMessage
351+
status json.RawMessage
352+
message json.RawMessage
353+
reason json.RawMessage
354+
355+
// optional attributes below
300356
ver json.RawMessage
301357
meta json.RawMessage
302358
components json.RawMessage
303359
isSet json.RawMessage
304360
seqNo json.RawMessage
305-
err error
361+
362+
err error
306363
)
307364
tsNow, err = json.Marshal(now)
308365
Err := errors.Join(err)

0 commit comments

Comments
 (0)