Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,7 @@ private void handleEventEnd(String name, long startProcessingTimeNs) {
MICROSECONDS.convert(deltaNs, NANOSECONDS));
performanceMonitor.observeEvent(name, deltaNs);
controllerMetrics.updateEventQueueProcessingTime(NANOSECONDS.toMillis(deltaNs));
controllerMetrics.updateIdleStartTime();
}

private Throwable handleEventException(
Expand All @@ -549,6 +550,7 @@ private Throwable handleEventException(
} else {
deltaUs = OptionalLong.empty();
}
controllerMetrics.updateIdleStartTime();
EventHandlerExceptionInfo info = EventHandlerExceptionInfo.
fromInternal(exception, this::latestController);
int epoch = curClaimEpoch;
Expand All @@ -573,6 +575,7 @@ private Throwable handleEventException(

private long updateEventStartMetricsAndGetTime(OptionalLong eventCreatedTimeNs) {
long now = time.nanoseconds();
controllerMetrics.updateIdleEndTime();
controllerMetrics.incrementOperationsStarted();
if (eventCreatedTimeNs.isPresent()) {
controllerMetrics.updateEventQueueTime(NANOSECONDS.toMillis(now - eventCreatedTimeNs.getAsLong()));
Expand Down Expand Up @@ -831,6 +834,7 @@ public void run() throws Exception {

// Remember the latest offset and future if it is not already completed
if (!future.isDone()) {
controllerMetrics.updateIdleStartTime();
deferredEventQueue.add(resultAndOffset.offset(), this);
}
}
Expand All @@ -842,6 +846,7 @@ public void handleException(Throwable exception) {

@Override
public void complete(Throwable exception) {
controllerMetrics.updateIdleEndTime();
if (exception == null) {
handleEventEnd(this.toString(), startProcessingTimeNs.getAsLong());
future.complete(resultAndOffset.response());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

package org.apache.kafka.controller.metrics;

import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.raft.internals.TimeRatio;
import org.apache.kafka.server.metrics.KafkaYammerMetrics;

import com.yammer.metrics.core.Gauge;
Expand Down Expand Up @@ -48,6 +50,8 @@ public class QuorumControllerMetrics implements AutoCloseable {
"ControllerEventManager", "EventQueueTimeMs");
private static final MetricName EVENT_QUEUE_PROCESSING_TIME_MS = getMetricName(
"ControllerEventManager", "EventQueueProcessingTimeMs");
private static final MetricName AVERAGE_IDLE_RATIO = getMetricName(
"ControllerEventManager", "AvgIdleRatio");
private static final MetricName LAST_APPLIED_RECORD_OFFSET = getMetricName(
"KafkaController", "LastAppliedRecordOffset");
private static final MetricName LAST_COMMITTED_RECORD_OFFSET = getMetricName(
Expand All @@ -64,8 +68,10 @@ public class QuorumControllerMetrics implements AutoCloseable {
"KafkaController", "EventQueueOperationsTimedOutCount");
private static final MetricName NEW_ACTIVE_CONTROLLERS_COUNT = getMetricName(
"KafkaController", "NewActiveControllersCount");

private static final String TIME_SINCE_LAST_HEARTBEAT_RECEIVED_METRIC_NAME = "TimeSinceLastHeartbeatReceivedMs";
private static final String BROKER_ID_TAG = "broker";
private static final MetricConfig METRIC_CONFIG = new MetricConfig();

private final Optional<MetricsRegistry> registry;
private final Time time;
Expand All @@ -75,13 +81,15 @@ public class QuorumControllerMetrics implements AutoCloseable {
private final AtomicLong lastAppliedRecordTimestamp = new AtomicLong(0);
private final Consumer<Long> eventQueueTimeUpdater;
private final Consumer<Long> eventQueueProcessingTimeUpdater;
private final TimeRatio avgIdleTimeRatio;

private final AtomicLong timedOutHeartbeats = new AtomicLong(0);
private final AtomicLong operationsStarted = new AtomicLong(0);
private final AtomicLong operationsTimedOut = new AtomicLong(0);
private final AtomicLong newActiveControllers = new AtomicLong(0);
private final Map<Integer, Long> brokerContactTimesMs = new ConcurrentHashMap<>();
private final int sessionTimeoutMs;
private final AtomicLong idleStartTime = new AtomicLong(-1);

private Consumer<Long> newHistogram(MetricName name, boolean biased) {
if (registry.isPresent()) {
Expand Down Expand Up @@ -109,6 +117,7 @@ public Integer value() {
this.eventQueueTimeUpdater = newHistogram(EVENT_QUEUE_TIME_MS, true);
this.eventQueueProcessingTimeUpdater = newHistogram(EVENT_QUEUE_PROCESSING_TIME_MS, true);
this.sessionTimeoutMs = sessionTimeoutMs;
this.avgIdleTimeRatio = new TimeRatio(1);
registry.ifPresent(r -> r.newGauge(LAST_APPLIED_RECORD_OFFSET, new Gauge<Long>() {
@Override
public Long value() {
Expand Down Expand Up @@ -157,6 +166,24 @@ public Long value() {
return newActiveControllers();
}
}));
registry.ifPresent(r -> r.newGauge(AVERAGE_IDLE_RATIO, new Gauge<Double>() {
@Override
public Double value() {
return avgIdleTimeRatio.measure(METRIC_CONFIG, time.milliseconds());
}
}));
}

public void updateIdleStartTime() {
idleStartTime.compareAndExchange(-1, time.milliseconds());
}

public void updateIdleEndTime() {
long startTime = idleStartTime.getAndSet(-1);
if (startTime != -1) {
long idleDurationMs = Math.max(time.milliseconds() - startTime, 0);
avgIdleTimeRatio.record(METRIC_CONFIG, (double) idleDurationMs, time.milliseconds());
}
}

public void addTimeSinceLastHeartbeatMetric(int brokerId) {
Expand Down Expand Up @@ -291,7 +318,8 @@ public void close() {
TIMED_OUT_BROKER_HEARTBEAT_COUNT,
EVENT_QUEUE_OPERATIONS_STARTED_COUNT,
EVENT_QUEUE_OPERATIONS_TIMED_OUT_COUNT,
NEW_ACTIVE_CONTROLLERS_COUNT
NEW_ACTIVE_CONTROLLERS_COUNT,
AVERAGE_IDLE_RATIO
).forEach(r::removeMetric));
removeTimeSinceLastHeartbeatMetrics();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public void testMetricNames() {
Set<String> expected = Set.of(
"kafka.controller:type=ControllerEventManager,name=EventQueueProcessingTimeMs",
"kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs",
"kafka.controller:type=ControllerEventManager,name=AvgIdleRatio",
"kafka.controller:type=KafkaController,name=ActiveControllerCount",
"kafka.controller:type=KafkaController,name=EventQueueOperationsStartedCount",
"kafka.controller:type=KafkaController,name=EventQueueOperationsTimedOutCount",
Expand Down Expand Up @@ -188,6 +189,39 @@ public void testTimeSinceLastHeartbeatReceivedMs() {
}
}

@Test
public void testAvgIdleRatio() {
MetricsRegistry registry = new MetricsRegistry();
MockTime time = new MockTime();
try (QuorumControllerMetrics metrics = new QuorumControllerMetrics(Optional.of(registry), time, 9000)) {
Gauge<Double> avgIdleRatio = (Gauge<Double>) registry.allMetrics().get(metricName("ControllerEventManager", "AvgIdleRatio"));

// No idle time recorded yet
assertEquals(1.0, avgIdleRatio.value());

metrics.updateIdleStartTime();
time.sleep(10); //initial record is ignored
metrics.updateIdleEndTime();
time.sleep(20); // wait 20ms non-idle
metrics.updateIdleStartTime();
time.sleep(20); // 20ms idle
metrics.updateIdleEndTime();
assertEquals(0.5, avgIdleRatio.value());

metrics.updateIdleStartTime();
time.sleep(0);
metrics.updateIdleEndTime();
time.sleep(19);
metrics.updateIdleStartTime();
time.sleep(1);
metrics.updateIdleEndTime();
assertEquals(0.05, avgIdleRatio.value());

} finally {
registry.shutdown();
}
}

private static void assertMetricHistogram(MetricsRegistry registry, MetricName metricName, long count, double sum) {
Histogram histogram = (Histogram) registry.allMetrics().get(metricName);

Expand Down