Skip to content

Commit e27f2fb

Browse files
[9.3](backport #49632) Fix check config and take over interaction (#49785)
When using Filestream's take_over feature with autodiscover, files were being re-ingested from the beginning instead of continuing from the offset recorded by the Log input. Autodiscover validates each rendered configuration by instantiating the input with a temporary, suffixed ID before starting it. Because take_over ran during input initialisation, states were migrated to the temporary ID rather than the real input ID. When the real input started, the Log input states had already been consumed, so all files appeared new. The fix moves the take_over migration step from input initialisation to input start. This ensures that config validation (CheckConfig) never triggers state migration, and only the input that actually runs performs the takeover. Additionally, the Log input state is no longer deleted from the registry after migration. Instead, Filestream checks whether it already holds a state for the file before migrating, skipping the takeover if a state is found. This makes the mechanism idempotent and removes reliance on the TTL=-2 heuristic that was used to detect previously-migrated states. Last, but not least, a few other issues in the TakeOver implementation are also fixed: - Incorrect resource release - ephemeralStore is now locked throughout the whole TakeOver duration GenAI-Assisted: Yes Human-Reviewed: Yes Tool: Claude-CLI, Model: Claude 4.6 Opus (Thinking) Tool: Cursor-CLI, Model: GPT-5.3 Codex Extra High --------- Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com> Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> (cherry picked from commit 8a648cf) # Conflicts: # filebeat/input/filestream/internal/input-logfile/store.go --------- Co-authored-by: Tiago Queiroz <tiago.queiroz@elastic.co>
1 parent 0b819d1 commit e27f2fb

File tree

12 files changed

+774
-118
lines changed

12 files changed

+774
-118
lines changed

.buildkite/filebeat/filebeat-pipeline.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,9 @@ steps:
166166

167167
- label: ":ubuntu: Filebeat: Go Integration Tests"
168168
command: |
169+
source .buildkite/scripts/docker.sh
169170
cd filebeat
171+
SNAPSHOT=true PACKAGES="docker" PLATFORMS="linux/amd64" mage -v package
170172
mage goIntegTest
171173
retry:
172174
automatic:
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
kind: bug-fix
2+
3+
summary: Fix Filestream take_over causing file re-ingestion when used with autodiscover.
4+
5+
component: filebeat
6+
7+
issue: https://github.com/elastic/beats/issues/49579

filebeat/input/filestream/internal/input-logfile/input.go

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package input_logfile
1919

2020
import (
2121
"context"
22+
"fmt"
2223
"time"
2324

2425
"github.com/elastic/beats/v7/filebeat/input/filestream/internal/task"
@@ -32,14 +33,15 @@ import (
3233
type managedInput struct {
3334
// id is the input ID, it is defined by setting 'id'
3435
// in the input configuration
35-
id string
36-
manager *InputManager
37-
ackCH *updateChan
38-
sourceIdentifier *SourceIdentifier
39-
prospector Prospector
40-
harvester Harvester
41-
cleanTimeout time.Duration
42-
harvesterLimit uint64
36+
id string
37+
manager *InputManager
38+
ackCH *updateChan
39+
sourceIdentifier *SourceIdentifier
40+
previousSrcIdentifiers []*SourceIdentifier
41+
prospector Prospector
42+
harvester Harvester
43+
cleanTimeout time.Duration
44+
harvesterLimit uint64
4345
}
4446

4547
// Name is required to implement the v2.Input interface
@@ -55,6 +57,10 @@ func (inp *managedInput) Run(
5557
ctx input.Context,
5658
pipeline beat.PipelineConnector,
5759
) (err error) {
60+
61+
// Notify the manager the input has stopped, currently that is used to
62+
// keep track of duplicated IDs
63+
defer inp.manager.StopInput(inp.id)
5864
ctx.UpdateStatus(status.Starting, "")
5965
groupStore := inp.manager.getRetainedStore()
6066
defer groupStore.Release()
@@ -86,18 +92,18 @@ func (inp *managedInput) Run(
8692

8793
prospectorStore := inp.manager.getRetainedStore()
8894
defer prospectorStore.Release()
89-
sourceStore := newSourceStore(prospectorStore, inp.sourceIdentifier, nil)
95+
sourceStore := newSourceStore(prospectorStore, inp.sourceIdentifier, inp.previousSrcIdentifiers)
96+
97+
if err := inp.prospector.TakeOver(sourceStore, inp.sourceIdentifier.ID); err != nil {
98+
return fmt.Errorf("prospector failed to take over states: %w", err)
99+
}
90100

91101
// Mark it as running for now.
92102
// Any errors encountered by harvester will change state to Degraded
93103
ctx.UpdateStatus(status.Running, "")
94104

95105
inp.prospector.Run(ctx, sourceStore, hg)
96106

97-
// Notify the manager the input has stopped, currently that is used to
98-
// keep track of duplicated IDs
99-
inp.manager.StopInput(inp.id)
100-
101107
return nil
102108
}
103109

filebeat/input/filestream/internal/input-logfile/manager.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -277,14 +277,15 @@ func (cim *InputManager) Create(config *conf.C) (inp v2.Input, retErr error) {
277277
}
278278

279279
return &managedInput{
280-
manager: cim,
281-
ackCH: cim.ackCH,
282-
id: settings.ID,
283-
prospector: prospector,
284-
harvester: harvester,
285-
sourceIdentifier: srcIdentifier,
286-
cleanTimeout: settings.CleanInactive,
287-
harvesterLimit: settings.HarvesterLimit,
280+
manager: cim,
281+
ackCH: cim.ackCH,
282+
id: settings.ID,
283+
prospector: prospector,
284+
harvester: harvester,
285+
sourceIdentifier: srcIdentifier,
286+
previousSrcIdentifiers: previousSrcIdentifiers,
287+
cleanTimeout: settings.CleanInactive,
288+
harvesterLimit: settings.HarvesterLimit,
288289
}, nil
289290
}
290291

filebeat/input/filestream/internal/input-logfile/manager_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,10 @@ func (m noopProspector) Init(_, _ StoreUpdater, _ func(Source) string) error {
5353
return nil
5454
}
5555

56+
func (m noopProspector) TakeOver(_ StoreUpdater, _ func(Source) string) error {
57+
return nil
58+
}
59+
5660
func (m noopProspector) Run(_ v2.Context, _ StateMetadataUpdater, _ HarvesterGroup) {}
5761

5862
func (m noopProspector) Test() error {

filebeat/input/filestream/internal/input-logfile/prospector.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,14 @@ import (
2626
// It also updates the statestore with the meta data of the running harvesters.
2727
type Prospector interface {
2828
// Init updates the store before starting the prospector.
29-
// It cleans up the store, migrates file identities and takes over
30-
// states from log or other filestream inputs.
29+
// It cleans up the store and migrates file identities.
3130
// It receives two StoreUpdater: one global and another local
3231
// to this prospector instance.
3332
Init(local, global StoreUpdater, newID func(Source) string) error
33+
// TakeOver migrates states from other inputs (Log input or other Filestream
34+
// inputs) to this prospector's input. It must be called after Init and
35+
// before Run, so that it is not triggered during CheckConfig validation.
36+
TakeOver(local StoreUpdater, newID func(Source) string) error
3437
// Run starts the event loop and handles the incoming events
3538
// either by starting/stopping a harvester, or updating the statestore.
3639
Run(input.Context, StateMetadataUpdater, HarvesterGroup)

filebeat/input/filestream/internal/input-logfile/store.go

Lines changed: 26 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -297,16 +297,14 @@ func (s *sourceStore) UpdateIdentifiers(getNewID func(v Value) (string, any)) {
297297
// Filestream inputs or the Log input. fn should return the new registry ID
298298
// and new CursorMeta. If fn returns an empty string, the entry is skipped.
299299
//
300-
// When fn returns a valid ID, the old resource is removed from both,
301-
// the in-memory and persistent store. The operations are synchronous.
302-
//
303-
// If the resource migrated was from the Log input, `TakeOver` will
304-
// remove it from the persistent store, however the Log input reigstrar
305-
// will write it back when Filebeat is shutting down. However,
306-
// there is a mechanism in place to detect this situation and avoid
307-
// migrating the same state over and over again.
308-
// See the comments on this method for more details.
300+
// When fn returns a valid ID:
301+
// - If the old resource was from a Filestream input, it is removed from both
302+
// the in-memory and disk store.
303+
// - If it was from a Log input, it is left untouched.
309304
func (s *sourceStore) TakeOver(fn func(Value) (string, any)) {
305+
// Lock the ephemeral store so we can migrate the states in one go
306+
s.store.ephemeralStore.mu.Lock()
307+
defer s.store.ephemeralStore.mu.Unlock()
310308
matchPreviousFilestreamIDs := func(key string) bool {
311309
for _, identifier := range s.identifiersToTakeOver {
312310
if identifier.MatchesInput(key) {
@@ -353,44 +351,6 @@ func (s *sourceStore) TakeOver(fn func(Value) (string, any)) {
353351
s.store.log.Errorf("cannot read Log input state: %s", err)
354352
return true, nil
355353
}
356-
// That is a workaround for the problems with the
357-
// Log input Registrar (`filebeat/registrar`) and the way it
358-
// handles states.
359-
// There are two problems:
360-
// - 1. The Log input store/registrar does not have an API for
361-
// removing states
362-
// - 2. When `registrar.Registrar` starts, it copies all states
363-
// belonging to the Log input from the disk store into
364-
// memory and when the Registrar is shutting down, it
365-
// writes all states to the disk. This all happens even
366-
// if no Log input was ever started.
367-
// This means that no matter what we do here, the states from
368-
// the Log input are always re-written to disk.
369-
// See: filebeat/registrar/registrar.go, deferred statement on
370-
// `Registrar.Run`.
371-
//
372-
// However, there is a "reset state" code, that runs
373-
// during the Registrar initialisation and sets the
374-
// TTL to -2, once the Log input havesting that file starts
375-
// the TTL is set to -1 (never expires) or the configured
376-
// value.
377-
// See: filebeat/registrar/registrar.go (readStatesFrom) and
378-
// filebeat/beater/filebeat.go (registrar.Start())
379-
//
380-
// This means that while the Log input is running and the file
381-
// has been active at any moment during the Filebeat's execution
382-
// the TTL is never going to be -2 during the shutdown.
383-
//
384-
// So, if TTL == -2, then in the previous run of Filebeat, there
385-
// was no Log input using this state, which likely means, it is
386-
// a state that has already been migrated to Filestream.
387-
//
388-
// The worst case that can happen is that we re-ingest the file
389-
// once, which is still better than copying an old state with
390-
// an incorrect offset every time Filebeat starts.
391-
if st.TTL == -2 {
392-
return true, nil
393-
}
394354
st.key = key
395355
fromLogInput[key] = st
396356
}
@@ -399,10 +359,6 @@ func (s *sourceStore) TakeOver(fn func(Value) (string, any)) {
399359
})
400360
}
401361

402-
// Lock the ephemeral store so we can migrate the states in one go
403-
s.store.ephemeralStore.mu.Lock()
404-
defer s.store.ephemeralStore.mu.Unlock()
405-
406362
// Migrate all states from the Filestream input
407363
for k := range fromFilestreamInput {
408364
res := s.store.ephemeralStore.unsafeFind(k, false)
@@ -418,13 +374,20 @@ func (s *sourceStore) TakeOver(fn func(Value) (string, any)) {
418374
continue
419375
}
420376

377+
// cleanup must be called on any "exit point" from this loop iteration.
378+
// It is responsible for correctly releasing the locked resource.
379+
cleanup := func() {
380+
res.Release()
381+
res.lock.Unlock()
382+
}
383+
421384
newKey, updatedMeta := fn(res)
422385
if len(newKey) > 0 {
423386
// If the new key already exists in the store, do nothing.
424387
// Unlock the resource and return
425388
if res := s.store.ephemeralStore.unsafeFind(newKey, false); res != nil {
426389
res.Release()
427-
res.lock.Unlock()
390+
cleanup()
428391
continue
429392
}
430393

@@ -454,16 +417,24 @@ func (s *sourceStore) TakeOver(fn func(Value) (string, any)) {
454417
s.store.log.Infof("migrated entry in registry from '%s' to '%s'. Cursor: %v", k, newKey, r.cursor)
455418
}
456419

457-
res.Release()
458-
res.lock.Unlock()
420+
cleanup()
459421
}
460422

461423
// Migrate all states from the Log input
462424
for k, v := range fromLogInput {
463425
newKey, updatedMeta := fn(v)
464426
if len(newKey) > 0 {
465-
// Find or create a resource. It should always create a new one.
466427
res := s.store.ephemeralStore.unsafeFind(newKey, true)
428+
429+
// If the new key already exists in the store, the file has already
430+
// been taken over. Skip it to avoid overwriting a valid Filestream
431+
// state with a potentially stale Log input state.
432+
if !res.IsNew() {
433+
res.Release()
434+
s.store.log.Infof("state for '%s' already exists as '%s', skipping takeover from Log input", k, newKey)
435+
continue
436+
}
437+
467438
res.cursorMeta = updatedMeta
468439
// Convert the offset to the correct type
469440
res.cursor = struct {
@@ -478,13 +449,6 @@ func (s *sourceStore) TakeOver(fn func(Value) (string, any)) {
478449
// Update in-memory store
479450
s.store.ephemeralStore.table[newKey] = res
480451

481-
// "remove" from the disk store.
482-
// It will add a remove entry in the log file for this key, however
483-
// the Registrar used by the Log input will write to disk all states
484-
// it read when Filebeat was starting, thus "overriding" this delete.
485-
// We keep it here because when we remove the Log input we will ensure
486-
// the entry is actually remove from the disk store.
487-
_ = s.store.persistentStore.Remove(k)
488452
res.Release()
489453
s.store.log.Infof("migrated entry in registry from '%s' to '%s'. Cursor: %v", k, newKey, res.cursor)
490454
}

filebeat/input/filestream/internal/input-logfile/store_test.go

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"github.com/stretchr/testify/assert"
2929
"github.com/stretchr/testify/require"
3030

31+
inpFile "github.com/elastic/beats/v7/filebeat/input/file"
3132
input "github.com/elastic/beats/v7/filebeat/input/v2"
3233
"github.com/elastic/beats/v7/libbeat/statestore"
3334
"github.com/elastic/beats/v7/libbeat/statestore/storetest"
@@ -456,6 +457,106 @@ func TestSourceStoreTakeOver(t *testing.T) {
456457
checkEqualStoreState(t, want, backend.snapshot())
457458
}
458459

460+
func TestSourceStoreTakeOverFromLogInput(t *testing.T) {
461+
const (
462+
logKey = "filebeat::logs::native::inode:device"
463+
filestreamNewKey = "filestream::input-id::native::inode:device"
464+
)
465+
466+
// fn simulates what takeOverFn does: map the log key to a new Filestream key.
467+
takeover := func(v Value) (string, any) {
468+
if v.Key() == logKey {
469+
return filestreamNewKey, testMeta{IdentifierName: "native"}
470+
}
471+
return "", nil
472+
}
473+
474+
t.Run("log input state is left untouched", func(t *testing.T) {
475+
backend := createSampleStore(t, nil)
476+
require.NoError(t, backend.Store.Set(logKey, inpFile.State{
477+
Source: "/path/to/file",
478+
Offset: 1234,
479+
TTL: -1,
480+
IdentifierName: "native",
481+
}), "populating test store")
482+
483+
s := testOpenStore(t, "filestream", backend)
484+
defer s.Release()
485+
store := &sourceStore{
486+
identifier: &SourceIdentifier{"filestream::input-id::"},
487+
store: s,
488+
}
489+
490+
store.TakeOver(takeover)
491+
492+
assert.NotNil(t, s.Get(filestreamNewKey), "Log input state must have been migrated to Filestream")
493+
assert.NotNil(t, s.Get(logKey), "Log input state must left untouched")
494+
})
495+
496+
t.Run("state with TTL=-2 is migrated", func(t *testing.T) {
497+
backend := createSampleStore(t, nil)
498+
require.NoError(t, backend.Store.Set(logKey, inpFile.State{
499+
Source: "/path/to/file",
500+
Offset: 1234,
501+
TTL: -2, // previously this caused the state to be skipped
502+
IdentifierName: "native",
503+
}), "populating test store")
504+
505+
s := testOpenStore(t, "filestream", backend)
506+
defer s.Release()
507+
store := &sourceStore{
508+
identifier: &SourceIdentifier{"filestream::input-id::"},
509+
store: s,
510+
}
511+
512+
store.TakeOver(takeover)
513+
514+
s.ephemeralStore.mu.Lock()
515+
_, migrated := s.ephemeralStore.table[filestreamNewKey]
516+
s.ephemeralStore.mu.Unlock()
517+
518+
assert.True(t, migrated, "state with TTL=-2 must be migrated to Filestream")
519+
})
520+
521+
t.Run("state is skipped when Filestream key already exists", func(t *testing.T) {
522+
const existingOffset = int64(9999)
523+
524+
backend := createSampleStore(t, map[string]state{
525+
filestreamNewKey: {
526+
TTL: 60 * time.Second,
527+
Cursor: map[string]any{"offset": existingOffset},
528+
},
529+
})
530+
require.NoError(t, backend.Store.Set(logKey, inpFile.State{
531+
Source: "/path/to/file",
532+
Offset: 1234,
533+
TTL: -1,
534+
IdentifierName: "native",
535+
}), "populating test store")
536+
537+
s := testOpenStore(t, "filestream", backend)
538+
defer s.Release()
539+
store := &sourceStore{
540+
identifier: &SourceIdentifier{"filestream::input-id::"},
541+
store: s,
542+
}
543+
544+
store.TakeOver(takeover)
545+
546+
// The pre-existing Filestream state must be unchanged.
547+
s.ephemeralStore.mu.Lock()
548+
res, exists := s.ephemeralStore.table[filestreamNewKey]
549+
s.ephemeralStore.mu.Unlock()
550+
551+
require.True(t, exists, "Filestream key must still exist after TakeOver")
552+
var cur struct {
553+
Offset int64 `json:"offset"`
554+
}
555+
require.NoError(t, res.UnpackCursor(&cur), "unpacking cursor")
556+
assert.Equal(t, existingOffset, cur.Offset, "existing Filestream cursor must not be overwritten by Log input state")
557+
})
558+
}
559+
459560
//nolint:dupl // Test code won't be refactored on this commit
460561
func TestSourceStore_CleanIf(t *testing.T) {
461562
t.Run("entries are cleaned when function returns true", func(t *testing.T) {

0 commit comments

Comments
 (0)