Skip to content

Commit 2bb1075

Browse files
committed
address feedback - part 4
1 parent d92129b commit 2bb1075

File tree

3 files changed

+68
-15
lines changed

3 files changed

+68
-15
lines changed

metadata/src/main/java/org/apache/kafka/controller/QuorumController.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -406,7 +406,8 @@ public QuorumController build() throws Exception {
406406

407407
KafkaEventQueue queue = null;
408408
try {
409-
queue = new KafkaEventQueue(time, logContext, threadNamePrefix);
409+
queue = new KafkaEventQueue(time, logContext, threadNamePrefix, EventQueue.VoidEvent.INSTANCE, controllerMetrics::updateIdleTime);
410+
410411
return new QuorumController(
411412
nonFatalFaultHandler,
412413
fatalFaultHandler,
@@ -1488,7 +1489,6 @@ private QuorumController(
14881489
this.queue = queue;
14891490
this.time = time;
14901491
this.controllerMetrics = controllerMetrics;
1491-
queue.setIdleTimeCallback(controllerMetrics::updateIdleTime);
14921492
this.snapshotRegistry = new SnapshotRegistry(logContext);
14931493
this.deferredEventQueue = new DeferredEventQueue(logContext);
14941494
this.resourceExists = new ConfigResourceExistenceChecker();

server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import java.util.OptionalLong;
3232
import java.util.TreeMap;
3333
import java.util.concurrent.RejectedExecutionException;
34-
import java.util.concurrent.TimeUnit;
3534
import java.util.concurrent.locks.Condition;
3635
import java.util.concurrent.locks.ReentrantLock;
3736
import java.util.function.Consumer;
@@ -281,7 +280,7 @@ private void handleEvents() {
281280
continue;
282281
}
283282

284-
long startIdleNs = time.nanoseconds();
283+
long startIdleMs = time.milliseconds();
285284
try {
286285
if (awaitNs == Long.MAX_VALUE) {
287286
cond.await();
@@ -294,11 +293,7 @@ private void handleEvents() {
294293
"Shutting down event queue", (awaitNs == Long.MAX_VALUE) ? "new" : "deferred");
295294
interrupted = true;
296295
} finally {
297-
if (idleTimeCallback != null) {
298-
long idleNs = Math.max(time.nanoseconds() - startIdleNs, 0);
299-
long idleMs = TimeUnit.NANOSECONDS.toMillis(idleNs);
300-
idleTimeCallback.accept(idleMs);
301-
}
296+
idleTimeCallback.accept(Math.max(time.milliseconds() - startIdleMs, 0));
302297
}
303298
} finally {
304299
lock.unlock();
@@ -448,22 +443,32 @@ int size() {
448443
/**
449444
* Optional callback for queue idle time tracking.
450445
*/
451-
private Consumer<Long> idleTimeCallback;
446+
private final Consumer<Long> idleTimeCallback;
452447

453448

454449
public KafkaEventQueue(
455450
Time time,
456451
LogContext logContext,
457452
String threadNamePrefix
458453
) {
459-
this(time, logContext, threadNamePrefix, VoidEvent.INSTANCE);
454+
this(time, logContext, threadNamePrefix, VoidEvent.INSTANCE, null);
460455
}
461456

462457
public KafkaEventQueue(
463458
Time time,
464459
LogContext logContext,
465460
String threadNamePrefix,
466461
Event cleanupEvent
462+
) {
463+
this(time, logContext, threadNamePrefix, cleanupEvent, null);
464+
}
465+
466+
public KafkaEventQueue(
467+
Time time,
468+
LogContext logContext,
469+
String threadNamePrefix,
470+
Event cleanupEvent,
471+
Consumer<Long> idleTimeCallback
467472
) {
468473
this.time = time;
469474
this.cleanupEvent = Objects.requireNonNull(cleanupEvent);
@@ -474,6 +479,7 @@ public KafkaEventQueue(
474479
this.eventHandler, false);
475480
this.shuttingDown = false;
476481
this.interrupted = false;
482+
this.idleTimeCallback = idleTimeCallback != null ? idleTimeCallback : __ -> { };
477483
this.eventHandlerThread.start();
478484
}
479485

@@ -514,10 +520,6 @@ public void beginShutdown(String source) {
514520
}
515521
}
516522

517-
public void setIdleTimeCallback(Consumer<Long> callback) {
518-
this.idleTimeCallback = callback;
519-
}
520-
521523
@Override
522524
public int size() {
523525
return eventHandler.size();

server-common/src/test/java/org/apache/kafka/queue/KafkaEventQueueTest.java

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -424,4 +424,55 @@ public void testInterruptedWithDeferredEvents() throws Exception {
424424
assertEquals(InterruptedException.class, ieTrapper2.exception.get().getClass());
425425
}
426426
}
427+
428+
@Test
429+
public void testIdleTimeCallback() throws Exception {
430+
MockTime time = new MockTime();
431+
AtomicLong lastIdleTimeMs = new AtomicLong(0);
432+
AtomicInteger idleCallCount = new AtomicInteger(0);
433+
434+
try (KafkaEventQueue queue = new KafkaEventQueue(
435+
time,
436+
logContext,
437+
"testIdleTimeCallback",
438+
EventQueue.VoidEvent.INSTANCE,
439+
idleMs -> {
440+
lastIdleTimeMs.set(idleMs);
441+
idleCallCount.incrementAndGet();
442+
})) {
443+
444+
// Test 1: Two events with a wait in between using FutureEvent
445+
CompletableFuture<String> event1 = new CompletableFuture<>();
446+
queue.append(new FutureEvent<>(event1, () -> {
447+
time.sleep(1);
448+
return "event1-processed";
449+
}));
450+
assertEquals("event1-processed", event1.get());
451+
452+
long waitTime5Ms = 5;
453+
time.sleep(waitTime5Ms);
454+
455+
CompletableFuture<String> event2 = new CompletableFuture<>();
456+
queue.append(new FutureEvent<>(event2, () -> {
457+
time.sleep(1);
458+
return "event2-processed";
459+
}));
460+
assertEquals("event2-processed", event2.get());
461+
462+
assertEquals(2, idleCallCount.get(), "Idle callback should have been called twice");
463+
assertEquals(waitTime5Ms, lastIdleTimeMs.get(), "Last idle time should be 5ms");
464+
465+
// Test 2: Deferred event
466+
long waitTime2Ms = 2;
467+
CompletableFuture<Void> deferredEvent2 = new CompletableFuture<>();
468+
queue.scheduleDeferred("deferred2",
469+
__ -> OptionalLong.of(time.nanoseconds() + TimeUnit.MILLISECONDS.toNanos(waitTime2Ms)),
470+
() -> deferredEvent2.complete(null));
471+
time.sleep(waitTime2Ms);
472+
deferredEvent2.get();
473+
474+
assertEquals(waitTime2Ms, lastIdleTimeMs.get(), "Last idle time should be 2ms");
475+
assertEquals(3, idleCallCount.get(), "Idle callback should have been called three times");
476+
}
477+
}
427478
}

0 commit comments

Comments
 (0)