Skip to content

Commit 06f2693

Browse files
authored
improve: use chan *message instead of chan []*message as queueCh (#1283)
### Motivation We currently use `chan []*message` as queueCh, and use the slice to stage the messages to send the message to the parent consumer, this will result in excessive use of memory. This PR optimizes potentially reducing overall memory overhead. ### Modifications - Use `chan *message` instead of `chan []*message`. - Fix test.
1 parent 551da2d commit 06f2693

File tree

2 files changed

+67
-75
lines changed

2 files changed

+67
-75
lines changed

pulsar/consumer_partition.go

Lines changed: 44 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ type partitionConsumer struct {
154154

155155
// the size of the queue channel for buffering messages
156156
maxQueueSize int32
157-
queueCh chan []*message
157+
queueCh chan *message
158158
startMessageID atomicMessageID
159159
lastDequeuedMsg *trackingMessageID
160160

@@ -338,7 +338,7 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon
338338
partitionIdx: int32(options.partitionIdx),
339339
eventsCh: make(chan interface{}, 10),
340340
maxQueueSize: int32(options.receiverQueueSize),
341-
queueCh: make(chan []*message, options.receiverQueueSize),
341+
queueCh: make(chan *message, options.receiverQueueSize),
342342
startMessageID: atomicMessageID{msgID: options.startMessageID},
343343
connectedCh: make(chan struct{}),
344344
messageCh: messageCh,
@@ -1057,37 +1057,33 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
10571057
return fmt.Errorf("discarding message on decryption error :%v", err)
10581058
case crypto.ConsumerCryptoFailureActionConsume:
10591059
pc.log.Warnf("consuming encrypted message due to error in decryption :%v", err)
1060-
messages := []*message{
1061-
{
1062-
publishTime: timeFromUnixTimestampMillis(msgMeta.GetPublishTime()),
1063-
eventTime: timeFromUnixTimestampMillis(msgMeta.GetEventTime()),
1064-
key: msgMeta.GetPartitionKey(),
1065-
producerName: msgMeta.GetProducerName(),
1066-
properties: internal.ConvertToStringMap(msgMeta.GetProperties()),
1067-
topic: pc.topic,
1068-
msgID: newMessageID(
1069-
int64(pbMsgID.GetLedgerId()),
1070-
int64(pbMsgID.GetEntryId()),
1071-
pbMsgID.GetBatchIndex(),
1072-
pc.partitionIdx,
1073-
pbMsgID.GetBatchSize(),
1074-
),
1075-
payLoad: headersAndPayload.ReadableSlice(),
1076-
schema: pc.options.schema,
1077-
replicationClusters: msgMeta.GetReplicateTo(),
1078-
replicatedFrom: msgMeta.GetReplicatedFrom(),
1079-
redeliveryCount: response.GetRedeliveryCount(),
1080-
encryptionContext: createEncryptionContext(msgMeta),
1081-
orderingKey: string(msgMeta.OrderingKey),
1082-
},
1083-
}
1084-
10851060
if pc.options.autoReceiverQueueSize {
10861061
pc.incomingMessages.Inc()
10871062
pc.markScaleIfNeed()
10881063
}
10891064

1090-
pc.queueCh <- messages
1065+
pc.queueCh <- &message{
1066+
publishTime: timeFromUnixTimestampMillis(msgMeta.GetPublishTime()),
1067+
eventTime: timeFromUnixTimestampMillis(msgMeta.GetEventTime()),
1068+
key: msgMeta.GetPartitionKey(),
1069+
producerName: msgMeta.GetProducerName(),
1070+
properties: internal.ConvertToStringMap(msgMeta.GetProperties()),
1071+
topic: pc.topic,
1072+
msgID: newMessageID(
1073+
int64(pbMsgID.GetLedgerId()),
1074+
int64(pbMsgID.GetEntryId()),
1075+
pbMsgID.GetBatchIndex(),
1076+
pc.partitionIdx,
1077+
pbMsgID.GetBatchSize(),
1078+
),
1079+
payLoad: headersAndPayload.ReadableSlice(),
1080+
schema: pc.options.schema,
1081+
replicationClusters: msgMeta.GetReplicateTo(),
1082+
replicatedFrom: msgMeta.GetReplicatedFrom(),
1083+
redeliveryCount: response.GetRedeliveryCount(),
1084+
encryptionContext: createEncryptionContext(msgMeta),
1085+
orderingKey: string(msgMeta.OrderingKey),
1086+
}
10911087
return nil
10921088
}
10931089
}
@@ -1123,7 +1119,6 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
11231119
numMsgs = int(msgMeta.GetNumMessagesInBatch())
11241120
}
11251121

1126-
messages := make([]*message, 0)
11271122
var ackTracker *ackTracker
11281123
// are there multiple messages in this batch?
11291124
if numMsgs > 1 {
@@ -1144,7 +1139,6 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
11441139
pc.metrics.PrefetchedMessages.Add(float64(numMsgs))
11451140

11461141
var (
1147-
bytesReceived int
11481142
skippedMessages int32
11491143
)
11501144
for i := 0; i < numMsgs; i++ {
@@ -1264,22 +1258,19 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
12641258
Message: msg,
12651259
})
12661260

1267-
messages = append(messages, msg)
1268-
bytesReceived += msg.size()
1269-
}
1261+
if pc.options.autoReceiverQueueSize {
1262+
pc.client.memLimit.ForceReserveMemory(int64(msg.size()))
1263+
pc.incomingMessages.Add(int32(1))
1264+
pc.markScaleIfNeed()
1265+
}
12701266

1271-
if pc.options.autoReceiverQueueSize {
1272-
pc.client.memLimit.ForceReserveMemory(int64(bytesReceived))
1273-
pc.incomingMessages.Add(int32(len(messages)))
1274-
pc.markScaleIfNeed()
1267+
pc.queueCh <- msg
12751268
}
12761269

12771270
if skippedMessages > 0 {
12781271
pc.availablePermits.add(skippedMessages)
12791272
}
12801273

1281-
// send messages to the dispatcher
1282-
pc.queueCh <- messages
12831274
return nil
12841275
}
12851276

@@ -1435,20 +1426,19 @@ func (pc *partitionConsumer) dispatcher() {
14351426
defer func() {
14361427
pc.log.Debug("exiting dispatch loop")
14371428
}()
1438-
var messages []*message
1429+
var queueMsg *message
14391430
for {
1440-
var queueCh chan []*message
1431+
var queueCh chan *message
14411432
var messageCh chan ConsumerMessage
14421433
var nextMessage ConsumerMessage
14431434
var nextMessageSize int
14441435

1445-
// are there more messages to send?
1446-
if len(messages) > 0 {
1436+
if queueMsg != nil {
14471437
nextMessage = ConsumerMessage{
14481438
Consumer: pc.parentConsumer,
1449-
Message: messages[0],
1439+
Message: queueMsg,
14501440
}
1451-
nextMessageSize = messages[0].size()
1441+
nextMessageSize = queueMsg.size()
14521442

14531443
if pc.dlq.shouldSendToDlq(&nextMessage) {
14541444
// pass the message to the DLQ router
@@ -1460,7 +1450,7 @@ func (pc *partitionConsumer) dispatcher() {
14601450
}
14611451

14621452
pc.metrics.PrefetchedMessages.Dec()
1463-
pc.metrics.PrefetchedBytes.Sub(float64(len(messages[0].payLoad)))
1453+
pc.metrics.PrefetchedBytes.Sub(float64(len(queueMsg.payLoad)))
14641454
} else {
14651455
queueCh = pc.queueCh
14661456
}
@@ -1475,7 +1465,7 @@ func (pc *partitionConsumer) dispatcher() {
14751465
}
14761466
pc.log.Debug("dispatcher received connection event")
14771467

1478-
messages = nil
1468+
queueMsg = nil
14791469

14801470
// reset available permits
14811471
pc.availablePermits.reset()
@@ -1493,19 +1483,16 @@ func (pc *partitionConsumer) dispatcher() {
14931483
pc.log.WithError(err).Error("unable to send initial permits to broker")
14941484
}
14951485

1496-
case msgs, ok := <-queueCh:
1486+
case msg, ok := <-queueCh:
14971487
if !ok {
14981488
return
14991489
}
1500-
// we only read messages here after the consumer has processed all messages
1501-
// in the previous batch
1502-
messages = msgs
1490+
1491+
queueMsg = msg
15031492

15041493
// if the messageCh is nil or the messageCh is full this will not be selected
15051494
case messageCh <- nextMessage:
1506-
// allow this message to be garbage collected
1507-
messages[0] = nil
1508-
messages = messages[1:]
1495+
queueMsg = nil
15091496

15101497
pc.availablePermits.inc()
15111498

@@ -1528,14 +1515,14 @@ func (pc *partitionConsumer) dispatcher() {
15281515
if m == nil {
15291516
break
15301517
} else if nextMessageInQueue == nil {
1531-
nextMessageInQueue = toTrackingMessageID(m[0].msgID)
1518+
nextMessageInQueue = toTrackingMessageID(m.msgID)
15321519
}
15331520
if pc.options.autoReceiverQueueSize {
1534-
pc.incomingMessages.Sub(int32(len(m)))
1521+
pc.incomingMessages.Sub(int32(1))
15351522
}
15361523
}
15371524

1538-
messages = nil
1525+
queueMsg = nil
15391526

15401527
clearQueueCb(nextMessageInQueue)
15411528
}

pulsar/consumer_partition_test.go

Lines changed: 23 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import (
3030
func TestSingleMessageIDNoAckTracker(t *testing.T) {
3131
eventsCh := make(chan interface{}, 1)
3232
pc := partitionConsumer{
33-
queueCh: make(chan []*message, 1),
33+
queueCh: make(chan *message, 1),
3434
eventsCh: eventsCh,
3535
compressionProviders: sync.Map{},
3636
options: &partitionConsumerOpts{},
@@ -47,13 +47,12 @@ func TestSingleMessageIDNoAckTracker(t *testing.T) {
4747
}
4848

4949
// ensure the tracker was set on the message id
50-
messages := <-pc.queueCh
51-
for _, m := range messages {
52-
assert.Nil(t, m.ID().(*trackingMessageID).tracker)
53-
}
50+
message := <-pc.queueCh
51+
id := message.ID().(*trackingMessageID)
52+
assert.Nil(t, id.tracker)
5453

5554
// ack the message id
56-
pc.AckID(messages[0].msgID.(*trackingMessageID))
55+
pc.AckID(id)
5756

5857
select {
5958
case <-eventsCh:
@@ -69,7 +68,7 @@ func newTestMetrics() *internal.LeveledMetrics {
6968
func TestBatchMessageIDNoAckTracker(t *testing.T) {
7069
eventsCh := make(chan interface{}, 1)
7170
pc := partitionConsumer{
72-
queueCh: make(chan []*message, 1),
71+
queueCh: make(chan *message, 1),
7372
eventsCh: eventsCh,
7473
compressionProviders: sync.Map{},
7574
options: &partitionConsumerOpts{},
@@ -86,13 +85,12 @@ func TestBatchMessageIDNoAckTracker(t *testing.T) {
8685
}
8786

8887
// ensure the tracker was set on the message id
89-
messages := <-pc.queueCh
90-
for _, m := range messages {
91-
assert.Nil(t, m.ID().(*trackingMessageID).tracker)
92-
}
88+
message := <-pc.queueCh
89+
id := message.ID().(*trackingMessageID)
90+
assert.Nil(t, id.tracker)
9391

9492
// ack the message id
95-
err := pc.AckID(messages[0].msgID.(*trackingMessageID))
93+
err := pc.AckID(id)
9694
assert.Nil(t, err)
9795

9896
select {
@@ -105,7 +103,7 @@ func TestBatchMessageIDNoAckTracker(t *testing.T) {
105103
func TestBatchMessageIDWithAckTracker(t *testing.T) {
106104
eventsCh := make(chan interface{}, 1)
107105
pc := partitionConsumer{
108-
queueCh: make(chan []*message, 1),
106+
queueCh: make(chan *message, 10),
109107
eventsCh: eventsCh,
110108
compressionProviders: sync.Map{},
111109
options: &partitionConsumerOpts{},
@@ -122,14 +120,21 @@ func TestBatchMessageIDWithAckTracker(t *testing.T) {
122120
}
123121

124122
// ensure the tracker was set on the message id
125-
messages := <-pc.queueCh
126-
for _, m := range messages {
127-
assert.NotNil(t, m.ID().(*trackingMessageID).tracker)
123+
var messageIDs []*trackingMessageID
124+
for i := 0; i < 10; i++ {
125+
select {
126+
case m := <-pc.queueCh:
127+
id := m.ID().(*trackingMessageID)
128+
assert.NotNil(t, id.tracker)
129+
messageIDs = append(messageIDs, id)
130+
default:
131+
break
132+
}
128133
}
129134

130135
// ack all message ids except the last one
131136
for i := 0; i < 9; i++ {
132-
err := pc.AckID(messages[i].msgID.(*trackingMessageID))
137+
err := pc.AckID(messageIDs[i])
133138
assert.Nil(t, err)
134139
}
135140

@@ -140,7 +145,7 @@ func TestBatchMessageIDWithAckTracker(t *testing.T) {
140145
}
141146

142147
// ack last message
143-
err := pc.AckID(messages[9].msgID.(*trackingMessageID))
148+
err := pc.AckID(messageIDs[9])
144149
assert.Nil(t, err)
145150

146151
select {

0 commit comments

Comments
 (0)