Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions checkstyle/import-control.xml
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,7 @@
<allow pkg="org.apache.kafka.server.common.serialization" />
<allow pkg="org.apache.kafka.server.config" />
<allow pkg="org.apache.kafka.server.fault"/>
<allow pkg="org.apache.kafka.server.metrics" />
<allow pkg="org.apache.kafka.server.util" />
<allow pkg="org.apache.kafka.test"/>
<allow pkg="com.fasterxml.jackson" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,8 @@ public QuorumController build() throws Exception {

KafkaEventQueue queue = null;
try {
queue = new KafkaEventQueue(time, logContext, threadNamePrefix);
queue = new KafkaEventQueue(time, logContext, threadNamePrefix, EventQueue.VoidEvent.INSTANCE, controllerMetrics::updateIdleTime);

return new QuorumController(
nonFatalFaultHandler,
fatalFaultHandler,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@

package org.apache.kafka.controller.metrics;

import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.metrics.KafkaYammerMetrics;
import org.apache.kafka.server.metrics.TimeRatio;

import com.yammer.metrics.core.Gauge;
import com.yammer.metrics.core.Histogram;
Expand Down Expand Up @@ -48,6 +50,8 @@ public class QuorumControllerMetrics implements AutoCloseable {
"ControllerEventManager", "EventQueueTimeMs");
private static final MetricName EVENT_QUEUE_PROCESSING_TIME_MS = getMetricName(
"ControllerEventManager", "EventQueueProcessingTimeMs");
private static final MetricName AVERAGE_IDLE_RATIO = getMetricName(
"ControllerEventManager", "AvgIdleRatio");
private static final MetricName LAST_APPLIED_RECORD_OFFSET = getMetricName(
"KafkaController", "LastAppliedRecordOffset");
private static final MetricName LAST_COMMITTED_RECORD_OFFSET = getMetricName(
Expand All @@ -64,8 +68,10 @@ public class QuorumControllerMetrics implements AutoCloseable {
"KafkaController", "EventQueueOperationsTimedOutCount");
private static final MetricName NEW_ACTIVE_CONTROLLERS_COUNT = getMetricName(
"KafkaController", "NewActiveControllersCount");

private static final String TIME_SINCE_LAST_HEARTBEAT_RECEIVED_METRIC_NAME = "TimeSinceLastHeartbeatReceivedMs";
private static final String BROKER_ID_TAG = "broker";
private static final MetricConfig METRIC_CONFIG = new MetricConfig();

private final Optional<MetricsRegistry> registry;
private final Time time;
Expand All @@ -75,6 +81,7 @@ public class QuorumControllerMetrics implements AutoCloseable {
private final AtomicLong lastAppliedRecordTimestamp = new AtomicLong(0);
private final Consumer<Long> eventQueueTimeUpdater;
private final Consumer<Long> eventQueueProcessingTimeUpdater;
private final TimeRatio avgIdleTimeRatio;

private final AtomicLong timedOutHeartbeats = new AtomicLong(0);
private final AtomicLong operationsStarted = new AtomicLong(0);
Expand Down Expand Up @@ -109,6 +116,7 @@ public Integer value() {
this.eventQueueTimeUpdater = newHistogram(EVENT_QUEUE_TIME_MS, true);
this.eventQueueProcessingTimeUpdater = newHistogram(EVENT_QUEUE_PROCESSING_TIME_MS, true);
this.sessionTimeoutMs = sessionTimeoutMs;
this.avgIdleTimeRatio = new TimeRatio(1);
registry.ifPresent(r -> r.newGauge(LAST_APPLIED_RECORD_OFFSET, new Gauge<Long>() {
@Override
public Long value() {
Expand Down Expand Up @@ -157,6 +165,16 @@ public Long value() {
return newActiveControllers();
}
}));
registry.ifPresent(r -> r.newGauge(AVERAGE_IDLE_RATIO, new Gauge<Double>() {
@Override
public Double value() {
return avgIdleTimeRatio.measure(METRIC_CONFIG, time.milliseconds());
}
}));
}

public void updateIdleTime(long idleDurationMs) {
avgIdleTimeRatio.record(METRIC_CONFIG, (double) idleDurationMs, time.milliseconds());
}

public void addTimeSinceLastHeartbeatMetric(int brokerId) {
Expand Down Expand Up @@ -291,7 +309,8 @@ public void close() {
TIMED_OUT_BROKER_HEARTBEAT_COUNT,
EVENT_QUEUE_OPERATIONS_STARTED_COUNT,
EVENT_QUEUE_OPERATIONS_TIMED_OUT_COUNT,
NEW_ACTIVE_CONTROLLERS_COUNT
NEW_ACTIVE_CONTROLLERS_COUNT,
AVERAGE_IDLE_RATIO
).forEach(r::removeMetric));
removeTimeSinceLastHeartbeatMetrics();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public void testMetricNames() {
Set<String> expected = Set.of(
"kafka.controller:type=ControllerEventManager,name=EventQueueProcessingTimeMs",
"kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs",
"kafka.controller:type=ControllerEventManager,name=AvgIdleRatio",
"kafka.controller:type=KafkaController,name=ActiveControllerCount",
"kafka.controller:type=KafkaController,name=EventQueueOperationsStartedCount",
"kafka.controller:type=KafkaController,name=EventQueueOperationsTimedOutCount",
Expand Down Expand Up @@ -188,6 +189,30 @@ public void testTimeSinceLastHeartbeatReceivedMs() {
}
}

@Test
public void testAvgIdleRatio() {
MetricsRegistry registry = new MetricsRegistry();
MockTime time = new MockTime();
try (QuorumControllerMetrics metrics = new QuorumControllerMetrics(Optional.of(registry), time, 9000)) {
Gauge<Double> avgIdleRatio = (Gauge<Double>) registry.allMetrics().get(metricName("ControllerEventManager", "AvgIdleRatio"));

// No idle time recorded yet
assertEquals(1.0, avgIdleRatio.value());

metrics.updateIdleTime(10);
time.sleep(40);
metrics.updateIdleTime(20);
assertEquals(0.5, avgIdleRatio.value());

time.sleep(20);
metrics.updateIdleTime(1);
assertEquals(0.05, avgIdleRatio.value());

} finally {
registry.shutdown();
}
}

private static void assertMetricHistogram(MetricsRegistry registry, MetricName metricName, long count, double sum) {
Histogram histogram = (Histogram) registry.allMetrics().get(metricName);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.kafka.raft.QuorumState;
import org.apache.kafka.raft.ReplicaKey;
import org.apache.kafka.server.common.OffsetAndEpoch;
import org.apache.kafka.server.metrics.TimeRatio;

import java.util.List;
import java.util.OptionalLong;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.UnaryOperator;

Expand Down Expand Up @@ -278,22 +279,21 @@ private void handleEvents() {
remove(toRun);
continue;
}
if (awaitNs == Long.MAX_VALUE) {
try {

long startIdleMs = time.milliseconds();
try {
if (awaitNs == Long.MAX_VALUE) {
cond.await();
} catch (InterruptedException e) {
log.warn("Interrupted while waiting for a new event. " +
"Shutting down event queue");
interrupted = true;
}
} else {
try {
} else {
cond.awaitNanos(awaitNs);
} catch (InterruptedException e) {
log.warn("Interrupted while waiting for a deferred event. " +
"Shutting down event queue");
interrupted = true;
}
} catch (InterruptedException e) {

log.warn("Interrupted while waiting for a {} event. " +
"Shutting down event queue", (awaitNs == Long.MAX_VALUE) ? "new" : "deferred");
interrupted = true;
} finally {
idleTimeCallback.accept(Math.max(time.milliseconds() - startIdleMs, 0));
}
} finally {
lock.unlock();
Expand Down Expand Up @@ -440,19 +440,35 @@ int size() {
*/
private boolean interrupted;

/**
* Optional callback for queue idle time tracking.
*/
private final Consumer<Long> idleTimeCallback;


public KafkaEventQueue(
Time time,
LogContext logContext,
String threadNamePrefix
) {
this(time, logContext, threadNamePrefix, VoidEvent.INSTANCE);
this(time, logContext, threadNamePrefix, VoidEvent.INSTANCE, null);
}

public KafkaEventQueue(
Time time,
LogContext logContext,
String threadNamePrefix,
Event cleanupEvent
) {
this(time, logContext, threadNamePrefix, cleanupEvent, null);
}

public KafkaEventQueue(
Time time,
LogContext logContext,
String threadNamePrefix,
Event cleanupEvent,
Consumer<Long> idleTimeCallback
) {
this.time = time;
this.cleanupEvent = Objects.requireNonNull(cleanupEvent);
Expand All @@ -463,6 +479,7 @@ public KafkaEventQueue(
this.eventHandler, false);
this.shuttingDown = false;
this.interrupted = false;
this.idleTimeCallback = idleTimeCallback != null ? idleTimeCallback : __ -> { };
this.eventHandlerThread.start();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.raft.internals;
package org.apache.kafka.server.metrics;

import org.apache.kafka.common.metrics.MeasurableStat;
import org.apache.kafka.common.metrics.MetricConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -424,4 +424,61 @@ public void testInterruptedWithDeferredEvents() throws Exception {
assertEquals(InterruptedException.class, ieTrapper2.exception.get().getClass());
}
}

@Test
public void testIdleTimeCallback() throws Exception {
MockTime time = new MockTime();
AtomicLong lastIdleTimeMs = new AtomicLong(0);
AtomicInteger idleCallCount = new AtomicInteger(0);

try (KafkaEventQueue queue = new KafkaEventQueue(
time,
logContext,
"testIdleTimeCallback",
EventQueue.VoidEvent.INSTANCE,
idleMs -> {
lastIdleTimeMs.set(idleMs);
idleCallCount.incrementAndGet();
})) {

// Test 1: Two events with a wait in between using FutureEvent
CompletableFuture<String> event1 = new CompletableFuture<>();
queue.append(new FutureEvent<>(event1, () -> {
time.sleep(1);
return "event1-processed";
}));
assertEquals("event1-processed", event1.get());

long waitTime5Ms = 5;
time.sleep(waitTime5Ms);

CompletableFuture<String> event2 = new CompletableFuture<>();
queue.append(new FutureEvent<>(event2, () -> {
time.sleep(1);
return "event2-processed";
}));
assertEquals("event2-processed", event2.get());

TestUtils.waitForCondition(
() -> idleCallCount.get() == 2,
"Idle callback should have been called twice"
);
assertEquals(waitTime5Ms, lastIdleTimeMs.get(), "Last idle time should be 5ms");

// Test 2: Deferred event
long waitTime2Ms = 2;
CompletableFuture<Void> deferredEvent2 = new CompletableFuture<>();
queue.scheduleDeferred("deferred2",
__ -> OptionalLong.of(time.nanoseconds() + TimeUnit.MILLISECONDS.toNanos(waitTime2Ms)),
() -> deferredEvent2.complete(null));
time.sleep(waitTime2Ms);
deferredEvent2.get();

TestUtils.waitForCondition(
() -> idleCallCount.get() == 3,
"Idle callback should have been called three times"
);
assertEquals(3, idleCallCount.get(), "Idle callback should have been called three times");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.raft.internals;
package org.apache.kafka.server.metrics;

import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.utils.MockTime;
Expand Down