Skip to content

Commit e63cc81

Browse files
committed
address feedback - part 5
1 parent 1e1ce61 commit e63cc81

File tree

5 files changed

+44
-18
lines changed

5 files changed

+44
-18
lines changed

metadata/src/main/java/org/apache/kafka/controller/QuorumController.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -406,7 +406,13 @@ public QuorumController build() throws Exception {
406406

407407
KafkaEventQueue queue = null;
408408
try {
409-
queue = new KafkaEventQueue(time, logContext, threadNamePrefix, EventQueue.VoidEvent.INSTANCE, controllerMetrics::updateIdleTime);
409+
queue = new KafkaEventQueue(
410+
time,
411+
logContext,
412+
threadNamePrefix,
413+
EventQueue.VoidEvent.INSTANCE,
414+
Optional.of(controllerMetrics::updateIdleTime)
415+
);
410416

411417
return new QuorumController(
412418
nonFatalFaultHandler,

metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
package org.apache.kafka.controller.metrics;
1919

20-
import org.apache.kafka.common.metrics.MetricConfig;
2120
import org.apache.kafka.common.utils.Time;
2221
import org.apache.kafka.server.metrics.KafkaYammerMetrics;
2322
import org.apache.kafka.server.metrics.TimeRatio;
@@ -71,7 +70,6 @@ public class QuorumControllerMetrics implements AutoCloseable {
7170

7271
private static final String TIME_SINCE_LAST_HEARTBEAT_RECEIVED_METRIC_NAME = "TimeSinceLastHeartbeatReceivedMs";
7372
private static final String BROKER_ID_TAG = "broker";
74-
private static final MetricConfig METRIC_CONFIG = new MetricConfig();
7573

7674
private final Optional<MetricsRegistry> registry;
7775
private final Time time;
@@ -168,13 +166,13 @@ public Long value() {
168166
registry.ifPresent(r -> r.newGauge(AVERAGE_IDLE_RATIO, new Gauge<Double>() {
169167
@Override
170168
public Double value() {
171-
return avgIdleTimeRatio.measure(METRIC_CONFIG, time.milliseconds());
169+
return avgIdleTimeRatio.measure();
172170
}
173171
}));
174172
}
175173

176174
public void updateIdleTime(long idleDurationMs) {
177-
avgIdleTimeRatio.record(METRIC_CONFIG, (double) idleDurationMs, time.milliseconds());
175+
avgIdleTimeRatio.record((double) idleDurationMs, time.milliseconds());
178176
}
179177

180178
public void addTimeSinceLastHeartbeatMetric(int brokerId) {

server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -289,8 +289,10 @@ private void handleEvents() {
289289
}
290290
} catch (InterruptedException e) {
291291

292-
log.warn("Interrupted while waiting for a {} event. " +
293-
"Shutting down event queue", (awaitNs == Long.MAX_VALUE) ? "new" : "deferred");
292+
log.warn(
293+
"Interrupted while waiting for a {} event. Shutting down event queue",
294+
(awaitNs == Long.MAX_VALUE) ? "new" : "deferred"
295+
);
294296
interrupted = true;
295297
} finally {
296298
idleTimeCallback.accept(Math.max(time.milliseconds() - startIdleMs, 0));
@@ -451,7 +453,7 @@ public KafkaEventQueue(
451453
LogContext logContext,
452454
String threadNamePrefix
453455
) {
454-
this(time, logContext, threadNamePrefix, VoidEvent.INSTANCE, null);
456+
this(time, logContext, threadNamePrefix, VoidEvent.INSTANCE, Optional.empty());
455457
}
456458

457459
public KafkaEventQueue(
@@ -460,15 +462,15 @@ public KafkaEventQueue(
460462
String threadNamePrefix,
461463
Event cleanupEvent
462464
) {
463-
this(time, logContext, threadNamePrefix, cleanupEvent, null);
465+
this(time, logContext, threadNamePrefix, cleanupEvent, Optional.empty());
464466
}
465467

466468
public KafkaEventQueue(
467469
Time time,
468470
LogContext logContext,
469471
String threadNamePrefix,
470472
Event cleanupEvent,
471-
Consumer<Long> idleTimeCallback
473+
Optional<Consumer<Long>> idleTimeCallback
472474
) {
473475
this.time = time;
474476
this.cleanupEvent = Objects.requireNonNull(cleanupEvent);
@@ -479,7 +481,7 @@ public KafkaEventQueue(
479481
this.eventHandler, false);
480482
this.shuttingDown = false;
481483
this.interrupted = false;
482-
this.idleTimeCallback = idleTimeCallback != null ? idleTimeCallback : __ -> { };
484+
this.idleTimeCallback = idleTimeCallback.orElse(__ -> { });
483485
this.eventHandlerThread.start();
484486
}
485487

server-common/src/main/java/org/apache/kafka/server/metrics/TimeRatio.java

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -46,11 +46,26 @@ public TimeRatio(double defaultRatio) {
4646

4747
@Override
4848
public double measure(MetricConfig config, long currentTimestampMs) {
49+
return measure();
50+
}
51+
52+
@Override
53+
public void record(MetricConfig config, double value, long currentTimestampMs) {
54+
record(value, currentTimestampMs);
55+
}
56+
57+
/**
58+
* Measures the ratio of recorded duration to the interval duration
59+
* since the last measurement.
60+
*
61+
* @return The measured ratio value between 0.0 and 1.0
62+
*/
63+
public double measure() {
4964
if (lastRecordedTimestampMs < 0) {
5065
// Return the default value if no recordings have been captured.
5166
return defaultRatio;
5267
} else {
53-
// We measure the ratio over the
68+
// We measure the ratio over the interval
5469
double intervalDurationMs = Math.max(lastRecordedTimestampMs - intervalStartTimestampMs, 0);
5570
final double ratio;
5671
if (intervalDurationMs == 0) {
@@ -61,15 +76,20 @@ public double measure(MetricConfig config, long currentTimestampMs) {
6176
ratio = totalRecordedDurationMs / intervalDurationMs;
6277
}
6378

64-
// The next interval begins at the
79+
// The next interval begins at the last recorded timestamp
6580
intervalStartTimestampMs = lastRecordedTimestampMs;
6681
totalRecordedDurationMs = 0;
6782
return ratio;
6883
}
6984
}
7085

71-
@Override
72-
public void record(MetricConfig config, double value, long currentTimestampMs) {
86+
/**
87+
* Records a duration value at the specified timestamp.
88+
*
89+
* @param value The duration value to record
90+
* @param currentTimestampMs The current timestamp in milliseconds
91+
*/
92+
public void record(double value, long currentTimestampMs) {
7393
if (intervalStartTimestampMs < 0) {
7494
// Discard the initial value since the value occurred prior to the interval start
7595
intervalStartTimestampMs = currentTimestampMs;
@@ -78,5 +98,4 @@ public void record(MetricConfig config, double value, long currentTimestampMs) {
7898
lastRecordedTimestampMs = currentTimestampMs;
7999
}
80100
}
81-
82101
}

server-common/src/test/java/org/apache/kafka/queue/KafkaEventQueueTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.junit.jupiter.api.Timeout;
3131

3232
import java.util.List;
33+
import java.util.Optional;
3334
import java.util.OptionalLong;
3435
import java.util.concurrent.CompletableFuture;
3536
import java.util.concurrent.ExecutionException;
@@ -436,10 +437,10 @@ public void testIdleTimeCallback() throws Exception {
436437
logContext,
437438
"testIdleTimeCallback",
438439
EventQueue.VoidEvent.INSTANCE,
439-
idleMs -> {
440+
Optional.of(idleMs -> {
440441
lastIdleTimeMs.set(idleMs);
441442
idleCallCount.incrementAndGet();
442-
})) {
443+
}))) {
443444

444445
// Test 1: Two events with a wait in between using FutureEvent
445446
CompletableFuture<String> event1 = new CompletableFuture<>();

0 commit comments

Comments
 (0)