Skip to content

Commit 9115bdc

Browse files
committed
Address feedback
1 parent 9294f46 commit 9115bdc

File tree

8 files changed

+46
-145
lines changed

8 files changed

+46
-145
lines changed

metadata/src/main/java/org/apache/kafka/controller/QuorumController.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -575,6 +575,7 @@ private Throwable handleEventException(
575575

576576
private long updateEventStartMetricsAndGetTime(OptionalLong eventCreatedTimeNs) {
577577
long now = time.nanoseconds();
578+
controllerMetrics.updateIdleEndTime();
578579
controllerMetrics.incrementOperationsStarted();
579580
if (eventCreatedTimeNs.isPresent()) {
580581
controllerMetrics.updateEventQueueTime(NANOSECONDS.toMillis(now - eventCreatedTimeNs.getAsLong()));
@@ -598,7 +599,6 @@ class ControllerEvent implements EventQueue.Event {
598599

599600
@Override
600601
public void run() throws Exception {
601-
controllerMetrics.updateIdleEndTime();
602602
startProcessingTimeNs = OptionalLong.of(
603603
updateEventStartMetricsAndGetTime(OptionalLong.of(eventCreatedTimeNs)));
604604
log.debug("Executing {}.", this);
@@ -650,7 +650,6 @@ CompletableFuture<T> future() {
650650

651651
@Override
652652
public void run() throws Exception {
653-
controllerMetrics.updateIdleEndTime();
654653
startProcessingTimeNs = OptionalLong.of(
655654
updateEventStartMetricsAndGetTime(OptionalLong.of(eventCreatedTimeNs)));
656655
T value = handler.get();
@@ -766,7 +765,6 @@ CompletableFuture<T> future() {
766765

767766
@Override
768767
public void run() throws Exception {
769-
controllerMetrics.updateIdleEndTime();
770768
// Deferred events set the DOES_NOT_UPDATE_QUEUE_TIME flag to prevent incorrectly
771769
// including their deferral time in the event queue time.
772770
startProcessingTimeNs = OptionalLong.of(

metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@
1717

1818
package org.apache.kafka.controller.metrics;
1919

20+
import org.apache.kafka.common.metrics.MetricConfig;
2021
import org.apache.kafka.common.utils.Time;
22+
import org.apache.kafka.raft.internals.TimeRatio;
2123
import org.apache.kafka.server.metrics.KafkaYammerMetrics;
2224

2325
import com.yammer.metrics.core.Gauge;
@@ -70,6 +72,7 @@ public class QuorumControllerMetrics implements AutoCloseable {
7072

7173
private static final String TIME_SINCE_LAST_HEARTBEAT_RECEIVED_METRIC_NAME = "TimeSinceLastHeartbeatReceivedMs";
7274
private static final String BROKER_ID_TAG = "broker";
75+
private static final MetricConfig METRIC_CONFIG = new MetricConfig();
7376

7477
private final Optional<MetricsRegistry> registry;
7578
private final Time time;
@@ -79,7 +82,7 @@ public class QuorumControllerMetrics implements AutoCloseable {
7982
private final AtomicLong lastAppliedRecordTimestamp = new AtomicLong(0);
8083
private final Consumer<Long> eventQueueTimeUpdater;
8184
private final Consumer<Long> eventQueueProcessingTimeUpdater;
82-
public final YammerTimeRatio avgIdleTimeRatio;
85+
public final TimeRatio avgIdleTimeRatio;
8386

8487
private final AtomicLong timedOutHeartbeats = new AtomicLong(0);
8588
private final AtomicLong operationsStarted = new AtomicLong(0);
@@ -115,7 +118,7 @@ public Integer value() {
115118
this.eventQueueTimeUpdater = newHistogram(EVENT_QUEUE_TIME_MS, true);
116119
this.eventQueueProcessingTimeUpdater = newHistogram(EVENT_QUEUE_PROCESSING_TIME_MS, true);
117120
this.sessionTimeoutMs = sessionTimeoutMs;
118-
this.avgIdleTimeRatio = new YammerTimeRatio(1);
121+
this.avgIdleTimeRatio = new TimeRatio(1);
119122
registry.ifPresent(r -> r.newGauge(LAST_APPLIED_RECORD_OFFSET, new Gauge<Long>() {
120123
@Override
121124
public Long value() {
@@ -164,7 +167,12 @@ public Long value() {
164167
return newActiveControllers();
165168
}
166169
}));
167-
registry.ifPresent(r -> r.newGauge(AVERAGE_IDLE_RATIO, this.avgIdleTimeRatio));
170+
registry.ifPresent(r -> r.newGauge(AVERAGE_IDLE_RATIO, new Gauge<Double>() {
171+
@Override
172+
public Double value() {
173+
return avgIdleTimeRatio.measure(METRIC_CONFIG, time.milliseconds());
174+
}
175+
}));
168176
}
169177

170178
public void updateIdleStartTime() {
@@ -176,7 +184,7 @@ public void updateIdleStartTime() {
176184
public void updateIdleEndTime() {
177185
if (this.idleStartTime.isPresent()) {
178186
long idleDurationMs = Math.max(time.milliseconds() - idleStartTime.getAsLong(), 0);
179-
avgIdleTimeRatio.record((double) idleDurationMs, time.milliseconds());
187+
avgIdleTimeRatio.record(METRIC_CONFIG, (double) idleDurationMs, time.milliseconds());
180188
idleStartTime = OptionalLong.empty();
181189
}
182190
}

metadata/src/main/java/org/apache/kafka/controller/metrics/YammerTimeRatio.java

Lines changed: 0 additions & 56 deletions
This file was deleted.

metadata/src/test/java/org/apache/kafka/controller/metrics/QuorumControllerMetricsTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ public void testTimeSinceLastHeartbeatReceivedMs() {
190190
}
191191

192192
@Test
193-
public void testYammerTimeRatioIdleTimeTracking() {
193+
public void testAvgIdleRatio() {
194194
MetricsRegistry registry = new MetricsRegistry();
195195
MockTime time = new MockTime();
196196
try (QuorumControllerMetrics metrics = new QuorumControllerMetrics(Optional.of(registry), time, 9000)) {

raft/src/main/java/org/apache/kafka/raft/internals/KafkaRaftMetrics.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ public KafkaRaftMetrics(Metrics metrics, String metricGrpPrefix) {
137137
"The ratio of time the Raft IO thread is idle as opposed to " +
138138
"doing work (e.g. handling requests or replicating from the leader)"
139139
),
140-
new KafkaTimeRatio(1.0)
140+
new TimeRatio(1.0)
141141
);
142142
}
143143

raft/src/main/java/org/apache/kafka/raft/internals/KafkaTimeRatio.java

Lines changed: 0 additions & 44 deletions
This file was deleted.

raft/src/main/java/org/apache/kafka/raft/internals/TimeRatio.java

Lines changed: 16 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,20 @@
1616
*/
1717
package org.apache.kafka.raft.internals;
1818

19+
import org.apache.kafka.common.metrics.MeasurableStat;
20+
import org.apache.kafka.common.metrics.MetricConfig;
21+
1922
/**
20-
* This metrics agnostic implementation maintain an approximate ratio of
21-
* the duration of a specific event over all time. For example, this can
22-
* be used to compute the ratio of time that a thread is busy or idle. The value
23-
* is approximate since the measurement and recording intervals may not be aligned.
23+
* Maintains an approximate ratio of the duration of a specific event
24+
* over all time. For example, this can be used to compute the ratio of
25+
* time that a thread is busy or idle. The value is approximate since the
26+
* measurement and recording intervals may not be aligned.
2427
*
2528
* Note that the duration of the event is assumed to be small relative to
2629
* the interval of measurement.
30+
*
2731
*/
28-
29-
public class TimeRatio {
32+
public class TimeRatio implements MeasurableStat {
3033
private long intervalStartTimestampMs = -1;
3134
private long lastRecordedTimestampMs = -1;
3235
private double totalRecordedDurationMs = 0;
@@ -37,17 +40,12 @@ public TimeRatio(double defaultRatio) {
3740
if (defaultRatio < 0.0 || defaultRatio > 1.0) {
3841
throw new IllegalArgumentException("Invalid ratio: value " + defaultRatio + " is not between 0 and 1.");
3942
}
43+
4044
this.defaultRatio = defaultRatio;
4145
}
4246

43-
/**
44-
* Measure the ratio of the total recorded duration over the interval duration.
45-
* If no recordings have been captured, it returns the default ratio.
46-
* After measuring, it resets the recorded duration and starts a new interval.
47-
*
48-
* @return The ratio of total recorded duration to the interval duration
49-
*/
50-
public double measure() {
47+
@Override
48+
public double measure(MetricConfig config, long currentTimestampMs) {
5149
if (lastRecordedTimestampMs < 0) {
5250
// Return the default value if no recordings have been captured.
5351
return defaultRatio;
@@ -70,15 +68,8 @@ public double measure() {
7068
}
7169
}
7270

73-
/**
74-
* Record the duration of an event at the current timestamp.
75-
* If this is the first record, it initializes the interval start timestamp.
76-
* Otherwise, it updates the total recorded duration and last recorded timestamp.
77-
*
78-
* @param value The duration of the event in milliseconds
79-
* @param currentTimestampMs The current time in milliseconds
80-
*/
81-
public void record(double value, long currentTimestampMs) {
71+
@Override
72+
public void record(MetricConfig config, double value, long currentTimestampMs) {
8273
if (intervalStartTimestampMs < 0) {
8374
// Discard the initial value since the value occurred prior to the interval start
8475
intervalStartTimestampMs = currentTimestampMs;
@@ -87,4 +78,5 @@ public void record(double value, long currentTimestampMs) {
8778
lastRecordedTimestampMs = currentTimestampMs;
8879
}
8980
}
90-
}
81+
82+
}

raft/src/test/java/org/apache/kafka/raft/internals/TimeRatioTest.java

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
*/
1717
package org.apache.kafka.raft.internals;
1818

19+
import org.apache.kafka.common.metrics.MetricConfig;
1920
import org.apache.kafka.common.utils.MockTime;
2021

2122
import org.junit.jupiter.api.Test;
@@ -26,40 +27,42 @@ class TimeRatioTest {
2627

2728
@Test
2829
public void testRatio() {
30+
MetricConfig config = new MetricConfig();
2931
MockTime time = new MockTime();
3032
TimeRatio ratio = new TimeRatio(1.0);
3133

32-
ratio.record(0.0, time.milliseconds());
34+
ratio.record(config, 0.0, time.milliseconds());
3335
time.sleep(10);
34-
ratio.record(10, time.milliseconds());
36+
ratio.record(config, 10, time.milliseconds());
3537
time.sleep(10);
36-
ratio.record(0, time.milliseconds());
37-
assertEquals(0.5, ratio.measure());
38+
ratio.record(config, 0, time.milliseconds());
39+
assertEquals(0.5, ratio.measure(config, time.milliseconds()));
3840

3941
time.sleep(10);
40-
ratio.record(10, time.milliseconds());
42+
ratio.record(config, 10, time.milliseconds());
4143
time.sleep(40);
42-
ratio.record(0, time.milliseconds());
43-
assertEquals(0.2, ratio.measure());
44+
ratio.record(config, 0, time.milliseconds());
45+
assertEquals(0.2, ratio.measure(config, time.milliseconds()));
4446
}
4547

4648
@Test
4749
public void testRatioMisalignedWindow() {
50+
MetricConfig config = new MetricConfig();
4851
MockTime time = new MockTime();
4952
TimeRatio ratio = new TimeRatio(1.0);
5053

51-
ratio.record(0.0, time.milliseconds());
54+
ratio.record(config, 0.0, time.milliseconds());
5255
time.sleep(10);
53-
ratio.record(10, time.milliseconds());
56+
ratio.record(config, 10, time.milliseconds());
5457
time.sleep(10);
5558

5659
// No recordings, so the last 10ms are not counted.
57-
assertEquals(1.0, ratio.measure());
60+
assertEquals(1.0, ratio.measure(config, time.milliseconds()));
5861

5962
// Now the measurement of 5ms arrives. We measure the time since the last
6063
// recording, so 5ms/10ms = 0.5.
61-
ratio.record(5, time.milliseconds());
62-
assertEquals(0.5, ratio.measure());
64+
ratio.record(config, 5, time.milliseconds());
65+
assertEquals(0.5, ratio.measure(config, time.milliseconds()));
6366
}
6467

6568
}

0 commit comments

Comments
 (0)