Skip to content

Commit 7d098cf

Browse files
authored
KAFKA-17876/ KAFKA-19150 Rename AssignmentsManager and RemoteStorageThreadPool metrics (#20265)
Rename org.apache.kafka.server:type=AssignmentsManager and org.apache.kafka.storage.internals.log.RemoteStorageThreadPool metrics for the consist, these metrics should be - `kafka.log.remote:type=...` - `kafka.server:type=...` Reviewers: Chia-Ping Tsai <[email protected]>
1 parent 60ad638 commit 7d098cf

File tree

5 files changed

+63
-16
lines changed

5 files changed

+63
-16
lines changed

docs/upgrade.html

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,18 @@ <h5><a id="upgrade_420_notable" href="#upgrade_420_notable">Notable changes in 4
170170
</ul>
171171
For further details, please refer to <a href="https://cwiki.apache.org/confluence/x/3gn0Ew">KIP-1120</a>.
172172
</li>
173+
<li>
174+
The metrics <code>org.apache.kafka.server:type=AssignmentsManager.QueuedReplicaToDirAssignments</code>,
175+
<code>org.apache.kafka.storage.internals.log:type=RemoteStorageThreadPool.RemoteLogReaderTaskQueueSize</code>, and
176+
<code>org.apache.kafka.storage.internals.log:type=RemoteStorageThreadPool.RemoteLogReaderAvgIdlePercent</code>
177+
have been deprecated and will be removed in Kafka 5.0.
178+
179+
As replacements, the following metrics have been introduced, which report the same information:
180+
<code>kafka.server:type=AssignmentsManager.QueuedReplicaToDirAssignments</code>,
181+
<code>kafka.log.remote:type=RemoteStorageThreadPool.RemoteLogReaderTaskQueueSize</code>, and
182+
<code>kafka.log.remote:type=RemoteStorageThreadPool.RemoteLogReaderAvgIdlePercent</code>.
183+
For further details, please refer to <a href="https://cwiki.apache.org/confluence/x/6oqMEw">KIP-1100</a>.
184+
</li>
173185
</ul>
174186

175187
<h4><a id="upgrade_4_1_0" href="#upgrade_4_1_0">Upgrading to 4.1.0</a></h4>

server/src/main/java/org/apache/kafka/server/AssignmentsManager.java

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -71,11 +71,15 @@ public final class AssignmentsManager {
7171
*/
7272
static final long MIN_NOISY_FAILURE_INTERVAL_NS = TimeUnit.MINUTES.toNanos(2);
7373

74+
@Deprecated(since = "4.2")
75+
static final MetricName DEPRECATED_QUEUED_REPLICA_TO_DIR_ASSIGNMENTS_METRIC =
76+
KafkaYammerMetrics.getMetricName("org.apache.kafka.server", "AssignmentsManager", "QueuedReplicaToDirAssignments");
77+
7478
/**
7579
* The metric reflecting the number of pending assignments.
7680
*/
7781
static final MetricName QUEUED_REPLICA_TO_DIR_ASSIGNMENTS_METRIC =
78-
metricName("QueuedReplicaToDirAssignments");
82+
KafkaYammerMetrics.getMetricName("kafka.server", "AssignmentsManager", "QueuedReplicaToDirAssignments");
7983

8084
/**
8185
* The event at which we send assignments, if appropriate.
@@ -142,10 +146,6 @@ public final class AssignmentsManager {
142146
*/
143147
private final KafkaEventQueue eventQueue;
144148

145-
static MetricName metricName(String name) {
146-
return KafkaYammerMetrics.getMetricName("org.apache.kafka.server", "AssignmentsManager", name);
147-
}
148-
149149
public AssignmentsManager(
150150
Time time,
151151
NodeToControllerChannelManager channelManager,
@@ -182,12 +182,18 @@ public AssignmentsManager(
182182
this.ready = new ConcurrentHashMap<>();
183183
this.inflight = Map.of();
184184
this.metricsRegistry = metricsRegistry;
185+
this.metricsRegistry.newGauge(DEPRECATED_QUEUED_REPLICA_TO_DIR_ASSIGNMENTS_METRIC, new Gauge<Integer>() {
186+
@Override
187+
public Integer value() {
188+
return numPending();
189+
}
190+
});
185191
this.metricsRegistry.newGauge(QUEUED_REPLICA_TO_DIR_ASSIGNMENTS_METRIC, new Gauge<Integer>() {
186-
@Override
187-
public Integer value() {
188-
return numPending();
189-
}
190-
});
192+
@Override
193+
public Integer value() {
194+
return numPending();
195+
}
196+
});
191197
this.previousGlobalFailures = 0;
192198
this.eventQueue = new KafkaEventQueue(time,
193199
new LogContext("[AssignmentsManager id=" + nodeId + "]"),
@@ -248,6 +254,7 @@ public void run() {
248254
log.error("Unexpected exception shutting down NodeToControllerChannelManager", e);
249255
}
250256
try {
257+
metricsRegistry.removeMetric(DEPRECATED_QUEUED_REPLICA_TO_DIR_ASSIGNMENTS_METRIC);
251258
metricsRegistry.removeMetric(QUEUED_REPLICA_TO_DIR_ASSIGNMENTS_METRIC);
252259
} catch (Exception e) {
253260
log.error("Unexpected exception removing metrics.", e);

server/src/test/java/org/apache/kafka/server/AssignmentsManagerTest.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,13 @@ int queuedReplicaToDirAssignments() {
250250
return queuedReplicaToDirAssignments.value();
251251
}
252252

253+
@SuppressWarnings("unchecked") // do not warn about Gauge typecast.
254+
int deprecatedQueuedReplicaToDirAssignments() {
255+
Gauge<Integer> queuedReplicaToDirAssignments =
256+
(Gauge<Integer>) findMetric(AssignmentsManager.DEPRECATED_QUEUED_REPLICA_TO_DIR_ASSIGNMENTS_METRIC);
257+
return queuedReplicaToDirAssignments.value();
258+
}
259+
253260
@Override
254261
public void close() throws Exception {
255262
try {
@@ -279,17 +286,20 @@ public void testStartAndShutdown() throws Exception {
279286
public void testSuccessfulAssignment() throws Exception {
280287
try (TestEnv testEnv = new TestEnv()) {
281288
assertEquals(0, testEnv.queuedReplicaToDirAssignments());
289+
assertEquals(0, testEnv.deprecatedQueuedReplicaToDirAssignments());
282290
testEnv.onAssignment(new TopicIdPartition(TOPIC_1, 0), DIR_1);
283291
TestUtils.retryOnExceptionWithTimeout(60_000, () -> {
284292
assertEquals(1, testEnv.assignmentsManager.numPending());
285293
assertEquals(1, testEnv.queuedReplicaToDirAssignments());
294+
assertEquals(1, testEnv.deprecatedQueuedReplicaToDirAssignments());
286295
});
287296
assertEquals(0, testEnv.assignmentsManager.previousGlobalFailures());
288297
assertEquals(1, testEnv.assignmentsManager.numInFlight());
289298
testEnv.successfullyCompleteCallbackOfRequestAssigningTopic1ToDir1();
290299
TestUtils.retryOnExceptionWithTimeout(60_000, () -> {
291300
assertEquals(0, testEnv.assignmentsManager.numPending());
292301
assertEquals(0, testEnv.queuedReplicaToDirAssignments());
302+
assertEquals(0, testEnv.deprecatedQueuedReplicaToDirAssignments());
293303
assertEquals(1, testEnv.success(new TopicIdPartition(TOPIC_1, 0)));
294304
});
295305
assertEquals(0, testEnv.assignmentsManager.previousGlobalFailures());

storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageMetrics.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,10 +90,16 @@ public class RemoteStorageMetrics {
9090
"kafka.server", "BrokerTopicMetrics", REMOTE_DELETE_LAG_SEGMENTS);
9191
public static final MetricName REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC = getMetricName(
9292
"kafka.log.remote", "RemoteLogManager", REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT);
93-
public static final MetricName REMOTE_LOG_READER_TASK_QUEUE_SIZE_METRIC = getMetricName(
93+
@Deprecated(since = "4.2")
94+
public static final MetricName DEPRECATE_REMOTE_LOG_READER_TASK_QUEUE_SIZE_METRIC = getMetricName(
9495
"org.apache.kafka.storage.internals.log", "RemoteStorageThreadPool", REMOTE_LOG_READER_TASK_QUEUE_SIZE);
95-
public static final MetricName REMOTE_LOG_READER_AVG_IDLE_PERCENT_METRIC = getMetricName(
96+
@Deprecated(since = "4.2")
97+
public static final MetricName DEPRECATE_REMOTE_LOG_READER_AVG_IDLE_PERCENT_METRIC = getMetricName(
9698
"org.apache.kafka.storage.internals.log", "RemoteStorageThreadPool", REMOTE_LOG_READER_AVG_IDLE_PERCENT);
99+
public static final MetricName REMOTE_LOG_READER_TASK_QUEUE_SIZE_METRIC = getMetricName(
100+
"kafka.log.remote", "RemoteStorageThreadPool", REMOTE_LOG_READER_TASK_QUEUE_SIZE);
101+
public static final MetricName REMOTE_LOG_READER_AVG_IDLE_PERCENT_METRIC = getMetricName(
102+
"kafka.log.remote", "RemoteStorageThreadPool", REMOTE_LOG_READER_AVG_IDLE_PERCENT);
97103
public static final MetricName REMOTE_LOG_READER_FETCH_RATE_AND_TIME_METRIC = getMetricName(
98104
"kafka.log.remote", "RemoteLogManager", REMOTE_LOG_READER_FETCH_RATE_AND_TIME_MS);
99105

@@ -115,6 +121,8 @@ public static Set<MetricName> allMetrics() {
115121
metrics.add(REMOTE_DELETE_LAG_BYTES_METRIC);
116122
metrics.add(REMOTE_DELETE_LAG_SEGMENTS_METRIC);
117123
metrics.add(REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC);
124+
metrics.add(DEPRECATE_REMOTE_LOG_READER_AVG_IDLE_PERCENT_METRIC);
125+
metrics.add(DEPRECATE_REMOTE_LOG_READER_TASK_QUEUE_SIZE_METRIC);
118126
metrics.add(REMOTE_LOG_READER_AVG_IDLE_PERCENT_METRIC);
119127
metrics.add(REMOTE_LOG_READER_TASK_QUEUE_SIZE_METRIC);
120128
metrics.add(REMOTE_LOG_METADATA_COUNT_METRIC);

storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteStorageThreadPool.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.apache.kafka.storage.internals.log;
1818

1919
import org.apache.kafka.common.utils.ThreadUtils;
20+
import org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics;
2021
import org.apache.kafka.server.metrics.KafkaMetricsGroup;
2122

2223
import org.slf4j.Logger;
@@ -32,8 +33,12 @@
3233

3334
public final class RemoteStorageThreadPool extends ThreadPoolExecutor {
3435
private static final Logger LOGGER = LoggerFactory.getLogger(RemoteStorageThreadPool.class);
35-
private final KafkaMetricsGroup metricsGroup = new KafkaMetricsGroup(this.getClass());
36+
@Deprecated(since = "4.2")
37+
// This metrics group is used to register deprecated metrics. It will be removed in Kafka 5.0
38+
private final KafkaMetricsGroup deprecatedLogMetricsGroup = new KafkaMetricsGroup("org.apache.kafka.storage.internals.log", "RemoteStorageThreadPool");
39+
private final KafkaMetricsGroup logRemoteMetricsGroup = new KafkaMetricsGroup("kafka.log.remote", "RemoteStorageThreadPool");
3640

41+
@SuppressWarnings("deprecation")
3742
public RemoteStorageThreadPool(String threadNamePattern,
3843
int numThreads,
3944
int maxPendingTasks) {
@@ -45,9 +50,13 @@ public RemoteStorageThreadPool(String threadNamePattern,
4550
ThreadUtils.createThreadFactory(threadNamePattern, false,
4651
(t, e) -> LOGGER.error("Uncaught exception in thread '{}':", t.getName(), e))
4752
);
48-
metricsGroup.newGauge(REMOTE_LOG_READER_TASK_QUEUE_SIZE_METRIC.getName(),
53+
deprecatedLogMetricsGroup.newGauge(RemoteStorageMetrics.DEPRECATE_REMOTE_LOG_READER_TASK_QUEUE_SIZE_METRIC.getName(),
4954
() -> getQueue().size());
50-
metricsGroup.newGauge(REMOTE_LOG_READER_AVG_IDLE_PERCENT_METRIC.getName(),
55+
deprecatedLogMetricsGroup.newGauge(RemoteStorageMetrics.DEPRECATE_REMOTE_LOG_READER_AVG_IDLE_PERCENT_METRIC.getName(),
56+
() -> 1 - (double) getActiveCount() / (double) getCorePoolSize());
57+
logRemoteMetricsGroup.newGauge(REMOTE_LOG_READER_TASK_QUEUE_SIZE_METRIC.getName(),
58+
() -> getQueue().size());
59+
logRemoteMetricsGroup.newGauge(REMOTE_LOG_READER_AVG_IDLE_PERCENT_METRIC.getName(),
5160
() -> 1 - (double) getActiveCount() / (double) getCorePoolSize());
5261
}
5362

@@ -59,6 +68,7 @@ protected void afterExecute(Runnable runnable, Throwable th) {
5968
}
6069

6170
public void removeMetrics() {
62-
REMOTE_STORAGE_THREAD_POOL_METRICS.forEach(metricsGroup::removeMetric);
71+
REMOTE_STORAGE_THREAD_POOL_METRICS.forEach(deprecatedLogMetricsGroup::removeMetric);
72+
REMOTE_STORAGE_THREAD_POOL_METRICS.forEach(logRemoteMetricsGroup::removeMetric);
6373
}
6474
}

0 commit comments

Comments
 (0)