Skip to content

Commit 3d9aa96

Browse files
committed
Reuse AckIDList tests
1 parent bef9922 commit 3d9aa96

File tree

2 files changed

+37
-62
lines changed

2 files changed

+37
-62
lines changed

pulsar/consumer_partition.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -746,8 +746,6 @@ func (pc *partitionConsumer) AckIDList(msgIDs []MessageID) error {
746746
pendingAcks := make(map[position]*bitset.BitSet)
747747
validMsgIDs := make([]MessageID, 0, len(msgIDs))
748748

749-
pc.metrics.AcksCounter.Add(float64(len(msgIDs)))
750-
751749
// They might be complete after the whole for loop
752750
for _, msgID := range msgIDs {
753751
if msgID.PartitionIdx() != pc.partitionIdx {
@@ -788,6 +786,8 @@ func (pc *partitionConsumer) AckIDList(msgIDs []MessageID) error {
788786
return toAckError(map[error][]MessageID{errors.New("consumer state is closed"): validMsgIDs})
789787
}
790788

789+
pc.metrics.AcksCounter.Add(float64(len(validMsgIDs)))
790+
791791
req := &ackListRequest{
792792
errCh: make(chan error),
793793
msgIDs: toMsgIDDataList(pendingAcks),

pulsar/consumer_test.go

Lines changed: 35 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -4857,8 +4857,30 @@ func TestAckIDList(t *testing.T) {
48574857
}
48584858
}
48594859

4860+
func getAckCount(registry *prometheus.Registry) (int, error) {
4861+
metrics, err := registry.Gather()
4862+
if err != nil {
4863+
return 0, err
4864+
}
4865+
4866+
var ackCount float64
4867+
for _, metric := range metrics {
4868+
if metric.GetName() == "pulsar_client_consumer_acks" {
4869+
for _, m := range metric.GetMetric() {
4870+
ackCount += m.GetCounter().GetValue()
4871+
}
4872+
}
4873+
}
4874+
return int(ackCount), nil
4875+
}
4876+
48604877
func runAckIDListTest(t *testing.T, enableBatchIndexAck bool) {
4861-
client, err := NewClient(ClientOptions{URL: lookupURL})
4878+
// Create a custom metrics registry
4879+
registry := prometheus.NewRegistry()
4880+
client, err := NewClient(ClientOptions{
4881+
URL: lookupURL,
4882+
MetricsRegisterer: registry,
4883+
})
48624884
assert.Nil(t, err)
48634885
defer client.Close()
48644886

@@ -4887,6 +4909,10 @@ func runAckIDListTest(t *testing.T, enableBatchIndexAck bool) {
48874909
msgIDs[i] = msgs[ackedIndexes[i]].ID()
48884910
}
48894911
assert.Nil(t, consumer.AckIDList(msgIDs))
4912+
ackCnt, err := getAckCount(registry)
4913+
expectedAckCnt := len(msgIDs)
4914+
assert.NoError(t, err)
4915+
assert.Equal(t, expectedAckCnt, ackCnt)
48904916
consumer.Close()
48914917

48924918
consumer = createSharedConsumer(t, client, topic, enableBatchIndexAck)
@@ -4901,6 +4927,10 @@ func runAckIDListTest(t *testing.T, enableBatchIndexAck bool) {
49014927
msgIDs = append(msgIDs, originalMsgIDs[i])
49024928
}
49034929
assert.Nil(t, consumer.AckIDList(msgIDs))
4930+
expectedAckCnt = expectedAckCnt + len(msgIDs)
4931+
ackCnt, err = getAckCount(registry)
4932+
assert.NoError(t, err)
4933+
assert.Equal(t, expectedAckCnt, ackCnt)
49044934
consumer.Close()
49054935

49064936
consumer = createSharedConsumer(t, client, topic, enableBatchIndexAck)
@@ -4921,6 +4951,10 @@ func runAckIDListTest(t *testing.T, enableBatchIndexAck bool) {
49214951
} else {
49224952
assert.Fail(t, "AckIDList should return AckError")
49234953
}
4954+
4955+
ackCnt, err = getAckCount(registry)
4956+
assert.NoError(t, err)
4957+
assert.Equal(t, expectedAckCnt, ackCnt)
49244958
}
49254959

49264960
func createSharedConsumer(t *testing.T, client Client, topic string, enableBatchIndexAck bool) Consumer {
@@ -5161,62 +5195,3 @@ func TestSelectConnectionForSameConsumer(t *testing.T) {
51615195
"The consumer uses a different connection when reconnecting")
51625196
}
51635197
}
5164-
5165-
func TestAckIDListWillIncreaseAckCounterMetrics(t *testing.T) {
5166-
topicName := newTopicName()
5167-
5168-
// Create a custom metrics registry
5169-
registry := prometheus.NewRegistry()
5170-
5171-
client, err := NewClient(ClientOptions{
5172-
URL: serviceURL,
5173-
MetricsRegisterer: registry,
5174-
})
5175-
assert.NoError(t, err)
5176-
defer client.Close()
5177-
5178-
p, err := client.CreateProducer(ProducerOptions{
5179-
Topic: topicName,
5180-
})
5181-
assert.NoError(t, err)
5182-
5183-
for i := 0; i < 10; i++ {
5184-
_, err = p.Send(context.Background(), &ProducerMessage{
5185-
Payload: []byte(fmt.Sprintf("msg-%d", i)),
5186-
})
5187-
assert.NoError(t, err)
5188-
}
5189-
5190-
c, err := client.Subscribe(ConsumerOptions{
5191-
Topic: topicName,
5192-
SubscriptionName: "my-sub",
5193-
SubscriptionInitialPosition: SubscriptionPositionEarliest,
5194-
})
5195-
5196-
assert.NoError(t, err)
5197-
5198-
msgIDs := make([]MessageID, 10)
5199-
for i := 0; i < 10; i++ {
5200-
msg, err := c.Receive(context.Background())
5201-
assert.NoError(t, err)
5202-
msgIDs[i] = msg.ID()
5203-
}
5204-
5205-
err = c.AckIDList(msgIDs)
5206-
assert.NoError(t, err)
5207-
5208-
// Get metrics directly from the registry
5209-
metrics, err := registry.Gather()
5210-
assert.NoError(t, err)
5211-
5212-
var ackCount float64
5213-
for _, metric := range metrics {
5214-
if metric.GetName() == "pulsar_client_consumer_acks" {
5215-
for _, m := range metric.GetMetric() {
5216-
ackCount += m.GetCounter().GetValue()
5217-
}
5218-
}
5219-
}
5220-
5221-
assert.Equal(t, float64(10), ackCount)
5222-
}

0 commit comments

Comments
 (0)