Skip to content

Commit 5d49f15

Browse files
authored
[improve] Add Testcase to test using keyShared subscription and delayed messages at the same time (#1361)
Master Issue: apache/pulsar#23968 Related pr: #1339 ### Motivation Refered to pr [comment](#1339 (comment)), there is no test cases about using keyShared subscription mode to consume delayed messages, so that maybe we need to add one. ### Modifications Add `pulsar/consumer_test/TestConsumerKeySharedWithDelayedMessages` test case.
1 parent e2f46c6 commit 5d49f15

File tree

2 files changed

+87
-5
lines changed

2 files changed

+87
-5
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ bin/golangci-lint:
5050
# use golangCi-lint docker to avoid local golang env issues
5151
# https://golangci-lint.run/welcome/install/
5252
lint-docker:
53-
docker run --rm -v $(shell pwd):/app -w /app golangci/golangci-lint:v1.51.2 golangci-lint run -v
53+
docker run --rm -v $(shell pwd):/app -w /app golangci/golangci-lint:v1.61.0 golangci-lint run -v
5454

5555
container:
5656
docker build -t ${IMAGE_NAME} \

pulsar/consumer_test.go

Lines changed: 86 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -336,11 +336,93 @@ func TestConsumerKeyShared(t *testing.T) {
336336
assert.NotEqual(t, 0, receivedConsumer1)
337337
assert.NotEqual(t, 0, receivedConsumer2)
338338

339-
t.Logf("TestConsumerKeyShared received messages consumer1: %d consumser2: %d\n",
339+
t.Logf("TestConsumerKeyShared received messages consumer1: %d consumer2: %d\n",
340340
receivedConsumer1, receivedConsumer2)
341341
assert.Equal(t, 100, receivedConsumer1+receivedConsumer2)
342342
}
343343

344+
// TestConsumerKeySharedWithDelayedMessages
345+
// test using delayed messages and key-shared sub mode at the same time
346+
func TestConsumerKeySharedWithDelayedMessages(t *testing.T) {
347+
client, err := NewClient(ClientOptions{
348+
URL: lookupURL,
349+
})
350+
assert.Nil(t, err)
351+
defer client.Close()
352+
topic := newTopicName()
353+
354+
consumer1, err := client.Subscribe(ConsumerOptions{
355+
Topic: topic,
356+
SubscriptionName: "sub-1",
357+
Type: KeyShared,
358+
})
359+
assert.Nil(t, err)
360+
defer consumer1.Close()
361+
consumer2, err := client.Subscribe(ConsumerOptions{
362+
Topic: topic,
363+
SubscriptionName: "sub-1",
364+
Type: KeyShared,
365+
})
366+
assert.Nil(t, err)
367+
defer consumer2.Close()
368+
369+
producer, err := client.CreateProducer(ProducerOptions{
370+
Topic: topic,
371+
})
372+
assert.Nil(t, err)
373+
defer producer.Close()
374+
ctx := context.Background()
375+
startTime := time.Now()
376+
delayTime := 3 * time.Second
377+
for i := 0; i < 100; i++ {
378+
_, err := producer.Send(ctx, &ProducerMessage{
379+
Key: fmt.Sprintf("key-shared-%d", i%3),
380+
Payload: []byte(fmt.Sprintf("value-%d", i)),
381+
DeliverAfter: delayTime,
382+
})
383+
assert.Nil(t, err)
384+
}
385+
386+
receivedConsumer1 := 0
387+
receivedConsumer2 := 0
388+
timeoutTimer := time.After(2 * delayTime)
389+
for (receivedConsumer1 + receivedConsumer2) < 100 {
390+
select {
391+
case <-timeoutTimer:
392+
break
393+
default:
394+
}
395+
396+
select {
397+
case cm, ok := <-consumer1.Chan():
398+
if !ok {
399+
break
400+
}
401+
receivedConsumer1++
402+
_ = consumer1.Ack(cm.Message)
403+
assert.GreaterOrEqual(t, time.Since(startTime), delayTime,
404+
"TestConsumerKeySharedWithDelayedMessages should delay messages later than defined deliverAfter time",
405+
)
406+
case cm, ok := <-consumer2.Chan():
407+
if !ok {
408+
break
409+
}
410+
receivedConsumer2++
411+
_ = consumer2.Ack(cm.Message)
412+
assert.GreaterOrEqual(t, time.Since(startTime), delayTime,
413+
"TestConsumerKeySharedWithDelayedMessages should delay messages later than defined deliverAfter time",
414+
)
415+
}
416+
}
417+
418+
assert.NotEqual(t, 0, receivedConsumer1)
419+
assert.NotEqual(t, 0, receivedConsumer2)
420+
assert.Equal(t, 100, receivedConsumer1+receivedConsumer2)
421+
t.Logf("TestConsumerKeySharedWithDelayedMessages received messages consumer1: %d consumer2: %d, timecost: %d\n",
422+
receivedConsumer1, receivedConsumer2, time.Since(startTime).Milliseconds(),
423+
)
424+
}
425+
344426
func TestPartitionTopicsConsumerPubSub(t *testing.T) {
345427
client, err := NewClient(ClientOptions{
346428
URL: lookupURL,
@@ -2704,11 +2786,11 @@ func TestKeyBasedBatchProducerConsumerKeyShared(t *testing.T) {
27042786
assert.Equal(t, len(consumer1Keys)*MsgBatchCount, receivedConsumer1)
27052787
assert.Equal(t, len(consumer2Keys)*MsgBatchCount, receivedConsumer2)
27062788

2707-
t.Logf("TestKeyBasedBatchProducerConsumerKeyShared received messages consumer1: %d consumser2: %d\n",
2789+
t.Logf("TestKeyBasedBatchProducerConsumerKeyShared received messages consumer1: %d consumer2: %d\n",
27082790
receivedConsumer1, receivedConsumer2)
27092791
assert.Equal(t, 300, receivedConsumer1+receivedConsumer2)
27102792

2711-
t.Logf("TestKeyBasedBatchProducerConsumerKeyShared received messages keys consumer1: %v consumser2: %v\n",
2793+
t.Logf("TestKeyBasedBatchProducerConsumerKeyShared received messages keys consumer1: %v consumer2: %v\n",
27122794
consumer1Keys, consumer2Keys)
27132795
}
27142796

@@ -2887,7 +2969,7 @@ func TestConsumerKeySharedWithOrderingKey(t *testing.T) {
28872969
assert.NotEqual(t, 0, receivedConsumer2)
28882970

28892971
t.Logf(
2890-
"TestConsumerKeySharedWithOrderingKey received messages consumer1: %d consumser2: %d\n",
2972+
"TestConsumerKeySharedWithOrderingKey received messages consumer1: %d consumer2: %d\n",
28912973
receivedConsumer1, receivedConsumer2,
28922974
)
28932975
assert.Equal(t, 100, receivedConsumer1+receivedConsumer2)

0 commit comments

Comments
 (0)