Skip to content

Commit 25a7304

Browse files
thetumbledhanmz
authored andcommitted
[fix][broker] PIP-399: Fix Metric Name for Delayed Queue (apache#23712)
1 parent d3b8ead commit 25a7304

File tree

2 files changed

+19
-6
lines changed

2 files changed

+19
-6
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -360,7 +360,7 @@ public static void printTopicStats(PrometheusMetricStreams stream, TopicStats st
360360
writeSubscriptionMetric(stream, "pulsar_subscription_filter_rescheduled_msg_count",
361361
subsStats.filterRescheduledMsgCount, cluster, namespace, topic, sub,
362362
splitTopicAndPartitionIndexLabel);
363-
writeSubscriptionMetric(stream, "pulsar_delayed_message_index_size_bytes",
363+
writeSubscriptionMetric(stream, "pulsar_subscription_delayed_message_index_size_bytes",
364364
subsStats.delayedMessageIndexSizeInBytes, cluster, namespace, topic, sub,
365365
splitTopicAndPartitionIndexLabel);
366366

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -424,10 +424,11 @@ public void testDelayedDeliveryTrackerMemoryUsageMetric(String topic, boolean ex
424424

425425
@Cleanup
426426
Producer<String> producer = client.newProducer(Schema.STRING).topic(topic).enableBatching(false).create();
427+
String subName = "test_sub";
427428
@Cleanup
428429
Consumer<String> consumer = client.newConsumer(Schema.STRING)
429430
.topic(topic)
430-
.subscriptionName("test_sub")
431+
.subscriptionName(subName)
431432
.subscriptionType(SubscriptionType.Shared)
432433
.messageListener((MessageListener<String>) (consumer1, msg) -> {
433434
try {
@@ -453,7 +454,13 @@ public void testDelayedDeliveryTrackerMemoryUsageMetric(String topic, boolean ex
453454

454455
Multimap<String, Metric> metricsMap = parseMetrics(metricsStr);
455456
Collection<Metric> metrics = metricsMap.get("pulsar_delayed_message_index_size_bytes");
456-
Assert.assertTrue(metrics.size() > 0);
457+
Collection<Metric> subMetrics = metricsMap.get("pulsar_subscription_delayed_message_index_size_bytes");
458+
assertFalse(metrics.isEmpty());
459+
if (exposeTopicLevelMetrics) {
460+
assertFalse(subMetrics.isEmpty());
461+
} else {
462+
assertTrue(subMetrics.isEmpty());
463+
}
457464

458465
int topicLevelNum = 0;
459466
int namespaceLevelNum = 0;
@@ -462,14 +469,20 @@ public void testDelayedDeliveryTrackerMemoryUsageMetric(String topic, boolean ex
462469
if (exposeTopicLevelMetrics && metric.tags.get("topic").equals(topic)) {
463470
Assert.assertTrue(metric.value > 0);
464471
topicLevelNum++;
465-
if ("test_sub".equals(metric.tags.get("subscription"))) {
466-
subscriptionLevelNum++;
467-
}
468472
} else if (!exposeTopicLevelMetrics && metric.tags.get("namespace").equals(namespace)) {
469473
Assert.assertTrue(metric.value > 0);
470474
namespaceLevelNum++;
471475
}
472476
}
477+
if (exposeTopicLevelMetrics) {
478+
for (Metric metric : subMetrics) {
479+
if (metric.tags.get("topic").equals(topic) &&
480+
subName.equals(metric.tags.get("subscription"))) {
481+
Assert.assertTrue(metric.value > 0);
482+
subscriptionLevelNum++;
483+
}
484+
}
485+
}
473486

474487
if (exposeTopicLevelMetrics) {
475488
Assert.assertTrue(topicLevelNum > 0);

0 commit comments

Comments
 (0)