Skip to content

Commit ba6468a

Browse files
authored
fix: handle reconnection logic for zero queue consumer (#1404)
1 parent 7a9a33c commit ba6468a

File tree

3 files changed

+132
-4
lines changed

3 files changed

+132
-4
lines changed

pulsar/consumer_impl.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -473,6 +473,7 @@ func newPartitionConsumerOpts(topic, consumerName string, idx int, options Consu
473473
enableBatchIndexAck: options.EnableBatchIndexAcknowledgment,
474474
ackGroupingOptions: options.AckGroupingOptions,
475475
autoReceiverQueueSize: options.EnableAutoScaledReceiverQueueSize,
476+
enableZeroQueueConsumer: options.EnableZeroQueueConsumer,
476477
}
477478
}
478479

pulsar/consumer_partition.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -123,9 +123,10 @@ type partitionConsumerOpts struct {
123123
expireTimeOfIncompleteChunk time.Duration
124124
autoAckIncompleteChunk bool
125125
// in failover mode, this callback will be called when consumer change
126-
consumerEventListener ConsumerEventListener
127-
enableBatchIndexAck bool
128-
ackGroupingOptions *AckGroupingOptions
126+
consumerEventListener ConsumerEventListener
127+
enableBatchIndexAck bool
128+
ackGroupingOptions *AckGroupingOptions
129+
enableZeroQueueConsumer bool
129130
}
130131

131132
type ConsumerEventListener interface {
@@ -1665,7 +1666,7 @@ func (pc *partitionConsumer) dispatcher() {
16651666

16661667
pc.log.Debugf("dispatcher requesting initial permits=%d", initialPermits)
16671668
// send initial permits
1668-
if err := pc.internalFlow(initialPermits); err != nil {
1669+
if err := pc.internalFlow(initialPermits); err != nil && !pc.options.enableZeroQueueConsumer {
16691670
pc.log.WithError(err).Error("unable to send initial permits to broker")
16701671
}
16711672

@@ -1922,6 +1923,10 @@ func (pc *partitionConsumer) reconnectToBroker(connectionClosed *connectionClose
19221923
// Successfully reconnected
19231924
pc.log.Info("Reconnected consumer to broker")
19241925
bo.Reset()
1926+
if pc.options.enableZeroQueueConsumer {
1927+
pc.log.Info("zeroQueueConsumer reconnect, reset availablePermits")
1928+
pc.availablePermits.inc()
1929+
}
19251930
return struct{}{}, nil
19261931
}
19271932
pc.log.WithError(err).Error("Failed to create consumer at reconnect")

pulsar/consumer_zero_queue_test.go

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,12 @@ import (
2424
"testing"
2525
"time"
2626

27+
"github.com/docker/docker/api/types/container"
28+
"github.com/docker/go-connections/nat"
29+
"github.com/stretchr/testify/require"
30+
"github.com/testcontainers/testcontainers-go"
31+
"github.com/testcontainers/testcontainers-go/wait"
32+
2733
"github.com/apache/pulsar-client-go/pulsar/internal"
2834
"github.com/apache/pulsar-client-go/pulsaradmin"
2935
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin/config"
@@ -115,6 +121,122 @@ func TestNormalZeroQueueConsumer(t *testing.T) {
115121
err = consumer.Unsubscribe()
116122
assert.Nil(t, err)
117123
}
124+
func TestReconnectConsumer(t *testing.T) {
125+
126+
req := testcontainers.ContainerRequest{
127+
Name: "pulsar-test",
128+
Image: getPulsarTestImage(),
129+
ExposedPorts: []string{"6650/tcp", "8080/tcp"},
130+
WaitingFor: wait.ForExposedPort(),
131+
HostConfigModifier: func(config *container.HostConfig) {
132+
config.PortBindings = map[nat.Port][]nat.PortBinding{
133+
"6650/tcp": {{HostIP: "0.0.0.0", HostPort: "6659"}},
134+
"8080/tcp": {{HostIP: "0.0.0.0", HostPort: "8089"}},
135+
}
136+
},
137+
Cmd: []string{"bin/pulsar", "standalone", "-nfw"},
138+
}
139+
c, err := testcontainers.GenericContainer(context.Background(), testcontainers.GenericContainerRequest{
140+
ContainerRequest: req,
141+
Started: true,
142+
Reuse: true,
143+
})
144+
require.NoError(t, err, "Failed to start the pulsar container")
145+
endpoint, err := c.PortEndpoint(context.Background(), "6650", "pulsar")
146+
require.NoError(t, err, "Failed to get the pulsar endpoint")
147+
148+
client, err := NewClient(ClientOptions{
149+
URL: endpoint,
150+
})
151+
assert.Nil(t, err)
152+
adminEndpoint, err := c.PortEndpoint(context.Background(), "8080", "http")
153+
assert.Nil(t, err)
154+
admin, err := pulsaradmin.NewClient(&config.Config{
155+
WebServiceURL: adminEndpoint,
156+
})
157+
assert.Nil(t, err)
158+
159+
assert.Nil(t, err)
160+
defer client.Close()
161+
162+
topic := newTopicName()
163+
ctx := context.Background()
164+
var consumer Consumer
165+
require.Eventually(t, func() bool {
166+
consumer, err = client.Subscribe(ConsumerOptions{
167+
Topic: topic,
168+
SubscriptionName: "my-sub",
169+
EnableZeroQueueConsumer: true,
170+
})
171+
return err == nil
172+
}, 30*time.Second, 1*time.Second)
173+
174+
assert.Nil(t, err)
175+
_, ok := consumer.(*zeroQueueConsumer)
176+
assert.True(t, ok)
177+
defer consumer.Close()
178+
179+
// create producer
180+
producer, err := client.CreateProducer(ProducerOptions{
181+
Topic: topic,
182+
DisableBatching: false,
183+
})
184+
assert.Nil(t, err)
185+
186+
// send 10 messages
187+
for i := 0; i < 10; i++ {
188+
msg, err := producer.Send(ctx, &ProducerMessage{
189+
Payload: []byte(fmt.Sprintf("hello-%d", i)),
190+
Key: "pulsar",
191+
Properties: map[string]string{
192+
"key-1": "pulsar-1",
193+
},
194+
})
195+
assert.Nil(t, err)
196+
log.Printf("send message: %s", msg.String())
197+
}
198+
199+
ch := make(chan struct{})
200+
201+
go func() {
202+
time.Sleep(3 * time.Second)
203+
log.Println("unloading topic")
204+
// Simulate a broker restart by stopping the pulsar container
205+
topicName, err := utils.GetTopicName(topic)
206+
assert.Nil(t, err)
207+
err = admin.Topics().Unload(*topicName)
208+
assert.Nil(t, err)
209+
log.Println("unloaded topic")
210+
ch <- struct{}{}
211+
}()
212+
213+
// receive 10 messages
214+
for i := 0; i < 10; i++ {
215+
if i == 3 {
216+
<-ch
217+
}
218+
msg, err := consumer.Receive(context.Background())
219+
if err != nil {
220+
log.Fatal(err)
221+
}
222+
223+
expectMsg := fmt.Sprintf("hello-%d", i)
224+
expectProperties := map[string]string{
225+
"key-1": "pulsar-1",
226+
}
227+
assert.Equal(t, []byte(expectMsg), msg.Payload())
228+
assert.Equal(t, "pulsar", msg.Key())
229+
assert.Equal(t, expectProperties, msg.Properties())
230+
// ack message
231+
consumer.Ack(msg)
232+
log.Printf("receive message: %s", msg.ID().String())
233+
}
234+
err = consumer.Unsubscribe()
235+
assert.Nil(t, err)
236+
consumer.Close()
237+
producer.Close()
238+
defer c.Terminate(ctx)
239+
}
118240

119241
func TestMultipleConsumer(t *testing.T) {
120242
client, err := NewClient(ClientOptions{

0 commit comments

Comments
 (0)