Skip to content

Commit 8468317

Browse files
KAFKA-19467; Add a metric for controller thread idleness (#20422)
This change adds the metric ControllerEventManager::AvgIdleRatio which measures the amount of time the controller spends blocked waiting for events vs the amount of time spent processing events. A value of 1.0 means that the controller spent the entire interval blocked waiting for events. Reviewers: José Armando García Sancio <[email protected]>, Kevin Wu <[email protected]>, Alyssa Huang <[email protected]>, TengYao Chi <[email protected]>, Reviewers: Chia-Ping Tsai <[email protected]>
1 parent 33cd114 commit 8468317

File tree

10 files changed

+168
-23
lines changed

10 files changed

+168
-23
lines changed

checkstyle/import-control.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -497,6 +497,7 @@
497497
<allow pkg="org.apache.kafka.server.common.serialization" />
498498
<allow pkg="org.apache.kafka.server.config" />
499499
<allow pkg="org.apache.kafka.server.fault"/>
500+
<allow pkg="org.apache.kafka.server.metrics" />
500501
<allow pkg="org.apache.kafka.server.util" />
501502
<allow pkg="org.apache.kafka.test"/>
502503
<allow pkg="com.fasterxml.jackson" />

docs/upgrade.html

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,10 @@ <h5><a id="upgrade_420_notable" href="#upgrade_420_notable">Notable changes in 4
182182
<code>kafka.log.remote:type=RemoteStorageThreadPool.RemoteLogReaderAvgIdlePercent</code>.
183183
For further details, please refer to <a href="https://cwiki.apache.org/confluence/x/6oqMEw">KIP-1100</a>.
184184
</li>
185+
<li>
186+
A new metric <code>AvgIdleRatio</code> has been added to the <code>ControllerEventManager</code> group. This metric measures the average idle ratio of the controller event queue thread,
187+
providing visibility into how much time the controller spends waiting for events versus processing them. The metric value ranges from 0.0 (always busy) to 1.0 (always idle).
188+
</li>
185189
</ul>
186190

187191
<h4><a id="upgrade_4_1_0" href="#upgrade_4_1_0">Upgrading to 4.1.0</a></h4>

metadata/src/main/java/org/apache/kafka/controller/QuorumController.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -406,7 +406,14 @@ public QuorumController build() throws Exception {
406406

407407
KafkaEventQueue queue = null;
408408
try {
409-
queue = new KafkaEventQueue(time, logContext, threadNamePrefix);
409+
queue = new KafkaEventQueue(
410+
time,
411+
logContext,
412+
threadNamePrefix,
413+
EventQueue.VoidEvent.INSTANCE,
414+
controllerMetrics::updateIdleTime
415+
);
416+
410417
return new QuorumController(
411418
nonFatalFaultHandler,
412419
fatalFaultHandler,

metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.apache.kafka.common.utils.Time;
2121
import org.apache.kafka.server.metrics.KafkaYammerMetrics;
22+
import org.apache.kafka.server.metrics.TimeRatio;
2223

2324
import com.yammer.metrics.core.Gauge;
2425
import com.yammer.metrics.core.Histogram;
@@ -48,6 +49,8 @@ public class QuorumControllerMetrics implements AutoCloseable {
4849
"ControllerEventManager", "EventQueueTimeMs");
4950
private static final MetricName EVENT_QUEUE_PROCESSING_TIME_MS = getMetricName(
5051
"ControllerEventManager", "EventQueueProcessingTimeMs");
52+
private static final MetricName AVERAGE_IDLE_RATIO = getMetricName(
53+
"ControllerEventManager", "AvgIdleRatio");
5154
private static final MetricName LAST_APPLIED_RECORD_OFFSET = getMetricName(
5255
"KafkaController", "LastAppliedRecordOffset");
5356
private static final MetricName LAST_COMMITTED_RECORD_OFFSET = getMetricName(
@@ -64,6 +67,7 @@ public class QuorumControllerMetrics implements AutoCloseable {
6467
"KafkaController", "EventQueueOperationsTimedOutCount");
6568
private static final MetricName NEW_ACTIVE_CONTROLLERS_COUNT = getMetricName(
6669
"KafkaController", "NewActiveControllersCount");
70+
6771
private static final String TIME_SINCE_LAST_HEARTBEAT_RECEIVED_METRIC_NAME = "TimeSinceLastHeartbeatReceivedMs";
6872
private static final String BROKER_ID_TAG = "broker";
6973

@@ -75,6 +79,7 @@ public class QuorumControllerMetrics implements AutoCloseable {
7579
private final AtomicLong lastAppliedRecordTimestamp = new AtomicLong(0);
7680
private final Consumer<Long> eventQueueTimeUpdater;
7781
private final Consumer<Long> eventQueueProcessingTimeUpdater;
82+
private final TimeRatio avgIdleTimeRatio;
7883

7984
private final AtomicLong timedOutHeartbeats = new AtomicLong(0);
8085
private final AtomicLong operationsStarted = new AtomicLong(0);
@@ -109,6 +114,7 @@ public Integer value() {
109114
this.eventQueueTimeUpdater = newHistogram(EVENT_QUEUE_TIME_MS, true);
110115
this.eventQueueProcessingTimeUpdater = newHistogram(EVENT_QUEUE_PROCESSING_TIME_MS, true);
111116
this.sessionTimeoutMs = sessionTimeoutMs;
117+
this.avgIdleTimeRatio = new TimeRatio(1);
112118
registry.ifPresent(r -> r.newGauge(LAST_APPLIED_RECORD_OFFSET, new Gauge<Long>() {
113119
@Override
114120
public Long value() {
@@ -157,6 +163,20 @@ public Long value() {
157163
return newActiveControllers();
158164
}
159165
}));
166+
registry.ifPresent(r -> r.newGauge(AVERAGE_IDLE_RATIO, new Gauge<Double>() {
167+
@Override
168+
public Double value() {
169+
synchronized (avgIdleTimeRatio) {
170+
return avgIdleTimeRatio.measure();
171+
}
172+
}
173+
}));
174+
}
175+
176+
public void updateIdleTime(long idleDurationMs) {
177+
synchronized (avgIdleTimeRatio) {
178+
avgIdleTimeRatio.record((double) idleDurationMs, time.milliseconds());
179+
}
160180
}
161181

162182
public void addTimeSinceLastHeartbeatMetric(int brokerId) {
@@ -291,7 +311,8 @@ public void close() {
291311
TIMED_OUT_BROKER_HEARTBEAT_COUNT,
292312
EVENT_QUEUE_OPERATIONS_STARTED_COUNT,
293313
EVENT_QUEUE_OPERATIONS_TIMED_OUT_COUNT,
294-
NEW_ACTIVE_CONTROLLERS_COUNT
314+
NEW_ACTIVE_CONTROLLERS_COUNT,
315+
AVERAGE_IDLE_RATIO
295316
).forEach(r::removeMetric));
296317
removeTimeSinceLastHeartbeatMetrics();
297318
}

metadata/src/test/java/org/apache/kafka/controller/metrics/QuorumControllerMetricsTest.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ public void testMetricNames() {
4545
Set<String> expected = Set.of(
4646
"kafka.controller:type=ControllerEventManager,name=EventQueueProcessingTimeMs",
4747
"kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs",
48+
"kafka.controller:type=ControllerEventManager,name=AvgIdleRatio",
4849
"kafka.controller:type=KafkaController,name=ActiveControllerCount",
4950
"kafka.controller:type=KafkaController,name=EventQueueOperationsStartedCount",
5051
"kafka.controller:type=KafkaController,name=EventQueueOperationsTimedOutCount",
@@ -189,6 +190,35 @@ public void testTimeSinceLastHeartbeatReceivedMs() {
189190
}
190191
}
191192

193+
@Test
194+
public void testAvgIdleRatio() {
195+
final double delta = 0.001;
196+
MetricsRegistry registry = new MetricsRegistry();
197+
MockTime time = new MockTime();
198+
try (QuorumControllerMetrics metrics = new QuorumControllerMetrics(Optional.of(registry), time, 9000)) {
199+
Gauge<Double> avgIdleRatio = (Gauge<Double>) registry.allMetrics().get(metricName("ControllerEventManager", "AvgIdleRatio"));
200+
201+
// No idle time recorded yet; returns default ratio of 1.0
202+
assertEquals(1.0, avgIdleRatio.value(), delta);
203+
204+
// First recording is dropped to establish the interval start time
205+
// This is because TimeRatio needs an initial timestamp to measure intervals from
206+
metrics.updateIdleTime(10);
207+
time.sleep(40);
208+
metrics.updateIdleTime(20);
209+
// avgIdleRatio = (20ms idle) / (40ms interval) = 0.5
210+
assertEquals(0.5, avgIdleRatio.value(), delta);
211+
212+
time.sleep(20);
213+
metrics.updateIdleTime(1);
214+
// avgIdleRatio = (1ms idle) / (20ms interval) = 0.05
215+
assertEquals(0.05, avgIdleRatio.value(), delta);
216+
217+
} finally {
218+
registry.shutdown();
219+
}
220+
}
221+
192222
private static void assertMetricHistogram(MetricsRegistry registry, MetricName metricName, long count, double sum) {
193223
Histogram histogram = (Histogram) registry.allMetrics().get(metricName);
194224

raft/src/main/java/org/apache/kafka/raft/internals/KafkaRaftMetrics.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.kafka.raft.QuorumState;
3030
import org.apache.kafka.raft.ReplicaKey;
3131
import org.apache.kafka.server.common.OffsetAndEpoch;
32+
import org.apache.kafka.server.metrics.TimeRatio;
3233

3334
import java.util.List;
3435
import java.util.OptionalLong;

server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java

Lines changed: 32 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import java.util.concurrent.RejectedExecutionException;
3434
import java.util.concurrent.locks.Condition;
3535
import java.util.concurrent.locks.ReentrantLock;
36+
import java.util.function.Consumer;
3637
import java.util.function.Function;
3738
import java.util.function.UnaryOperator;
3839

@@ -278,22 +279,22 @@ private void handleEvents() {
278279
remove(toRun);
279280
continue;
280281
}
281-
if (awaitNs == Long.MAX_VALUE) {
282-
try {
282+
283+
long startIdleMs = time.milliseconds();
284+
try {
285+
if (awaitNs == Long.MAX_VALUE) {
283286
cond.await();
284-
} catch (InterruptedException e) {
285-
log.warn("Interrupted while waiting for a new event. " +
286-
"Shutting down event queue");
287-
interrupted = true;
288-
}
289-
} else {
290-
try {
287+
} else {
291288
cond.awaitNanos(awaitNs);
292-
} catch (InterruptedException e) {
293-
log.warn("Interrupted while waiting for a deferred event. " +
294-
"Shutting down event queue");
295-
interrupted = true;
296289
}
290+
} catch (InterruptedException e) {
291+
log.warn(
292+
"Interrupted while waiting for a {} event. Shutting down event queue",
293+
(awaitNs == Long.MAX_VALUE) ? "new" : "deferred"
294+
);
295+
interrupted = true;
296+
} finally {
297+
idleTimeCallback.accept(Math.max(time.milliseconds() - startIdleMs, 0));
297298
}
298299
} finally {
299300
lock.unlock();
@@ -440,19 +441,35 @@ int size() {
440441
*/
441442
private boolean interrupted;
442443

444+
/**
445+
* Optional callback for queue idle time tracking.
446+
*/
447+
private final Consumer<Long> idleTimeCallback;
448+
449+
443450
public KafkaEventQueue(
444451
Time time,
445452
LogContext logContext,
446453
String threadNamePrefix
447454
) {
448-
this(time, logContext, threadNamePrefix, VoidEvent.INSTANCE);
455+
this(time, logContext, threadNamePrefix, VoidEvent.INSTANCE, __ -> { });
449456
}
450457

451458
public KafkaEventQueue(
452459
Time time,
453460
LogContext logContext,
454461
String threadNamePrefix,
455462
Event cleanupEvent
463+
) {
464+
this(time, logContext, threadNamePrefix, cleanupEvent, __ -> { });
465+
}
466+
467+
public KafkaEventQueue(
468+
Time time,
469+
LogContext logContext,
470+
String threadNamePrefix,
471+
Event cleanupEvent,
472+
Consumer<Long> idleTimeCallback
456473
) {
457474
this.time = time;
458475
this.cleanupEvent = Objects.requireNonNull(cleanupEvent);
@@ -463,6 +480,7 @@ public KafkaEventQueue(
463480
this.eventHandler, false);
464481
this.shuttingDown = false;
465482
this.interrupted = false;
483+
this.idleTimeCallback = Objects.requireNonNull(idleTimeCallback);
466484
this.eventHandlerThread.start();
467485
}
468486

raft/src/main/java/org/apache/kafka/raft/internals/TimeRatio.java renamed to server-common/src/main/java/org/apache/kafka/server/metrics/TimeRatio.java

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
1616
*/
17-
package org.apache.kafka.raft.internals;
17+
package org.apache.kafka.server.metrics;
1818

1919
import org.apache.kafka.common.metrics.MeasurableStat;
2020
import org.apache.kafka.common.metrics.MetricConfig;
@@ -46,11 +46,26 @@ public TimeRatio(double defaultRatio) {
4646

4747
@Override
4848
public double measure(MetricConfig config, long currentTimestampMs) {
49+
return measure();
50+
}
51+
52+
@Override
53+
public void record(MetricConfig config, double value, long currentTimestampMs) {
54+
record(value, currentTimestampMs);
55+
}
56+
57+
/**
58+
* Measures the ratio of recorded duration to the interval duration
59+
* since the last measurement.
60+
*
61+
* @return The measured ratio value between 0.0 and 1.0
62+
*/
63+
public double measure() {
4964
if (lastRecordedTimestampMs < 0) {
5065
// Return the default value if no recordings have been captured.
5166
return defaultRatio;
5267
} else {
53-
// We measure the ratio over the
68+
// We measure the ratio over the interval
5469
double intervalDurationMs = Math.max(lastRecordedTimestampMs - intervalStartTimestampMs, 0);
5570
final double ratio;
5671
if (intervalDurationMs == 0) {
@@ -61,15 +76,20 @@ public double measure(MetricConfig config, long currentTimestampMs) {
6176
ratio = totalRecordedDurationMs / intervalDurationMs;
6277
}
6378

64-
// The next interval begins at the
79+
// The next interval begins at the last recorded timestamp
6580
intervalStartTimestampMs = lastRecordedTimestampMs;
6681
totalRecordedDurationMs = 0;
6782
return ratio;
6883
}
6984
}
7085

71-
@Override
72-
public void record(MetricConfig config, double value, long currentTimestampMs) {
86+
/**
87+
* Records a duration value at the specified timestamp.
88+
*
89+
* @param value The duration value to record
90+
* @param currentTimestampMs The current timestamp in milliseconds
91+
*/
92+
public void record(double value, long currentTimestampMs) {
7393
if (intervalStartTimestampMs < 0) {
7494
// Discard the initial value since the value occurred prior to the interval start
7595
intervalStartTimestampMs = currentTimestampMs;
@@ -78,5 +98,4 @@ public void record(MetricConfig config, double value, long currentTimestampMs) {
7898
lastRecordedTimestampMs = currentTimestampMs;
7999
}
80100
}
81-
82101
}

server-common/src/test/java/org/apache/kafka/queue/KafkaEventQueueTest.java

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -424,4 +424,48 @@ public void testInterruptedWithDeferredEvents() throws Exception {
424424
assertEquals(InterruptedException.class, ieTrapper2.exception.get().getClass());
425425
}
426426
}
427+
428+
@Test
429+
public void testIdleTimeCallback() throws Exception {
430+
MockTime time = new MockTime();
431+
AtomicLong lastIdleTimeMs = new AtomicLong(0);
432+
433+
try (KafkaEventQueue queue = new KafkaEventQueue(
434+
time,
435+
logContext,
436+
"testIdleTimeCallback",
437+
EventQueue.VoidEvent.INSTANCE,
438+
lastIdleTimeMs::set)) {
439+
time.sleep(2);
440+
assertEquals(0, lastIdleTimeMs.get(), "Last idle time should be 0ms");
441+
442+
// Test 1: Two events with a wait in between using FutureEvent
443+
CompletableFuture<String> event1 = new CompletableFuture<>();
444+
queue.append(new FutureEvent<>(event1, () -> {
445+
time.sleep(1);
446+
return "event1-processed";
447+
}));
448+
assertEquals("event1-processed", event1.get());
449+
450+
long waitTime5Ms = 5;
451+
time.sleep(waitTime5Ms);
452+
CompletableFuture<String> event2 = new CompletableFuture<>();
453+
queue.append(new FutureEvent<>(event2, () -> {
454+
time.sleep(1);
455+
return "event2-processed";
456+
}));
457+
assertEquals("event2-processed", event2.get());
458+
assertEquals(waitTime5Ms, lastIdleTimeMs.get(), "Idle time should be " + waitTime5Ms + "ms, was: " + lastIdleTimeMs.get());
459+
460+
// Test 2: Deferred event
461+
long waitTime2Ms = 2;
462+
CompletableFuture<Void> deferredEvent2 = new CompletableFuture<>();
463+
queue.scheduleDeferred("deferred2",
464+
__ -> OptionalLong.of(time.nanoseconds() + TimeUnit.MILLISECONDS.toNanos(waitTime2Ms)),
465+
() -> deferredEvent2.complete(null));
466+
time.sleep(waitTime2Ms);
467+
deferredEvent2.get();
468+
assertEquals(waitTime2Ms, lastIdleTimeMs.get(), "Idle time should be " + waitTime2Ms + "ms, was: " + lastIdleTimeMs.get());
469+
}
470+
}
427471
}

raft/src/test/java/org/apache/kafka/raft/internals/TimeRatioTest.java renamed to server-common/src/test/java/org/apache/kafka/server/metrics/TimeRatioTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
1616
*/
17-
package org.apache.kafka.raft.internals;
17+
package org.apache.kafka.server.metrics;
1818

1919
import org.apache.kafka.common.metrics.MetricConfig;
2020
import org.apache.kafka.common.utils.MockTime;

0 commit comments

Comments
 (0)