Skip to content

Commit 5736d50

Browse files
authored
KAFKA-19684 Move Gauge#value to MetricValueProvider (#20543)
KIP-188 introduced MetricValueProvider, adding Measurable and Gauge as its sub interfaces. However, this left a legacy [issue](#3705 (comment)): move the value method from Gauge to the super interface, MetricValueProvider. This PR moves the value method from Gauge to MetricValueProvider and provides a default implementation in Measurable. This unifies the methods used by Gauge and Measurable to obtain monitoring values. Reviewers: Ismael Juma <[email protected]>, Chia-Ping Tsai <[email protected]>
1 parent bc2f23b commit 5736d50

File tree

5 files changed

+61
-28
lines changed

5 files changed

+61
-28
lines changed

clients/src/main/java/org/apache/kafka/common/metrics/Gauge.java

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,4 @@
2020
* A gauge metric is an instantaneous reading of a particular value.
2121
*/
2222
@FunctionalInterface
23-
public interface Gauge<T> extends MetricValueProvider<T> {
24-
25-
/**
26-
* Returns the current value associated with this gauge.
27-
* @param config The configuration for this metric
28-
* @param now The POSIX time in milliseconds the measurement is being taken
29-
*/
30-
T value(MetricConfig config, long now);
31-
32-
}
23+
public interface Gauge<T> extends MetricValueProvider<T> { }

clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import org.apache.kafka.common.MetricName;
2121
import org.apache.kafka.common.utils.Time;
2222

23+
import java.util.Objects;
24+
2325
public final class KafkaMetric implements Metric {
2426

2527
private final MetricName metricName;
@@ -41,9 +43,7 @@ public KafkaMetric(Object lock, MetricName metricName, MetricValueProvider<?> va
4143
MetricConfig config, Time time) {
4244
this.metricName = metricName;
4345
this.lock = lock;
44-
if (!(valueProvider instanceof Measurable) && !(valueProvider instanceof Gauge))
45-
throw new IllegalArgumentException("Unsupported metric value provider of class " + valueProvider.getClass());
46-
this.metricValueProvider = valueProvider;
46+
this.metricValueProvider = Objects.requireNonNull(valueProvider, "valueProvider must not be null");
4747
this.config = config;
4848
this.time = time;
4949
}
@@ -67,20 +67,15 @@ public MetricName metricName() {
6767
}
6868

6969
/**
70-
* Take the metric and return the value, which could be a {@link Measurable} or a {@link Gauge}
70+
* Take the metric and return the value via {@link MetricValueProvider#value(MetricConfig, long)}.
71+
*
7172
* @return Return the metric value
72-
* @throws IllegalStateException if the underlying metric is not a {@link Measurable} or a {@link Gauge}.
7373
*/
7474
@Override
7575
public Object metricValue() {
7676
long now = time.milliseconds();
7777
synchronized (this.lock) {
78-
if (isMeasurable())
79-
return ((Measurable) metricValueProvider).measure(config, now);
80-
else if (this.metricValueProvider instanceof Gauge)
81-
return ((Gauge<?>) metricValueProvider).value(config, now);
82-
else
83-
throw new IllegalStateException("Not a valid metric: " + this.metricValueProvider.getClass());
78+
return metricValueProvider.value(config, now);
8479
}
8580
}
8681

clients/src/main/java/org/apache/kafka/common/metrics/Measurable.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,26 @@
2222
public interface Measurable extends MetricValueProvider<Double> {
2323

2424
/**
25-
* Measure this quantity and return the result as a double
25+
* Measure this quantity and return the result as a double.
26+
*
2627
* @param config The configuration for this metric
2728
* @param now The POSIX time in milliseconds the measurement is being taken
2829
* @return The measured value
2930
*/
3031
double measure(MetricConfig config, long now);
3132

33+
/**
34+
* Measure this quantity and return the result as a double.
35+
*
36+
* This default implementation delegates to {@link #measure(MetricConfig, long)}.
37+
*
38+
* @param config The configuration for this metric
39+
* @param now The POSIX time in milliseconds the measurement is being taken
40+
* @return The measured value as a {@link Double}
41+
*/
42+
@Override
43+
default Double value(MetricConfig config, long now) {
44+
return measure(config, now);
45+
}
46+
3247
}

clients/src/main/java/org/apache/kafka/common/metrics/MetricValueProvider.java

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,17 @@
1919
/**
2020
* Super-interface for {@link Measurable} or {@link Gauge} that provides
2121
* metric values.
22-
* <p>
23-
* In the future for Java8 and above, {@link Gauge#value(MetricConfig, long)} will be
24-
* moved to this interface with a default implementation in {@link Measurable} that returns
25-
* {@link Measurable#measure(MetricConfig, long)}.
26-
* </p>
2722
*/
28-
public interface MetricValueProvider<T> { }
23+
@FunctionalInterface
24+
public interface MetricValueProvider<T> {
25+
26+
/**
27+
* Returns the current value associated with this metric.
28+
*
29+
* @param config The configuration for this metric
30+
* @param now The POSIX time in milliseconds the measurement is being taken
31+
* @return the current metric value
32+
*/
33+
T value(MetricConfig config, long now);
34+
35+
}

clients/src/test/java/org/apache/kafka/common/metrics/KafkaMetricTest.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,4 +48,29 @@ public void testIsMeasurableWithGaugeProvider() {
4848
assertThrows(IllegalStateException.class, metric::measurable);
4949
}
5050

51+
@Test
52+
public void testMeasurableValueReturnsZeroWhenNotMeasurable() {
53+
MockTime time = new MockTime();
54+
MetricConfig config = new MetricConfig();
55+
Gauge<Integer> gauge = (c, now) -> 7;
56+
57+
KafkaMetric metric = new KafkaMetric(new Object(), METRIC_NAME, gauge, config, time);
58+
assertEquals(0.0d, metric.measurableValue(time.milliseconds()), 0.0d);
59+
}
60+
61+
@Test
62+
public void testKafkaMetricAcceptsNonMeasurableNonGaugeProvider() {
63+
MetricValueProvider<String> provider = (config, now) -> "metric value provider";
64+
KafkaMetric metric = new KafkaMetric(new Object(), METRIC_NAME, provider, new MetricConfig(), new MockTime());
65+
66+
Object value = metric.metricValue();
67+
assertEquals("metric value provider", value);
68+
}
69+
70+
@Test
71+
public void testConstructorWithNullProvider() {
72+
assertThrows(NullPointerException.class, () ->
73+
new KafkaMetric(new Object(), METRIC_NAME, null, new MetricConfig(), new MockTime())
74+
);
75+
}
5176
}

0 commit comments

Comments
 (0)