Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions checkstyle/import-control.xml
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,7 @@
<allow pkg="org.apache.kafka.server.common.serialization" />
<allow pkg="org.apache.kafka.server.config" />
<allow pkg="org.apache.kafka.server.fault"/>
<allow pkg="org.apache.kafka.server.metrics" />
<allow pkg="org.apache.kafka.server.util" />
<allow pkg="org.apache.kafka.test"/>
<allow pkg="com.fasterxml.jackson" />
Expand Down
4 changes: 4 additions & 0 deletions docs/upgrade.html
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,10 @@ <h5><a id="upgrade_420_notable" href="#upgrade_420_notable">Notable changes in 4
<code>kafka.log.remote:type=RemoteStorageThreadPool.RemoteLogReaderAvgIdlePercent</code>.
For further details, please refer to <a href="https://cwiki.apache.org/confluence/x/6oqMEw">KIP-1100</a>.
</li>
<li>
A new metric <code>AvgIdleRatio</code> has been added to the <code>ControllerEventManager</code> group. This metric measures the average idle ratio of the controller event queue thread,
providing visibility into how much time the controller spends waiting for events versus processing them. The metric value ranges from 0.0 (always busy) to 1.0 (always idle).
</li>
</ul>

<h4><a id="upgrade_4_1_0" href="#upgrade_4_1_0">Upgrading to 4.1.0</a></h4>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,14 @@ public QuorumController build() throws Exception {

KafkaEventQueue queue = null;
try {
queue = new KafkaEventQueue(time, logContext, threadNamePrefix);
queue = new KafkaEventQueue(
time,
logContext,
threadNamePrefix,
EventQueue.VoidEvent.INSTANCE,
controllerMetrics::updateIdleTime
);

return new QuorumController(
nonFatalFaultHandler,
fatalFaultHandler,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.metrics.KafkaYammerMetrics;
import org.apache.kafka.server.metrics.TimeRatio;

import com.yammer.metrics.core.Gauge;
import com.yammer.metrics.core.Histogram;
Expand Down Expand Up @@ -48,6 +49,8 @@ public class QuorumControllerMetrics implements AutoCloseable {
"ControllerEventManager", "EventQueueTimeMs");
private static final MetricName EVENT_QUEUE_PROCESSING_TIME_MS = getMetricName(
"ControllerEventManager", "EventQueueProcessingTimeMs");
private static final MetricName AVERAGE_IDLE_RATIO = getMetricName(
"ControllerEventManager", "AvgIdleRatio");
private static final MetricName LAST_APPLIED_RECORD_OFFSET = getMetricName(
"KafkaController", "LastAppliedRecordOffset");
private static final MetricName LAST_COMMITTED_RECORD_OFFSET = getMetricName(
Expand All @@ -64,6 +67,7 @@ public class QuorumControllerMetrics implements AutoCloseable {
"KafkaController", "EventQueueOperationsTimedOutCount");
private static final MetricName NEW_ACTIVE_CONTROLLERS_COUNT = getMetricName(
"KafkaController", "NewActiveControllersCount");

private static final String TIME_SINCE_LAST_HEARTBEAT_RECEIVED_METRIC_NAME = "TimeSinceLastHeartbeatReceivedMs";
private static final String BROKER_ID_TAG = "broker";

Expand All @@ -75,6 +79,7 @@ public class QuorumControllerMetrics implements AutoCloseable {
private final AtomicLong lastAppliedRecordTimestamp = new AtomicLong(0);
private final Consumer<Long> eventQueueTimeUpdater;
private final Consumer<Long> eventQueueProcessingTimeUpdater;
private final TimeRatio avgIdleTimeRatio;

private final AtomicLong timedOutHeartbeats = new AtomicLong(0);
private final AtomicLong operationsStarted = new AtomicLong(0);
Expand Down Expand Up @@ -109,6 +114,7 @@ public Integer value() {
this.eventQueueTimeUpdater = newHistogram(EVENT_QUEUE_TIME_MS, true);
this.eventQueueProcessingTimeUpdater = newHistogram(EVENT_QUEUE_PROCESSING_TIME_MS, true);
this.sessionTimeoutMs = sessionTimeoutMs;
this.avgIdleTimeRatio = new TimeRatio(1);
registry.ifPresent(r -> r.newGauge(LAST_APPLIED_RECORD_OFFSET, new Gauge<Long>() {
@Override
public Long value() {
Expand Down Expand Up @@ -157,6 +163,20 @@ public Long value() {
return newActiveControllers();
}
}));
registry.ifPresent(r -> r.newGauge(AVERAGE_IDLE_RATIO, new Gauge<Double>() {
@Override
public Double value() {
synchronized (avgIdleTimeRatio) {
return avgIdleTimeRatio.measure();
}
}
}));
}

public void updateIdleTime(long idleDurationMs) {
synchronized (avgIdleTimeRatio) {
avgIdleTimeRatio.record((double) idleDurationMs, time.milliseconds());
}
}

public void addTimeSinceLastHeartbeatMetric(int brokerId) {
Expand Down Expand Up @@ -291,7 +311,8 @@ public void close() {
TIMED_OUT_BROKER_HEARTBEAT_COUNT,
EVENT_QUEUE_OPERATIONS_STARTED_COUNT,
EVENT_QUEUE_OPERATIONS_TIMED_OUT_COUNT,
NEW_ACTIVE_CONTROLLERS_COUNT
NEW_ACTIVE_CONTROLLERS_COUNT,
AVERAGE_IDLE_RATIO
).forEach(r::removeMetric));
removeTimeSinceLastHeartbeatMetrics();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public void testMetricNames() {
Set<String> expected = Set.of(
"kafka.controller:type=ControllerEventManager,name=EventQueueProcessingTimeMs",
"kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs",
"kafka.controller:type=ControllerEventManager,name=AvgIdleRatio",
"kafka.controller:type=KafkaController,name=ActiveControllerCount",
"kafka.controller:type=KafkaController,name=EventQueueOperationsStartedCount",
"kafka.controller:type=KafkaController,name=EventQueueOperationsTimedOutCount",
Expand Down Expand Up @@ -189,6 +190,35 @@ public void testTimeSinceLastHeartbeatReceivedMs() {
}
}

@Test
public void testAvgIdleRatio() {
final double delta = 0.001;
MetricsRegistry registry = new MetricsRegistry();
MockTime time = new MockTime();
try (QuorumControllerMetrics metrics = new QuorumControllerMetrics(Optional.of(registry), time, 9000)) {
Gauge<Double> avgIdleRatio = (Gauge<Double>) registry.allMetrics().get(metricName("ControllerEventManager", "AvgIdleRatio"));

// No idle time recorded yet; returns default ratio of 1.0
assertEquals(1.0, avgIdleRatio.value(), delta);

// First recording is dropped to establish the interval start time
// This is because TimeRatio needs an initial timestamp to measure intervals from
metrics.updateIdleTime(10);
time.sleep(40);
metrics.updateIdleTime(20);
// avgIdleRatio = (20ms idle) / (40ms interval) = 0.5
assertEquals(0.5, avgIdleRatio.value(), delta);

time.sleep(20);
metrics.updateIdleTime(1);
// avgIdleRatio = (1ms idle) / (20ms interval) = 0.05
assertEquals(0.05, avgIdleRatio.value(), delta);

} finally {
registry.shutdown();
}
}

private static void assertMetricHistogram(MetricsRegistry registry, MetricName metricName, long count, double sum) {
Histogram histogram = (Histogram) registry.allMetrics().get(metricName);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.kafka.raft.QuorumState;
import org.apache.kafka.raft.ReplicaKey;
import org.apache.kafka.server.common.OffsetAndEpoch;
import org.apache.kafka.server.metrics.TimeRatio;

import java.util.List;
import java.util.OptionalLong;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.UnaryOperator;

Expand Down Expand Up @@ -278,22 +279,22 @@ private void handleEvents() {
remove(toRun);
continue;
}
if (awaitNs == Long.MAX_VALUE) {
try {

long startIdleMs = time.milliseconds();
try {
if (awaitNs == Long.MAX_VALUE) {
cond.await();
} catch (InterruptedException e) {
log.warn("Interrupted while waiting for a new event. " +
"Shutting down event queue");
interrupted = true;
}
} else {
try {
} else {
cond.awaitNanos(awaitNs);
} catch (InterruptedException e) {
log.warn("Interrupted while waiting for a deferred event. " +
"Shutting down event queue");
interrupted = true;
}
} catch (InterruptedException e) {
log.warn(
"Interrupted while waiting for a {} event. Shutting down event queue",
(awaitNs == Long.MAX_VALUE) ? "new" : "deferred"
);
interrupted = true;
} finally {
idleTimeCallback.accept(Math.max(time.milliseconds() - startIdleMs, 0));
}
} finally {
lock.unlock();
Expand Down Expand Up @@ -440,19 +441,35 @@ int size() {
*/
private boolean interrupted;

/**
* Optional callback for queue idle time tracking.
*/
private final Consumer<Long> idleTimeCallback;


public KafkaEventQueue(
Time time,
LogContext logContext,
String threadNamePrefix
) {
this(time, logContext, threadNamePrefix, VoidEvent.INSTANCE);
this(time, logContext, threadNamePrefix, VoidEvent.INSTANCE, __ -> { });
}

public KafkaEventQueue(
Time time,
LogContext logContext,
String threadNamePrefix,
Event cleanupEvent
) {
this(time, logContext, threadNamePrefix, cleanupEvent, __ -> { });
}

public KafkaEventQueue(
Time time,
LogContext logContext,
String threadNamePrefix,
Event cleanupEvent,
Consumer<Long> idleTimeCallback
) {
this.time = time;
this.cleanupEvent = Objects.requireNonNull(cleanupEvent);
Expand All @@ -463,6 +480,7 @@ public KafkaEventQueue(
this.eventHandler, false);
this.shuttingDown = false;
this.interrupted = false;
this.idleTimeCallback = Objects.requireNonNull(idleTimeCallback);
this.eventHandlerThread.start();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.raft.internals;
package org.apache.kafka.server.metrics;

import org.apache.kafka.common.metrics.MeasurableStat;
import org.apache.kafka.common.metrics.MetricConfig;
Expand Down Expand Up @@ -46,11 +46,26 @@ public TimeRatio(double defaultRatio) {

@Override
public double measure(MetricConfig config, long currentTimestampMs) {
return measure();
}

@Override
public void record(MetricConfig config, double value, long currentTimestampMs) {
record(value, currentTimestampMs);
}

/**
* Measures the ratio of recorded duration to the interval duration
* since the last measurement.
*
* @return The measured ratio value between 0.0 and 1.0
*/
public double measure() {
if (lastRecordedTimestampMs < 0) {
// Return the default value if no recordings have been captured.
return defaultRatio;
} else {
// We measure the ratio over the
// We measure the ratio over the interval
double intervalDurationMs = Math.max(lastRecordedTimestampMs - intervalStartTimestampMs, 0);
final double ratio;
if (intervalDurationMs == 0) {
Expand All @@ -61,15 +76,20 @@ public double measure(MetricConfig config, long currentTimestampMs) {
ratio = totalRecordedDurationMs / intervalDurationMs;
}

// The next interval begins at the
// The next interval begins at the last recorded timestamp
intervalStartTimestampMs = lastRecordedTimestampMs;
totalRecordedDurationMs = 0;
return ratio;
}
}

@Override
public void record(MetricConfig config, double value, long currentTimestampMs) {
/**
* Records a duration value at the specified timestamp.
*
* @param value The duration value to record
* @param currentTimestampMs The current timestamp in milliseconds
*/
public void record(double value, long currentTimestampMs) {
if (intervalStartTimestampMs < 0) {
// Discard the initial value since the value occurred prior to the interval start
intervalStartTimestampMs = currentTimestampMs;
Expand All @@ -78,5 +98,4 @@ public void record(MetricConfig config, double value, long currentTimestampMs) {
lastRecordedTimestampMs = currentTimestampMs;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -424,4 +424,48 @@ public void testInterruptedWithDeferredEvents() throws Exception {
assertEquals(InterruptedException.class, ieTrapper2.exception.get().getClass());
}
}

@Test
public void testIdleTimeCallback() throws Exception {
MockTime time = new MockTime();
AtomicLong lastIdleTimeMs = new AtomicLong(0);

try (KafkaEventQueue queue = new KafkaEventQueue(
time,
logContext,
"testIdleTimeCallback",
EventQueue.VoidEvent.INSTANCE,
lastIdleTimeMs::set)) {
time.sleep(2);
assertEquals(0, lastIdleTimeMs.get(), "Last idle time should be 0ms");

// Test 1: Two events with a wait in between using FutureEvent
CompletableFuture<String> event1 = new CompletableFuture<>();
queue.append(new FutureEvent<>(event1, () -> {
time.sleep(1);
return "event1-processed";
}));
assertEquals("event1-processed", event1.get());

long waitTime5Ms = 5;
time.sleep(waitTime5Ms);
CompletableFuture<String> event2 = new CompletableFuture<>();
queue.append(new FutureEvent<>(event2, () -> {
time.sleep(1);
return "event2-processed";
}));
assertEquals("event2-processed", event2.get());
assertEquals(waitTime5Ms, lastIdleTimeMs.get(), "Idle time should be " + waitTime5Ms + "ms, was: " + lastIdleTimeMs.get());

// Test 2: Deferred event
long waitTime2Ms = 2;
CompletableFuture<Void> deferredEvent2 = new CompletableFuture<>();
queue.scheduleDeferred("deferred2",
__ -> OptionalLong.of(time.nanoseconds() + TimeUnit.MILLISECONDS.toNanos(waitTime2Ms)),
() -> deferredEvent2.complete(null));
time.sleep(waitTime2Ms);
deferredEvent2.get();
assertEquals(waitTime2Ms, lastIdleTimeMs.get(), "Idle time should be " + waitTime2Ms + "ms, was: " + lastIdleTimeMs.get());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.raft.internals;
package org.apache.kafka.server.metrics;

import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.utils.MockTime;
Expand Down
Loading