Skip to content

Commit 41bc6f4

Browse files
crossoverJieCopilotnodece
authored
fix: enhance zero queue consumer reconnection handling and message permit management (#1443)
Co-authored-by: Copilot <[email protected]> Co-authored-by: Zixuan Liu <[email protected]>
1 parent 6ab93d5 commit 41bc6f4

File tree

5 files changed

+184
-10
lines changed

5 files changed

+184
-10
lines changed

pulsar/consumer_partition.go

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -124,10 +124,11 @@ type partitionConsumerOpts struct {
124124
expireTimeOfIncompleteChunk time.Duration
125125
autoAckIncompleteChunk bool
126126
// in failover mode, this callback will be called when consumer change
127-
consumerEventListener ConsumerEventListener
128-
enableBatchIndexAck bool
129-
ackGroupingOptions *AckGroupingOptions
130-
enableZeroQueueConsumer bool
127+
consumerEventListener ConsumerEventListener
128+
enableBatchIndexAck bool
129+
ackGroupingOptions *AckGroupingOptions
130+
enableZeroQueueConsumer bool
131+
zeroQueueReconnectedPolicy func(*partitionConsumer)
131132
}
132133

133134
type ConsumerEventListener interface {
@@ -170,6 +171,7 @@ type partitionConsumer struct {
170171
currentQueueSize uAtomic.Int32
171172
scaleReceiverQueueHint uAtomic.Bool
172173
incomingMessages uAtomic.Int32
174+
reconnectCount uAtomic.Int32
173175

174176
eventsCh chan interface{}
175177
connectedCh chan struct{}
@@ -1393,6 +1395,7 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
13931395
orderingKey: string(smm.OrderingKey),
13941396
index: messageIndex,
13951397
brokerPublishTime: brokerPublishTime,
1398+
conn: pc._getConn(),
13961399
}
13971400
} else {
13981401
msg = &message{
@@ -1413,6 +1416,7 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
14131416
orderingKey: string(msgMeta.GetOrderingKey()),
14141417
index: messageIndex,
14151418
brokerPublishTime: brokerPublishTime,
1419+
conn: pc._getConn(),
14161420
}
14171421
}
14181422

@@ -1541,6 +1545,7 @@ func createEncryptionContext(msgMeta *pb.MessageMetadata) *EncryptionContext {
15411545
func (pc *partitionConsumer) ConnectionClosed(closeConsumer *pb.CommandCloseConsumer) {
15421546
// Trigger reconnection in the consumer goroutine
15431547
pc.log.Debug("connection closed and send to connectClosedCh")
1548+
pc.reconnectCount.Inc()
15441549
var assignedBrokerURL string
15451550
if closeConsumer != nil {
15461551
assignedBrokerURL = pc.client.selectServiceURL(
@@ -1925,9 +1930,8 @@ func (pc *partitionConsumer) reconnectToBroker(connectionClosed *connectionClose
19251930
// Successfully reconnected
19261931
pc.log.Info("Reconnected consumer to broker")
19271932
bo.Reset()
1928-
if pc.options.enableZeroQueueConsumer {
1929-
pc.log.Info("zeroQueueConsumer reconnect, reset availablePermits")
1930-
pc.availablePermits.inc()
1933+
if pc.options.enableZeroQueueConsumer && pc.options.zeroQueueReconnectedPolicy != nil {
1934+
pc.options.zeroQueueReconnectedPolicy(pc)
19311935
}
19321936
return struct{}{}, nil
19331937
}

pulsar/consumer_partition_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ func TestSingleMessageIDNoAckTracker(t *testing.T) {
3737
metrics: newTestMetrics(),
3838
decryptor: crypto.NewNoopDecryptor(),
3939
}
40+
pc._setConn(dummyConnection{})
4041
pc.availablePermits = &availablePermits{pc: &pc}
4142
pc.ackGroupingTracker = newAckGroupingTracker(&AckGroupingOptions{MaxSize: 0},
4243
func(id MessageID) { pc.sendIndividualAck(id) }, nil, nil)
@@ -76,6 +77,7 @@ func TestBatchMessageIDNoAckTracker(t *testing.T) {
7677
metrics: newTestMetrics(),
7778
decryptor: crypto.NewNoopDecryptor(),
7879
}
80+
pc._setConn(dummyConnection{})
7981
pc.availablePermits = &availablePermits{pc: &pc}
8082
pc.ackGroupingTracker = newAckGroupingTracker(&AckGroupingOptions{MaxSize: 0},
8183
func(id MessageID) { pc.sendIndividualAck(id) }, nil, nil)
@@ -112,6 +114,7 @@ func TestBatchMessageIDWithAckTracker(t *testing.T) {
112114
metrics: newTestMetrics(),
113115
decryptor: crypto.NewNoopDecryptor(),
114116
}
117+
pc._setConn(dummyConnection{})
115118
pc.availablePermits = &availablePermits{pc: &pc}
116119
pc.ackGroupingTracker = newAckGroupingTracker(&AckGroupingOptions{MaxSize: 0},
117120
func(id MessageID) { pc.sendIndividualAck(id) }, nil, nil)

pulsar/consumer_zero_queue.go

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ import (
2323
"sync"
2424
"time"
2525

26+
uAtomic "go.uber.org/atomic"
27+
2628
"github.com/apache/pulsar-client-go/pulsar/internal"
2729
"github.com/apache/pulsar-client-go/pulsar/log"
2830
"github.com/pkg/errors"
@@ -36,6 +38,7 @@ type zeroQueueConsumer struct {
3638
pc *partitionConsumer
3739
consumerName string
3840
disableForceTopicCreation bool
41+
waitingOnReceive uAtomic.Bool
3942

4043
messageCh chan ConsumerMessage
4144

@@ -71,11 +74,17 @@ func newZeroConsumer(client *client, options ConsumerOptions, topic string,
7174
return nil, err
7275
}
7376
opts := newPartitionConsumerOpts(zc.topic, zc.consumerName, tn.Partition, zc.options)
74-
conn, err := newPartitionConsumer(zc, zc.client, opts, zc.messageCh, zc.dlq, zc.metrics)
77+
opts.zeroQueueReconnectedPolicy = func(pc *partitionConsumer) {
78+
if zc.waitingOnReceive.Load() {
79+
pc.log.Info("zeroQueueConsumer reconnect, reset availablePermits")
80+
pc.availablePermits.inc()
81+
}
82+
}
83+
pc, err := newPartitionConsumer(zc, zc.client, opts, zc.messageCh, zc.dlq, zc.metrics)
7584
if err != nil {
7685
return nil, err
7786
}
78-
zc.pc = conn
87+
zc.pc = pc
7988

8089
return zc, nil
8190
}
@@ -119,17 +128,26 @@ func (z *zeroQueueConsumer) Receive(ctx context.Context) (Message, error) {
119128
}
120129
z.Lock()
121130
defer z.Unlock()
131+
z.waitingOnReceive.Store(true)
122132
z.pc.availablePermits.inc()
123133
for {
124134
select {
125135
case <-z.closeCh:
136+
z.waitingOnReceive.Store(false)
126137
return nil, newError(ConsumerClosed, "consumer closed")
127138
case cm, ok := <-z.messageCh:
128139
if !ok {
129140
return nil, newError(ConsumerClosed, "consumer closed")
130141
}
131-
return cm.Message, nil
142+
message, ok := cm.Message.(*message)
143+
if ok && message.getConn().ID() == z.pc._getConn().ID() {
144+
z.waitingOnReceive.Store(false)
145+
return cm.Message, nil
146+
} else {
147+
z.log.WithField("messageID", cm.Message.ID()).Warn("message from old connection discarded after reconnection")
148+
}
132149
case <-ctx.Done():
150+
z.waitingOnReceive.Store(false)
133151
return nil, ctx.Err()
134152
}
135153
}

pulsar/consumer_zero_queue_test.go

Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,148 @@ func TestReconnectConsumer(t *testing.T) {
243243
defer c.Terminate(ctx)
244244
}
245245

246+
func TestReconnectedBrokerSendPermits(t *testing.T) {
247+
req := testcontainers.ContainerRequest{
248+
Name: "pulsar-test",
249+
Image: getPulsarTestImage(),
250+
ExposedPorts: []string{"6650/tcp", "8080/tcp"},
251+
WaitingFor: wait.ForExposedPort(),
252+
HostConfigModifier: func(config *container.HostConfig) {
253+
config.PortBindings = map[nat.Port][]nat.PortBinding{
254+
"6650/tcp": {{HostIP: "0.0.0.0", HostPort: "6659"}},
255+
"8080/tcp": {{HostIP: "0.0.0.0", HostPort: "8089"}},
256+
}
257+
},
258+
Cmd: []string{"bin/pulsar", "standalone", "-nfw"},
259+
}
260+
c, err := testcontainers.GenericContainer(context.Background(), testcontainers.GenericContainerRequest{
261+
ContainerRequest: req,
262+
Started: true,
263+
Reuse: true,
264+
})
265+
require.NoError(t, err, "Failed to start the pulsar container")
266+
endpoint, err := c.PortEndpoint(context.Background(), "6650", "pulsar")
267+
require.NoError(t, err, "Failed to get the pulsar endpoint")
268+
269+
sLogger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug}))
270+
client, err := NewClient(ClientOptions{
271+
URL: endpoint,
272+
Logger: plog.NewLoggerWithSlog(sLogger),
273+
})
274+
assert.Nil(t, err)
275+
adminEndpoint, err := c.PortEndpoint(context.Background(), "8080", "http")
276+
assert.Nil(t, err)
277+
admin, err := pulsaradmin.NewClient(&config.Config{
278+
WebServiceURL: adminEndpoint,
279+
})
280+
assert.Nil(t, err)
281+
282+
topic := newTopicName()
283+
var consumer Consumer
284+
require.Eventually(t, func() bool {
285+
consumer, err = client.Subscribe(ConsumerOptions{
286+
Topic: topic,
287+
SubscriptionName: "my-sub",
288+
EnableZeroQueueConsumer: true,
289+
Type: Shared, // using Shared subscription type to support unack subscription stats
290+
})
291+
return err == nil
292+
}, 30*time.Second, 1*time.Second)
293+
ctx := context.Background()
294+
295+
// create producer
296+
producer, err := client.CreateProducer(ProducerOptions{
297+
Topic: topic,
298+
DisableBatching: false,
299+
})
300+
assert.Nil(t, err)
301+
302+
// send 10 messages
303+
for i := 0; i < 10; i++ {
304+
msg, err := producer.Send(ctx, &ProducerMessage{
305+
Payload: []byte(fmt.Sprintf("hello-%d", i)),
306+
Key: "pulsar",
307+
Properties: map[string]string{
308+
"key-1": "pulsar-1",
309+
},
310+
})
311+
assert.Nil(t, err)
312+
log.Printf("send message: %s", msg.String())
313+
}
314+
315+
log.Println("unloading topic")
316+
topicName, err := utils.GetTopicName(topic)
317+
assert.Nil(t, err)
318+
err = admin.Topics().Unload(*topicName)
319+
assert.Nil(t, err)
320+
log.Println("unloaded topic")
321+
zc, ok := consumer.(*zeroQueueConsumer)
322+
assert.True(t, ok)
323+
// wait for reconnect
324+
require.EventuallyWithT(t, func(c *assert.CollectT) {
325+
reconnectCount := zc.pc.reconnectCount.Load()
326+
require.Equal(c, reconnectCount, int32(1))
327+
}, 30*time.Second, 1*time.Second)
328+
329+
// receive 10 messages
330+
for i := 0; i < 10; i++ {
331+
msg, err := consumer.Receive(context.Background())
332+
if err != nil {
333+
assert.Nil(t, err)
334+
}
335+
336+
expectMsg := fmt.Sprintf("hello-%d", i)
337+
expectProperties := map[string]string{
338+
"key-1": "pulsar-1",
339+
}
340+
assert.Equal(t, []byte(expectMsg), msg.Payload())
341+
assert.Equal(t, "pulsar", msg.Key())
342+
assert.Equal(t, expectProperties, msg.Properties())
343+
// ack message
344+
err = consumer.Ack(msg)
345+
assert.Nil(t, err)
346+
log.Printf("receive message: %s", msg.ID().String())
347+
}
348+
// send one more message and we do not manually receive it
349+
_, err = producer.Send(ctx, &ProducerMessage{
350+
Payload: []byte(fmt.Sprintf("hello-%d", 10)),
351+
Key: "pulsar",
352+
Properties: map[string]string{
353+
"key-1": "pulsar-1",
354+
},
355+
})
356+
assert.Nil(t, err)
357+
// wait for broker send messages to consumer and topic stats update finish
358+
option := utils.GetStatsOptions{
359+
GetPreciseBacklog: true,
360+
}
361+
require.EventuallyWithT(t, func(c *assert.CollectT) {
362+
topicStats, err := admin.Topics().GetStatsWithOptionWithContext(ctx, *topicName, option)
363+
require.Nil(c, err)
364+
for _, subscriptionStats := range topicStats.Subscriptions {
365+
require.Equal(c, subscriptionStats.MsgBacklog, int64(1))
366+
require.Equal(c, subscriptionStats.Consumers[0].UnAckedMessages, 0)
367+
}
368+
}, 30*time.Second, 1*time.Second)
369+
370+
// ack
371+
msg, err := consumer.Receive(context.Background())
372+
assert.Nil(t, err)
373+
err = consumer.Ack(msg)
374+
assert.Nil(t, err)
375+
376+
// check topic stats
377+
require.EventuallyWithT(t, func(c *assert.CollectT) {
378+
topicStats, err := admin.Topics().GetStatsWithOptionWithContext(ctx, *topicName, option)
379+
require.Nil(c, err)
380+
for _, subscriptionStats := range topicStats.Subscriptions {
381+
require.Equal(c, subscriptionStats.MsgBacklog, int64(0))
382+
require.Equal(c, subscriptionStats.Consumers[0].UnAckedMessages, 0)
383+
}
384+
}, 30*time.Second, 1*time.Second)
385+
386+
}
387+
246388
func TestUnloadTopicBeforeConsume(t *testing.T) {
247389

248390
sLogger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug}))

pulsar/impl_message.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ import (
2626
"sync/atomic"
2727
"time"
2828

29+
"github.com/apache/pulsar-client-go/pulsar/internal"
30+
2931
"google.golang.org/protobuf/proto"
3032

3133
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
@@ -313,6 +315,7 @@ type message struct {
313315
encryptionContext *EncryptionContext
314316
index *uint64
315317
brokerPublishTime *time.Time
318+
conn internal.Connection
316319
}
317320

318321
func (msg *message) Topic() string {
@@ -394,6 +397,10 @@ func (msg *message) size() int {
394397
return len(msg.payLoad)
395398
}
396399

400+
func (msg *message) getConn() internal.Connection {
401+
return msg.conn
402+
}
403+
397404
func newAckTracker(size uint) *ackTracker {
398405
batchIDs := bitset.New(size)
399406
for i := uint(0); i < size; i++ {

0 commit comments

Comments
 (0)