Skip to content

Commit 95232de

Browse files
authored
Revert #1283 and to fix possible deadlock when queueCh is full (#1311)
### Motivation For the issue please refer to the PR description: #1310 Here have an analysis: #1310 (comment) ### Modifications - Rever #1283 - Add test from #1310 to cover `queueCh` never not full.
1 parent 9366a0e commit 95232de

File tree

3 files changed

+131
-64
lines changed

3 files changed

+131
-64
lines changed

pulsar/consumer_partition.go

Lines changed: 50 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ type partitionConsumer struct {
156156

157157
// the size of the queue channel for buffering messages
158158
maxQueueSize int32
159-
queueCh chan *message
159+
queueCh chan []*message
160160
startMessageID atomicMessageID
161161
lastDequeuedMsg *trackingMessageID
162162

@@ -354,7 +354,7 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon
354354
partitionIdx: int32(options.partitionIdx),
355355
eventsCh: make(chan interface{}, 10),
356356
maxQueueSize: int32(options.receiverQueueSize),
357-
queueCh: make(chan *message, options.receiverQueueSize),
357+
queueCh: make(chan []*message, options.receiverQueueSize),
358358
startMessageID: atomicMessageID{msgID: options.startMessageID},
359359
connectedCh: make(chan struct{}),
360360
messageCh: messageCh,
@@ -1166,33 +1166,36 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
11661166
return fmt.Errorf("discarding message on decryption error :%v", err)
11671167
case crypto.ConsumerCryptoFailureActionConsume:
11681168
pc.log.Warnf("consuming encrypted message due to error in decryption :%v", err)
1169+
messages := []*message{
1170+
{
1171+
publishTime: timeFromUnixTimestampMillis(msgMeta.GetPublishTime()),
1172+
eventTime: timeFromUnixTimestampMillis(msgMeta.GetEventTime()),
1173+
key: msgMeta.GetPartitionKey(),
1174+
producerName: msgMeta.GetProducerName(),
1175+
properties: internal.ConvertToStringMap(msgMeta.GetProperties()),
1176+
topic: pc.topic,
1177+
msgID: newMessageID(
1178+
int64(pbMsgID.GetLedgerId()),
1179+
int64(pbMsgID.GetEntryId()),
1180+
pbMsgID.GetBatchIndex(),
1181+
pc.partitionIdx,
1182+
pbMsgID.GetBatchSize(),
1183+
),
1184+
payLoad: headersAndPayload.ReadableSlice(),
1185+
schema: pc.options.schema,
1186+
replicationClusters: msgMeta.GetReplicateTo(),
1187+
replicatedFrom: msgMeta.GetReplicatedFrom(),
1188+
redeliveryCount: response.GetRedeliveryCount(),
1189+
encryptionContext: createEncryptionContext(msgMeta),
1190+
orderingKey: string(msgMeta.OrderingKey),
1191+
},
1192+
}
11691193
if pc.options.autoReceiverQueueSize {
11701194
pc.incomingMessages.Inc()
11711195
pc.markScaleIfNeed()
11721196
}
11731197

1174-
pc.queueCh <- &message{
1175-
publishTime: timeFromUnixTimestampMillis(msgMeta.GetPublishTime()),
1176-
eventTime: timeFromUnixTimestampMillis(msgMeta.GetEventTime()),
1177-
key: msgMeta.GetPartitionKey(),
1178-
producerName: msgMeta.GetProducerName(),
1179-
properties: internal.ConvertToStringMap(msgMeta.GetProperties()),
1180-
topic: pc.topic,
1181-
msgID: newMessageID(
1182-
int64(pbMsgID.GetLedgerId()),
1183-
int64(pbMsgID.GetEntryId()),
1184-
pbMsgID.GetBatchIndex(),
1185-
pc.partitionIdx,
1186-
pbMsgID.GetBatchSize(),
1187-
),
1188-
payLoad: headersAndPayload.ReadableSlice(),
1189-
schema: pc.options.schema,
1190-
replicationClusters: msgMeta.GetReplicateTo(),
1191-
replicatedFrom: msgMeta.GetReplicatedFrom(),
1192-
redeliveryCount: response.GetRedeliveryCount(),
1193-
encryptionContext: createEncryptionContext(msgMeta),
1194-
orderingKey: string(msgMeta.OrderingKey),
1195-
}
1198+
pc.queueCh <- messages
11961199
return nil
11971200
}
11981201
}
@@ -1228,6 +1231,7 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
12281231
numMsgs = int(msgMeta.GetNumMessagesInBatch())
12291232
}
12301233

1234+
messages := make([]*message, 0)
12311235
var ackTracker *ackTracker
12321236
// are there multiple messages in this batch?
12331237
if numMsgs > 1 {
@@ -1248,6 +1252,7 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
12481252
pc.metrics.PrefetchedMessages.Add(float64(numMsgs))
12491253

12501254
var (
1255+
bytesReceived int
12511256
skippedMessages int32
12521257
)
12531258
for i := 0; i < numMsgs; i++ {
@@ -1366,20 +1371,21 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
13661371
Consumer: pc.parentConsumer,
13671372
Message: msg,
13681373
})
1369-
1374+
messages = append(messages, msg)
1375+
bytesReceived += msg.size()
13701376
if pc.options.autoReceiverQueueSize {
1371-
pc.client.memLimit.ForceReserveMemory(int64(msg.size()))
1377+
pc.client.memLimit.ForceReserveMemory(int64(bytesReceived))
13721378
pc.incomingMessages.Add(int32(1))
13731379
pc.markScaleIfNeed()
13741380
}
1375-
1376-
pc.queueCh <- msg
13771381
}
13781382

13791383
if skippedMessages > 0 {
13801384
pc.availablePermits.add(skippedMessages)
13811385
}
13821386

1387+
// send messages to the dispatcher
1388+
pc.queueCh <- messages
13831389
return nil
13841390
}
13851391

@@ -1535,19 +1541,20 @@ func (pc *partitionConsumer) dispatcher() {
15351541
defer func() {
15361542
pc.log.Debug("exiting dispatch loop")
15371543
}()
1538-
var queueMsg *message
1544+
var messages []*message
15391545
for {
1540-
var queueCh chan *message
1546+
var queueCh chan []*message
15411547
var messageCh chan ConsumerMessage
15421548
var nextMessage ConsumerMessage
15431549
var nextMessageSize int
15441550

1545-
if queueMsg != nil {
1551+
// are there more messages to send?
1552+
if len(messages) > 0 {
15461553
nextMessage = ConsumerMessage{
15471554
Consumer: pc.parentConsumer,
1548-
Message: queueMsg,
1555+
Message: messages[0],
15491556
}
1550-
nextMessageSize = queueMsg.size()
1557+
nextMessageSize = messages[0].size()
15511558

15521559
if !pc.isSeeking.Load() {
15531560
if pc.dlq.shouldSendToDlq(&nextMessage) {
@@ -1559,7 +1566,7 @@ func (pc *partitionConsumer) dispatcher() {
15591566
messageCh = pc.messageCh
15601567
}
15611568
pc.metrics.PrefetchedMessages.Dec()
1562-
pc.metrics.PrefetchedBytes.Sub(float64(len(queueMsg.payLoad)))
1569+
pc.metrics.PrefetchedBytes.Sub(float64(len(messages[0].payLoad)))
15631570
} else {
15641571
pc.log.Debug("skip dispatching messages when seeking")
15651572
}
@@ -1577,7 +1584,7 @@ func (pc *partitionConsumer) dispatcher() {
15771584
}
15781585
pc.log.Debug("dispatcher received connection event")
15791586

1580-
queueMsg = nil
1587+
messages = nil
15811588

15821589
// reset available permits
15831590
pc.availablePermits.reset()
@@ -1602,16 +1609,18 @@ func (pc *partitionConsumer) dispatcher() {
16021609
pc.log.Debug("received dispatcherSeekingControlCh, set isSeek to true")
16031610
pc.isSeeking.Store(true)
16041611

1605-
case msg, ok := <-queueCh:
1612+
case msgs, ok := <-queueCh:
16061613
if !ok {
16071614
return
16081615
}
16091616

1610-
queueMsg = msg
1617+
messages = msgs
16111618

16121619
// if the messageCh is nil or the messageCh is full this will not be selected
16131620
case messageCh <- nextMessage:
1614-
queueMsg = nil
1621+
// allow this message to be garbage collected
1622+
messages[0] = nil
1623+
messages = messages[1:]
16151624

16161625
pc.availablePermits.inc()
16171626

@@ -1634,14 +1643,14 @@ func (pc *partitionConsumer) dispatcher() {
16341643
if m == nil {
16351644
break
16361645
} else if nextMessageInQueue == nil {
1637-
nextMessageInQueue = toTrackingMessageID(m.msgID)
1646+
nextMessageInQueue = toTrackingMessageID(m[0].msgID)
16381647
}
16391648
if pc.options.autoReceiverQueueSize {
1640-
pc.incomingMessages.Sub(int32(1))
1649+
pc.incomingMessages.Sub(int32(len(m)))
16411650
}
16421651
}
16431652

1644-
queueMsg = nil
1653+
messages = nil
16451654

16461655
clearQueueCb(nextMessageInQueue)
16471656
}

pulsar/consumer_partition_test.go

Lines changed: 18 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import (
3030
func TestSingleMessageIDNoAckTracker(t *testing.T) {
3131
eventsCh := make(chan interface{}, 1)
3232
pc := partitionConsumer{
33-
queueCh: make(chan *message, 1),
33+
queueCh: make(chan []*message, 1),
3434
eventsCh: eventsCh,
3535
compressionProviders: sync.Map{},
3636
options: &partitionConsumerOpts{},
@@ -47,12 +47,13 @@ func TestSingleMessageIDNoAckTracker(t *testing.T) {
4747
}
4848

4949
// ensure the tracker was set on the message id
50-
message := <-pc.queueCh
51-
id := message.ID().(*trackingMessageID)
52-
assert.Nil(t, id.tracker)
50+
messages := <-pc.queueCh
51+
for _, m := range messages {
52+
assert.Nil(t, m.ID().(*trackingMessageID).tracker)
53+
}
5354

5455
// ack the message id
55-
pc.AckID(id)
56+
pc.AckID(messages[0].msgID.(*trackingMessageID))
5657

5758
select {
5859
case <-eventsCh:
@@ -68,7 +69,7 @@ func newTestMetrics() *internal.LeveledMetrics {
6869
func TestBatchMessageIDNoAckTracker(t *testing.T) {
6970
eventsCh := make(chan interface{}, 1)
7071
pc := partitionConsumer{
71-
queueCh: make(chan *message, 1),
72+
queueCh: make(chan []*message, 1),
7273
eventsCh: eventsCh,
7374
compressionProviders: sync.Map{},
7475
options: &partitionConsumerOpts{},
@@ -85,12 +86,13 @@ func TestBatchMessageIDNoAckTracker(t *testing.T) {
8586
}
8687

8788
// ensure the tracker was set on the message id
88-
message := <-pc.queueCh
89-
id := message.ID().(*trackingMessageID)
90-
assert.Nil(t, id.tracker)
89+
messages := <-pc.queueCh
90+
for _, m := range messages {
91+
assert.Nil(t, m.ID().(*trackingMessageID).tracker)
92+
}
9193

9294
// ack the message id
93-
err := pc.AckID(id)
95+
err := pc.AckID(messages[0].msgID.(*trackingMessageID))
9496
assert.Nil(t, err)
9597

9698
select {
@@ -103,7 +105,7 @@ func TestBatchMessageIDNoAckTracker(t *testing.T) {
103105
func TestBatchMessageIDWithAckTracker(t *testing.T) {
104106
eventsCh := make(chan interface{}, 1)
105107
pc := partitionConsumer{
106-
queueCh: make(chan *message, 10),
108+
queueCh: make(chan []*message, 10),
107109
eventsCh: eventsCh,
108110
compressionProviders: sync.Map{},
109111
options: &partitionConsumerOpts{},
@@ -120,21 +122,14 @@ func TestBatchMessageIDWithAckTracker(t *testing.T) {
120122
}
121123

122124
// ensure the tracker was set on the message id
123-
var messageIDs []*trackingMessageID
124-
for i := 0; i < 10; i++ {
125-
select {
126-
case m := <-pc.queueCh:
127-
id := m.ID().(*trackingMessageID)
128-
assert.NotNil(t, id.tracker)
129-
messageIDs = append(messageIDs, id)
130-
default:
131-
break
132-
}
125+
messages := <-pc.queueCh
126+
for _, m := range messages {
127+
assert.NotNil(t, m.ID().(*trackingMessageID).tracker)
133128
}
134129

135130
// ack all message ids except the last one
136131
for i := 0; i < 9; i++ {
137-
err := pc.AckID(messageIDs[i])
132+
err := pc.AckID(messages[i].msgID.(*trackingMessageID))
138133
assert.Nil(t, err)
139134
}
140135

@@ -145,7 +140,7 @@ func TestBatchMessageIDWithAckTracker(t *testing.T) {
145140
}
146141

147142
// ack last message
148-
err := pc.AckID(messageIDs[9])
143+
err := pc.AckID(messages[9].msgID.(*trackingMessageID))
149144
assert.Nil(t, err)
150145

151146
select {

pulsar/consumer_test.go

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4877,3 +4877,66 @@ func receiveMessages(t *testing.T, consumer Consumer, numMessages int) []Message
48774877
assert.Equal(t, numMessages, len(msgs))
48784878
return msgs
48794879
}
4880+
4881+
func TestAckResponseNotBlocked(t *testing.T) {
4882+
client, err := NewClient(ClientOptions{
4883+
URL: lookupURL,
4884+
OperationTimeout: 5 * time.Second,
4885+
})
4886+
assert.Nil(t, err)
4887+
defer client.Close()
4888+
4889+
topic := fmt.Sprintf("test-ack-response-not-blocked-%v", time.Now().Nanosecond())
4890+
assert.Nil(t, createPartitionedTopic(topic, 10))
4891+
4892+
producer, err := client.CreateProducer(ProducerOptions{
4893+
Topic: topic,
4894+
})
4895+
assert.Nil(t, err)
4896+
4897+
ctx := context.Background()
4898+
numMessages := 1000
4899+
for i := 0; i < numMessages; i++ {
4900+
producer.SendAsync(ctx, &ProducerMessage{
4901+
Payload: []byte(fmt.Sprintf("value-%d", i)),
4902+
}, func(_ MessageID, _ *ProducerMessage, err error) {
4903+
if err != nil {
4904+
t.Fatal(err)
4905+
}
4906+
})
4907+
if i%100 == 99 {
4908+
assert.Nil(t, producer.Flush())
4909+
}
4910+
}
4911+
producer.Flush()
4912+
producer.Close()
4913+
4914+
// Set a small receiver queue size to trigger ack response blocking if the internal `queueCh`
4915+
// is a channel with the same size
4916+
consumer, err := client.Subscribe(ConsumerOptions{
4917+
Topic: topic,
4918+
SubscriptionName: "my-sub",
4919+
SubscriptionInitialPosition: SubscriptionPositionEarliest,
4920+
Type: KeyShared,
4921+
EnableBatchIndexAcknowledgment: true,
4922+
AckWithResponse: true,
4923+
ReceiverQueueSize: 5,
4924+
})
4925+
assert.Nil(t, err)
4926+
msgIDs := make([]MessageID, 0)
4927+
for i := 0; i < numMessages; i++ {
4928+
if msg, err := consumer.Receive(context.Background()); err != nil {
4929+
t.Fatal(err)
4930+
} else {
4931+
msgIDs = append(msgIDs, msg.ID())
4932+
if len(msgIDs) >= 10 {
4933+
if err := consumer.AckIDList(msgIDs); err != nil {
4934+
t.Fatal("Failed to acked messages: ", msgIDs, " ", err)
4935+
} else {
4936+
t.Log("Acked messages: ", msgIDs)
4937+
}
4938+
msgIDs = msgIDs[:0]
4939+
}
4940+
}
4941+
}
4942+
}

0 commit comments

Comments
 (0)