Skip to content

Commit 8bdf0bf

Browse files
authored
add command decoding benchmark (#1726)
* add command decoding benchmark Signed-off-by: monkey <[email protected]> * fix "call to (*B).Fatalf from a non-test goroutine" Signed-off-by: monkey <[email protected]> * Add a newline at the end of the file Signed-off-by: monkey <[email protected]> * Get cmdsInfoCache from redis-server Signed-off-by: monkey <[email protected]> * reinforce benchmark-decode Signed-off-by: monkey <[email protected]>
1 parent f7ac48d commit 8bdf0bf

File tree

1 file changed

+317
-0
lines changed

1 file changed

+317
-0
lines changed

benchmark_decode_test.go

Lines changed: 317 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,317 @@
1+
package redis
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"github.com/go-redis/redis/v8/internal/proto"
7+
"net"
8+
"testing"
9+
"time"
10+
)
11+
12+
var ctx = context.TODO()
13+
14+
type ConnStub struct {
15+
buff []byte
16+
idx int
17+
}
18+
19+
func (c *ConnStub) Read(b []byte) (n int, err error) {
20+
n = copy(b, c.buff[c.idx:])
21+
if len(c.buff) > c.idx+n {
22+
c.idx += n
23+
} else {
24+
c.idx = 0
25+
}
26+
return n, nil
27+
}
28+
func (c *ConnStub) Write(b []byte) (n int, err error) { return len(b), nil }
29+
func (c *ConnStub) Close() error { return nil }
30+
func (c *ConnStub) LocalAddr() net.Addr { return nil }
31+
func (c *ConnStub) RemoteAddr() net.Addr { return nil }
32+
func (c *ConnStub) SetDeadline(_ time.Time) error { return nil }
33+
func (c *ConnStub) SetReadDeadline(_ time.Time) error { return nil }
34+
func (c *ConnStub) SetWriteDeadline(_ time.Time) error { return nil }
35+
36+
type ClientStub struct {
37+
Cmdable
38+
name string
39+
buff []byte
40+
conn []*ConnStub
41+
}
42+
43+
func (stub *ClientStub) SetResponse(b []byte) {
44+
stub.buff = make([]byte, len(b))
45+
copy(stub.buff, b)
46+
47+
for _, c := range stub.conn {
48+
c.buff = make([]byte, len(b))
49+
copy(c.buff, b)
50+
}
51+
}
52+
53+
func (stub *ClientStub) AppendConn(c *ConnStub) {
54+
if len(stub.buff) > 0 {
55+
c.buff = make([]byte, len(stub.buff))
56+
copy(c.buff, stub.buff)
57+
}
58+
stub.conn = append(stub.conn, c)
59+
}
60+
61+
func (stub *ClientStub) Name() string {
62+
return stub.name
63+
}
64+
65+
func NewDecodeClient() *ClientStub {
66+
stub := &ClientStub{
67+
name: "Client",
68+
}
69+
stub.Cmdable = NewClient(&Options{
70+
PoolSize: 128,
71+
Dialer: func(ctx context.Context, network, addr string) (net.Conn, error) {
72+
conn := &ConnStub{}
73+
stub.AppendConn(conn)
74+
return conn, nil
75+
},
76+
})
77+
return stub
78+
}
79+
80+
func NewDecodeClusterClient() *ClientStub {
81+
stub := &ClientStub{
82+
name: "Cluster",
83+
}
84+
client := NewClusterClient(&ClusterOptions{
85+
PoolSize: 128,
86+
Addrs: []string{"127.0.0.1:6379"},
87+
Dialer: func(ctx context.Context, network, addr string) (net.Conn, error) {
88+
c := &ConnStub{}
89+
stub.AppendConn(c)
90+
return c, nil
91+
},
92+
ClusterSlots: func(_ context.Context) ([]ClusterSlot, error) {
93+
return []ClusterSlot{
94+
{
95+
Start: 0,
96+
End: 16383,
97+
Nodes: []ClusterNode{{Addr: "127.0.0.1:6379"}},
98+
},
99+
}, nil
100+
},
101+
})
102+
// init command.
103+
tmpClient := NewClient(&Options{Addr: ":6379"})
104+
cmdCache, err := tmpClient.Command(ctx).Result()
105+
_ = tmpClient.Close()
106+
client.cmdsInfoCache = newCmdsInfoCache(func(_ context.Context) (map[string]*CommandInfo, error) {
107+
return cmdCache, err
108+
})
109+
110+
stub.Cmdable = client
111+
return stub
112+
}
113+
114+
func BenchmarkDecode(b *testing.B) {
115+
stubs := []*ClientStub{
116+
NewDecodeClient(),
117+
NewDecodeClusterClient(),
118+
}
119+
120+
for _, stub := range stubs {
121+
b.Run(fmt.Sprintf("RespError-%s", stub.Name()), func(b *testing.B) {
122+
respError(b, stub)
123+
})
124+
b.Run(fmt.Sprintf("RespStatus-%s", stub.Name()), func(b *testing.B) {
125+
respStatus(b, stub)
126+
})
127+
b.Run(fmt.Sprintf("RespInt-%s", stub.Name()), func(b *testing.B) {
128+
respInt(b, stub)
129+
})
130+
b.Run(fmt.Sprintf("RespString-%s", stub.Name()), func(b *testing.B) {
131+
respString(b, stub)
132+
})
133+
b.Run(fmt.Sprintf("RespArray-%s", stub.Name()), func(b *testing.B) {
134+
respArray(b, stub)
135+
})
136+
b.Run(fmt.Sprintf("RespPipeline-%s", stub.Name()), func(b *testing.B) {
137+
respPipeline(b, stub)
138+
})
139+
b.Run(fmt.Sprintf("RespTxPipeline-%s", stub.Name()), func(b *testing.B) {
140+
respTxPipeline(b, stub)
141+
})
142+
143+
// goroutine
144+
b.Run(fmt.Sprintf("DynamicGoroutine-%s-pool=5", stub.Name()), func(b *testing.B) {
145+
dynamicGoroutine(b, stub, 5)
146+
})
147+
b.Run(fmt.Sprintf("DynamicGoroutine-%s-pool=20", stub.Name()), func(b *testing.B) {
148+
dynamicGoroutine(b, stub, 20)
149+
})
150+
b.Run(fmt.Sprintf("DynamicGoroutine-%s-pool=50", stub.Name()), func(b *testing.B) {
151+
dynamicGoroutine(b, stub, 50)
152+
})
153+
b.Run(fmt.Sprintf("DynamicGoroutine-%s-pool=100", stub.Name()), func(b *testing.B) {
154+
dynamicGoroutine(b, stub, 100)
155+
})
156+
157+
b.Run(fmt.Sprintf("StaticGoroutine-%s-pool=5", stub.Name()), func(b *testing.B) {
158+
staticGoroutine(b, stub, 5)
159+
})
160+
b.Run(fmt.Sprintf("StaticGoroutine-%s-pool=20", stub.Name()), func(b *testing.B) {
161+
staticGoroutine(b, stub, 20)
162+
})
163+
b.Run(fmt.Sprintf("StaticGoroutine-%s-pool=50", stub.Name()), func(b *testing.B) {
164+
staticGoroutine(b, stub, 50)
165+
})
166+
b.Run(fmt.Sprintf("StaticGoroutine-%s-pool=100", stub.Name()), func(b *testing.B) {
167+
staticGoroutine(b, stub, 100)
168+
})
169+
}
170+
}
171+
172+
func respError(b *testing.B, stub *ClientStub) {
173+
stub.SetResponse([]byte("-ERR test error\r\n"))
174+
respErr := proto.RedisError("ERR test error")
175+
var err error
176+
177+
b.ResetTimer()
178+
for i := 0; i < b.N; i++ {
179+
if err = stub.Get(ctx, "key").Err(); err != respErr {
180+
b.Fatalf("response error, got %q, want %q", err, respErr)
181+
}
182+
}
183+
}
184+
185+
func respStatus(b *testing.B, stub *ClientStub) {
186+
stub.SetResponse([]byte("+OK\r\n"))
187+
var val string
188+
189+
b.ResetTimer()
190+
for i := 0; i < b.N; i++ {
191+
if val = stub.Set(ctx, "key", "value", 0).Val(); val != "OK" {
192+
b.Fatalf("response error, got %q, want OK", val)
193+
}
194+
}
195+
}
196+
197+
func respInt(b *testing.B, stub *ClientStub) {
198+
stub.SetResponse([]byte(":10\r\n"))
199+
var val int64
200+
201+
b.ResetTimer()
202+
for i := 0; i < b.N; i++ {
203+
if val = stub.Incr(ctx, "key").Val(); val != 10 {
204+
b.Fatalf("response error, got %q, want 10", val)
205+
}
206+
}
207+
}
208+
209+
func respString(b *testing.B, stub *ClientStub) {
210+
stub.SetResponse([]byte("$5\r\nhello\r\n"))
211+
var val string
212+
213+
b.ResetTimer()
214+
for i := 0; i < b.N; i++ {
215+
if val = stub.Get(ctx, "key").Val(); val != "hello" {
216+
b.Fatalf("response error, got %q, want hello", val)
217+
}
218+
}
219+
}
220+
221+
func respArray(b *testing.B, stub *ClientStub) {
222+
stub.SetResponse([]byte("*3\r\n$5\r\nhello\r\n:10\r\n+OK\r\n"))
223+
var val []interface{}
224+
225+
b.ResetTimer()
226+
for i := 0; i < b.N; i++ {
227+
if val = stub.MGet(ctx, "key").Val(); len(val) != 3 {
228+
b.Fatalf("response error, got len(%d), want len(3)", len(val))
229+
}
230+
}
231+
}
232+
233+
func respPipeline(b *testing.B, stub *ClientStub) {
234+
stub.SetResponse([]byte("+OK\r\n$5\r\nhello\r\n:1\r\n"))
235+
var pipe Pipeliner
236+
237+
b.ResetTimer()
238+
for i := 0; i < b.N; i++ {
239+
pipe = stub.Pipeline()
240+
set := pipe.Set(ctx, "key", "value", 0)
241+
get := pipe.Get(ctx, "key")
242+
del := pipe.Del(ctx, "key")
243+
_, err := pipe.Exec(ctx)
244+
if err != nil {
245+
b.Fatalf("response error, got %q, want nil", err)
246+
}
247+
if set.Val() != "OK" || get.Val() != "hello" || del.Val() != 1 {
248+
b.Fatal("response error")
249+
}
250+
}
251+
}
252+
253+
func respTxPipeline(b *testing.B, stub *ClientStub) {
254+
stub.SetResponse([]byte("+OK\r\n+QUEUED\r\n+QUEUED\r\n+QUEUED\r\n*3\r\n+OK\r\n$5\r\nhello\r\n:1\r\n"))
255+
256+
b.ResetTimer()
257+
for i := 0; i < b.N; i++ {
258+
var set *StatusCmd
259+
var get *StringCmd
260+
var del *IntCmd
261+
_, err := stub.TxPipelined(ctx, func(pipe Pipeliner) error {
262+
set = pipe.Set(ctx, "key", "value", 0)
263+
get = pipe.Get(ctx, "key")
264+
del = pipe.Del(ctx, "key")
265+
return nil
266+
})
267+
if err != nil {
268+
b.Fatalf("response error, got %q, want nil", err)
269+
}
270+
if set.Val() != "OK" || get.Val() != "hello" || del.Val() != 1 {
271+
b.Fatal("response error")
272+
}
273+
}
274+
}
275+
276+
func dynamicGoroutine(b *testing.B, stub *ClientStub, concurrency int) {
277+
stub.SetResponse([]byte("$5\r\nhello\r\n"))
278+
c := make(chan struct{}, concurrency)
279+
280+
b.ResetTimer()
281+
for i := 0; i < b.N; i++ {
282+
c <- struct{}{}
283+
go func() {
284+
if val := stub.Get(ctx, "key").Val(); val != "hello" {
285+
panic(fmt.Sprintf("response error, got %q, want hello", val))
286+
}
287+
<-c
288+
}()
289+
}
290+
// Here no longer wait for all goroutines to complete, it will not affect the test results.
291+
close(c)
292+
}
293+
294+
func staticGoroutine(b *testing.B, stub *ClientStub, concurrency int) {
295+
stub.SetResponse([]byte("$5\r\nhello\r\n"))
296+
c := make(chan struct{}, concurrency)
297+
298+
b.ResetTimer()
299+
300+
for i := 0; i < concurrency; i++ {
301+
go func() {
302+
for {
303+
_, ok := <-c
304+
if !ok {
305+
return
306+
}
307+
if val := stub.Get(ctx, "key").Val(); val != "hello" {
308+
panic(fmt.Sprintf("response error, got %q, want hello", val))
309+
}
310+
}
311+
}()
312+
}
313+
for i := 0; i < b.N; i++ {
314+
c <- struct{}{}
315+
}
316+
close(c)
317+
}

0 commit comments

Comments
 (0)