Skip to content

Commit 18994d1

Browse files
committed
Add AvgIdleRatio metrics to QuorumControllerMetrics
1 parent 9983331 commit 18994d1

File tree

8 files changed

+198
-33
lines changed

8 files changed

+198
-33
lines changed

metadata/src/main/java/org/apache/kafka/controller/QuorumController.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -532,6 +532,7 @@ private void handleEventEnd(String name, long startProcessingTimeNs) {
532532
MICROSECONDS.convert(deltaNs, NANOSECONDS));
533533
performanceMonitor.observeEvent(name, deltaNs);
534534
controllerMetrics.updateEventQueueProcessingTime(NANOSECONDS.toMillis(deltaNs));
535+
controllerMetrics.updateIdleStartTime();
535536
}
536537

537538
private Throwable handleEventException(
@@ -549,6 +550,7 @@ private Throwable handleEventException(
549550
} else {
550551
deltaUs = OptionalLong.empty();
551552
}
553+
controllerMetrics.updateIdleStartTime();
552554
EventHandlerExceptionInfo info = EventHandlerExceptionInfo.
553555
fromInternal(exception, this::latestController);
554556
int epoch = curClaimEpoch;
@@ -596,6 +598,7 @@ class ControllerEvent implements EventQueue.Event {
596598

597599
@Override
598600
public void run() throws Exception {
601+
controllerMetrics.updateIdleEndTime();
599602
startProcessingTimeNs = OptionalLong.of(
600603
updateEventStartMetricsAndGetTime(OptionalLong.of(eventCreatedTimeNs)));
601604
log.debug("Executing {}.", this);
@@ -647,6 +650,7 @@ CompletableFuture<T> future() {
647650

648651
@Override
649652
public void run() throws Exception {
653+
controllerMetrics.updateIdleEndTime();
650654
startProcessingTimeNs = OptionalLong.of(
651655
updateEventStartMetricsAndGetTime(OptionalLong.of(eventCreatedTimeNs)));
652656
T value = handler.get();
@@ -762,6 +766,7 @@ CompletableFuture<T> future() {
762766

763767
@Override
764768
public void run() throws Exception {
769+
controllerMetrics.updateIdleEndTime();
765770
// Deferred events set the DOES_NOT_UPDATE_QUEUE_TIME flag to prevent incorrectly
766771
// including their deferral time in the event queue time.
767772
startProcessingTimeNs = OptionalLong.of(
@@ -831,6 +836,7 @@ public void run() throws Exception {
831836

832837
// Remember the latest offset and future if it is not already completed
833838
if (!future.isDone()) {
839+
controllerMetrics.updateIdleStartTime();
834840
deferredEventQueue.add(resultAndOffset.offset(), this);
835841
}
836842
}
@@ -842,6 +848,7 @@ public void handleException(Throwable exception) {
842848

843849
@Override
844850
public void complete(Throwable exception) {
851+
controllerMetrics.updateIdleEndTime();
845852
if (exception == null) {
846853
handleEventEnd(this.toString(), startProcessingTimeNs.getAsLong());
847854
future.complete(resultAndOffset.response());

metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import java.util.List;
3030
import java.util.Map;
3131
import java.util.Optional;
32+
import java.util.OptionalLong;
3233
import java.util.concurrent.ConcurrentHashMap;
3334
import java.util.concurrent.atomic.AtomicLong;
3435
import java.util.function.Consumer;
@@ -48,6 +49,8 @@ public class QuorumControllerMetrics implements AutoCloseable {
4849
"ControllerEventManager", "EventQueueTimeMs");
4950
private static final MetricName EVENT_QUEUE_PROCESSING_TIME_MS = getMetricName(
5051
"ControllerEventManager", "EventQueueProcessingTimeMs");
52+
private static final MetricName AVERAGE_IDLE_RATIO = getMetricName(
53+
"ControllerEventManager", "AvgIdleRatio");
5154
private static final MetricName LAST_APPLIED_RECORD_OFFSET = getMetricName(
5255
"KafkaController", "LastAppliedRecordOffset");
5356
private static final MetricName LAST_COMMITTED_RECORD_OFFSET = getMetricName(
@@ -64,6 +67,7 @@ public class QuorumControllerMetrics implements AutoCloseable {
6467
"KafkaController", "EventQueueOperationsTimedOutCount");
6568
private static final MetricName NEW_ACTIVE_CONTROLLERS_COUNT = getMetricName(
6669
"KafkaController", "NewActiveControllersCount");
70+
6771
private static final String TIME_SINCE_LAST_HEARTBEAT_RECEIVED_METRIC_NAME = "TimeSinceLastHeartbeatReceivedMs";
6872
private static final String BROKER_ID_TAG = "broker";
6973

@@ -75,13 +79,15 @@ public class QuorumControllerMetrics implements AutoCloseable {
7579
private final AtomicLong lastAppliedRecordTimestamp = new AtomicLong(0);
7680
private final Consumer<Long> eventQueueTimeUpdater;
7781
private final Consumer<Long> eventQueueProcessingTimeUpdater;
82+
public final YammerTimeRatio avgIdleTimeRatio;
7883

7984
private final AtomicLong timedOutHeartbeats = new AtomicLong(0);
8085
private final AtomicLong operationsStarted = new AtomicLong(0);
8186
private final AtomicLong operationsTimedOut = new AtomicLong(0);
8287
private final AtomicLong newActiveControllers = new AtomicLong(0);
8388
private final Map<Integer, Long> brokerContactTimesMs = new ConcurrentHashMap<>();
8489
private final int sessionTimeoutMs;
90+
private volatile OptionalLong idleStartTime = OptionalLong.empty();
8591

8692
private Consumer<Long> newHistogram(MetricName name, boolean biased) {
8793
if (registry.isPresent()) {
@@ -109,6 +115,7 @@ public Integer value() {
109115
this.eventQueueTimeUpdater = newHistogram(EVENT_QUEUE_TIME_MS, true);
110116
this.eventQueueProcessingTimeUpdater = newHistogram(EVENT_QUEUE_PROCESSING_TIME_MS, true);
111117
this.sessionTimeoutMs = sessionTimeoutMs;
118+
this.avgIdleTimeRatio = new YammerTimeRatio(1);
112119
registry.ifPresent(r -> r.newGauge(LAST_APPLIED_RECORD_OFFSET, new Gauge<Long>() {
113120
@Override
114121
public Long value() {
@@ -157,8 +164,22 @@ public Long value() {
157164
return newActiveControllers();
158165
}
159166
}));
167+
registry.ifPresent(r -> r.newGauge(AVERAGE_IDLE_RATIO, this.avgIdleTimeRatio));
168+
}
169+
170+
public void updateIdleStartTime() {
171+
if (idleStartTime.isEmpty()) {
172+
idleStartTime = OptionalLong.of(time.milliseconds());
173+
}
160174
}
161175

176+
public void updateIdleEndTime() {
177+
if (this.idleStartTime.isPresent()) {
178+
long idleDurationMs = Math.max(time.milliseconds() - idleStartTime.getAsLong(), 0);
179+
avgIdleTimeRatio.record((double) idleDurationMs, time.milliseconds());
180+
idleStartTime = OptionalLong.empty();
181+
}
182+
}
162183
public void addTimeSinceLastHeartbeatMetric(int brokerId) {
163184
brokerContactTimesMs.put(brokerId, time.milliseconds());
164185
registry.ifPresent(r -> r.newGauge(
@@ -291,7 +312,8 @@ public void close() {
291312
TIMED_OUT_BROKER_HEARTBEAT_COUNT,
292313
EVENT_QUEUE_OPERATIONS_STARTED_COUNT,
293314
EVENT_QUEUE_OPERATIONS_TIMED_OUT_COUNT,
294-
NEW_ACTIVE_CONTROLLERS_COUNT
315+
NEW_ACTIVE_CONTROLLERS_COUNT,
316+
AVERAGE_IDLE_RATIO
295317
).forEach(r::removeMetric));
296318
removeTimeSinceLastHeartbeatMetrics();
297319
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.kafka.controller.metrics;
19+
20+
import com.yammer.metrics.core.Gauge;
21+
import org.apache.kafka.raft.internals.TimeRatio;
22+
23+
/**
24+
* Yammer Metrics facade for TimeRatio.
25+
* This class provides a Gauge interface for Yammer metrics registry
26+
* while using the same shared TimeRatio implementation that
27+
* the Kafka Metrics KafkaTimeRatio uses.
28+
*/
29+
public class YammerTimeRatio extends Gauge<Double> {
30+
private final TimeRatio timeRatio;
31+
32+
public YammerTimeRatio(double defaultRatio) {
33+
this.timeRatio = new TimeRatio(defaultRatio);
34+
}
35+
36+
/**
37+
* Record an idle/wait duration.
38+
*
39+
* @param idleDurationMs The duration of the idle/wait period in milliseconds
40+
* @param currentTimeMs The current time in milliseconds
41+
*/
42+
public void record(double idleDurationMs, long currentTimeMs) {
43+
timeRatio.record(idleDurationMs, currentTimeMs);
44+
}
45+
46+
/**
47+
* Get the current idle ratio for Yammer Metrics.
48+
*
49+
* @return The ratio of idle time to total time (between 0.0 and 1.0)
50+
*/
51+
@Override
52+
public Double value() { return timeRatio.measure(); }
53+
}

metadata/src/test/java/org/apache/kafka/controller/metrics/QuorumControllerMetricsTest.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ public void testMetricNames() {
4646
"kafka.controller:type=ControllerEventManager,name=EventQueueProcessingTimeMs",
4747
"kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs",
4848
"kafka.controller:type=KafkaController,name=ActiveControllerCount",
49+
"kafka.controller:type=KafkaController,name=AvgIdleRatio",
4950
"kafka.controller:type=KafkaController,name=EventQueueOperationsStartedCount",
5051
"kafka.controller:type=KafkaController,name=EventQueueOperationsTimedOutCount",
5152
"kafka.controller:type=KafkaController,name=LastAppliedRecordLagMs",
@@ -188,6 +189,39 @@ public void testTimeSinceLastHeartbeatReceivedMs() {
188189
}
189190
}
190191

192+
@Test
193+
public void testYammerTimeRatioIdleTimeTracking() {
194+
MetricsRegistry registry = new MetricsRegistry();
195+
MockTime time = new MockTime();
196+
try (QuorumControllerMetrics metrics = new QuorumControllerMetrics(Optional.of(registry), time, 9000)) {
197+
Gauge<Double> avgIdleRatio = (Gauge<Double>) registry.allMetrics().get(metricName("KafkaController", "AvgIdleRatio"));
198+
199+
// No idle time recorded yet
200+
assertEquals(1.0, avgIdleRatio.value());
201+
202+
metrics.updateIdleStartTime();
203+
time.sleep(10); //initial record is ignored
204+
metrics.updateIdleEndTime();
205+
time.sleep(20); // wait 20ms non-idle
206+
metrics.updateIdleStartTime();
207+
time.sleep(20); // 20ms idle
208+
metrics.updateIdleEndTime();
209+
assertEquals(0.5, avgIdleRatio.value());
210+
211+
metrics.updateIdleStartTime();
212+
time.sleep(0);
213+
metrics.updateIdleEndTime();
214+
time.sleep(19);
215+
metrics.updateIdleStartTime();
216+
time.sleep(1);
217+
metrics.updateIdleEndTime();
218+
assertEquals(0.05, avgIdleRatio.value());
219+
220+
} finally {
221+
registry.shutdown();
222+
}
223+
}
224+
191225
private static void assertMetricHistogram(MetricsRegistry registry, MetricName metricName, long count, double sum) {
192226
Histogram histogram = (Histogram) registry.allMetrics().get(metricName);
193227

raft/src/main/java/org/apache/kafka/raft/internals/KafkaRaftMetrics.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ public KafkaRaftMetrics(Metrics metrics, String metricGrpPrefix) {
137137
"The ratio of time the Raft IO thread is idle as opposed to " +
138138
"doing work (e.g. handling requests or replicating from the leader)"
139139
),
140-
new TimeRatio(1.0)
140+
new KafkaTimeRatio(1.0)
141141
);
142142
}
143143

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.kafka.raft.internals;
19+
20+
import org.apache.kafka.common.metrics.MeasurableStat;
21+
import org.apache.kafka.common.metrics.MetricConfig;
22+
23+
/**
24+
* Kafka Metrics facade for TimeRatio.
25+
* This facade adapts the TimeRatio core implementation to work with
26+
* Kafka Metrics by implementing MeasurableStat interface.
27+
*/
28+
public class KafkaTimeRatio implements MeasurableStat {
29+
private final TimeRatio timeRatio;
30+
31+
public KafkaTimeRatio(double defaultRatio) {
32+
this.timeRatio = new TimeRatio(defaultRatio);
33+
}
34+
35+
@Override
36+
public double measure(MetricConfig config, long currentTimestampMs) {
37+
return timeRatio.measure();
38+
}
39+
40+
@Override
41+
public void record(MetricConfig config, double value, long currentTimestampMs) {
42+
timeRatio.record(value, currentTimestampMs);
43+
}
44+
}

raft/src/main/java/org/apache/kafka/raft/internals/TimeRatio.java

Lines changed: 24 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -16,20 +16,17 @@
1616
*/
1717
package org.apache.kafka.raft.internals;
1818

19-
import org.apache.kafka.common.metrics.MeasurableStat;
20-
import org.apache.kafka.common.metrics.MetricConfig;
21-
2219
/**
23-
* Maintains an approximate ratio of the duration of a specific event
24-
* over all time. For example, this can be used to compute the ratio of
25-
* time that a thread is busy or idle. The value is approximate since the
26-
* measurement and recording intervals may not be aligned.
20+
* This metrics agnostic implementation maintain an approximate ratio of
21+
* the duration of a specific event over all time. For example, this can
22+
* be used to compute the ratio of time that a thread is busy or idle. The value
23+
* is approximate since the measurement and recording intervals may not be aligned.
2724
*
2825
* Note that the duration of the event is assumed to be small relative to
2926
* the interval of measurement.
30-
*
3127
*/
32-
public class TimeRatio implements MeasurableStat {
28+
29+
public class TimeRatio {
3330
private long intervalStartTimestampMs = -1;
3431
private long lastRecordedTimestampMs = -1;
3532
private double totalRecordedDurationMs = 0;
@@ -40,12 +37,17 @@ public TimeRatio(double defaultRatio) {
4037
if (defaultRatio < 0.0 || defaultRatio > 1.0) {
4138
throw new IllegalArgumentException("Invalid ratio: value " + defaultRatio + " is not between 0 and 1.");
4239
}
43-
4440
this.defaultRatio = defaultRatio;
4541
}
4642

47-
@Override
48-
public double measure(MetricConfig config, long currentTimestampMs) {
43+
/**
44+
* Measure the ratio of the total recorded duration over the interval duration.
45+
* If no recordings have been captured, it returns the default ratio.
46+
* After measuring, it resets the recorded duration and starts a new interval.
47+
*
48+
* @return The ratio of total recorded duration to the interval duration
49+
*/
50+
public double measure() {
4951
if (lastRecordedTimestampMs < 0) {
5052
// Return the default value if no recordings have been captured.
5153
return defaultRatio;
@@ -68,8 +70,15 @@ public double measure(MetricConfig config, long currentTimestampMs) {
6870
}
6971
}
7072

71-
@Override
72-
public void record(MetricConfig config, double value, long currentTimestampMs) {
73+
/**
74+
* Record the duration of an event at the current timestamp.
75+
* If this is the first record, it initializes the interval start timestamp.
76+
* Otherwise, it updates the total recorded duration and last recorded timestamp.
77+
*
78+
* @param value The duration of the event in milliseconds
79+
* @param currentTimestampMs The current time in milliseconds
80+
*/
81+
public void record(double value, long currentTimestampMs) {
7382
if (intervalStartTimestampMs < 0) {
7483
// Discard the initial value since the value occurred prior to the interval start
7584
intervalStartTimestampMs = currentTimestampMs;
@@ -78,5 +87,4 @@ public void record(MetricConfig config, double value, long currentTimestampMs) {
7887
lastRecordedTimestampMs = currentTimestampMs;
7988
}
8089
}
81-
82-
}
90+
}

0 commit comments

Comments
 (0)