Skip to content

Commit d92129b

Browse files
committed
address feedback - part 3
1 parent 715a151 commit d92129b

File tree

4 files changed

+36
-43
lines changed

4 files changed

+36
-43
lines changed

metadata/src/main/java/org/apache/kafka/controller/QuorumController.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -532,7 +532,6 @@ private void handleEventEnd(String name, long startProcessingTimeNs) {
532532
MICROSECONDS.convert(deltaNs, NANOSECONDS));
533533
performanceMonitor.observeEvent(name, deltaNs);
534534
controllerMetrics.updateEventQueueProcessingTime(NANOSECONDS.toMillis(deltaNs));
535-
controllerMetrics.updateIdleStartTime();
536535
}
537536

538537
private Throwable handleEventException(
@@ -550,7 +549,6 @@ private Throwable handleEventException(
550549
} else {
551550
deltaUs = OptionalLong.empty();
552551
}
553-
controllerMetrics.updateIdleStartTime();
554552
EventHandlerExceptionInfo info = EventHandlerExceptionInfo.
555553
fromInternal(exception, this::latestController);
556554
int epoch = curClaimEpoch;
@@ -575,7 +573,6 @@ private Throwable handleEventException(
575573

576574
private long updateEventStartMetricsAndGetTime(OptionalLong eventCreatedTimeNs) {
577575
long now = time.nanoseconds();
578-
controllerMetrics.updateIdleEndTime();
579576
controllerMetrics.incrementOperationsStarted();
580577
if (eventCreatedTimeNs.isPresent()) {
581578
controllerMetrics.updateEventQueueTime(NANOSECONDS.toMillis(now - eventCreatedTimeNs.getAsLong()));
@@ -834,7 +831,6 @@ public void run() throws Exception {
834831

835832
// Remember the latest offset and future if it is not already completed
836833
if (!future.isDone()) {
837-
controllerMetrics.updateIdleStartTime();
838834
deferredEventQueue.add(resultAndOffset.offset(), this);
839835
}
840836
}
@@ -846,7 +842,6 @@ public void handleException(Throwable exception) {
846842

847843
@Override
848844
public void complete(Throwable exception) {
849-
controllerMetrics.updateIdleEndTime();
850845
if (exception == null) {
851846
handleEventEnd(this.toString(), startProcessingTimeNs.getAsLong());
852847
future.complete(resultAndOffset.response());
@@ -1493,6 +1488,7 @@ private QuorumController(
14931488
this.queue = queue;
14941489
this.time = time;
14951490
this.controllerMetrics = controllerMetrics;
1491+
queue.setIdleTimeCallback(controllerMetrics::updateIdleTime);
14961492
this.snapshotRegistry = new SnapshotRegistry(logContext);
14971493
this.deferredEventQueue = new DeferredEventQueue(logContext);
14981494
this.resourceExists = new ConfigResourceExistenceChecker();

metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,6 @@ public class QuorumControllerMetrics implements AutoCloseable {
8989
private final AtomicLong newActiveControllers = new AtomicLong(0);
9090
private final Map<Integer, Long> brokerContactTimesMs = new ConcurrentHashMap<>();
9191
private final int sessionTimeoutMs;
92-
private final AtomicLong idleStartTime = new AtomicLong(-1);
9392

9493
private Consumer<Long> newHistogram(MetricName name, boolean biased) {
9594
if (registry.isPresent()) {
@@ -174,16 +173,8 @@ public Double value() {
174173
}));
175174
}
176175

177-
public void updateIdleStartTime() {
178-
idleStartTime.compareAndExchange(-1, time.milliseconds());
179-
}
180-
181-
public void updateIdleEndTime() {
182-
long startTime = idleStartTime.getAndSet(-1);
183-
if (startTime != -1) {
184-
long idleDurationMs = Math.max(time.milliseconds() - startTime, 0);
185-
avgIdleTimeRatio.record(METRIC_CONFIG, (double) idleDurationMs, time.milliseconds());
186-
}
176+
public void updateIdleTime(long idleDurationMs) {
177+
avgIdleTimeRatio.record(METRIC_CONFIG, (double) idleDurationMs, time.milliseconds());
187178
}
188179

189180
public void addTimeSinceLastHeartbeatMetric(int brokerId) {

metadata/src/test/java/org/apache/kafka/controller/metrics/QuorumControllerMetricsTest.java

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -199,22 +199,13 @@ public void testAvgIdleRatio() {
199199
// No idle time recorded yet
200200
assertEquals(1.0, avgIdleRatio.value());
201201

202-
metrics.updateIdleStartTime();
203-
time.sleep(10); //initial record is ignored
204-
metrics.updateIdleEndTime();
205-
time.sleep(20); // wait 20ms non-idle
206-
metrics.updateIdleStartTime();
207-
time.sleep(20); // 20ms idle
208-
metrics.updateIdleEndTime();
202+
metrics.updateIdleTime(10);
203+
time.sleep(40);
204+
metrics.updateIdleTime(20);
209205
assertEquals(0.5, avgIdleRatio.value());
210206

211-
metrics.updateIdleStartTime();
212-
time.sleep(0);
213-
metrics.updateIdleEndTime();
214-
time.sleep(19);
215-
metrics.updateIdleStartTime();
216-
time.sleep(1);
217-
metrics.updateIdleEndTime();
207+
time.sleep(20);
208+
metrics.updateIdleTime(1);
218209
assertEquals(0.05, avgIdleRatio.value());
219210

220211
} finally {

server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java

Lines changed: 28 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,10 @@
3131
import java.util.OptionalLong;
3232
import java.util.TreeMap;
3333
import java.util.concurrent.RejectedExecutionException;
34+
import java.util.concurrent.TimeUnit;
3435
import java.util.concurrent.locks.Condition;
3536
import java.util.concurrent.locks.ReentrantLock;
37+
import java.util.function.Consumer;
3638
import java.util.function.Function;
3739
import java.util.function.UnaryOperator;
3840

@@ -278,21 +280,24 @@ private void handleEvents() {
278280
remove(toRun);
279281
continue;
280282
}
281-
if (awaitNs == Long.MAX_VALUE) {
282-
try {
283+
284+
long startIdleNs = time.nanoseconds();
285+
try {
286+
if (awaitNs == Long.MAX_VALUE) {
283287
cond.await();
284-
} catch (InterruptedException e) {
285-
log.warn("Interrupted while waiting for a new event. " +
286-
"Shutting down event queue");
287-
interrupted = true;
288-
}
289-
} else {
290-
try {
288+
} else {
291289
cond.awaitNanos(awaitNs);
292-
} catch (InterruptedException e) {
293-
log.warn("Interrupted while waiting for a deferred event. " +
294-
"Shutting down event queue");
295-
interrupted = true;
290+
}
291+
} catch (InterruptedException e) {
292+
293+
log.warn("Interrupted while waiting for a {} event. " +
294+
"Shutting down event queue", (awaitNs == Long.MAX_VALUE) ? "new" : "deferred");
295+
interrupted = true;
296+
} finally {
297+
if (idleTimeCallback != null) {
298+
long idleNs = Math.max(time.nanoseconds() - startIdleNs, 0);
299+
long idleMs = TimeUnit.NANOSECONDS.toMillis(idleNs);
300+
idleTimeCallback.accept(idleMs);
296301
}
297302
}
298303
} finally {
@@ -440,6 +445,12 @@ int size() {
440445
*/
441446
private boolean interrupted;
442447

448+
/**
449+
* Optional callback for queue idle time tracking.
450+
*/
451+
private Consumer<Long> idleTimeCallback;
452+
453+
443454
public KafkaEventQueue(
444455
Time time,
445456
LogContext logContext,
@@ -503,6 +514,10 @@ public void beginShutdown(String source) {
503514
}
504515
}
505516

517+
public void setIdleTimeCallback(Consumer<Long> callback) {
518+
this.idleTimeCallback = callback;
519+
}
520+
506521
@Override
507522
public int size() {
508523
return eventHandler.size();

0 commit comments

Comments
 (0)