Skip to content

Commit 0dee113

Browse files
authored
fix: add missing metric tracking of pulsar_client_consumer_acks for AckIDList method (#1396)
1 parent 7f7fb43 commit 0dee113

File tree

2 files changed

+39
-1
lines changed

2 files changed

+39
-1
lines changed

pulsar/consumer_partition.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -760,6 +760,7 @@ func (pc *partitionConsumer) AckIDList(msgIDs []MessageID) error {
760760
position := newPosition(msgID)
761761
if convertedMsgID.ack() {
762762
pendingAcks[position] = nil
763+
pc.metrics.ProcessingTime.Observe(float64(time.Now().UnixNano()-convertedMsgID.receivedTime.UnixNano()) / 1.0e9)
763764
} else if pc.options.enableBatchIndexAck {
764765
pendingAcks[position] = convertedMsgID.tracker.getAckBitSet()
765766
}
@@ -785,6 +786,8 @@ func (pc *partitionConsumer) AckIDList(msgIDs []MessageID) error {
785786
return toAckError(map[error][]MessageID{errors.New("consumer state is closed"): validMsgIDs})
786787
}
787788

789+
pc.metrics.AcksCounter.Add(float64(len(validMsgIDs)))
790+
788791
req := &ackListRequest{
789792
errCh: make(chan error),
790793
msgIDs: toMsgIDDataList(pendingAcks),

pulsar/consumer_test.go

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
"time"
3434

3535
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin"
36+
"github.com/prometheus/client_golang/prometheus"
3637

3738
"github.com/stretchr/testify/require"
3839
"github.com/testcontainers/testcontainers-go"
@@ -4856,8 +4857,30 @@ func TestAckIDList(t *testing.T) {
48564857
}
48574858
}
48584859

4860+
func getAckCount(registry *prometheus.Registry) (int, error) {
4861+
metrics, err := registry.Gather()
4862+
if err != nil {
4863+
return 0, err
4864+
}
4865+
4866+
var ackCount float64
4867+
for _, metric := range metrics {
4868+
if metric.GetName() == "pulsar_client_consumer_acks" {
4869+
for _, m := range metric.GetMetric() {
4870+
ackCount += m.GetCounter().GetValue()
4871+
}
4872+
}
4873+
}
4874+
return int(ackCount), nil
4875+
}
4876+
48594877
func runAckIDListTest(t *testing.T, enableBatchIndexAck bool) {
4860-
client, err := NewClient(ClientOptions{URL: lookupURL})
4878+
// Create a custom metrics registry
4879+
registry := prometheus.NewRegistry()
4880+
client, err := NewClient(ClientOptions{
4881+
URL: lookupURL,
4882+
MetricsRegisterer: registry,
4883+
})
48614884
assert.Nil(t, err)
48624885
defer client.Close()
48634886

@@ -4886,6 +4909,10 @@ func runAckIDListTest(t *testing.T, enableBatchIndexAck bool) {
48864909
msgIDs[i] = msgs[ackedIndexes[i]].ID()
48874910
}
48884911
assert.Nil(t, consumer.AckIDList(msgIDs))
4912+
ackCnt, err := getAckCount(registry)
4913+
expectedAckCnt := len(msgIDs)
4914+
assert.NoError(t, err)
4915+
assert.Equal(t, expectedAckCnt, ackCnt)
48894916
consumer.Close()
48904917

48914918
consumer = createSharedConsumer(t, client, topic, enableBatchIndexAck)
@@ -4900,6 +4927,10 @@ func runAckIDListTest(t *testing.T, enableBatchIndexAck bool) {
49004927
msgIDs = append(msgIDs, originalMsgIDs[i])
49014928
}
49024929
assert.Nil(t, consumer.AckIDList(msgIDs))
4930+
expectedAckCnt = expectedAckCnt + len(msgIDs)
4931+
ackCnt, err = getAckCount(registry)
4932+
assert.NoError(t, err)
4933+
assert.Equal(t, expectedAckCnt, ackCnt)
49034934
consumer.Close()
49044935

49054936
consumer = createSharedConsumer(t, client, topic, enableBatchIndexAck)
@@ -4920,6 +4951,10 @@ func runAckIDListTest(t *testing.T, enableBatchIndexAck bool) {
49204951
} else {
49214952
assert.Fail(t, "AckIDList should return AckError")
49224953
}
4954+
4955+
ackCnt, err = getAckCount(registry)
4956+
assert.NoError(t, err)
4957+
assert.Equal(t, expectedAckCnt, ackCnt) // The Ack Counter shouldn't be increased.
49234958
}
49244959

49254960
func createSharedConsumer(t *testing.T, client Client, topic string, enableBatchIndexAck bool) Consumer {

0 commit comments

Comments
 (0)