Skip to content

Commit 79cef10

Browse files
authored
fix: stop loki.source.kubernetes discarding log lines with duplicate timestamps (#3959) (#4613)
* fix: stop skipping duplicated-by-timestamp kubernetes pod log lines (#3959) * test: adds tests to prove duplicated-by-timestamp kubernetes pod log lines are not skipped (#3959)
1 parent bd6d85c commit 79cef10

File tree

3 files changed

+188
-2
lines changed

3 files changed

+188
-2
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ Main (unreleased)
4949

5050
### Bugfixes
5151

52+
- Stop `loki.source.kubernetes` discarding log lines with duplicate timestamps. (@ciaranj)
53+
5254
- Fix direction of arrows for pyroscope components in UI graph. (@dehaansa)
5355

5456
- Only log EOF errors for syslog port investigations in `loki.source.syslog` as Debug, not Warn. (@dehaansa)

internal/component/loki/source/kubernetes/kubetail/tailer.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
kubetypes "k8s.io/apimachinery/pkg/types"
2121

2222
"github.com/grafana/alloy/internal/component/common/loki"
23+
"github.com/grafana/alloy/internal/component/common/loki/positions"
2324
"github.com/grafana/alloy/internal/runner"
2425
"github.com/grafana/alloy/internal/runtime/logging/level"
2526
)
@@ -262,9 +263,14 @@ func (t *tailer) tail(ctx context.Context, handler loki.EntryHandler) error {
262263

263264
level.Info(t.log).Log("msg", "opened log stream", "start time", lastReadTime)
264265

266+
return t.processLogStream(ctx, stream, handler, lastReadTime, positionsEnt, calc)
267+
}
268+
269+
// processLogStream reads log lines from a reader and processes them.
270+
// It returns when the context is done, the stream ends, or an error occurs.
271+
func (t *tailer) processLogStream(ctx context.Context, stream io.ReadCloser, handler loki.EntryHandler, lastReadTime time.Time, positionsEnt positions.Entry, calc *rollingAverageCalculator) error {
265272
ch := handler.Chan()
266273
reader := bufio.NewReader(stream)
267-
268274
for {
269275
line, err := reader.ReadString('\n')
270276

@@ -274,7 +280,10 @@ func (t *tailer) tail(ctx context.Context, handler loki.EntryHandler) error {
274280
calc.AddTimestamp(time.Now())
275281

276282
entryTimestamp, entryLine := parseKubernetesLog(line)
277-
if !entryTimestamp.After(lastReadTime) {
283+
// Skip only if the timestamp is strictly before lastReadTime.
284+
// This allows multiple log lines with the same timestamp to be processed,
285+
// which is common in Windows containers that log rapidly.
286+
if entryTimestamp.Before(lastReadTime) {
278287
continue
279288
}
280289
lastReadTime = entryTimestamp

internal/component/loki/source/kubernetes/kubetail/tailer_test.go

Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,58 @@
11
package kubetail
22

33
import (
4+
"context"
5+
"strings"
46
"testing"
57
"time"
68

9+
"github.com/grafana/alloy/internal/component/common/loki"
10+
"github.com/grafana/alloy/internal/component/common/loki/positions"
11+
"github.com/prometheus/prometheus/model/labels"
712
"github.com/stretchr/testify/require"
813
)
914

15+
// mockPositions is a no-op implementation of positions.Positions for testing.
16+
type mockPositions struct{}
17+
18+
func (m *mockPositions) GetString(path, labels string) string { return "" }
19+
20+
func (m *mockPositions) Get(path, labels string) (int64, error) { return 0, nil }
21+
22+
func (m *mockPositions) PutString(path, labels, pos string) {}
23+
24+
func (m *mockPositions) Put(path, labels string, pos int64) {}
25+
26+
func (m *mockPositions) Remove(path, labels string) {}
27+
28+
func (m *mockPositions) Stop() {}
29+
30+
func (m *mockPositions) SyncPeriod() time.Duration { return 0 }
31+
32+
func (m *mockPositions) WatchConfig(cfg positions.Config) {}
33+
34+
// mockEntryHandler is a simple implementation of loki.EntryHandler for testing.
35+
type mockEntryHandler struct {
36+
ch chan loki.Entry
37+
}
38+
39+
func newMockEntryHandler() *mockEntryHandler {
40+
return &mockEntryHandler{
41+
ch: make(chan loki.Entry, 100),
42+
}
43+
}
44+
45+
func (m *mockEntryHandler) Chan() chan<- loki.Entry { return m.ch }
46+
47+
func (m *mockEntryHandler) Stop() {}
48+
49+
// mockReadCloser wraps a strings.Reader to provide io.ReadCloser interface.
50+
type mockReadCloser struct {
51+
*strings.Reader
52+
}
53+
54+
func (m *mockReadCloser) Close() error { return nil }
55+
1056
func Test_parseKubernetesLog(t *testing.T) {
1157
tt := []struct {
1258
inputLine string
@@ -42,3 +88,132 @@ func Test_parseKubernetesLog(t *testing.T) {
4288
})
4389
}
4490
}
91+
92+
func Test_processLogStream_duplicateTimestamps(t *testing.T) {
93+
baseTime := time.Date(2023, time.January, 23, 17, 0, 10, 0, time.UTC)
94+
95+
tt := []struct {
96+
name string
97+
logLines []string
98+
lastReadTime time.Time
99+
expectLines []string
100+
}{
101+
{
102+
name: "duplicate timestamps are not discarded",
103+
logLines: []string{
104+
"2023-01-23T17:00:10Z line1\n",
105+
"2023-01-23T17:00:10Z line2\n",
106+
"2023-01-23T17:00:10Z line3\n",
107+
},
108+
lastReadTime: baseTime.Add(-1 * time.Second), // Before all entries
109+
expectLines: []string{"line1\n", "line2\n", "line3\n"},
110+
},
111+
{
112+
name: "entries before lastReadTime are discarded",
113+
logLines: []string{
114+
"2023-01-23T17:00:09Z old_line\n",
115+
"2023-01-23T17:00:10Z line1\n",
116+
"2023-01-23T17:00:11Z line2\n",
117+
},
118+
lastReadTime: baseTime, // Equal to second entry
119+
expectLines: []string{"line1\n", "line2\n"},
120+
},
121+
{
122+
name: "entries equal to lastReadTime are included",
123+
logLines: []string{
124+
"2023-01-23T17:00:10Z line1\n",
125+
"2023-01-23T17:00:10Z line2\n",
126+
"2023-01-23T17:00:11Z line3\n",
127+
},
128+
lastReadTime: baseTime, // Equal to first two entries
129+
expectLines: []string{"line1\n", "line2\n", "line3\n"},
130+
},
131+
{
132+
name: "mixed timestamps with duplicates",
133+
logLines: []string{
134+
"2023-01-23T17:00:08Z old1\n",
135+
"2023-01-23T17:00:09Z old2\n",
136+
"2023-01-23T17:00:10Z line1\n",
137+
"2023-01-23T17:00:10Z line2\n",
138+
"2023-01-23T17:00:11Z line3\n",
139+
"2023-01-23T17:00:11Z line4\n",
140+
},
141+
lastReadTime: baseTime,
142+
expectLines: []string{"line1\n", "line2\n", "line3\n", "line4\n"},
143+
},
144+
{
145+
name: "all entries have same timestamp",
146+
logLines: []string{
147+
"2023-01-23T17:00:10Z line1\n",
148+
"2023-01-23T17:00:10Z line2\n",
149+
"2023-01-23T17:00:10Z line3\n",
150+
"2023-01-23T17:00:10Z line4\n",
151+
},
152+
lastReadTime: baseTime,
153+
expectLines: []string{"line1\n", "line2\n", "line3\n", "line4\n"},
154+
},
155+
}
156+
157+
for _, tc := range tt {
158+
t.Run(tc.name, func(t *testing.T) {
159+
// Create a mock tailer with minimal setup
160+
lset := labels.FromStrings(
161+
LabelPodNamespace, "default",
162+
LabelPodName, "test-pod",
163+
LabelPodContainerName, "test-container",
164+
LabelPodUID, "test-uid-123",
165+
"test", "value",
166+
)
167+
target := NewTarget(lset, lset)
168+
opts := &Options{
169+
Positions: &mockPositions{},
170+
}
171+
tailer := &tailer{
172+
target: target,
173+
lset: newLabelSet(target.Labels()),
174+
opts: opts,
175+
}
176+
177+
// Create a stream from the log lines
178+
logData := strings.Join(tc.logLines, "")
179+
stream := &mockReadCloser{strings.NewReader(logData)}
180+
181+
// Create a mock handler
182+
handler := newMockEntryHandler()
183+
184+
// Create a mock positions entry
185+
positionsEnt := positions.Entry{}
186+
187+
// Create a rolling average calculator
188+
calc := newRollingAverageCalculator(10000, 100, 2*time.Second, 1*time.Hour)
189+
190+
// Create a context with timeout
191+
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
192+
defer cancel()
193+
194+
// Process the log stream in a goroutine
195+
go func() {
196+
_ = tailer.processLogStream(ctx, stream, handler, tc.lastReadTime, positionsEnt, calc)
197+
}()
198+
199+
// Collect all entries
200+
var receivedLines []string
201+
timeout := time.After(500 * time.Millisecond)
202+
203+
collectLoop:
204+
for {
205+
select {
206+
case entry := <-handler.ch:
207+
receivedLines = append(receivedLines, entry.Line)
208+
if len(receivedLines) == len(tc.expectLines) {
209+
break collectLoop
210+
}
211+
case <-timeout:
212+
break collectLoop
213+
}
214+
}
215+
216+
require.Equal(t, tc.expectLines, receivedLines, "received lines should match expected lines")
217+
})
218+
}
219+
}

0 commit comments

Comments
 (0)