Skip to content

Commit e5f6398

Browse files
authored
client: pre-encode request before sending to batch-send-loop (tikv#1841)
Signed-off-by: zyguan <zhongyangguan@gmail.com>
1 parent 30119fc commit e5f6398

File tree

4 files changed

+309
-1
lines changed

4 files changed

+309
-1
lines changed

internal/client/client_async.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,11 @@ func (c *RPCClient) SendRequestAsync(ctx context.Context, addr string, req *tikv
8282
return
8383
}
8484

85+
if err := encodeRequestCmd(batchReq); err != nil {
86+
cb.Invoke(nil, err)
87+
return
88+
}
89+
8590
var (
8691
entry = &batchCommandsEntry{
8792
ctx: ctx,

internal/client/client_batch.go

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,79 @@ import (
6464
"google.golang.org/grpc/metadata"
6565
)
6666

67+
// globalEncodedMsgDataPool is used to pool pre-encoded message data for batch commands.
68+
var globalEncodedMsgDataPool = grpc.NewSharedBufferPool()
69+
70+
type encodedBatchCmd struct {
71+
// implement isBatchCommandsRequest_Request_Cmd
72+
tikvpb.BatchCommandsRequest_Request_Empty
73+
// pre-encoded message data
74+
data *[]byte
75+
}
76+
77+
func (p *encodedBatchCmd) MarshalTo(data []byte) (int, error) {
78+
if p.data == nil {
79+
return 0, errors.New("message data has been released")
80+
} else if len(data) < len(*p.data) {
81+
return 0, errors.New("no enough space to marshal message")
82+
}
83+
n := copy(data, *p.data)
84+
return n, nil
85+
}
86+
87+
func (p *encodedBatchCmd) Size() int {
88+
if p.data == nil {
89+
return 0
90+
}
91+
return len(*p.data)
92+
}
93+
94+
// encodeRequestCmd encodes the `req.Cmd` into a `encodedBatchCmd` and updates the `req.Cmd` to it in place.
95+
//
96+
// SAFETY: This function is called just after `tikvrpc.Request.ToBatchCommandsRequest()` and before constructing
97+
// `batchCommandsEntry`. Note that the `req` argument is the output of `ToBatchCommandsRequest`, which is a new object
98+
// of `tikvpb.BatchCommandsRequest_Request`. So `req.Cmd` can be only modified inside the batch client by
99+
// `reuseRequestData` after the `req` has been sent. So there should be no data race on `req.Cmd` via public API
100+
// (`SendRequest` and `SendRequestAsync`). When writing unit tests, please make sure it's only called once for one `req`
101+
// (do NOT reuse `req`).
102+
func encodeRequestCmd(req *tikvpb.BatchCommandsRequest_Request) error {
103+
if req.Cmd == nil {
104+
// req.Cmd might be nil in unit tests.
105+
return nil
106+
}
107+
if encoded, ok := req.Cmd.(*encodedBatchCmd); ok {
108+
if encoded.data == nil {
109+
return errors.New("request has already been encoded and sent")
110+
}
111+
return nil
112+
}
113+
data := globalEncodedMsgDataPool.Get(req.Cmd.Size())
114+
n, err := req.Cmd.MarshalTo(data)
115+
if err != nil {
116+
globalEncodedMsgDataPool.Put(&data)
117+
return errors.WithStack(err)
118+
} else if n != len(data) {
119+
globalEncodedMsgDataPool.Put(&data)
120+
return errors.Errorf("unexpected marshaled size: got %d, want %d", n, len(data))
121+
}
122+
req.Cmd = &encodedBatchCmd{data: &data}
123+
return nil
124+
}
125+
126+
// reuseRequestData puts back all pre-encoded message data in the request to the `globalEncodedMsgDataPool`. The
127+
// returned count is used for testing.
128+
func reuseRequestData(req *tikvpb.BatchCommandsRequest) int {
129+
count := 0
130+
for _, r := range req.Requests {
131+
if cmd, ok := r.Cmd.(*encodedBatchCmd); ok && cmd.data != nil {
132+
globalEncodedMsgDataPool.Put(cmd.data)
133+
cmd.data = nil
134+
count++
135+
}
136+
}
137+
return count
138+
}
139+
67140
type batchCommandsEntry struct {
68141
ctx context.Context
69142
req *tikvpb.BatchCommandsRequest_Request
@@ -509,6 +582,8 @@ func (c *batchCommandsClient) available() int64 {
509582
}
510583

511584
func (c *batchCommandsClient) send(forwardedHost string, req *tikvpb.BatchCommandsRequest) {
585+
defer reuseRequestData(req)
586+
512587
err := c.initBatchClient(forwardedHost)
513588
if err != nil {
514589
logutil.BgLogger().Warn(
@@ -834,6 +909,9 @@ func sendBatchRequest(
834909
timeout time.Duration,
835910
priority uint64,
836911
) (*tikvrpc.Response, error) {
912+
if err := encodeRequestCmd(req); err != nil {
913+
return nil, err
914+
}
837915
entry := &batchCommandsEntry{
838916
ctx: ctx,
839917
req: req,
Lines changed: 221 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,221 @@
1+
// Copyright 2026 TiKV Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package client
16+
17+
import (
18+
"sync"
19+
"sync/atomic"
20+
"testing"
21+
22+
"github.com/pingcap/kvproto/pkg/kvrpcpb"
23+
"github.com/pingcap/kvproto/pkg/tikvpb"
24+
"github.com/stretchr/testify/require"
25+
)
26+
27+
func TestEncodedBatchCmd_SizeAndMarshalTo(t *testing.T) {
28+
data := []byte{0x01, 0x02, 0x03, 0x04, 0x05}
29+
cmd := &encodedBatchCmd{data: &data}
30+
31+
// Verify Size returns correct length
32+
require.Equal(t, 5, cmd.Size())
33+
34+
// Verify MarshalTo copies data correctly
35+
buf := make([]byte, 5)
36+
n, err := cmd.MarshalTo(buf)
37+
require.NoError(t, err)
38+
require.Equal(t, 5, n)
39+
require.Equal(t, data, buf)
40+
}
41+
42+
func TestEncodeRequestCmd_Basic(t *testing.T) {
43+
cmd := &tikvpb.BatchCommandsRequest_Request_Get{
44+
Get: &kvrpcpb.GetRequest{
45+
Key: []byte("test-key"),
46+
},
47+
}
48+
batch1 := &tikvpb.BatchCommandsRequest{Requests: []*tikvpb.BatchCommandsRequest_Request{{Cmd: cmd}}}
49+
batch2 := &tikvpb.BatchCommandsRequest{Requests: []*tikvpb.BatchCommandsRequest_Request{{Cmd: cmd}}}
50+
51+
err := encodeRequestCmd(batch1.Requests[0])
52+
require.NoError(t, err)
53+
54+
// Verify req.Cmd is now a encodedBatchCmd
55+
_, ok := batch1.Requests[0].Cmd.(*encodedBatchCmd)
56+
require.True(t, ok, "req.Cmd should be converted to *encodedBatchCmd")
57+
58+
// Verify two batches have the same size after encoding
59+
require.Equal(t, batch1.Size(), batch2.Size())
60+
61+
// Verify marshaled data matches
62+
data1 := make([]byte, batch1.Size())
63+
n1, err := batch1.MarshalTo(data1)
64+
require.NoError(t, err)
65+
require.Equal(t, batch1.Size(), n1)
66+
67+
data2 := make([]byte, batch2.Size())
68+
n2, err := batch2.MarshalTo(data2)
69+
require.NoError(t, err)
70+
require.Equal(t, batch2.Size(), n2)
71+
72+
require.Equal(t, data1, data2)
73+
}
74+
75+
func TestEncodeRequestCmd_PoolReuse(t *testing.T) {
76+
// Encode cmd of an Empty request
77+
req1 := &tikvpb.BatchCommandsRequest_Request{
78+
Cmd: &tikvpb.BatchCommandsRequest_Request_Empty{
79+
Empty: &tikvpb.BatchCommandsEmptyRequest{},
80+
},
81+
}
82+
err := encodeRequestCmd(req1)
83+
require.NoError(t, err)
84+
85+
// Simulate send/reuse cycle
86+
batchReq := &tikvpb.BatchCommandsRequest{Requests: []*tikvpb.BatchCommandsRequest_Request{req1}}
87+
reuseRequestData(batchReq)
88+
89+
// Encode cmd of a Get request
90+
req2 := &tikvpb.BatchCommandsRequest_Request{
91+
Cmd: &tikvpb.BatchCommandsRequest_Request_Get{
92+
Get: &kvrpcpb.GetRequest{
93+
Key: []byte("test-key"),
94+
},
95+
},
96+
}
97+
err = encodeRequestCmd(req2)
98+
require.NoError(t, err)
99+
}
100+
101+
func TestEncodeRequestCmd_AfterPoolReturn(t *testing.T) {
102+
req := &tikvpb.BatchCommandsRequest_Request{
103+
Cmd: &tikvpb.BatchCommandsRequest_Request_Empty{
104+
Empty: &tikvpb.BatchCommandsEmptyRequest{},
105+
},
106+
}
107+
108+
// First encode - should succeed
109+
err := encodeRequestCmd(req)
110+
require.NoError(t, err)
111+
112+
// Simulate send: marshal and return to pool
113+
batch := &tikvpb.BatchCommandsRequest{Requests: []*tikvpb.BatchCommandsRequest_Request{req}}
114+
_, err = batch.Marshal()
115+
require.NoError(t, err)
116+
reuseRequestData(batch)
117+
118+
// Second encode on same request - should fail
119+
err = encodeRequestCmd(req)
120+
require.Error(t, err)
121+
require.Contains(t, err.Error(), "already been encoded and sent")
122+
}
123+
124+
func TestReuseRequestData_Basic(t *testing.T) {
125+
126+
// Normal request
127+
normalReq := &tikvpb.BatchCommandsRequest_Request{
128+
Cmd: &tikvpb.BatchCommandsRequest_Request_Empty{
129+
Empty: &tikvpb.BatchCommandsEmptyRequest{},
130+
},
131+
}
132+
133+
// Encoded request
134+
encodedReq := &tikvpb.BatchCommandsRequest_Request{
135+
Cmd: &tikvpb.BatchCommandsRequest_Request_Empty{
136+
Empty: &tikvpb.BatchCommandsEmptyRequest{},
137+
},
138+
}
139+
err := encodeRequestCmd(encodedReq)
140+
require.NoError(t, err)
141+
142+
// Batch with mixed requests
143+
batch := &tikvpb.BatchCommandsRequest{
144+
Requests: []*tikvpb.BatchCommandsRequest_Request{normalReq, encodedReq},
145+
}
146+
147+
// reuseRequestData should only affect the encoded request
148+
count := reuseRequestData(batch)
149+
require.Equal(t, 1, count)
150+
_, ok := normalReq.Cmd.(*encodedBatchCmd)
151+
require.False(t, ok, "Normal request should not be converted")
152+
_, ok = encodedReq.Cmd.(*encodedBatchCmd)
153+
require.True(t, ok, "Encoded request should remain encoded")
154+
_, err = encodedReq.Marshal()
155+
require.Error(t, err, "Marshaling released data should fail")
156+
}
157+
158+
func TestReuseRequestData_DoubleReturn(t *testing.T) {
159+
req := &tikvpb.BatchCommandsRequest_Request{
160+
Cmd: &tikvpb.BatchCommandsRequest_Request_Empty{
161+
Empty: &tikvpb.BatchCommandsEmptyRequest{},
162+
},
163+
}
164+
165+
// Encode the request
166+
err := encodeRequestCmd(req)
167+
require.NoError(t, err)
168+
169+
// First reuse - should succeed and return 1
170+
batch := &tikvpb.BatchCommandsRequest{Requests: []*tikvpb.BatchCommandsRequest_Request{req}}
171+
count := reuseRequestData(batch)
172+
require.Equal(t, 1, count)
173+
174+
// Verify data is now nil
175+
encodedCmd := req.Cmd.(*encodedBatchCmd)
176+
require.Nil(t, encodedCmd.data)
177+
178+
// Second reuse - should return 0 (nil data is not returned to pool)
179+
count = reuseRequestData(batch)
180+
require.Equal(t, 0, count)
181+
}
182+
183+
func TestEncodedMsgDataPool_ConcurrentSafety(t *testing.T) {
184+
var wg sync.WaitGroup
185+
errors := make([]error, 50)
186+
successCount := int32(0)
187+
188+
for i := 0; i < 50; i++ {
189+
wg.Add(1)
190+
go func(idx int) {
191+
defer wg.Done()
192+
for j := 0; j < 100; j++ {
193+
req := &tikvpb.BatchCommandsRequest_Request{
194+
Cmd: &tikvpb.BatchCommandsRequest_Request_Empty{
195+
Empty: &tikvpb.BatchCommandsEmptyRequest{},
196+
},
197+
}
198+
err := encodeRequestCmd(req)
199+
if err != nil {
200+
errors[idx] = err
201+
return
202+
}
203+
204+
atomic.AddInt32(&successCount, 1)
205+
batch := &tikvpb.BatchCommandsRequest{Requests: []*tikvpb.BatchCommandsRequest_Request{req}}
206+
_, err = batch.Marshal()
207+
if err != nil {
208+
errors[idx] = err
209+
return
210+
}
211+
reuseRequestData(batch)
212+
}
213+
}(i)
214+
}
215+
216+
wg.Wait()
217+
for _, err := range errors {
218+
require.NoError(t, err)
219+
}
220+
require.Equal(t, int32(5000), successCount)
221+
}

internal/client/client_test.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -679,11 +679,11 @@ func TestBatchClientRecoverAfterServerRestart(t *testing.T) {
679679
server.Stop()
680680
}()
681681

682-
req := &tikvpb.BatchCommandsRequest_Request{Cmd: &tikvpb.BatchCommandsRequest_Request_Coprocessor{Coprocessor: &coprocessor.Request{}}}
683682
conn, err := client.getConnPool(addr, true)
684683
assert.Nil(t, err)
685684
// send some request, it should be success.
686685
for i := 0; i < 100; i++ {
686+
req := &tikvpb.BatchCommandsRequest_Request{Cmd: &tikvpb.BatchCommandsRequest_Request_Coprocessor{Coprocessor: &coprocessor.Request{}}}
687687
_, err = sendBatchRequest(context.Background(), addr, "", conn.batchConn, req, time.Second*20, 0)
688688
require.NoError(t, err)
689689
}
@@ -694,6 +694,7 @@ func TestBatchClientRecoverAfterServerRestart(t *testing.T) {
694694

695695
// send some request, it should be failed since server is down.
696696
for i := 0; i < 10; i++ {
697+
req := &tikvpb.BatchCommandsRequest_Request{Cmd: &tikvpb.BatchCommandsRequest_Request_Coprocessor{Coprocessor: &coprocessor.Request{}}}
697698
_, err = sendBatchRequest(context.Background(), addr, "", conn.batchConn, req, time.Millisecond*100, 0)
698699
require.Error(t, err)
699700
time.Sleep(time.Millisecond * time.Duration(rand.Intn(300)))
@@ -737,6 +738,7 @@ func TestBatchClientRecoverAfterServerRestart(t *testing.T) {
737738

738739
// send some request, it should be success again.
739740
for i := 0; i < 100; i++ {
741+
req := &tikvpb.BatchCommandsRequest_Request{Cmd: &tikvpb.BatchCommandsRequest_Request_Coprocessor{Coprocessor: &coprocessor.Request{}}}
740742
_, err = sendBatchRequest(context.Background(), addr, "", conn.batchConn, req, time.Second*20, 0)
741743
require.NoError(t, err)
742744
}
@@ -1049,6 +1051,8 @@ func TestFastFailWhenNoAvailableConn(t *testing.T) {
10491051
// mock all client a in recreate.
10501052
c.lockForRecreate()
10511053
}
1054+
1055+
req = &tikvpb.BatchCommandsRequest_Request{Cmd: &tikvpb.BatchCommandsRequest_Request_Coprocessor{Coprocessor: &coprocessor.Request{}}}
10521056
start := time.Now()
10531057
timeout := time.Second
10541058
_, err = sendBatchRequest(context.Background(), addr, "", conn.batchConn, req, timeout, 0)

0 commit comments

Comments
 (0)