Skip to content

Commit 48d4cb9

Browse files
committed
Add test to cover case from PR apache#1310
1 parent 6411297 commit 48d4cb9

File tree

1 file changed

+63
-0
lines changed

1 file changed

+63
-0
lines changed

pulsar/consumer_test.go

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4877,3 +4877,66 @@ func receiveMessages(t *testing.T, consumer Consumer, numMessages int) []Message
48774877
assert.Equal(t, numMessages, len(msgs))
48784878
return msgs
48794879
}
4880+
4881+
func TestAckResponseNotBlocked(t *testing.T) {
4882+
client, err := NewClient(ClientOptions{
4883+
URL: lookupURL,
4884+
OperationTimeout: 5 * time.Second,
4885+
})
4886+
assert.Nil(t, err)
4887+
defer client.Close()
4888+
4889+
topic := fmt.Sprintf("test-ack-response-not-blocked-%v", time.Now().Nanosecond())
4890+
assert.Nil(t, createPartitionedTopic(topic, 10))
4891+
4892+
producer, err := client.CreateProducer(ProducerOptions{
4893+
Topic: topic,
4894+
})
4895+
assert.Nil(t, err)
4896+
4897+
ctx := context.Background()
4898+
numMessages := 1000
4899+
for i := 0; i < numMessages; i++ {
4900+
producer.SendAsync(ctx, &ProducerMessage{
4901+
Payload: []byte(fmt.Sprintf("value-%d", i)),
4902+
}, func(_ MessageID, _ *ProducerMessage, err error) {
4903+
if err != nil {
4904+
t.Fatal(err)
4905+
}
4906+
})
4907+
if i%100 == 99 {
4908+
assert.Nil(t, producer.Flush())
4909+
}
4910+
}
4911+
producer.Flush()
4912+
producer.Close()
4913+
4914+
// Set a small receiver queue size to trigger ack response blocking if the internal `queueCh`
4915+
// is a channel with the same size
4916+
consumer, err := client.Subscribe(ConsumerOptions{
4917+
Topic: topic,
4918+
SubscriptionName: "my-sub",
4919+
SubscriptionInitialPosition: SubscriptionPositionEarliest,
4920+
Type: KeyShared,
4921+
EnableBatchIndexAcknowledgment: true,
4922+
AckWithResponse: true,
4923+
ReceiverQueueSize: 5,
4924+
})
4925+
assert.Nil(t, err)
4926+
msgIDs := make([]MessageID, 0)
4927+
for i := 0; i < numMessages; i++ {
4928+
if msg, err := consumer.Receive(context.Background()); err != nil {
4929+
t.Fatal(err)
4930+
} else {
4931+
msgIDs = append(msgIDs, msg.ID())
4932+
if len(msgIDs) >= 10 {
4933+
if err := consumer.AckIDList(msgIDs); err != nil {
4934+
t.Fatal("Failed to acked messages: ", msgIDs, " ", err)
4935+
} else {
4936+
t.Log("Acked messages: ", msgIDs)
4937+
}
4938+
msgIDs = msgIDs[:0]
4939+
}
4940+
}
4941+
}
4942+
}

0 commit comments

Comments
 (0)