Skip to content

Commit 0509e8d

Browse files
committed
fix: track reconnect count in zero queue consumer
1 parent bb8449d commit 0509e8d

File tree

2 files changed

+9
-1
lines changed

2 files changed

+9
-1
lines changed

pulsar/consumer_partition.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,7 @@ type partitionConsumer struct {
171171
currentQueueSize uAtomic.Int32
172172
scaleReceiverQueueHint uAtomic.Bool
173173
incomingMessages uAtomic.Int32
174+
reconnectCount uAtomic.Int32
174175

175176
eventsCh chan interface{}
176177
connectedCh chan struct{}
@@ -1544,6 +1545,7 @@ func createEncryptionContext(msgMeta *pb.MessageMetadata) *EncryptionContext {
15441545
func (pc *partitionConsumer) ConnectionClosed(closeConsumer *pb.CommandCloseConsumer) {
15451546
// Trigger reconnection in the consumer goroutine
15461547
pc.log.Debug("connection closed and send to connectClosedCh")
1548+
pc.reconnectCount.Inc()
15471549
var assignedBrokerURL string
15481550
if closeConsumer != nil {
15491551
assignedBrokerURL = pc.client.selectServiceURL(

pulsar/consumer_zero_queue_test.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -318,7 +318,13 @@ func TestReconnectedBrokerSendPermits(t *testing.T) {
318318
err = admin.Topics().Unload(*topicName)
319319
assert.Nil(t, err)
320320
log.Println("unloaded topic")
321-
time.Sleep(1 * time.Minute)
321+
zc, ok := consumer.(*zeroQueueConsumer)
322+
assert.True(t, ok)
323+
require.EventuallyWithT(t, func(c *assert.CollectT) {
324+
reconnectCount := zc.pc.reconnectCount.Load()
325+
require.Equal(c, reconnectCount, int32(1))
326+
}, 30*time.Second, 1*time.Second)
327+
//time.Sleep(1 * time.Minute)
322328

323329
// receive 10 messages
324330
for i := 0; i < 10; i++ {

0 commit comments

Comments
 (0)