Skip to content

Commit 73cd70b

Browse files
QuentinIdailinsubjam
authored andcommitted
Add a log debouncer to op-service.log package (#259)
(cherry picked from commit 788d28f)
1 parent 52bdc11 commit 73cd70b

File tree

3 files changed

+333
-2
lines changed

3 files changed

+333
-2
lines changed

op-service/log/cli.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,8 @@ func NewLogHandler(wr io.Writer, cfg CLIConfig) slog.Handler {
234234
// The log handler of the logger is a LvlSetter, i.e. the log level can be changed as needed.
235235
func NewLogger(wr io.Writer, cfg CLIConfig) log.Logger {
236236
h := NewLogHandler(wr, cfg)
237-
l := log.NewLogger(h)
237+
debounced := NewDebouncingHandler(h)
238+
l := log.NewLogger(debounced)
238239
if cfg.Pid {
239240
l = l.With("pid", os.Getpid())
240241
}
@@ -247,7 +248,8 @@ func NewLogger(wr io.Writer, cfg CLIConfig) log.Logger {
247248
// Geth and other components may use the global logger however,
248249
// and it is thus recommended to set the global log handler to catch these logs.
249250
func SetGlobalLogHandler(h slog.Handler) {
250-
l := log.NewLogger(h)
251+
debounced := NewDebouncingHandler(h)
252+
l := log.NewLogger(debounced)
251253
ctx := logfilter.AddLogAttrToContext(context.Background(), "global", true)
252254
l.SetContext(ctx)
253255
log.SetDefault(l)

op-service/log/debouncer.go

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
package log
2+
3+
import (
4+
"context"
5+
"log/slog"
6+
"sync/atomic"
7+
"time"
8+
9+
lru "github.com/hashicorp/golang-lru/v2"
10+
)
11+
12+
const (
13+
// DebounceDuration is the time window during which duplicate messages are suppressed
14+
DebounceDuration = 100 * time.Millisecond
15+
// DebounceTickerInterval is how often we check and report debounced message counts
16+
DebounceTickerInterval = 5 * time.Second
17+
// DebounceWarningMessage is the message logged when messages have been debounced
18+
DebounceWarningMessage = "Some messages were debounced"
19+
)
20+
21+
type DebounchingHandler struct {
22+
handler slog.Handler
23+
messages *lru.Cache[string, time.Time]
24+
counter atomic.Uint64
25+
ticker *time.Ticker
26+
}
27+
28+
func NewDebouncingHandler(handler slog.Handler) *DebounchingHandler {
29+
messages, _ := lru.New[string, time.Time](1024)
30+
return &DebounchingHandler{
31+
handler: handler,
32+
messages: messages,
33+
ticker: time.NewTicker(DebounceTickerInterval),
34+
}
35+
}
36+
37+
func (h *DebounchingHandler) Enabled(ctx context.Context, lvl slog.Level) bool {
38+
return h.handler.Enabled(ctx, lvl)
39+
}
40+
41+
func (h *DebounchingHandler) Handle(ctx context.Context, record slog.Record) error {
42+
select {
43+
case <-h.ticker.C:
44+
cntr := h.counter.Load()
45+
h.counter.Store(0)
46+
47+
if cntr > 0 {
48+
warningRecord := slog.NewRecord(time.Now(), slog.LevelWarn, DebounceWarningMessage, 0)
49+
warningRecord.Add("nDebounced", cntr)
50+
err := h.handler.Handle(ctx, warningRecord)
51+
if err != nil {
52+
return err
53+
}
54+
}
55+
56+
default:
57+
}
58+
59+
if last, ok := h.messages.Get(record.Message); ok && time.Since(last) < DebounceDuration {
60+
h.counter.Add(1)
61+
return nil
62+
}
63+
h.messages.Add(record.Message, time.Now())
64+
65+
return h.handler.Handle(ctx, record)
66+
}
67+
68+
func (h *DebounchingHandler) WithAttrs(attrs []slog.Attr) slog.Handler {
69+
return NewDebouncingHandler(h.handler.WithAttrs(attrs))
70+
}
71+
72+
func (h *DebounchingHandler) WithGroup(name string) slog.Handler {
73+
return NewDebouncingHandler(h.handler.WithGroup(name))
74+
}

op-service/log/debouncer_test.go

Lines changed: 255 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,255 @@
1+
package log
2+
3+
import (
4+
"context"
5+
"log/slog"
6+
"sync"
7+
"testing"
8+
"time"
9+
10+
"github.com/stretchr/testify/require"
11+
12+
"github.com/ethereum/go-ethereum/log"
13+
)
14+
15+
// safeTestRecorder is a thread-safe version of testRecorder for concurrent tests
16+
type safeTestRecorder struct {
17+
mu sync.Mutex
18+
records []slog.Record
19+
}
20+
21+
func (r *safeTestRecorder) Enabled(context.Context, slog.Level) bool {
22+
return true
23+
}
24+
25+
func (r *safeTestRecorder) Handle(_ context.Context, rec slog.Record) error {
26+
r.mu.Lock()
27+
defer r.mu.Unlock()
28+
r.records = append(r.records, rec)
29+
return nil
30+
}
31+
32+
func (r *safeTestRecorder) WithAttrs([]slog.Attr) slog.Handler { return r }
33+
func (r *safeTestRecorder) WithGroup(string) slog.Handler { return r }
34+
35+
func (r *safeTestRecorder) GetRecords() []slog.Record {
36+
r.mu.Lock()
37+
defer r.mu.Unlock()
38+
// Return a copy to avoid race conditions
39+
result := make([]slog.Record, len(r.records))
40+
copy(result, r.records)
41+
return result
42+
}
43+
44+
func (r *safeTestRecorder) Len() int {
45+
r.mu.Lock()
46+
defer r.mu.Unlock()
47+
return len(r.records)
48+
}
49+
50+
func TestDebouncingHandler_Basic(t *testing.T) {
51+
h := new(testRecorder)
52+
d := NewDebouncingHandler(h)
53+
logger := log.NewLogger(d)
54+
55+
// First message should go through
56+
logger.Info("hello world")
57+
require.Len(t, h.records, 1)
58+
require.Equal(t, "hello world", h.records[0].Message)
59+
60+
// Same message within 100ms should be dropped
61+
logger.Info("hello world")
62+
require.Len(t, h.records, 1)
63+
64+
// Different message should go through
65+
logger.Info("different message")
66+
require.Len(t, h.records, 2)
67+
require.Equal(t, "different message", h.records[1].Message)
68+
69+
// Wait for debounce period to expire
70+
time.Sleep(DebounceDuration + 1*time.Millisecond)
71+
72+
// Same message should now go through again
73+
logger.Info("hello world")
74+
require.Len(t, h.records, 3)
75+
require.Equal(t, "hello world", h.records[2].Message)
76+
}
77+
78+
func TestDebouncingHandler_MultipleMessages(t *testing.T) {
79+
h := new(testRecorder)
80+
d := NewDebouncingHandler(h)
81+
logger := log.NewLogger(d)
82+
83+
// Send multiple different messages
84+
messages := []string{"msg1", "msg2", "msg3", "msg4", "msg5"}
85+
for _, msg := range messages {
86+
logger.Info(msg)
87+
}
88+
require.Len(t, h.records, len(messages))
89+
90+
// Try to resend them immediately - all should be dropped
91+
for _, msg := range messages {
92+
logger.Info(msg)
93+
}
94+
require.Len(t, h.records, len(messages))
95+
96+
// Wait for debounce period
97+
time.Sleep(DebounceDuration + 1*time.Millisecond)
98+
99+
// Now they should all go through again
100+
for _, msg := range messages {
101+
logger.Info(msg)
102+
}
103+
require.Len(t, h.records, 2*len(messages))
104+
}
105+
106+
func TestDebouncingHandler_CacheEviction(t *testing.T) {
107+
h := new(testRecorder)
108+
d := NewDebouncingHandler(h)
109+
logger := log.NewLogger(d)
110+
111+
// Generate more than 1024 unique messages to trigger LRU eviction
112+
const numMessages = 1100
113+
for i := range numMessages {
114+
logger.Info(slog.IntValue(i).String())
115+
}
116+
require.Len(t, h.records, numMessages)
117+
118+
// The earliest messages should have been evicted from cache
119+
// So they should go through again without waiting
120+
logger.Info(slog.IntValue(0).String())
121+
require.Len(t, h.records, numMessages+1)
122+
123+
// Recent messages should still be debounced
124+
logger.Info(slog.IntValue(numMessages - 1).String())
125+
require.Len(t, h.records, numMessages+1)
126+
}
127+
128+
func TestDebouncingHandler_SameMessageDifferentAttrs(t *testing.T) {
129+
h := new(testRecorder)
130+
d := NewDebouncingHandler(h)
131+
logger := log.NewLogger(d)
132+
133+
// Log message with one set of attributes
134+
logger.Info("same message", "key1", "value1", "key2", "value2")
135+
require.Len(t, h.records, 1)
136+
require.Equal(t, "same message", h.records[0].Message)
137+
138+
// Same message with different attributes should still be debounced
139+
logger.Info("same message", "key3", "value3", "key4", "value4")
140+
require.Len(t, h.records, 1)
141+
142+
// Same message with no attributes should still be debounced
143+
logger.Info("same message")
144+
require.Len(t, h.records, 1)
145+
146+
// Same message with partially overlapping attributes should still be debounced
147+
logger.Info("same message", "key1", "different_value", "key5", "value5")
148+
require.Len(t, h.records, 1)
149+
150+
// Wait for debounce period
151+
time.Sleep(DebounceDuration + 1*time.Millisecond)
152+
153+
// Now the same message with any attributes should go through
154+
logger.Info("same message", "totally", "new", "attrs", "here")
155+
require.Len(t, h.records, 2)
156+
require.Equal(t, "same message", h.records[1].Message)
157+
}
158+
159+
func TestDebouncingHandler_TickerWarning(t *testing.T) {
160+
h := new(testRecorder)
161+
d := NewDebouncingHandler(h)
162+
logger := log.NewLogger(d)
163+
164+
// Send initial message
165+
logger.Info("test message 1")
166+
require.Len(t, h.records, 1)
167+
require.Equal(t, "test message 1", h.records[0].Message)
168+
169+
// Trigger several debounced messages
170+
for i := 0; i < 10; i++ {
171+
logger.Info("test message 1")
172+
}
173+
// Still only the first message
174+
require.Len(t, h.records, 1)
175+
176+
// Send another unique message and debounce it
177+
logger.Info("test message 2")
178+
require.Len(t, h.records, 2)
179+
for i := 0; i < 5; i++ {
180+
logger.Info("test message 2")
181+
}
182+
183+
// Wait for ticker to fire (5 seconds)
184+
time.Sleep(DebounceTickerInterval + 100*time.Millisecond)
185+
186+
// Send a new message to trigger the ticker check
187+
logger.Info("trigger ticker check")
188+
189+
// Should have: original 2 messages, warning about debounced messages, and the trigger message
190+
require.Len(t, h.records, 4)
191+
require.Equal(t, "test message 1", h.records[0].Message)
192+
require.Equal(t, "test message 2", h.records[1].Message)
193+
require.Equal(t, DebounceWarningMessage, h.records[2].Message)
194+
require.Equal(t, "trigger ticker check", h.records[3].Message)
195+
196+
// Check that the warning record has the debounced count
197+
warningRecord := h.records[2]
198+
hasDebounceCount := false
199+
warningRecord.Attrs(func(attr slog.Attr) bool {
200+
if attr.Key == "nDebounced" {
201+
require.Equal(t, uint64(15), attr.Value.Uint64()) // 10 + 5 debounced messages
202+
hasDebounceCount = true
203+
}
204+
return true
205+
})
206+
require.True(t, hasDebounceCount, "Warning should contain nDebounced attribute")
207+
208+
// Counter should be reset, so debouncing more messages starts fresh
209+
for i := 0; i < 3; i++ {
210+
logger.Info("trigger ticker check")
211+
}
212+
// No new messages should be logged (they're debounced)
213+
require.Len(t, h.records, 4)
214+
215+
// Wait for ticker again
216+
time.Sleep(DebounceTickerInterval + 100*time.Millisecond)
217+
218+
// Trigger ticker check
219+
logger.Info("final message")
220+
221+
// Should have another warning for the 3 newly debounced messages
222+
require.Len(t, h.records, 6)
223+
require.Equal(t, DebounceWarningMessage, h.records[4].Message)
224+
require.Equal(t, "final message", h.records[5].Message)
225+
226+
// Check the second warning has count of 3
227+
secondWarning := h.records[4]
228+
secondWarning.Attrs(func(attr slog.Attr) bool {
229+
if attr.Key == "nDebounced" {
230+
require.Equal(t, uint64(3), attr.Value.Uint64())
231+
}
232+
return true
233+
})
234+
}
235+
236+
func TestDebouncingHandler_Concurrent(t *testing.T) {
237+
h := new(safeTestRecorder)
238+
d := NewDebouncingHandler(h)
239+
logger := log.NewLogger(d)
240+
241+
const numGoroutines = 10
242+
var wg sync.WaitGroup
243+
wg.Add(numGoroutines)
244+
245+
for i := range numGoroutines {
246+
go func(id int) {
247+
defer wg.Done()
248+
logger.Info("hello")
249+
}(i)
250+
}
251+
252+
wg.Wait()
253+
254+
require.Equal(t, h.Len(), 1, "Should have debounced duplicate messages")
255+
}

0 commit comments

Comments
 (0)