Skip to content

Commit c9a4c87

Browse files
committed
address feedback - part 5
1 parent 1e1ce61 commit c9a4c87

File tree

5 files changed

+50
-36
lines changed

5 files changed

+50
-36
lines changed

metadata/src/main/java/org/apache/kafka/controller/QuorumController.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -406,7 +406,13 @@ public QuorumController build() throws Exception {
406406

407407
KafkaEventQueue queue = null;
408408
try {
409-
queue = new KafkaEventQueue(time, logContext, threadNamePrefix, EventQueue.VoidEvent.INSTANCE, controllerMetrics::updateIdleTime);
409+
queue = new KafkaEventQueue(
410+
time,
411+
logContext,
412+
threadNamePrefix,
413+
EventQueue.VoidEvent.INSTANCE,
414+
Optional.of(controllerMetrics::updateIdleTime)
415+
);
410416

411417
return new QuorumController(
412418
nonFatalFaultHandler,

metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
package org.apache.kafka.controller.metrics;
1919

20-
import org.apache.kafka.common.metrics.MetricConfig;
2120
import org.apache.kafka.common.utils.Time;
2221
import org.apache.kafka.server.metrics.KafkaYammerMetrics;
2322
import org.apache.kafka.server.metrics.TimeRatio;
@@ -71,7 +70,6 @@ public class QuorumControllerMetrics implements AutoCloseable {
7170

7271
private static final String TIME_SINCE_LAST_HEARTBEAT_RECEIVED_METRIC_NAME = "TimeSinceLastHeartbeatReceivedMs";
7372
private static final String BROKER_ID_TAG = "broker";
74-
private static final MetricConfig METRIC_CONFIG = new MetricConfig();
7573

7674
private final Optional<MetricsRegistry> registry;
7775
private final Time time;
@@ -168,13 +166,13 @@ public Long value() {
168166
registry.ifPresent(r -> r.newGauge(AVERAGE_IDLE_RATIO, new Gauge<Double>() {
169167
@Override
170168
public Double value() {
171-
return avgIdleTimeRatio.measure(METRIC_CONFIG, time.milliseconds());
169+
return avgIdleTimeRatio.measure();
172170
}
173171
}));
174172
}
175173

176174
public void updateIdleTime(long idleDurationMs) {
177-
avgIdleTimeRatio.record(METRIC_CONFIG, (double) idleDurationMs, time.milliseconds());
175+
avgIdleTimeRatio.record((double) idleDurationMs, time.milliseconds());
178176
}
179177

180178
public void addTimeSinceLastHeartbeatMetric(int brokerId) {

server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -288,9 +288,10 @@ private void handleEvents() {
288288
cond.awaitNanos(awaitNs);
289289
}
290290
} catch (InterruptedException e) {
291-
292-
log.warn("Interrupted while waiting for a {} event. " +
293-
"Shutting down event queue", (awaitNs == Long.MAX_VALUE) ? "new" : "deferred");
291+
log.warn(
292+
"Interrupted while waiting for a {} event. Shutting down event queue",
293+
(awaitNs == Long.MAX_VALUE) ? "new" : "deferred"
294+
);
294295
interrupted = true;
295296
} finally {
296297
idleTimeCallback.accept(Math.max(time.milliseconds() - startIdleMs, 0));
@@ -451,7 +452,7 @@ public KafkaEventQueue(
451452
LogContext logContext,
452453
String threadNamePrefix
453454
) {
454-
this(time, logContext, threadNamePrefix, VoidEvent.INSTANCE, null);
455+
this(time, logContext, threadNamePrefix, VoidEvent.INSTANCE, Optional.empty());
455456
}
456457

457458
public KafkaEventQueue(
@@ -460,15 +461,15 @@ public KafkaEventQueue(
460461
String threadNamePrefix,
461462
Event cleanupEvent
462463
) {
463-
this(time, logContext, threadNamePrefix, cleanupEvent, null);
464+
this(time, logContext, threadNamePrefix, cleanupEvent, Optional.empty());
464465
}
465466

466467
public KafkaEventQueue(
467468
Time time,
468469
LogContext logContext,
469470
String threadNamePrefix,
470471
Event cleanupEvent,
471-
Consumer<Long> idleTimeCallback
472+
Optional<Consumer<Long>> idleTimeCallback
472473
) {
473474
this.time = time;
474475
this.cleanupEvent = Objects.requireNonNull(cleanupEvent);
@@ -479,7 +480,7 @@ public KafkaEventQueue(
479480
this.eventHandler, false);
480481
this.shuttingDown = false;
481482
this.interrupted = false;
482-
this.idleTimeCallback = idleTimeCallback != null ? idleTimeCallback : __ -> { };
483+
this.idleTimeCallback = idleTimeCallback.orElse(__ -> { });
483484
this.eventHandlerThread.start();
484485
}
485486

server-common/src/main/java/org/apache/kafka/server/metrics/TimeRatio.java

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -46,11 +46,26 @@ public TimeRatio(double defaultRatio) {
4646

4747
@Override
4848
public double measure(MetricConfig config, long currentTimestampMs) {
49+
return measure();
50+
}
51+
52+
@Override
53+
public void record(MetricConfig config, double value, long currentTimestampMs) {
54+
record(value, currentTimestampMs);
55+
}
56+
57+
/**
58+
* Measures the ratio of recorded duration to the interval duration
59+
* since the last measurement.
60+
*
61+
* @return The measured ratio value between 0.0 and 1.0
62+
*/
63+
public double measure() {
4964
if (lastRecordedTimestampMs < 0) {
5065
// Return the default value if no recordings have been captured.
5166
return defaultRatio;
5267
} else {
53-
// We measure the ratio over the
68+
// We measure the ratio over the interval
5469
double intervalDurationMs = Math.max(lastRecordedTimestampMs - intervalStartTimestampMs, 0);
5570
final double ratio;
5671
if (intervalDurationMs == 0) {
@@ -61,15 +76,20 @@ public double measure(MetricConfig config, long currentTimestampMs) {
6176
ratio = totalRecordedDurationMs / intervalDurationMs;
6277
}
6378

64-
// The next interval begins at the
79+
// The next interval begins at the last recorded timestamp
6580
intervalStartTimestampMs = lastRecordedTimestampMs;
6681
totalRecordedDurationMs = 0;
6782
return ratio;
6883
}
6984
}
7085

71-
@Override
72-
public void record(MetricConfig config, double value, long currentTimestampMs) {
86+
/**
87+
* Records a duration value at the specified timestamp.
88+
*
89+
* @param value The duration value to record
90+
* @param currentTimestampMs The current timestamp in milliseconds
91+
*/
92+
public void record(double value, long currentTimestampMs) {
7393
if (intervalStartTimestampMs < 0) {
7494
// Discard the initial value since the value occurred prior to the interval start
7595
intervalStartTimestampMs = currentTimestampMs;
@@ -78,5 +98,4 @@ public void record(MetricConfig config, double value, long currentTimestampMs) {
7898
lastRecordedTimestampMs = currentTimestampMs;
7999
}
80100
}
81-
82101
}

server-common/src/test/java/org/apache/kafka/queue/KafkaEventQueueTest.java

Lines changed: 9 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.junit.jupiter.api.Timeout;
3131

3232
import java.util.List;
33+
import java.util.Optional;
3334
import java.util.OptionalLong;
3435
import java.util.concurrent.CompletableFuture;
3536
import java.util.concurrent.ExecutionException;
@@ -429,18 +430,16 @@ public void testInterruptedWithDeferredEvents() throws Exception {
429430
public void testIdleTimeCallback() throws Exception {
430431
MockTime time = new MockTime();
431432
AtomicLong lastIdleTimeMs = new AtomicLong(0);
432-
AtomicInteger idleCallCount = new AtomicInteger(0);
433433

434434
try (KafkaEventQueue queue = new KafkaEventQueue(
435435
time,
436436
logContext,
437437
"testIdleTimeCallback",
438438
EventQueue.VoidEvent.INSTANCE,
439-
idleMs -> {
440-
lastIdleTimeMs.set(idleMs);
441-
idleCallCount.incrementAndGet();
442-
})) {
443-
439+
Optional.of(lastIdleTimeMs::set))) {
440+
time.sleep(2);
441+
assertEquals(0, lastIdleTimeMs.get(), "Last idle time should be 0ms");
442+
444443
// Test 1: Two events with a wait in between using FutureEvent
445444
CompletableFuture<String> event1 = new CompletableFuture<>();
446445
queue.append(new FutureEvent<>(event1, () -> {
@@ -451,19 +450,14 @@ public void testIdleTimeCallback() throws Exception {
451450

452451
long waitTime5Ms = 5;
453452
time.sleep(waitTime5Ms);
454-
455453
CompletableFuture<String> event2 = new CompletableFuture<>();
456454
queue.append(new FutureEvent<>(event2, () -> {
457455
time.sleep(1);
458456
return "event2-processed";
459457
}));
460458
assertEquals("event2-processed", event2.get());
461-
462-
TestUtils.waitForCondition(
463-
() -> idleCallCount.get() == 2,
464-
"Idle callback should have been called twice"
465-
);
466-
assertEquals(waitTime5Ms, lastIdleTimeMs.get(), "Last idle time should be 5ms");
459+
assertTrue(lastIdleTimeMs.get() >= waitTime5Ms,
460+
"Idle time should be at least " + waitTime5Ms + "ms, was: " + lastIdleTimeMs.get());
467461

468462
// Test 2: Deferred event
469463
long waitTime2Ms = 2;
@@ -473,12 +467,8 @@ public void testIdleTimeCallback() throws Exception {
473467
() -> deferredEvent2.complete(null));
474468
time.sleep(waitTime2Ms);
475469
deferredEvent2.get();
476-
477-
TestUtils.waitForCondition(
478-
() -> idleCallCount.get() == 3,
479-
"Idle callback should have been called three times"
480-
);
481-
assertEquals(3, idleCallCount.get(), "Idle callback should have been called three times");
470+
assertTrue(lastIdleTimeMs.get() >= waitTime2Ms,
471+
"Idle time should be at least " + waitTime2Ms + "ms, was: " + lastIdleTimeMs.get());
482472
}
483473
}
484474
}

0 commit comments

Comments
 (0)