Skip to content

Commit f42fdde

Browse files
authored
[Improve] pool sendRequest to improve producer perf (#1126)
### Motivation `sendRequest ` in producer is a frequently allocated struct, pool it can decrease the memory allocation. ### Modifications 1. Init a sync.Pool; 2. Get sendRequest from the pool when we need; 3. Reset sendRequest and put it back into the pool when it is done. --------- Co-authored-by: gunli <[email protected]>
1 parent 81495b5 commit f42fdde

File tree

1 file changed

+24
-3
lines changed

1 file changed

+24
-3
lines changed

pulsar/producer_partition.go

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,11 +60,20 @@ var (
6060
errProducerClosed = newError(ProducerClosed, "producer already been closed")
6161
errMemoryBufferIsFull = newError(ClientMemoryBufferIsFull, "client memory buffer is full")
6262

63-
buffersPool sync.Pool
63+
buffersPool sync.Pool
64+
sendRequestPool *sync.Pool
6465
)
6566

6667
var errTopicNotFount = "TopicNotFound"
6768

69+
func init() {
70+
sendRequestPool = &sync.Pool{
71+
New: func() interface{} {
72+
return &sendRequest{}
73+
},
74+
}
75+
}
76+
6877
type partitionProducer struct {
6978
state uAtomic.Int32
7079
client *client
@@ -527,7 +536,9 @@ func (p *partitionProducer) internalSend(sr *sendRequest) {
527536
}
528537
// update chunk id
529538
sr.mm.ChunkId = proto.Int32(int32(chunkID))
530-
nsr := &sendRequest{
539+
nsr := sendRequestPool.Get().(*sendRequest)
540+
*nsr = sendRequest{
541+
pool: sendRequestPool,
531542
ctx: sr.ctx,
532543
msg: sr.msg,
533544
producer: sr.producer,
@@ -1150,7 +1161,9 @@ func (p *partitionProducer) internalSendAsync(
11501161
return
11511162
}
11521163

1153-
sr := &sendRequest{
1164+
sr := sendRequestPool.Get().(*sendRequest)
1165+
*sr = sendRequest{
1166+
pool: sendRequestPool,
11541167
ctx: ctx,
11551168
msg: msg,
11561169
producer: p,
@@ -1395,6 +1408,7 @@ func (p *partitionProducer) Close() {
13951408
}
13961409

13971410
type sendRequest struct {
1411+
pool *sync.Pool
13981412
ctx context.Context
13991413
msg *ProducerMessage
14001414
producer *partitionProducer
@@ -1477,6 +1491,13 @@ func (sr *sendRequest) done(msgID MessageID, err error) {
14771491
sr.transaction.endSendOrAckOp(err)
14781492
}
14791493
}
1494+
1495+
pool := sr.pool
1496+
if pool != nil {
1497+
// reset all the fields
1498+
*sr = sendRequest{}
1499+
pool.Put(sr)
1500+
}
14801501
}
14811502

14821503
func (p *partitionProducer) blockIfQueueFull() bool {

0 commit comments

Comments
 (0)