Skip to content

Commit 52faefa

Browse files
authored
Merge pull request #28756 from redpanda-data/manual-backport-28728-v25.2.x-612
[v25.2.x] cluster: fix topic label aggregation on restart
2 parents e09b1cc + 4de370b commit 52faefa

File tree

2 files changed

+122
-34
lines changed

2 files changed

+122
-34
lines changed

src/v/cluster/topic_metrics_watcher.cc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ topic_metrics_watcher::topic_metrics_watcher(
2323
tt.register_topic_delta_notification(
2424
[this](const auto&) { maybe_aggregate_topic_label(); });
2525
_topic_agg_limit.watch([this] { maybe_aggregate_topic_label(); });
26+
27+
maybe_aggregate_topic_label();
2628
}
2729

2830
ss::future<> topic_metrics_watcher::stop() { return _gate.close(); }

tests/rptest/tests/topic_label_aggregation_test.py

Lines changed: 120 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
# the Business Source License, use of this software will be governed
88
# by the Apache License, Version 2.0
99

10+
from typing import Any
1011
from ducktape.utils.util import wait_until
1112

1213
from rptest.clients.types import TopicSpec
@@ -19,34 +20,50 @@ class TopicLabelAggregationTest(RedpandaTest):
1920
TOPIC_LABEL = "topic"
2021
PARTITION_LABEL = "partition"
2122

22-
def __init__(self, *args, **kwargs):
23+
def __init__(self, *args: Any, **kwargs: Any):
2324
super().__init__(num_brokers=3, *args, **kwargs)
2425

26+
def _aggregated_metrics_have_topic_label(self) -> bool:
27+
for node in self.redpanda.nodes:
28+
for family in self.redpanda.metrics(
29+
node, metrics_endpoint=MetricsEndpoint.METRICS
30+
):
31+
for sample in family.samples:
32+
# If the partition label isn't aggregated then neither will the topic label.
33+
if (
34+
self.TOPIC_LABEL in sample.labels.keys()
35+
and self.PARTITION_LABEL not in sample.labels.keys()
36+
):
37+
return True
38+
39+
return False
40+
41+
def _unaggregated_metrics_have_topic_label(self) -> bool:
42+
for node in self.redpanda.nodes:
43+
for family in self.redpanda.metrics(
44+
node, metrics_endpoint=MetricsEndpoint.METRICS
45+
):
46+
for sample in family.samples:
47+
# Only consider series that would have their topic label aggregated
48+
# when topic label aggregation is enabled.
49+
if (
50+
self.TOPIC_LABEL in sample.labels.keys()
51+
and self.PARTITION_LABEL in sample.labels.keys()
52+
):
53+
return True
54+
55+
return False
56+
57+
def _create_topics(self, topics: list[TopicSpec]):
58+
for t in topics:
59+
self.client().create_topic(t)
60+
61+
def _delete_topics(self, topics: list[TopicSpec]):
62+
for t in topics:
63+
self.client().delete_topic(t.name)
64+
2565
@cluster(num_nodes=3)
2666
def test_topic_label_aggregation(self):
27-
def metrics_have_topic_label() -> bool:
28-
for node in self.redpanda.nodes:
29-
for family in self.redpanda.metrics(
30-
node, metrics_endpoint=MetricsEndpoint.METRICS
31-
):
32-
for sample in family.samples:
33-
# If the partition label isn't aggregated then neither will the topic label.
34-
if (
35-
self.TOPIC_LABEL in sample.labels.keys()
36-
and self.PARTITION_LABEL not in sample.labels.keys()
37-
):
38-
return True
39-
40-
return False
41-
42-
def create_topics(topics: list[TopicSpec]):
43-
for t in topics:
44-
self.client().create_topic(t)
45-
46-
def delete_topics(topics: list[TopicSpec]):
47-
for t in topics:
48-
self.client().delete_topic(t.name)
49-
5067
initial_topic_count = len(self.client().describe_topics())
5168
topics_to_create = [TopicSpec() for _ in range(50)]
5269
topic_aggregation_limit = len(topics_to_create) + initial_topic_count - 1
@@ -57,11 +74,13 @@ def delete_topics(topics: list[TopicSpec]):
5774
"topic_label_aggregation_limit": topic_aggregation_limit,
5875
}
5976
)
60-
assert metrics_have_topic_label(), "topic label shouldn't be aggregated yet"
77+
assert self._aggregated_metrics_have_topic_label(), (
78+
"topic label shouldn't be aggregated yet"
79+
)
6180

62-
create_topics(topics_to_create)
81+
self._create_topics(topics_to_create)
6382
wait_until(
64-
lambda: not metrics_have_topic_label(),
83+
lambda: not self._aggregated_metrics_have_topic_label(),
6584
timeout_sec=60,
6685
backoff_sec=10,
6786
err_msg="topic label wasn't aggregated",
@@ -77,32 +96,99 @@ def delete_topics(topics: list[TopicSpec]):
7796
topic_count_to_delete = int(current_topic_count - agg_limit_lower_bound) - 1
7897
assert topic_count_to_delete > 1
7998

80-
delete_topics(topics_to_create[:topic_count_to_delete])
81-
assert not metrics_have_topic_label()
99+
self._delete_topics(topics_to_create[:topic_count_to_delete])
100+
assert not self._aggregated_metrics_have_topic_label()
82101

83102
# Have topic count fall bellow the 95% limit.
84-
delete_topics(
103+
self._delete_topics(
85104
topics_to_create[topic_count_to_delete : (topic_count_to_delete + 2)]
86105
)
87106
wait_until(
88-
lambda: metrics_have_topic_label(),
107+
lambda: self._aggregated_metrics_have_topic_label(),
89108
timeout_sec=60,
90109
backoff_sec=10,
91110
err_msg="topic label wasn't un-aggregated",
92111
)
93112

94-
create_topics(topics_to_create[: (topic_count_to_delete + 2)])
113+
self._create_topics(topics_to_create[: (topic_count_to_delete + 2)])
95114
wait_until(
96-
lambda: not metrics_have_topic_label(),
115+
lambda: not self._aggregated_metrics_have_topic_label(),
97116
timeout_sec=60,
98117
backoff_sec=10,
99118
err_msg="topic label wasn't aggregated",
100119
)
101120

102121
self.redpanda.set_cluster_config({"topic_label_aggregation_limit": None})
103122
wait_until(
104-
lambda: metrics_have_topic_label(),
123+
lambda: self._aggregated_metrics_have_topic_label(),
105124
timeout_sec=60,
106125
backoff_sec=10,
107126
err_msg="topic label wasn't un-aggregated",
108127
)
128+
129+
@cluster(num_nodes=3)
130+
def test_topic_aggregate_on_toggle(self):
131+
"""
132+
Topic labels are not aggregated when metrics aggregation is turned off in general.
133+
This tests that they are aggregated when metrics aggregation is enabled. Assuming
134+
that the topic count meets the aggregation threshold.
135+
"""
136+
initial_topic_count = len(self.client().describe_topics())
137+
topics_to_create = [TopicSpec() for _ in range(50)]
138+
topic_aggregation_limit = len(topics_to_create) + initial_topic_count - 1
139+
140+
self.redpanda.set_cluster_config(
141+
{
142+
"aggregate_metrics": False,
143+
"topic_label_aggregation_limit": topic_aggregation_limit,
144+
}
145+
)
146+
147+
self._create_topics(topics_to_create)
148+
assert self._unaggregated_metrics_have_topic_label(), (
149+
"topic label shouldn't be aggregated yet"
150+
)
151+
152+
self.redpanda.set_cluster_config(
153+
{
154+
"aggregate_metrics": True,
155+
}
156+
)
157+
wait_until(
158+
lambda: not self._aggregated_metrics_have_topic_label(),
159+
timeout_sec=60,
160+
backoff_sec=10,
161+
err_msg="topic label wasn't aggregated",
162+
)
163+
164+
@cluster(num_nodes=3)
165+
def test_topic_aggregate_on_restart(self):
166+
"""
167+
Tests that topic labels remain aggregated when a node restarts.
168+
"""
169+
initial_topic_count = len(self.client().describe_topics())
170+
topics_to_create = [TopicSpec() for _ in range(50)]
171+
topic_aggregation_limit = len(topics_to_create) + initial_topic_count - 1
172+
173+
self.redpanda.set_cluster_config(
174+
{
175+
"aggregate_metrics": True,
176+
"topic_label_aggregation_limit": topic_aggregation_limit,
177+
}
178+
)
179+
180+
self._create_topics(topics_to_create)
181+
wait_until(
182+
lambda: not self._aggregated_metrics_have_topic_label(),
183+
timeout_sec=60,
184+
backoff_sec=10,
185+
err_msg="topic label wasn't aggregated",
186+
)
187+
188+
self.redpanda.rolling_restart_nodes(self.redpanda.nodes)
189+
wait_until(
190+
lambda: not self._aggregated_metrics_have_topic_label(),
191+
timeout_sec=60,
192+
backoff_sec=10,
193+
err_msg="topic label wasn't aggregated",
194+
)

0 commit comments

Comments
 (0)