Skip to content

Commit 305ac0b

Browse files
mergify[bot]belimawrAndersonQ
authored
[Filebeat/Journald] Fix flakiness from TestDoubleStarCanBeUsed (#46913) (#47175)
--------- (cherry picked from commit f13a0bd) Co-authored-by: Tiago Queiroz <[email protected]> Co-authored-by: Anderson Queiroz <[email protected]>
1 parent 35d827f commit 305ac0b

File tree

3 files changed

+100
-2
lines changed

3 files changed

+100
-2
lines changed

.buildkite/filebeat/filebeat-pipeline.yml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ steps:
8686
artifact_paths:
8787
- "filebeat/build/*.xml"
8888
- "filebeat/build/*.json"
89+
- "filebeat/build/input-test/**/*"
8990
plugins:
9091
- test-collector#v1.10.2:
9192
files: "filebeat/build/TEST-*.xml"
@@ -112,6 +113,7 @@ steps:
112113
artifact_paths:
113114
- "filebeat/build/*.xml"
114115
- "filebeat/build/*.json"
116+
- "filebeat/build/input-test/**/*"
115117
plugins:
116118
- test-collector#v1.10.2:
117119
files: "filebeat/build/TEST-*.xml"
@@ -138,6 +140,7 @@ steps:
138140
artifact_paths:
139141
- "filebeat/build/*.xml"
140142
- "filebeat/build/*.json"
143+
- "filebeat/build/input-test/**/*"
141144
plugins:
142145
- test-collector#v1.10.2:
143146
files: "filebeat/build/TEST-*.xml"
@@ -162,6 +165,7 @@ steps:
162165
artifact_paths:
163166
- "filebeat/build/*.xml"
164167
- "filebeat/build/*.json"
168+
- "filebeat/build/input-test/**/*"
165169
- "filebeat/build/integration-tests/*"
166170
- "filebeat/build/integration-tests/Test*/*"
167171
- "filebeat/build/integration-tests/Test*/data/**/*"
@@ -193,6 +197,7 @@ steps:
193197
artifact_paths:
194198
- "filebeat/build/*.xml"
195199
- "filebeat/build/*.json"
200+
- "filebeat/build/input-test/**/*"
196201
- "filebeat/build/integration-tests/*"
197202
- "filebeat/build/integration-tests/Test*/*"
198203
- "filebeat/build/integration-tests/Test*/data/**/*"
@@ -220,6 +225,7 @@ steps:
220225
artifact_paths:
221226
- "filebeat/build/*.xml"
222227
- "filebeat/build/*.json"
228+
- "filebeat/build/input-test/**/*"
223229
- "filebeat/build/integration-tests/*"
224230
- "filebeat/build/integration-tests/Test*/*"
225231
- "filebeat/build/integration-tests/Test*/data/**/*"
@@ -338,6 +344,7 @@ steps:
338344
artifact_paths:
339345
- "filebeat/build/*.xml"
340346
- "filebeat/build/*.json"
347+
- "filebeat/build/input-test/**/*"
341348
plugins:
342349
- test-collector#v1.10.2:
343350
files: "filebeat/build/TEST-*.xml"

filebeat/input/journald/environment_test.go

Lines changed: 81 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,21 @@
2020
package journald
2121

2222
import (
23+
"bytes"
2324
"context"
2425
"fmt"
26+
"os"
27+
"path/filepath"
2528
"strings"
2629
"sync"
2730
"testing"
2831
"time"
2932

3033
"github.com/gofrs/uuid/v5"
3134
"github.com/stretchr/testify/require"
35+
"go.elastic.co/ecszap"
36+
"go.uber.org/zap"
37+
"go.uber.org/zap/zapcore"
3238

3339
v2 "github.com/elastic/beats/v7/filebeat/input/v2"
3440
"github.com/elastic/beats/v7/libbeat/beat"
@@ -52,6 +58,9 @@ type inputTestingEnvironment struct {
5258
pluginInitOnce sync.Once
5359
plugin v2.Plugin
5460

61+
inputLogger *logp.Logger
62+
logBuffer *bytes.Buffer
63+
5564
wg sync.WaitGroup
5665
grp unison.TaskGroup
5766
}
@@ -92,6 +101,39 @@ func (e *inputTestingEnvironment) mustCreateInput(config map[string]interface{})
92101

93102
func (e *inputTestingEnvironment) startInput(ctx context.Context, inp v2.Input) {
94103
e.wg.Add(1)
104+
t := e.t
105+
106+
e.inputLogger, e.logBuffer = newInMemoryJSON()
107+
e.t.Cleanup(func() {
108+
if t.Failed() {
109+
folder := filepath.Join("..", "..", "build", "input-test")
110+
if err := os.MkdirAll(folder, 0o750); err != nil {
111+
t.Logf("cannot create folder for error logs: %s", err)
112+
return
113+
}
114+
115+
cleanTestName := strings.Replace(t.Name(), "\\", "_", -1)
116+
117+
f, err := os.CreateTemp(folder, cleanTestName+"-*")
118+
if err != nil {
119+
t.Logf("cannot create file for error logs: %s", err)
120+
return
121+
}
122+
defer f.Close()
123+
fullLogPath, err := filepath.Abs(f.Name())
124+
if err != nil {
125+
t.Logf("cannot get full path from log file: %s", err)
126+
}
127+
128+
if _, err := f.Write(e.logBuffer.Bytes()); err != nil {
129+
t.Logf("cannot write to file: %s", err)
130+
return
131+
}
132+
133+
t.Logf("Test Failed, logs from input at %q", fullLogPath)
134+
}
135+
})
136+
95137
go func(wg *sync.WaitGroup, grp *unison.TaskGroup) {
96138
defer wg.Done()
97139
defer func() {
@@ -108,7 +150,7 @@ func (e *inputTestingEnvironment) startInput(ctx context.Context, inp v2.Input)
108150
Cancelation: ctx,
109151
StatusReporter: e.statusReporter,
110152
MetricsRegistry: monitoring.NewRegistry(),
111-
Logger: logp.L(),
153+
Logger: e.inputLogger,
112154
}
113155
if err := inp.Run(inputCtx, e.pipeline); err != nil {
114156
e.t.Errorf("input 'Run' method returned an error: %s", err)
@@ -139,6 +181,23 @@ func (e *inputTestingEnvironment) waitUntilEventCount(count int) {
139181
}, 5*time.Second, 10*time.Millisecond, &msg)
140182
}
141183

184+
// waitUntilEventCount waits until total count events arrive to the client.
185+
func (e *inputTestingEnvironment) waitUntilEventsPublished(published int) {
186+
e.t.Helper()
187+
msg := strings.Builder{}
188+
require.Eventually(e.t, func() bool {
189+
sum := len(e.pipeline.GetAllEvents())
190+
if sum >= published {
191+
return true
192+
}
193+
194+
msg.Reset()
195+
fmt.Fprintf(&msg, "too few events; expected: %d, actual: %d", published, sum)
196+
197+
return false
198+
}, 5*time.Second, 10*time.Millisecond, &msg)
199+
}
200+
142201
func (e *inputTestingEnvironment) RequireStatuses(expected []statusUpdate) {
143202
t := e.t
144203
t.Helper()
@@ -308,3 +367,24 @@ func (m *mockStatusReporter) GetUpdates() []statusUpdate {
308367
defer m.mutex.RUnlock()
309368
return append([]statusUpdate{}, m.updates...)
310369
}
370+
371+
func newInMemoryJSON() (*logp.Logger, *bytes.Buffer) {
372+
buff := bytes.Buffer{}
373+
encoderConfig := ecszap.ECSCompatibleEncoderConfig(logp.JSONEncoderConfig())
374+
encoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder
375+
encoder := zapcore.NewJSONEncoder(encoderConfig)
376+
377+
core := zapcore.NewCore(
378+
encoder,
379+
zapcore.Lock(zapcore.AddSync(&buff)),
380+
zap.NewAtomicLevelAt(zap.DebugLevel))
381+
ecszap.ECSCompatibleEncoderConfig(logp.ConsoleEncoderConfig())
382+
383+
logger, _ := logp.NewDevelopmentLogger(
384+
"journald",
385+
zap.WrapCore(func(in zapcore.Core) zapcore.Core {
386+
return core
387+
}))
388+
389+
return logger, &buff
390+
}

filebeat/input/journald/input_test.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -509,7 +509,18 @@ func TestDoubleStarCanBeUsed(t *testing.T) {
509509
}
510510

511511
env.startInput(ctx, inp)
512-
env.waitUntilEventCount(len(srcFiles) * 10)
512+
// Wait for at least 11 events, this means more than one journal file
513+
// has been read and ingested.
514+
//
515+
// When many small journal files are ingested, the journalctl process
516+
// may exit before the input has fully read its stdout, which makes us
517+
// discard the last few lines/entries.
518+
//
519+
// We still correctly track the cursor of events published to the output,
520+
// however the cursor returned by journalctl on this set of handcrafted
521+
// journal files leads to us skipping some events.
522+
// See https://github.com/elastic/beats/issues/46904.
523+
env.waitUntilEventsPublished(11)
513524
}
514525

515526
func decompress(t *testing.T, namegz string) string {

0 commit comments

Comments
 (0)