Skip to content

Commit b207b25

Browse files
committed
simplify
1 parent 30bc6dd commit b207b25

File tree

5 files changed

+34
-33
lines changed

5 files changed

+34
-33
lines changed

instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/KafkaTelemetry.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.OpenTelemetryMetricsReporter;
2727
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.OpenTelemetrySupplier;
2828
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.TracingList;
29+
import io.opentelemetry.instrumentation.kafkaclients.v2_6.internal.KafkaTelemetrySupplier;
2930
import io.opentelemetry.instrumentation.kafkaclients.v2_6.internal.OpenTelemetryConsumerInterceptor;
3031
import io.opentelemetry.instrumentation.kafkaclients.v2_6.internal.OpenTelemetryProducerInterceptor;
3132
import java.lang.reflect.InvocationTargetException;
Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,9 @@
33
* SPDX-License-Identifier: Apache-2.0
44
*/
55

6-
package io.opentelemetry.instrumentation.kafkaclients.v2_6;
6+
package io.opentelemetry.instrumentation.kafkaclients.v2_6.internal;
77

8+
import io.opentelemetry.instrumentation.kafkaclients.v2_6.KafkaTelemetry;
89
import java.io.Serializable;
910
import java.util.Objects;
1011
import java.util.function.Supplier;
@@ -16,17 +17,17 @@
1617
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
1718
* at any time.
1819
*/
19-
final class KafkaTelemetrySupplier implements Supplier<Object>, Serializable {
20+
public final class KafkaTelemetrySupplier implements Supplier<KafkaTelemetry>, Serializable {
2021
private static final long serialVersionUID = 1L;
21-
private final transient Object kafkaTelemetry;
22+
private final transient KafkaTelemetry kafkaTelemetry;
2223

23-
KafkaTelemetrySupplier(Object kafkaTelemetry) {
24+
public KafkaTelemetrySupplier(KafkaTelemetry kafkaTelemetry) {
2425
Objects.requireNonNull(kafkaTelemetry);
2526
this.kafkaTelemetry = kafkaTelemetry;
2627
}
2728

2829
@Override
29-
public Object get() {
30+
public KafkaTelemetry get() {
3031
return kafkaTelemetry;
3132
}
3233

instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/internal/OpenTelemetryConsumerInterceptor.java

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
import io.opentelemetry.instrumentation.kafkaclients.v2_6.KafkaTelemetry;
1414
import java.util.Map;
1515
import java.util.Objects;
16-
import java.util.function.Supplier;
1716
import javax.annotation.Nullable;
1817
import org.apache.kafka.clients.consumer.ConsumerConfig;
1918
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
@@ -69,21 +68,21 @@ public void configure(Map<String, ?> configs) {
6968
return;
7069
}
7170

72-
if (!(telemetrySupplier instanceof Supplier)) {
73-
throw new IllegalStateException(
74-
"Configuration property "
75-
+ CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER
76-
+ " is not instance of Supplier");
77-
}
71+
KafkaTelemetrySupplier supplier =
72+
getProperty(configs, CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER, KafkaTelemetrySupplier.class);
73+
this.telemetry = supplier.get();
74+
}
7875

79-
Object kafkaTelemetry = ((Supplier<?>) telemetrySupplier).get();
80-
if (!(kafkaTelemetry instanceof KafkaTelemetry)) {
76+
@SuppressWarnings("unchecked")
77+
private static <T> T getProperty(Map<String, ?> configs, String key, Class<T> requiredType) {
78+
Object value = configs.get(key);
79+
if (value == null) {
80+
throw new IllegalStateException("Missing required configuration property: " + key);
81+
}
82+
if (!requiredType.isInstance(value)) {
8183
throw new IllegalStateException(
82-
"Configuration property "
83-
+ CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER
84-
+ " supplier does not return KafkaTelemetry instance");
84+
"Configuration property " + key + " is not instance of " + requiredType.getSimpleName());
8585
}
86-
87-
this.telemetry = (KafkaTelemetry) kafkaTelemetry;
86+
return (T) value;
8887
}
8988
}

instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/internal/OpenTelemetryProducerInterceptor.java

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
import io.opentelemetry.instrumentation.kafkaclients.v2_6.KafkaTelemetry;
1010
import java.util.Map;
1111
import java.util.Objects;
12-
import java.util.function.Supplier;
1312
import javax.annotation.Nullable;
1413
import org.apache.kafka.clients.producer.ProducerConfig;
1514
import org.apache.kafka.clients.producer.ProducerInterceptor;
@@ -54,21 +53,21 @@ public void configure(Map<String, ?> configs) {
5453
return;
5554
}
5655

57-
if (!(telemetrySupplier instanceof Supplier)) {
58-
throw new IllegalStateException(
59-
"Configuration property "
60-
+ CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER
61-
+ " is not instance of Supplier");
62-
}
56+
KafkaTelemetrySupplier supplier =
57+
getProperty(configs, CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER, KafkaTelemetrySupplier.class);
58+
this.telemetry = supplier.get();
59+
}
6360

64-
Object kafkaTelemetry = ((Supplier<?>) telemetrySupplier).get();
65-
if (!(kafkaTelemetry instanceof KafkaTelemetry)) {
61+
@SuppressWarnings("unchecked")
62+
private static <T> T getProperty(Map<String, ?> configs, String key, Class<T> requiredType) {
63+
Object value = configs.get(key);
64+
if (value == null) {
65+
throw new IllegalStateException("Missing required configuration property: " + key);
66+
}
67+
if (!requiredType.isInstance(value)) {
6668
throw new IllegalStateException(
67-
"Configuration property "
68-
+ CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER
69-
+ " supplier does not return KafkaTelemetry instance");
69+
"Configuration property " + key + " is not instance of " + requiredType.getSimpleName());
7070
}
71-
72-
this.telemetry = (KafkaTelemetry) kafkaTelemetry;
71+
return (T) value;
7372
}
7473
}

instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/KafkaTelemetryInterceptorTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import static org.assertj.core.api.Assertions.assertThat;
99
import static org.assertj.core.api.Assertions.assertThatThrownBy;
1010

11+
import io.opentelemetry.instrumentation.kafkaclients.v2_6.internal.KafkaTelemetrySupplier;
1112
import io.opentelemetry.instrumentation.kafkaclients.v2_6.internal.OpenTelemetryConsumerInterceptor;
1213
import io.opentelemetry.instrumentation.kafkaclients.v2_6.internal.OpenTelemetryProducerInterceptor;
1314
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;

0 commit comments

Comments
 (0)