Skip to content

Commit f1cf49d

Browse files
committed
feat: enhance consumer metrics with ack counter and processing time tracking
1 parent 67c3d53 commit f1cf49d

File tree

4 files changed

+209
-13
lines changed

4 files changed

+209
-13
lines changed

pulsar/consumer_partition.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -746,6 +746,8 @@ func (pc *partitionConsumer) AckIDList(msgIDs []MessageID) error {
746746
pendingAcks := make(map[position]*bitset.BitSet)
747747
validMsgIDs := make([]MessageID, 0, len(msgIDs))
748748

749+
pc.metrics.AcksCounter.Add(float64(len(msgIDs)))
750+
749751
// They might be complete after the whole for loop
750752
for _, msgID := range msgIDs {
751753
if msgID.PartitionIdx() != pc.partitionIdx {
@@ -760,6 +762,7 @@ func (pc *partitionConsumer) AckIDList(msgIDs []MessageID) error {
760762
position := newPosition(msgID)
761763
if convertedMsgID.ack() {
762764
pendingAcks[position] = nil
765+
pc.metrics.ProcessingTime.Observe(float64(time.Now().UnixNano()-convertedMsgID.receivedTime.UnixNano()) / 1.0e9)
763766
} else if pc.options.enableBatchIndexAck {
764767
pendingAcks[position] = convertedMsgID.tracker.getAckBitSet()
765768
}

pulsar/consumer_test.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5160,3 +5160,47 @@ func TestSelectConnectionForSameConsumer(t *testing.T) {
51605160
"The consumer uses a different connection when reconnecting")
51615161
}
51625162
}
5163+
5164+
func TestAckIDListWillIncreaseAckCounterMetrics(t *testing.T) {
5165+
topicName := newTopicName()
5166+
5167+
client, err := NewClient(ClientOptions{
5168+
URL: serviceURL,
5169+
})
5170+
assert.NoError(t, err)
5171+
defer client.Close()
5172+
5173+
p, err := client.CreateProducer(ProducerOptions{
5174+
Topic: topicName,
5175+
})
5176+
assert.NoError(t, err)
5177+
5178+
for i := 0; i < 10; i++ {
5179+
_, err = p.Send(context.Background(), &ProducerMessage{
5180+
Payload: []byte(fmt.Sprintf("msg-%d", i)),
5181+
})
5182+
assert.NoError(t, err)
5183+
}
5184+
5185+
c, err := client.Subscribe(ConsumerOptions{
5186+
Topic: topicName,
5187+
SubscriptionName: "my-sub",
5188+
SubscriptionInitialPosition: SubscriptionPositionEarliest,
5189+
})
5190+
5191+
assert.NoError(t, err)
5192+
5193+
msgIds := make([]MessageID, 10)
5194+
for i := 0; i < 10; i++ {
5195+
msg, err := c.Receive(context.Background())
5196+
assert.NoError(t, err)
5197+
msgIds[i] = msg.ID()
5198+
}
5199+
5200+
err = c.AckIDList(msgIds)
5201+
assert.NoError(t, err)
5202+
5203+
ackCnt, err := getMetricValue("pulsar_client_consumer_acks")
5204+
assert.NoError(t, err)
5205+
assert.Equal(t, float64(10), ackCnt)
5206+
}

pulsar/test_setup_test.go

Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package pulsar
19+
20+
import (
21+
"bufio"
22+
"context"
23+
"fmt"
24+
"net"
25+
"net/http"
26+
"os"
27+
"strings"
28+
"sync"
29+
"testing"
30+
"time"
31+
32+
"github.com/prometheus/client_golang/prometheus/promhttp"
33+
)
34+
35+
// Global metrics server for all tests
36+
var (
37+
metricsServer *http.Server
38+
metricsPort = 8801
39+
metricsMutex sync.Mutex
40+
)
41+
42+
// startMetricsServer starts a global Prometheus metrics server
43+
func startMetricsServer() error {
44+
metricsMutex.Lock()
45+
defer metricsMutex.Unlock()
46+
47+
if metricsServer != nil {
48+
return nil // Already started
49+
}
50+
51+
// Check if port is available
52+
addr := fmt.Sprintf("localhost:%d", metricsPort)
53+
ln, err := net.Listen("tcp", addr)
54+
if err != nil {
55+
return fmt.Errorf("port %d is not available: %v", metricsPort, err)
56+
}
57+
ln.Close()
58+
59+
// Start metrics server
60+
http.Handle("/metrics", promhttp.Handler())
61+
metricsServer = &http.Server{
62+
Addr: fmt.Sprintf(":%d", metricsPort),
63+
}
64+
65+
go func() {
66+
if err := metricsServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
67+
panic(fmt.Sprintf("Failed to start metrics server: %v", err))
68+
}
69+
}()
70+
71+
// Wait a bit for server to start
72+
time.Sleep(100 * time.Millisecond)
73+
return nil
74+
}
75+
76+
// stopMetricsServer stops the global Prometheus metrics server
77+
func stopMetricsServer() error {
78+
metricsMutex.Lock()
79+
defer metricsMutex.Unlock()
80+
81+
if metricsServer == nil {
82+
return nil
83+
}
84+
85+
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
86+
defer cancel()
87+
88+
err := metricsServer.Shutdown(ctx)
89+
metricsServer = nil
90+
return err
91+
}
92+
93+
// getMetricValue retrieves a metric value from the Prometheus metrics server
94+
func getMetricValue(metricName string) (float64, error) {
95+
resp, err := http.Get(fmt.Sprintf("http://localhost:%d/metrics", metricsPort))
96+
if err != nil {
97+
return 0, fmt.Errorf("failed to fetch metrics: %v", err)
98+
}
99+
defer resp.Body.Close()
100+
101+
scanner := bufio.NewScanner(resp.Body)
102+
for scanner.Scan() {
103+
line := scanner.Text()
104+
// Skip comment lines that start with #
105+
if strings.HasPrefix(strings.TrimSpace(line), "#") {
106+
continue
107+
}
108+
if strings.Contains(line, metricName) {
109+
// Parse the metric line to extract the value
110+
// Format: metric_name{label="value",label2="value2"} 10
111+
parts := strings.Fields(line)
112+
if len(parts) >= 2 {
113+
var value float64
114+
_, err := fmt.Sscanf(parts[len(parts)-1], "%f", &value)
115+
if err != nil {
116+
return 0, fmt.Errorf("failed to parse metric value: %v", err)
117+
}
118+
return value, nil
119+
}
120+
}
121+
}
122+
123+
if err := scanner.Err(); err != nil {
124+
return 0, fmt.Errorf("failed to scan metrics response: %v", err)
125+
}
126+
127+
return 0, fmt.Errorf("metric %s not found in Prometheus metrics response", metricName)
128+
}
129+
130+
// TestMain runs before all tests and after all tests
131+
func TestMain(m *testing.M) {
132+
// Start metrics server before all tests
133+
if err := startMetricsServer(); err != nil {
134+
panic(fmt.Sprintf("Failed to start metrics server: %v", err))
135+
}
136+
137+
// Run all tests
138+
code := m.Run()
139+
140+
// Stop metrics server after all tests
141+
if err := stopMetricsServer(); err != nil {
142+
fmt.Printf("Failed to stop metrics server: %v\n", err)
143+
}
144+
145+
// Exit with test result code
146+
os.Exit(code)
147+
}

scripts/pulsar-test-service-start.sh

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -33,19 +33,19 @@ if [[ -f /.dockerenv ]]; then
3333
cat /pulsar/conf/standalone.conf
3434
/pulsar/bin/pulsar-daemon start standalone --no-functions-worker --no-stream-storage
3535
else
36-
docker build -t ${IMAGE_NAME} .
37-
38-
docker kill pulsar-client-go-test || true
39-
docker run -d --rm --name pulsar-client-go-test \
40-
-p 8080:8080 \
41-
-p 6650:6650 \
42-
-p 8443:8443 \
43-
-p 6651:6651 \
44-
-e PULSAR_MEM=${PULSAR_MEM} \
45-
-e PULSAR_STANDALONE_USE_ZOOKEEPER=${PULSAR_STANDALONE_USE_ZOOKEEPER} \
46-
${IMAGE_NAME} \
47-
/pulsar/bin/pulsar standalone \
48-
--no-functions-worker --no-stream-storage
36+
# docker build -t ${IMAGE_NAME} .
37+
#
38+
# docker kill pulsar-client-go-test || true
39+
# docker run -d --rm --name pulsar-client-go-test \
40+
# -p 8080:8080 \
41+
# -p 6650:6650 \
42+
# -p 8443:8443 \
43+
# -p 6651:6651 \
44+
# -e PULSAR_MEM="${PULSAR_MEM}" \
45+
# -e PULSAR_STANDALONE_USE_ZOOKEEPER=${PULSAR_STANDALONE_USE_ZOOKEEPER} \
46+
# ${IMAGE_NAME} \
47+
# /pulsar/bin/pulsar standalone \
48+
# --no-functions-worker --no-stream-storage
4949

5050
PULSAR_ADMIN="docker exec -it pulsar-client-go-test /pulsar/bin/pulsar-admin"
5151
fi
@@ -55,6 +55,8 @@ until curl http://localhost:8080/metrics > /dev/null 2>&1 ; do sleep 1; done
5555

5656
echo "-- Pulsar service is ready -- Configure permissions"
5757

58+
sleep 20
59+
5860
$PULSAR_ADMIN tenants update public -r anonymous
5961
$PULSAR_ADMIN namespaces grant-permission public/default \
6062
--actions produce,consume \

0 commit comments

Comments
 (0)