Skip to content

Commit 175c77f

Browse files
authored
Merge pull request #1257 from twmb/examples
examples: add testing with kfake example
2 parents 6999fb8 + 84bc833 commit 175c77f

File tree

4 files changed

+326
-0
lines changed

4 files changed

+326
-0
lines changed

examples/README.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,20 @@ Registers a schema, produces encoded records, and decodes them on consume using
146146
- `-topic` - topic to produce to and consume from
147147
- `-registry` - schema registry URL (default localhost:8081)
148148

149+
## testing_with_kfake
150+
151+
Demonstrates `kfake`, a fake in-memory Kafka cluster for testing without a
152+
real broker. Shows three control function patterns for intercepting, observing,
153+
and coordinating Kafka protocol requests. Unlike other examples, this one runs
154+
entirely in-process with no external dependencies.
155+
156+
- `-mode inject-error` - one-shot `ControlKey` to inject a produce error; kgo
157+
retries and succeeds because the control is consumed after one interception (default)
158+
- `-mode observe` - `KeepControl` to persistently count every request type
159+
during a produce-and-consume cycle without intercepting
160+
- `-mode sleep` - `SleepControl` with `SleepOutOfOrder` to delay a fetch until
161+
a produce arrives, coordinating between connections
162+
149163
## transactions
150164

151165
Demonstrates Kafka transactions in two modes.

examples/testing_with_kfake/go.mod

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
module testing_with_kfake
2+
3+
go 1.25.0
4+
5+
require (
6+
github.com/twmb/franz-go v1.20.6
7+
github.com/twmb/franz-go/pkg/kfake v0.0.0-20260218055430-fc72d8313608
8+
github.com/twmb/franz-go/pkg/kmsg v1.12.0
9+
)
10+
11+
require (
12+
github.com/klauspost/compress v1.18.4 // indirect
13+
github.com/pierrec/lz4/v4 v4.1.25 // indirect
14+
golang.org/x/crypto v0.48.0 // indirect
15+
)

examples/testing_with_kfake/go.sum

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
github.com/klauspost/compress v1.18.4 h1:RPhnKRAQ4Fh8zU2FY/6ZFDwTVTxgJ/EMydqSTzE9a2c=
2+
github.com/klauspost/compress v1.18.4/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4=
3+
github.com/pierrec/lz4/v4 v4.1.25 h1:kocOqRffaIbU5djlIBr7Wh+cx82C0vtFb0fOurZHqD0=
4+
github.com/pierrec/lz4/v4 v4.1.25/go.mod h1:EoQMVJgeeEOMsCqCzqFm2O0cJvljX2nGZjcRIPL34O4=
5+
github.com/twmb/franz-go v1.20.6 h1:TpQTt4QcixJ1cHEmQGPOERvTzo99s8jAutmS7rbSD6w=
6+
github.com/twmb/franz-go v1.20.6/go.mod h1:u+FzH2sInp7b9HNVv2cZN8AxdXy6y/AQ1Bkptu4c0FM=
7+
github.com/twmb/franz-go/pkg/kadm v1.17.1 h1:Bt02Y/RLgnFO2NP2HVP1kd2TFtGRiJZx+fSArjZDtpw=
8+
github.com/twmb/franz-go/pkg/kadm v1.17.1/go.mod h1:s4duQmrDbloVW9QTMXhs6mViTepze7JLG43xwPcAeTg=
9+
github.com/twmb/franz-go/pkg/kfake v0.0.0-20260218055430-fc72d8313608 h1:UlexSOBHuhhBdA6NNPzaFrLLvoe7qgo8/xI+2FZKuwQ=
10+
github.com/twmb/franz-go/pkg/kfake v0.0.0-20260218055430-fc72d8313608/go.mod h1:u6MCLKYQtF7DP1d3pFjohpY0G+dUEUSdmC2JZt9F84U=
11+
github.com/twmb/franz-go/pkg/kmsg v1.12.0 h1:CbatD7ers1KzDNgJqPbKOq0Bz/WLBdsTH75wgzeVaPc=
12+
github.com/twmb/franz-go/pkg/kmsg v1.12.0/go.mod h1:+DPt4NC8RmI6hqb8G09+3giKObE6uD2Eya6CfqBpeJY=
13+
golang.org/x/crypto v0.48.0 h1:/VRzVqiRSggnhY7gNRxPauEQ5Drw9haKdM0jqfcCFts=
14+
golang.org/x/crypto v0.48.0/go.mod h1:r0kV5h3qnFPlQnBSrULhlsRfryS2pmewsg+XfMgkVos=
Lines changed: 283 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,283 @@
1+
// This example demonstrates kfake, a fake in-memory Kafka cluster for
2+
// testing without a real broker. kfake provides control functions that let
3+
// you intercept, observe, modify, or delay any Kafka protocol request.
4+
//
5+
// Three modes show different control function patterns:
6+
//
7+
// - inject-error: one-shot ControlKey to inject a produce error; kgo retries
8+
// and succeeds because the control is consumed after one interception
9+
// - observe: KeepControl to persistently count every request type flowing
10+
// through the cluster during a produce-and-consume cycle
11+
// - sleep: SleepControl with SleepOutOfOrder to coordinate between requests,
12+
// delaying a fetch response until a produce arrives
13+
//
14+
// Unlike other examples in this directory, this one does not require a running
15+
// Kafka cluster - everything runs in-process.
16+
package main
17+
18+
import (
19+
"context"
20+
"flag"
21+
"fmt"
22+
"os"
23+
"sort"
24+
"sync"
25+
"sync/atomic"
26+
"time"
27+
28+
"github.com/twmb/franz-go/pkg/kerr"
29+
"github.com/twmb/franz-go/pkg/kfake"
30+
"github.com/twmb/franz-go/pkg/kgo"
31+
"github.com/twmb/franz-go/pkg/kmsg"
32+
)
33+
34+
var mode = flag.String("mode", "inject-error", "demo mode: inject-error, observe, sleep")
35+
36+
func die(msg string, args ...any) {
37+
fmt.Fprintf(os.Stderr, msg+"\n", args...)
38+
os.Exit(1)
39+
}
40+
41+
func main() {
42+
flag.Parse()
43+
switch *mode {
44+
case "inject-error":
45+
demoInjectError()
46+
case "observe":
47+
demoObserve()
48+
case "sleep":
49+
demoSleep()
50+
default:
51+
die("unknown mode %q; use inject-error, observe, or sleep", *mode)
52+
}
53+
}
54+
55+
// demoInjectError uses a one-shot ControlKey to inject a retriable error on
56+
// the first produce request. Because KeepControl is NOT called, the control
57+
// is removed after handling one request. The client retries and the second
58+
// produce goes through normally.
59+
func demoInjectError() {
60+
c, err := kfake.NewCluster(
61+
kfake.NumBrokers(1),
62+
kfake.SeedTopics(1, "test-topic"),
63+
)
64+
if err != nil {
65+
die("NewCluster: %v", err)
66+
}
67+
defer c.Close()
68+
69+
// One-shot control: return NOT_LEADER_FOR_PARTITION on the first produce.
70+
// The control handles exactly one request and is then removed.
71+
var intercepted atomic.Bool
72+
c.ControlKey(int16(kmsg.Produce), func(kreq kmsg.Request) (kmsg.Response, error, bool) {
73+
intercepted.Store(true)
74+
fmt.Println(" [control] intercepted produce, returning NOT_LEADER_FOR_PARTITION")
75+
76+
req := kreq.(*kmsg.ProduceRequest)
77+
resp := req.ResponseKind().(*kmsg.ProduceResponse)
78+
for _, rt := range req.Topics {
79+
st := kmsg.NewProduceResponseTopic()
80+
st.Topic = rt.Topic
81+
for _, rp := range rt.Partitions {
82+
sp := kmsg.NewProduceResponseTopicPartition()
83+
sp.Partition = rp.Partition
84+
sp.ErrorCode = kerr.NotLeaderForPartition.Code
85+
st.Partitions = append(st.Partitions, sp)
86+
}
87+
resp.Topics = append(resp.Topics, st)
88+
}
89+
return resp, nil, true
90+
})
91+
92+
cl, err := kgo.NewClient(
93+
kgo.SeedBrokers(c.ListenAddrs()...),
94+
kgo.DefaultProduceTopic("test-topic"),
95+
)
96+
if err != nil {
97+
die("NewClient: %v", err)
98+
}
99+
defer cl.Close()
100+
101+
fmt.Println("inject-error: producing a record (first attempt will fail)...")
102+
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
103+
defer cancel()
104+
105+
r := &kgo.Record{Value: []byte("hello")}
106+
if err := cl.ProduceSync(ctx, r).FirstErr(); err != nil {
107+
die("produce failed: %v", err)
108+
}
109+
110+
fmt.Printf(" produced to partition %d offset %d\n", r.Partition, r.Offset)
111+
fmt.Printf(" control intercepted first attempt: %v\n", intercepted.Load())
112+
fmt.Println(" kgo retried automatically and succeeded")
113+
}
114+
115+
// demoObserve uses KeepControl to persistently observe every request without
116+
// intercepting. The control function returns (nil, nil, false) - "not handled"
117+
// - so all requests are processed normally by the cluster.
118+
func demoObserve() {
119+
c, err := kfake.NewCluster(
120+
kfake.NumBrokers(1),
121+
kfake.SeedTopics(3, "test-topic"),
122+
)
123+
if err != nil {
124+
die("NewCluster: %v", err)
125+
}
126+
defer c.Close()
127+
128+
// Persistent observer using Control (matches all request keys).
129+
// KeepControl prevents the control from being removed after use.
130+
var mu sync.Mutex
131+
counts := make(map[string]int)
132+
c.Control(func(kreq kmsg.Request) (kmsg.Response, error, bool) {
133+
c.KeepControl()
134+
mu.Lock()
135+
counts[kmsg.NameForKey(kreq.Key())]++
136+
mu.Unlock()
137+
return nil, nil, false
138+
})
139+
140+
cl, err := kgo.NewClient(
141+
kgo.SeedBrokers(c.ListenAddrs()...),
142+
kgo.DefaultProduceTopic("test-topic"),
143+
kgo.ConsumeTopics("test-topic"),
144+
)
145+
if err != nil {
146+
die("NewClient: %v", err)
147+
}
148+
149+
fmt.Println("observe: producing 10 records...")
150+
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
151+
defer cancel()
152+
153+
for i := range 10 {
154+
r := &kgo.Record{Value: []byte(fmt.Sprintf("record-%d", i))}
155+
if err := cl.ProduceSync(ctx, r).FirstErr(); err != nil {
156+
die("produce: %v", err)
157+
}
158+
}
159+
160+
fmt.Println("observe: consuming records...")
161+
fetches := cl.PollFetches(ctx)
162+
fmt.Printf(" consumed %d records\n", fetches.NumRecords())
163+
164+
cl.Close()
165+
166+
fmt.Println("\n request type counts:")
167+
mu.Lock()
168+
keys := make([]string, 0, len(counts))
169+
for k := range counts {
170+
keys = append(keys, k)
171+
}
172+
sort.Strings(keys)
173+
for _, k := range keys {
174+
fmt.Printf(" %-30s %d\n", k, counts[k])
175+
}
176+
mu.Unlock()
177+
}
178+
179+
// demoSleep uses SleepControl with SleepOutOfOrder to coordinate between
180+
// requests on different connections. A fetch control function sleeps until a
181+
// produce request arrives, guaranteeing the fetch returns data rather than
182+
// timing out empty.
183+
//
184+
// franz-go uses separate connections for produce and fetch requests, so
185+
// SleepOutOfOrder allows the produce to be processed while the fetch sleeps.
186+
func demoSleep() {
187+
c, err := kfake.NewCluster(
188+
kfake.NumBrokers(1),
189+
kfake.SeedTopics(1, "test-topic"),
190+
// SleepOutOfOrder allows requests on other connections to
191+
// proceed while a control function is sleeping.
192+
kfake.SleepOutOfOrder(),
193+
)
194+
if err != nil {
195+
die("NewCluster: %v", err)
196+
}
197+
defer c.Close()
198+
199+
// Signal channel: produce observer notifies when data has been written.
200+
produced := make(chan struct{}, 1)
201+
202+
// Observe produces: signal when one arrives. KeepControl keeps this
203+
// observer active for the entire test.
204+
c.ControlKey(int16(kmsg.Produce), func(kmsg.Request) (kmsg.Response, error, bool) {
205+
c.KeepControl()
206+
select {
207+
case produced <- struct{}{}:
208+
default:
209+
}
210+
return nil, nil, false
211+
})
212+
213+
// Sleep the first fetch until a produce is observed. SleepControl
214+
// yields so that the produce request (on a different connection) can
215+
// be processed while this fetch waits. DropControl removes this
216+
// control after it fires, so subsequent fetches run normally.
217+
c.ControlKey(int16(kmsg.Fetch), func(kmsg.Request) (kmsg.Response, error, bool) {
218+
fmt.Println(" [control] fetch arrived, sleeping until produce completes...")
219+
c.SleepControl(func() {
220+
<-produced
221+
})
222+
fmt.Println(" [control] produce observed, waking fetch")
223+
// DropControl removes this control even though we return
224+
// handled=false. Without it, the control would fire again
225+
// on the next fetch because unhandled controls are kept.
226+
c.DropControl()
227+
return nil, nil, false
228+
})
229+
230+
// Start a consumer - its first fetch will sleep in the control.
231+
consumer, err := kgo.NewClient(
232+
kgo.SeedBrokers(c.ListenAddrs()...),
233+
kgo.ConsumeTopics("test-topic"),
234+
)
235+
if err != nil {
236+
die("NewClient consumer: %v", err)
237+
}
238+
defer consumer.Close()
239+
240+
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
241+
defer cancel()
242+
243+
// Poll in the background - the fetch will sleep until produce arrives.
244+
type result struct {
245+
n int
246+
err error
247+
}
248+
done := make(chan result, 1)
249+
go func() {
250+
f := consumer.PollFetches(ctx)
251+
var fetchErr error
252+
f.EachError(func(_ string, _ int32, err error) {
253+
fetchErr = err
254+
})
255+
done <- result{f.NumRecords(), fetchErr}
256+
}()
257+
258+
// Brief pause to let the consumer connect and issue its fetch.
259+
time.Sleep(100 * time.Millisecond)
260+
261+
// Produce a record - this wakes the sleeping fetch control.
262+
producer, err := kgo.NewClient(
263+
kgo.SeedBrokers(c.ListenAddrs()...),
264+
kgo.DefaultProduceTopic("test-topic"),
265+
)
266+
if err != nil {
267+
die("NewClient producer: %v", err)
268+
}
269+
defer producer.Close()
270+
271+
fmt.Println("sleep: producing a record (will wake the sleeping fetch)...")
272+
r := &kgo.Record{Value: []byte("coordinated")}
273+
if err := producer.ProduceSync(ctx, r).FirstErr(); err != nil {
274+
die("produce: %v", err)
275+
}
276+
fmt.Printf(" produced to partition %d offset %d\n", r.Partition, r.Offset)
277+
278+
res := <-done
279+
if res.err != nil {
280+
die("consume: %v", res.err)
281+
}
282+
fmt.Printf(" consumer fetched %d record(s) after being woken\n", res.n)
283+
}

0 commit comments

Comments
 (0)