Skip to content

Commit b187791

Browse files
Copilottrask
andcommitted
Follow OpenTelemetryMetricsReporter pattern for property validation
Co-authored-by: trask <[email protected]>
1 parent ce0eafc commit b187791

File tree

2 files changed

+54
-31
lines changed

2 files changed

+54
-31
lines changed

instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/TracingConsumerInterceptor.java

Lines changed: 29 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -63,25 +63,37 @@ public void configure(Map<String, ?> configs) {
6363
consumerGroup = Objects.toString(configs.get(ConsumerConfig.GROUP_ID_CONFIG), null);
6464
clientId = Objects.toString(configs.get(ConsumerConfig.CLIENT_ID_CONFIG), null);
6565

66-
// Try to get KafkaTelemetry from config
6766
Object telemetrySupplier = configs.get(KafkaTelemetry.CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER);
68-
if (telemetrySupplier instanceof Supplier) {
69-
Object kafkaTelemetry = ((Supplier<?>) telemetrySupplier).get();
70-
if (kafkaTelemetry instanceof KafkaTelemetry) {
71-
this.telemetry = (KafkaTelemetry) kafkaTelemetry;
72-
return;
73-
}
67+
if (telemetrySupplier == null) {
68+
// Fallback to GlobalOpenTelemetry if not configured
69+
this.telemetry =
70+
KafkaTelemetry.builder(GlobalOpenTelemetry.get())
71+
.setMessagingReceiveInstrumentationEnabled(
72+
ConfigPropertiesUtil.getBoolean(
73+
"otel.instrumentation.messaging.experimental.receive-telemetry.enabled",
74+
false))
75+
.setCapturedHeaders(
76+
ConfigPropertiesUtil.getList(
77+
"otel.instrumentation.messaging.experimental.capture-headers", emptyList()))
78+
.build();
79+
return;
7480
}
7581

76-
// Fallback to GlobalOpenTelemetry if not configured
77-
this.telemetry =
78-
KafkaTelemetry.builder(GlobalOpenTelemetry.get())
79-
.setMessagingReceiveInstrumentationEnabled(
80-
ConfigPropertiesUtil.getBoolean(
81-
"otel.instrumentation.messaging.experimental.receive-telemetry.enabled", false))
82-
.setCapturedHeaders(
83-
ConfigPropertiesUtil.getList(
84-
"otel.instrumentation.messaging.experimental.capture-headers", emptyList()))
85-
.build();
82+
if (!(telemetrySupplier instanceof Supplier)) {
83+
throw new IllegalStateException(
84+
"Configuration property "
85+
+ KafkaTelemetry.CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER
86+
+ " is not instance of Supplier");
87+
}
88+
89+
Object kafkaTelemetry = ((Supplier<?>) telemetrySupplier).get();
90+
if (!(kafkaTelemetry instanceof KafkaTelemetry)) {
91+
throw new IllegalStateException(
92+
"Configuration property "
93+
+ KafkaTelemetry.CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER
94+
+ " supplier does not return KafkaTelemetry instance");
95+
}
96+
97+
this.telemetry = (KafkaTelemetry) kafkaTelemetry;
8698
}
8799
}

instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/TracingProducerInterceptor.java

Lines changed: 25 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -48,22 +48,33 @@ public void close() {}
4848
public void configure(Map<String, ?> configs) {
4949
clientId = Objects.toString(configs.get(ProducerConfig.CLIENT_ID_CONFIG), null);
5050

51-
// Try to get KafkaTelemetry from config
5251
Object telemetrySupplier = configs.get(KafkaTelemetry.CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER);
53-
if (telemetrySupplier instanceof Supplier) {
54-
Object kafkaTelemetry = ((Supplier<?>) telemetrySupplier).get();
55-
if (kafkaTelemetry instanceof KafkaTelemetry) {
56-
this.telemetry = (KafkaTelemetry) kafkaTelemetry;
57-
return;
58-
}
52+
if (telemetrySupplier == null) {
53+
// Fallback to GlobalOpenTelemetry if not configured
54+
this.telemetry =
55+
KafkaTelemetry.builder(GlobalOpenTelemetry.get())
56+
.setCapturedHeaders(
57+
ConfigPropertiesUtil.getList(
58+
"otel.instrumentation.messaging.experimental.capture-headers", emptyList()))
59+
.build();
60+
return;
5961
}
6062

61-
// Fallback to GlobalOpenTelemetry if not configured
62-
this.telemetry =
63-
KafkaTelemetry.builder(GlobalOpenTelemetry.get())
64-
.setCapturedHeaders(
65-
ConfigPropertiesUtil.getList(
66-
"otel.instrumentation.messaging.experimental.capture-headers", emptyList()))
67-
.build();
63+
if (!(telemetrySupplier instanceof Supplier)) {
64+
throw new IllegalStateException(
65+
"Configuration property "
66+
+ KafkaTelemetry.CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER
67+
+ " is not instance of Supplier");
68+
}
69+
70+
Object kafkaTelemetry = ((Supplier<?>) telemetrySupplier).get();
71+
if (!(kafkaTelemetry instanceof KafkaTelemetry)) {
72+
throw new IllegalStateException(
73+
"Configuration property "
74+
+ KafkaTelemetry.CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER
75+
+ " supplier does not return KafkaTelemetry instance");
76+
}
77+
78+
this.telemetry = (KafkaTelemetry) kafkaTelemetry;
6879
}
6980
}

0 commit comments

Comments
 (0)