Skip to content

Commit edea3eb

Browse files
crossoverJienodece
andauthored
[Issue 1276] Fix multiple consumers using zeroQueueConsumer (#1278)
* fix #1276 * Update pulsar/consumer_zero_queue_test.go Co-authored-by: Zixuan Liu <[email protected]> * fix docker clean --------- Co-authored-by: Zixuan Liu <[email protected]>
1 parent 279e1d7 commit edea3eb

File tree

3 files changed

+93
-3
lines changed

3 files changed

+93
-3
lines changed

.github/workflows/ci.yml

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,12 @@ jobs:
5151
go-version: [ '1.22', '1.23' ]
5252
steps:
5353
- uses: actions/checkout@v3
54-
- name: clean docker cache
55-
run: docker rmi $(docker images -q) -f && df -h
54+
- name: Check for Docker images
55+
id: check_images
56+
run: echo "::set-output name=images::$(docker images -q | wc -l)"
57+
- name: Clean Docker cache if images exist
58+
if: ${{ steps.check_images.outputs.images > 0 }}
59+
run: docker rmi $(docker images -q) -f && df -h
5660
- uses: actions/setup-go@v3
5761
with:
5862
go-version: ${{ matrix.go-version }}

pulsar/consumer_partition.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1629,7 +1629,10 @@ func (pc *partitionConsumer) dispatcher() {
16291629
messages[0] = nil
16301630
messages = messages[1:]
16311631

1632-
pc.availablePermits.inc()
1632+
// for the zeroQueueConsumer, the permits controlled by itself
1633+
if pc.options.receiverQueueSize > 0 {
1634+
pc.availablePermits.inc()
1635+
}
16331636

16341637
if pc.options.autoReceiverQueueSize {
16351638
pc.incomingMessages.Dec()

pulsar/consumer_zero_queue_test.go

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,89 @@ func TestNormalZeroQueueConsumer(t *testing.T) {
9595
err = consumer.Unsubscribe()
9696
assert.Nil(t, err)
9797
}
98+
99+
func TestMultipleConsumer(t *testing.T) {
100+
client, err := NewClient(ClientOptions{
101+
URL: lookupURL,
102+
})
103+
if err != nil {
104+
log.Fatal(err)
105+
}
106+
defer client.Close()
107+
108+
topic := newTopicName()
109+
ctx := context.Background()
110+
111+
// create consumer1
112+
consumer1, err := client.Subscribe(ConsumerOptions{
113+
Topic: topic,
114+
SubscriptionName: "my-sub",
115+
Type: Shared,
116+
EnableZeroQueueConsumer: true,
117+
})
118+
assert.Nil(t, err)
119+
_, ok := consumer1.(*zeroQueueConsumer)
120+
assert.True(t, ok)
121+
defer consumer1.Close()
122+
123+
// create consumer2
124+
consumer2, err := client.Subscribe(ConsumerOptions{
125+
Topic: topic,
126+
SubscriptionName: "my-sub",
127+
Type: Shared,
128+
EnableZeroQueueConsumer: true,
129+
})
130+
assert.Nil(t, err)
131+
_, ok = consumer2.(*zeroQueueConsumer)
132+
assert.True(t, ok)
133+
defer consumer2.Close()
134+
135+
// create producer
136+
producer, err := client.CreateProducer(ProducerOptions{
137+
Topic: topic,
138+
DisableBatching: true,
139+
})
140+
assert.Nil(t, err)
141+
defer producer.Close()
142+
143+
sendNum := 10
144+
// send 10 messages
145+
for i := 0; i < sendNum; i++ {
146+
msg, err := producer.Send(ctx, &ProducerMessage{
147+
Payload: []byte(fmt.Sprintf("hello-%d", i)),
148+
Key: "pulsar",
149+
Properties: map[string]string{
150+
"key-1": "pulsar-1",
151+
},
152+
})
153+
assert.Nil(t, err)
154+
log.Printf("send message: %s", msg.String())
155+
}
156+
157+
// receive messages
158+
for i := 0; i < sendNum/2; i++ {
159+
msg, err := consumer1.Receive(context.Background())
160+
if err != nil {
161+
log.Fatal(err)
162+
}
163+
log.Printf("consumer1 receive message: %s %s", msg.ID().String(), msg.Payload())
164+
// ack message
165+
consumer1.Ack(msg)
166+
}
167+
168+
// receive messages
169+
for i := 0; i < sendNum/2; i++ {
170+
msg, err := consumer2.Receive(context.Background())
171+
if err != nil {
172+
log.Fatal(err)
173+
}
174+
log.Printf("consumer2 receive message: %s %s", msg.ID().String(), msg.Payload())
175+
// ack message
176+
consumer2.Ack(msg)
177+
}
178+
179+
}
180+
98181
func TestPartitionZeroQueueConsumer(t *testing.T) {
99182
client, err := NewClient(ClientOptions{
100183
URL: lookupURL,

0 commit comments

Comments
 (0)