Skip to content

Commit 4b939b1

Browse files
twmbclaude
andcommitted
kgo: add ring logger and chaos connection test infrastructure
Add two test utilities for debugging produce-related issues: ringLogger wraps a Logger, capturing all entries (including DEBUG) in a circular buffer while only forwarding INFO+ in real time. Call flush() to dump the full backlog when a test detects a problem, giving complete DEBUG context without the noise on every run. Supports automatic flushing on pattern matches via shouldFlush (currently empty; add needle strings as needed for future investigations). chaosDialer wraps connections with a random 500ms-1500ms lifetime, forcing connection deaths that stress reconnection, produce retries, and sequence number handling. Enabled via KGO_TEST_CHAOS=1 on the TestGroupETL producer and ETL consumers. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 4d5b7d9 commit 4b939b1

File tree

2 files changed

+158
-2
lines changed

2 files changed

+158
-2
lines changed

pkg/kgo/group_test.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,12 +38,16 @@ func TestGroupETL(t *testing.T) {
3838
////////////////////
3939

4040
go func() {
41-
cl, _ := newTestClient(
41+
producerOpts := []Opt{
4242
WithLogger(BasicLogger(os.Stderr, testLogLevel, nil)),
4343
MaxBufferedRecords(10000),
4444
MaxBufferedBytes(50000),
4545
UnknownTopicRetries(-1), // see txn_test comment
46-
)
46+
}
47+
if testChaos {
48+
producerOpts = append(producerOpts, Dialer(chaosDialer{}.DialContext))
49+
}
50+
cl, _ := newTestClient(producerOpts...)
4751
defer cl.Close()
4852

4953
offsets := make(map[int32]int64)
@@ -180,6 +184,9 @@ func (c *testConsumer) etl(etlsBeforeQuit int) {
180184
ctx848 := context.WithValue(context.Background(), "opt_in_kafka_next_gen_balancer_beta", true) //nolint:revive,staticcheck // intentional string key for beta opt-in
181185
opts = append(opts, WithContext(ctx848))
182186
}
187+
if testChaos {
188+
opts = append(opts, Dialer(chaosDialer{}.DialContext))
189+
}
183190

184191
cl, _ := newTestClient(opts...)
185192
defer c.leaveGroupStatic(adm, myInstanceID)

pkg/kgo/helpers_test.go

Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,10 @@ import (
88
"encoding/hex"
99
"errors"
1010
"fmt"
11+
"math/rand/v2"
1112
"net"
1213
"os"
14+
"slices"
1315
"sort"
1416
"strconv"
1517
"strings"
@@ -27,6 +29,8 @@ import (
2729

2830
var errSkipChecks848 = errors.New("848 stale commit; skip checks")
2931

32+
var testChaos = os.Getenv("KGO_TEST_CHAOS") == "1"
33+
3034
var (
3135
adm *Client
3236
testrf = 3
@@ -260,6 +264,151 @@ func testLogger() Logger {
260264
})
261265
}
262266

267+
// ringLogger captures all log entries (including DEBUG) in a ring buffer but
268+
// only forwards INFO+ to the underlying logger in real time. Call flush() to
269+
// dump the entire backlog when a test detects a problem (e.g. duplicate
270+
// offset). This gives you full DEBUG context leading up to the event without
271+
// the noise of DEBUG logging on every run.
272+
//
273+
// shouldFlush can also trigger automatic flushing when specific log patterns
274+
// are detected. To enable, add needle strings to the shouldFlush method
275+
// (e.g. "OUT_OF_ORDER_SEQUENCE_NUMBER"). When a match is found, the entire
276+
// backlog is dumped first, then the triggering message, so the output shows
277+
// full context leading up to the event.
278+
type ringLogEntry struct {
279+
level LogLevel
280+
msg string
281+
keyvals []any
282+
}
283+
284+
type ringLogger struct {
285+
mu sync.Mutex
286+
buf []ringLogEntry
287+
size int
288+
pos int // next write position in circular buffer
289+
full bool // whether the buffer has wrapped around
290+
real Logger
291+
}
292+
293+
func newRingLogger(real Logger, size int) *ringLogger {
294+
return &ringLogger{buf: make([]ringLogEntry, size), size: size, real: real}
295+
}
296+
297+
func (r *ringLogger) Level() LogLevel { return LogLevelDebug }
298+
299+
func (r *ringLogger) Log(level LogLevel, msg string, keyvals ...any) {
300+
r.mu.Lock()
301+
defer r.mu.Unlock()
302+
303+
// If this message triggers a flush, dump the entire backlog first
304+
// so the historical context appears before the triggering message.
305+
if r.shouldFlush(msg, keyvals) {
306+
r.flushLocked()
307+
r.real.Log(level, msg, keyvals...)
308+
return
309+
}
310+
311+
r.buf[r.pos] = ringLogEntry{level, msg, slices.Clone(keyvals)}
312+
r.pos++
313+
if r.pos >= r.size {
314+
r.pos = 0
315+
r.full = true
316+
}
317+
318+
// Forward INFO+ to the real logger immediately.
319+
if level <= r.real.Level() {
320+
r.real.Log(level, msg, keyvals...)
321+
}
322+
}
323+
324+
func (r *ringLogger) shouldFlush(msg string, keyvals []any) bool {
325+
// Add needle strings here to auto-flush on specific log patterns.
326+
// Check both msg and keyvals, since error text can appear in either
327+
// depending on the code path.
328+
for _, needle := range []string{
329+
// e.g. "OUT_OF_ORDER_SEQUENCE_NUMBER",
330+
} {
331+
if strings.Contains(msg, needle) {
332+
return true
333+
}
334+
for _, kv := range keyvals {
335+
if s, ok := kv.(fmt.Stringer); ok && strings.Contains(s.String(), needle) {
336+
return true
337+
}
338+
if s, ok := kv.(error); ok && strings.Contains(s.Error(), needle) {
339+
return true
340+
}
341+
}
342+
}
343+
return false
344+
}
345+
346+
// flush dumps all buffered entries in order. Call this from test code when an
347+
// interesting event (e.g. duplicate offset) is detected.
348+
func (r *ringLogger) flush() {
349+
r.mu.Lock()
350+
defer r.mu.Unlock()
351+
r.flushLocked()
352+
}
353+
354+
func (r *ringLogger) flushLocked() {
355+
if r.full {
356+
for i := r.pos; i < r.size; i++ {
357+
r.real.Log(r.buf[i].level, "[BACKLOG] "+r.buf[i].msg, r.buf[i].keyvals...)
358+
}
359+
}
360+
for i := range r.pos {
361+
r.real.Log(r.buf[i].level, "[BACKLOG] "+r.buf[i].msg, r.buf[i].keyvals...)
362+
}
363+
r.pos = 0
364+
r.full = false
365+
}
366+
367+
// chaosConn and chaosDialer inject random connection deaths into tests.
368+
// Each connection lives for 500ms-1500ms before reads/writes start returning
369+
// errors, forcing the client to reconnect. This stresses connection lifecycle
370+
// code paths: loadConnection races, broker object reuse, produce retries,
371+
// sequence number rewinding, and idempotent ordering guarantees.
372+
//
373+
// Enable with KGO_TEST_CHAOS=1:
374+
//
375+
// KGO_TEST_CHAOS=1 go test -run TestGroupETL -count=50 -timeout 60m
376+
type chaosConn struct {
377+
net.Conn
378+
deadline time.Time
379+
once sync.Once
380+
}
381+
382+
func (c *chaosConn) Read(p []byte) (int, error) {
383+
if time.Now().After(c.deadline) {
384+
c.once.Do(func() { c.Conn.Close() })
385+
return 0, net.ErrClosed
386+
}
387+
return c.Conn.Read(p)
388+
}
389+
390+
func (c *chaosConn) Write(p []byte) (int, error) {
391+
if time.Now().After(c.deadline) {
392+
c.once.Do(func() { c.Conn.Close() })
393+
return 0, net.ErrClosed
394+
}
395+
return c.Conn.Write(p)
396+
}
397+
398+
type chaosDialer struct{}
399+
400+
func (chaosDialer) DialContext(ctx context.Context, network, host string) (net.Conn, error) {
401+
c, err := (&net.Dialer{}).DialContext(ctx, network, host)
402+
if err != nil {
403+
return nil, err
404+
}
405+
lifetime := 500*time.Millisecond + time.Duration(rand.IntN(1000))*time.Millisecond
406+
return &chaosConn{
407+
Conn: c,
408+
deadline: time.Now().Add(lifetime),
409+
}, nil
410+
}
411+
263412
var randsha = func() func() string {
264413
var mu sync.Mutex
265414
last := time.Now().UnixNano()

0 commit comments

Comments
 (0)