Skip to content

Commit de987cd

Browse files
authored
Restore done callback in log input plugins (#1739)
1 parent 4b3a200 commit de987cd

File tree

4 files changed

+26
-16
lines changed

4 files changed

+26
-16
lines changed

plugins/inputs/logfile/tailersrc.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ func (le LogEvent) Time() time.Time {
4747
}
4848

4949
func (le LogEvent) Done() {
50+
le.RangeQueue().Enqueue(le.Range())
5051
}
5152

5253
func (le LogEvent) Range() state.Range {

plugins/inputs/logfile/tailersrc_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ func TestTailerSrc(t *testing.T) {
156156
assert.Eventually(t, func() bool { return tail.OpenFileCount.Load() <= beforeCount }, 3*time.Second, time.Second)
157157
}
158158

159-
func TestStatefulLogEvent(t *testing.T) {
159+
func TestEventDoneCallback(t *testing.T) {
160160
original := multilineWaitPeriod
161161
defer resetState(original)
162162

@@ -217,7 +217,7 @@ func TestStatefulLogEvent(t *testing.T) {
217217
}
218218
sle, ok := evt.(logs.StatefulLogEvent)
219219
assert.True(t, ok)
220-
sle.RangeQueue().Enqueue(sle.Range())
220+
sle.Done()
221221
i++
222222
switch i {
223223
case 10:

plugins/inputs/windows_event_log/wineventlog/wineventlog.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -254,10 +254,6 @@ func (w *windowsEventLog) SetEventOffset(eventOffset uint64) {
254254
w.eventOffset = eventOffset
255255
}
256256

257-
func (w *windowsEventLog) Done(offset state.Range) {
258-
w.stateManager.Enqueue(offset)
259-
}
260-
261257
func (w *windowsEventLog) ResubscribeCh() chan struct{} {
262258
return w.resubscribeCh
263259
}
@@ -313,6 +309,7 @@ func (le LogEvent) Time() time.Time {
313309
}
314310

315311
func (le LogEvent) Done() {
312+
le.RangeQueue().Enqueue(le.Range())
316313
}
317314

318315
func (le LogEvent) Range() state.Range {

plugins/outputs/cloudwatchlogs/internal/pusher/batch_test.go

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,14 @@ func newMockEntityProvider(entity *cloudwatchlogs.Entity) *mockEntityProvider {
3333
return ep
3434
}
3535

36+
type mockDoneCallback struct {
37+
mock.Mock
38+
}
39+
40+
func (m *mockDoneCallback) Done() {
41+
m.Called()
42+
}
43+
3644
func TestLogEvent(t *testing.T) {
3745
now := time.Now()
3846
e := newLogEvent(now, "test message", nil)
@@ -165,10 +173,8 @@ func TestLogEventBatch(t *testing.T) {
165173
t.Run("WithStatefulLogEvents", func(t *testing.T) {
166174
batch := newLogEventBatch(Target{Group: "G", Stream: "S"}, nil)
167175

168-
callbackCalled := false
169-
callback := func() {
170-
callbackCalled = true
171-
}
176+
mdc1 := &mockDoneCallback{}
177+
mdc1.On("Done").Panic("should not be called")
172178

173179
mrq1 := &mockRangeQueue{}
174180
mrq1.On("ID").Return("test")
@@ -178,25 +184,31 @@ func TestLogEventBatch(t *testing.T) {
178184
mrq2.On("ID").Return("test2")
179185
mrq2.On("Enqueue", state.NewRange(5, 20)).Once()
180186

181-
event1 := newStatefulLogEvent(time.Now(), "Test", callback, &logEventState{
187+
event1 := newStatefulLogEvent(time.Now(), "Test", mdc1.Done, &logEventState{
182188
r: state.NewRange(20, 40),
183189
queue: mrq1,
184190
})
185-
event2 := newStatefulLogEvent(time.Now(), "Test2", callback, &logEventState{
191+
event2 := newStatefulLogEvent(time.Now(), "Test2", mdc1.Done, &logEventState{
186192
r: state.NewRange(5, 20),
187193
queue: mrq2,
188194
})
189-
event3 := newStatefulLogEvent(time.Now(), "Test3", callback, &logEventState{
195+
event3 := newStatefulLogEvent(time.Now(), "Test3", mdc1.Done, &logEventState{
190196
r: state.NewRange(40, 50),
191197
queue: mrq1,
192198
})
199+
200+
mdc2 := &mockDoneCallback{}
201+
mdc2.On("Done").Return().Once()
202+
event4 := newLogEvent(time.Now(), "Test2", mdc2.Done)
193203
batch.append(event1)
194204
batch.append(event2)
195205
batch.append(event3)
206+
batch.append(event4)
196207
batch.done()
197208

198-
mrq1.AssertNumberOfCalls(t, "Enqueue", 1)
199-
mrq2.AssertNumberOfCalls(t, "Enqueue", 1)
200-
assert.False(t, callbackCalled, "Done callback should not have been called")
209+
mrq1.AssertExpectations(t)
210+
mrq2.AssertExpectations(t)
211+
mdc1.AssertNotCalled(t, "Done")
212+
mdc2.AssertExpectations(t)
201213
})
202214
}

0 commit comments

Comments
 (0)