Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,7 @@ private void handleEventEnd(String name, long startProcessingTimeNs) {
MICROSECONDS.convert(deltaNs, NANOSECONDS));
performanceMonitor.observeEvent(name, deltaNs);
controllerMetrics.updateEventQueueProcessingTime(NANOSECONDS.toMillis(deltaNs));
controllerMetrics.updateIdleStartTime();
}

private Throwable handleEventException(
Expand All @@ -549,6 +550,7 @@ private Throwable handleEventException(
} else {
deltaUs = OptionalLong.empty();
}
controllerMetrics.updateIdleStartTime();
EventHandlerExceptionInfo info = EventHandlerExceptionInfo.
fromInternal(exception, this::latestController);
int epoch = curClaimEpoch;
Expand Down Expand Up @@ -596,6 +598,7 @@ class ControllerEvent implements EventQueue.Event {

@Override
public void run() throws Exception {
controllerMetrics.updateIdleEndTime();
startProcessingTimeNs = OptionalLong.of(
updateEventStartMetricsAndGetTime(OptionalLong.of(eventCreatedTimeNs)));
log.debug("Executing {}.", this);
Expand Down Expand Up @@ -647,6 +650,7 @@ CompletableFuture<T> future() {

@Override
public void run() throws Exception {
controllerMetrics.updateIdleEndTime();
startProcessingTimeNs = OptionalLong.of(
updateEventStartMetricsAndGetTime(OptionalLong.of(eventCreatedTimeNs)));
T value = handler.get();
Expand Down Expand Up @@ -762,6 +766,7 @@ CompletableFuture<T> future() {

@Override
public void run() throws Exception {
controllerMetrics.updateIdleEndTime();
// Deferred events set the DOES_NOT_UPDATE_QUEUE_TIME flag to prevent incorrectly
// including their deferral time in the event queue time.
startProcessingTimeNs = OptionalLong.of(
Expand Down Expand Up @@ -831,6 +836,7 @@ public void run() throws Exception {

// Remember the latest offset and future if it is not already completed
if (!future.isDone()) {
controllerMetrics.updateIdleStartTime();
deferredEventQueue.add(resultAndOffset.offset(), this);
}
}
Expand All @@ -842,6 +848,7 @@ public void handleException(Throwable exception) {

@Override
public void complete(Throwable exception) {
controllerMetrics.updateIdleEndTime();
if (exception == null) {
handleEventEnd(this.toString(), startProcessingTimeNs.getAsLong());
future.complete(resultAndOffset.response());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
Expand All @@ -48,6 +49,8 @@ public class QuorumControllerMetrics implements AutoCloseable {
"ControllerEventManager", "EventQueueTimeMs");
private static final MetricName EVENT_QUEUE_PROCESSING_TIME_MS = getMetricName(
"ControllerEventManager", "EventQueueProcessingTimeMs");
private static final MetricName AVERAGE_IDLE_RATIO = getMetricName(
"ControllerEventManager", "AvgIdleRatio");
private static final MetricName LAST_APPLIED_RECORD_OFFSET = getMetricName(
"KafkaController", "LastAppliedRecordOffset");
private static final MetricName LAST_COMMITTED_RECORD_OFFSET = getMetricName(
Expand All @@ -64,6 +67,7 @@ public class QuorumControllerMetrics implements AutoCloseable {
"KafkaController", "EventQueueOperationsTimedOutCount");
private static final MetricName NEW_ACTIVE_CONTROLLERS_COUNT = getMetricName(
"KafkaController", "NewActiveControllersCount");

private static final String TIME_SINCE_LAST_HEARTBEAT_RECEIVED_METRIC_NAME = "TimeSinceLastHeartbeatReceivedMs";
private static final String BROKER_ID_TAG = "broker";

Expand All @@ -75,13 +79,15 @@ public class QuorumControllerMetrics implements AutoCloseable {
private final AtomicLong lastAppliedRecordTimestamp = new AtomicLong(0);
private final Consumer<Long> eventQueueTimeUpdater;
private final Consumer<Long> eventQueueProcessingTimeUpdater;
public final YammerTimeRatio avgIdleTimeRatio;

private final AtomicLong timedOutHeartbeats = new AtomicLong(0);
private final AtomicLong operationsStarted = new AtomicLong(0);
private final AtomicLong operationsTimedOut = new AtomicLong(0);
private final AtomicLong newActiveControllers = new AtomicLong(0);
private final Map<Integer, Long> brokerContactTimesMs = new ConcurrentHashMap<>();
private final int sessionTimeoutMs;
private volatile OptionalLong idleStartTime = OptionalLong.empty();

private Consumer<Long> newHistogram(MetricName name, boolean biased) {
if (registry.isPresent()) {
Expand Down Expand Up @@ -109,6 +115,7 @@ public Integer value() {
this.eventQueueTimeUpdater = newHistogram(EVENT_QUEUE_TIME_MS, true);
this.eventQueueProcessingTimeUpdater = newHistogram(EVENT_QUEUE_PROCESSING_TIME_MS, true);
this.sessionTimeoutMs = sessionTimeoutMs;
this.avgIdleTimeRatio = new YammerTimeRatio(1);
registry.ifPresent(r -> r.newGauge(LAST_APPLIED_RECORD_OFFSET, new Gauge<Long>() {
@Override
public Long value() {
Expand Down Expand Up @@ -157,8 +164,22 @@ public Long value() {
return newActiveControllers();
}
}));
registry.ifPresent(r -> r.newGauge(AVERAGE_IDLE_RATIO, this.avgIdleTimeRatio));
}

public void updateIdleStartTime() {
if (idleStartTime.isEmpty()) {
idleStartTime = OptionalLong.of(time.milliseconds());
}
}

public void updateIdleEndTime() {
if (this.idleStartTime.isPresent()) {
long idleDurationMs = Math.max(time.milliseconds() - idleStartTime.getAsLong(), 0);
avgIdleTimeRatio.record((double) idleDurationMs, time.milliseconds());
idleStartTime = OptionalLong.empty();
}
}
public void addTimeSinceLastHeartbeatMetric(int brokerId) {
brokerContactTimesMs.put(brokerId, time.milliseconds());
registry.ifPresent(r -> r.newGauge(
Expand Down Expand Up @@ -291,7 +312,8 @@ public void close() {
TIMED_OUT_BROKER_HEARTBEAT_COUNT,
EVENT_QUEUE_OPERATIONS_STARTED_COUNT,
EVENT_QUEUE_OPERATIONS_TIMED_OUT_COUNT,
NEW_ACTIVE_CONTROLLERS_COUNT
NEW_ACTIVE_CONTROLLERS_COUNT,
AVERAGE_IDLE_RATIO
).forEach(r::removeMetric));
removeTimeSinceLastHeartbeatMetrics();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.controller.metrics;

import org.apache.kafka.raft.internals.TimeRatio;

import com.yammer.metrics.core.Gauge;

/**
* Yammer Metrics facade for TimeRatio.
* This class provides a Gauge interface for Yammer metrics registry
* while using the same shared TimeRatio implementation that
* the Kafka Metrics KafkaTimeRatio uses.
*/
public class YammerTimeRatio extends Gauge<Double> {
private final TimeRatio timeRatio;

public YammerTimeRatio(double defaultRatio) {
this.timeRatio = new TimeRatio(defaultRatio);
}

/**
* Record an idle/wait duration.
*
* @param idleDurationMs The duration of the idle/wait period in milliseconds
* @param currentTimeMs The current time in milliseconds
*/
public void record(double idleDurationMs, long currentTimeMs) {
timeRatio.record(idleDurationMs, currentTimeMs);
}

/**
* Get the current idle ratio for Yammer Metrics.
*
* @return The ratio of idle time to total time (between 0.0 and 1.0)
*/
@Override
public Double value() {
return timeRatio.measure();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public void testMetricNames() {
Set<String> expected = Set.of(
"kafka.controller:type=ControllerEventManager,name=EventQueueProcessingTimeMs",
"kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs",
"kafka.controller:type=ControllerEventManager,name=AvgIdleRatio",
"kafka.controller:type=KafkaController,name=ActiveControllerCount",
"kafka.controller:type=KafkaController,name=EventQueueOperationsStartedCount",
"kafka.controller:type=KafkaController,name=EventQueueOperationsTimedOutCount",
Expand Down Expand Up @@ -188,6 +189,39 @@ public void testTimeSinceLastHeartbeatReceivedMs() {
}
}

@Test
public void testYammerTimeRatioIdleTimeTracking() {
MetricsRegistry registry = new MetricsRegistry();
MockTime time = new MockTime();
try (QuorumControllerMetrics metrics = new QuorumControllerMetrics(Optional.of(registry), time, 9000)) {
Gauge<Double> avgIdleRatio = (Gauge<Double>) registry.allMetrics().get(metricName("ControllerEventManager", "AvgIdleRatio"));

// No idle time recorded yet
assertEquals(1.0, avgIdleRatio.value());

metrics.updateIdleStartTime();
time.sleep(10); //initial record is ignored
metrics.updateIdleEndTime();
time.sleep(20); // wait 20ms non-idle
metrics.updateIdleStartTime();
time.sleep(20); // 20ms idle
metrics.updateIdleEndTime();
assertEquals(0.5, avgIdleRatio.value());

metrics.updateIdleStartTime();
time.sleep(0);
metrics.updateIdleEndTime();
time.sleep(19);
metrics.updateIdleStartTime();
time.sleep(1);
metrics.updateIdleEndTime();
assertEquals(0.05, avgIdleRatio.value());

} finally {
registry.shutdown();
}
}

private static void assertMetricHistogram(MetricsRegistry registry, MetricName metricName, long count, double sum) {
Histogram histogram = (Histogram) registry.allMetrics().get(metricName);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ public KafkaRaftMetrics(Metrics metrics, String metricGrpPrefix) {
"The ratio of time the Raft IO thread is idle as opposed to " +
"doing work (e.g. handling requests or replicating from the leader)"
),
new TimeRatio(1.0)
new KafkaTimeRatio(1.0)
);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.raft.internals;

import org.apache.kafka.common.metrics.MeasurableStat;
import org.apache.kafka.common.metrics.MetricConfig;

/**
* Kafka Metrics facade for TimeRatio.
* This facade adapts the TimeRatio core implementation to work with
* Kafka Metrics by implementing MeasurableStat interface.
*/
public class KafkaTimeRatio implements MeasurableStat {
private final TimeRatio timeRatio;

public KafkaTimeRatio(double defaultRatio) {
this.timeRatio = new TimeRatio(defaultRatio);
}

@Override
public double measure(MetricConfig config, long currentTimestampMs) {
return timeRatio.measure();
}

@Override
public void record(MetricConfig config, double value, long currentTimestampMs) {
timeRatio.record(value, currentTimestampMs);
}
}
40 changes: 24 additions & 16 deletions raft/src/main/java/org/apache/kafka/raft/internals/TimeRatio.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,17 @@
*/
package org.apache.kafka.raft.internals;

import org.apache.kafka.common.metrics.MeasurableStat;
import org.apache.kafka.common.metrics.MetricConfig;

/**
* Maintains an approximate ratio of the duration of a specific event
* over all time. For example, this can be used to compute the ratio of
* time that a thread is busy or idle. The value is approximate since the
* measurement and recording intervals may not be aligned.
* This metrics agnostic implementation maintain an approximate ratio of
* the duration of a specific event over all time. For example, this can
* be used to compute the ratio of time that a thread is busy or idle. The value
* is approximate since the measurement and recording intervals may not be aligned.
*
* Note that the duration of the event is assumed to be small relative to
* the interval of measurement.
*
*/
public class TimeRatio implements MeasurableStat {

public class TimeRatio {
private long intervalStartTimestampMs = -1;
private long lastRecordedTimestampMs = -1;
private double totalRecordedDurationMs = 0;
Expand All @@ -40,12 +37,17 @@ public TimeRatio(double defaultRatio) {
if (defaultRatio < 0.0 || defaultRatio > 1.0) {
throw new IllegalArgumentException("Invalid ratio: value " + defaultRatio + " is not between 0 and 1.");
}

this.defaultRatio = defaultRatio;
}

@Override
public double measure(MetricConfig config, long currentTimestampMs) {
/**
* Measure the ratio of the total recorded duration over the interval duration.
* If no recordings have been captured, it returns the default ratio.
* After measuring, it resets the recorded duration and starts a new interval.
*
* @return The ratio of total recorded duration to the interval duration
*/
public double measure() {
if (lastRecordedTimestampMs < 0) {
// Return the default value if no recordings have been captured.
return defaultRatio;
Expand All @@ -68,8 +70,15 @@ public double measure(MetricConfig config, long currentTimestampMs) {
}
}

@Override
public void record(MetricConfig config, double value, long currentTimestampMs) {
/**
* Record the duration of an event at the current timestamp.
* If this is the first record, it initializes the interval start timestamp.
* Otherwise, it updates the total recorded duration and last recorded timestamp.
*
* @param value The duration of the event in milliseconds
* @param currentTimestampMs The current time in milliseconds
*/
public void record(double value, long currentTimestampMs) {
if (intervalStartTimestampMs < 0) {
// Discard the initial value since the value occurred prior to the interval start
intervalStartTimestampMs = currentTimestampMs;
Expand All @@ -78,5 +87,4 @@ public void record(MetricConfig config, double value, long currentTimestampMs) {
lastRecordedTimestampMs = currentTimestampMs;
}
}

}
}
Loading