Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,8 @@ public QuorumController build() throws Exception {

KafkaEventQueue queue = null;
try {
queue = new KafkaEventQueue(time, logContext, threadNamePrefix);
queue = new KafkaEventQueue(time, logContext, threadNamePrefix, EventQueue.VoidEvent.INSTANCE, controllerMetrics::updateIdleTime);

return new QuorumController(
nonFatalFaultHandler,
fatalFaultHandler,
Expand Down Expand Up @@ -1488,7 +1489,6 @@ private QuorumController(
this.queue = queue;
this.time = time;
this.controllerMetrics = controllerMetrics;
queue.setIdleTimeCallback(controllerMetrics::updateIdleTime);
this.snapshotRegistry = new SnapshotRegistry(logContext);
this.deferredEventQueue = new DeferredEventQueue(logContext);
this.resourceExists = new ConfigResourceExistenceChecker();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import java.util.OptionalLong;
import java.util.TreeMap;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
Expand Down Expand Up @@ -281,7 +280,7 @@ private void handleEvents() {
continue;
}

long startIdleNs = time.nanoseconds();
long startIdleMs = time.milliseconds();
try {
if (awaitNs == Long.MAX_VALUE) {
cond.await();
Expand All @@ -294,11 +293,7 @@ private void handleEvents() {
"Shutting down event queue", (awaitNs == Long.MAX_VALUE) ? "new" : "deferred");
interrupted = true;
} finally {
if (idleTimeCallback != null) {
long idleNs = Math.max(time.nanoseconds() - startIdleNs, 0);
long idleMs = TimeUnit.NANOSECONDS.toMillis(idleNs);
idleTimeCallback.accept(idleMs);
}
idleTimeCallback.accept(Math.max(time.milliseconds() - startIdleMs, 0));
}
} finally {
lock.unlock();
Expand Down Expand Up @@ -448,22 +443,32 @@ int size() {
/**
* Optional callback for queue idle time tracking.
*/
private Consumer<Long> idleTimeCallback;
private final Consumer<Long> idleTimeCallback;


public KafkaEventQueue(
Time time,
LogContext logContext,
String threadNamePrefix
) {
this(time, logContext, threadNamePrefix, VoidEvent.INSTANCE);
this(time, logContext, threadNamePrefix, VoidEvent.INSTANCE, null);
}

public KafkaEventQueue(
Time time,
LogContext logContext,
String threadNamePrefix,
Event cleanupEvent
) {
this(time, logContext, threadNamePrefix, cleanupEvent, null);
}

public KafkaEventQueue(
Time time,
LogContext logContext,
String threadNamePrefix,
Event cleanupEvent,
Consumer<Long> idleTimeCallback
) {
this.time = time;
this.cleanupEvent = Objects.requireNonNull(cleanupEvent);
Expand All @@ -474,6 +479,7 @@ public KafkaEventQueue(
this.eventHandler, false);
this.shuttingDown = false;
this.interrupted = false;
this.idleTimeCallback = idleTimeCallback != null ? idleTimeCallback : __ -> { };
this.eventHandlerThread.start();
}

Expand Down Expand Up @@ -514,10 +520,6 @@ public void beginShutdown(String source) {
}
}

public void setIdleTimeCallback(Consumer<Long> callback) {
this.idleTimeCallback = callback;
}

@Override
public int size() {
return eventHandler.size();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -424,4 +424,61 @@ public void testInterruptedWithDeferredEvents() throws Exception {
assertEquals(InterruptedException.class, ieTrapper2.exception.get().getClass());
}
}

@Test
public void testIdleTimeCallback() throws Exception {
MockTime time = new MockTime();
AtomicLong lastIdleTimeMs = new AtomicLong(0);
AtomicInteger idleCallCount = new AtomicInteger(0);

try (KafkaEventQueue queue = new KafkaEventQueue(
time,
logContext,
"testIdleTimeCallback",
EventQueue.VoidEvent.INSTANCE,
idleMs -> {
lastIdleTimeMs.set(idleMs);
idleCallCount.incrementAndGet();
})) {

// Test 1: Two events with a wait in between using FutureEvent
CompletableFuture<String> event1 = new CompletableFuture<>();
queue.append(new FutureEvent<>(event1, () -> {
time.sleep(1);
return "event1-processed";
}));
assertEquals("event1-processed", event1.get());

long waitTime5Ms = 5;
time.sleep(waitTime5Ms);

CompletableFuture<String> event2 = new CompletableFuture<>();
queue.append(new FutureEvent<>(event2, () -> {
time.sleep(1);
return "event2-processed";
}));
assertEquals("event2-processed", event2.get());

TestUtils.waitForCondition(
() -> idleCallCount.get() == 2,
"Idle callback should have been called twice"
);
assertEquals(waitTime5Ms, lastIdleTimeMs.get(), "Last idle time should be 5ms");

// Test 2: Deferred event
long waitTime2Ms = 2;
CompletableFuture<Void> deferredEvent2 = new CompletableFuture<>();
queue.scheduleDeferred("deferred2",
__ -> OptionalLong.of(time.nanoseconds() + TimeUnit.MILLISECONDS.toNanos(waitTime2Ms)),
() -> deferredEvent2.complete(null));
time.sleep(waitTime2Ms);
deferredEvent2.get();

TestUtils.waitForCondition(
() -> idleCallCount.get() == 3,
"Idle callback should have been called three times"
);
assertEquals(3, idleCallCount.get(), "Idle callback should have been called three times");
}
}
}