-
Notifications
You must be signed in to change notification settings - Fork 1k
Add configurable OpenTelemetry kafka-clients interceptors #14870
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 20 commits
2a012ae
34d68ec
3b68915
ce0eafc
b187791
85ec089
5f6294b
e5ca107
e791fc8
97cbf9d
4f3d30d
a31210c
84e69ad
5419f03
f4e568a
82248c4
f1e7dde
7703b1d
95d1723
3fdd463
f72ae47
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
trask marked this conversation as resolved.
Show resolved
Hide resolved
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @copilot looks like this class is only used in kafka-client-2.6:library module? if so can we move it there? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Moved |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
/* | ||
* Copyright The OpenTelemetry Authors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package io.opentelemetry.instrumentation.kafkaclients.v2_6; | ||
|
||
import java.io.Serializable; | ||
import java.util.Objects; | ||
import java.util.function.Supplier; | ||
|
||
/** | ||
* Wrapper for KafkaTelemetry that can be injected into kafka configuration without breaking | ||
* serialization. | ||
* | ||
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change | ||
* at any time. | ||
*/ | ||
final class KafkaTelemetrySupplier implements Supplier<Object>, Serializable { | ||
private static final long serialVersionUID = 1L; | ||
private final transient Object kafkaTelemetry; | ||
|
||
KafkaTelemetrySupplier(Object kafkaTelemetry) { | ||
Objects.requireNonNull(kafkaTelemetry); | ||
this.kafkaTelemetry = kafkaTelemetry; | ||
} | ||
|
||
@Override | ||
public Object get() { | ||
return kafkaTelemetry; | ||
} | ||
|
||
private Object writeReplace() { | ||
// serialize this object to null | ||
return null; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,92 @@ | ||
/* | ||
* Copyright The OpenTelemetry Authors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package io.opentelemetry.instrumentation.kafkaclients.v2_6; | ||
|
||
import com.google.errorprone.annotations.CanIgnoreReturnValue; | ||
import io.opentelemetry.context.Context; | ||
import io.opentelemetry.instrumentation.api.internal.Timer; | ||
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaConsumerContext; | ||
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaConsumerContextUtil; | ||
import java.util.Map; | ||
import java.util.Objects; | ||
import java.util.function.Supplier; | ||
import javax.annotation.Nullable; | ||
import org.apache.kafka.clients.consumer.ConsumerConfig; | ||
import org.apache.kafka.clients.consumer.ConsumerInterceptor; | ||
import org.apache.kafka.clients.consumer.ConsumerRecords; | ||
import org.apache.kafka.clients.consumer.OffsetAndMetadata; | ||
import org.apache.kafka.common.TopicPartition; | ||
|
||
/** | ||
* A ConsumerInterceptor that adds OpenTelemetry instrumentation. Add this interceptor's class name | ||
* or class via ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG property to your Consumer's properties to | ||
* get it instantiated and used. See more details on ConsumerInterceptor usage in its Javadoc. | ||
* | ||
* <p>To configure the interceptor, use {@link KafkaTelemetry#consumerInterceptorConfigProperties} | ||
* to obtain the configuration properties and add them to your consumer configuration. | ||
* | ||
* @see KafkaTelemetry#consumerInterceptorConfigProperties() | ||
*/ | ||
public class OpenTelemetryConsumerInterceptor<K, V> implements ConsumerInterceptor<K, V> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @copilot mark TracingConsumerIntercepter and TracingProducerInterceptor as deprecated and recommend to use these new classes instead There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Marked both |
||
|
||
public static final String CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER = | ||
"opentelemetry.kafka-telemetry.supplier"; | ||
|
||
@Nullable private KafkaTelemetry telemetry; | ||
private String consumerGroup; | ||
private String clientId; | ||
|
||
@Override | ||
@CanIgnoreReturnValue | ||
public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records) { | ||
if (telemetry == null) { | ||
return records; | ||
} | ||
// timer should be started before fetching ConsumerRecords, but there is no callback for that | ||
Timer timer = Timer.start(); | ||
Context receiveContext = telemetry.buildAndFinishSpan(records, consumerGroup, clientId, timer); | ||
if (receiveContext == null) { | ||
receiveContext = Context.current(); | ||
} | ||
KafkaConsumerContext consumerContext = | ||
KafkaConsumerContextUtil.create(receiveContext, consumerGroup, clientId); | ||
return telemetry.addTracing(records, consumerContext); | ||
} | ||
|
||
@Override | ||
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {} | ||
|
||
@Override | ||
public void close() {} | ||
|
||
@Override | ||
public void configure(Map<String, ?> configs) { | ||
consumerGroup = Objects.toString(configs.get(ConsumerConfig.GROUP_ID_CONFIG), null); | ||
clientId = Objects.toString(configs.get(ConsumerConfig.CLIENT_ID_CONFIG), null); | ||
|
||
Object telemetrySupplier = configs.get(CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER); | ||
if (telemetrySupplier == null) { | ||
return; | ||
} | ||
|
||
if (!(telemetrySupplier instanceof Supplier)) { | ||
throw new IllegalStateException( | ||
"Configuration property " | ||
+ CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER | ||
+ " is not instance of Supplier"); | ||
} | ||
|
||
Object kafkaTelemetry = ((Supplier<?>) telemetrySupplier).get(); | ||
if (!(kafkaTelemetry instanceof KafkaTelemetry)) { | ||
throw new IllegalStateException( | ||
"Configuration property " | ||
+ CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER | ||
+ " supplier does not return KafkaTelemetry instance"); | ||
} | ||
|
||
this.telemetry = (KafkaTelemetry) kafkaTelemetry; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,77 @@ | ||
/* | ||
* Copyright The OpenTelemetry Authors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package io.opentelemetry.instrumentation.kafkaclients.v2_6; | ||
|
||
import com.google.errorprone.annotations.CanIgnoreReturnValue; | ||
import java.util.Map; | ||
import java.util.Objects; | ||
import java.util.function.Supplier; | ||
import javax.annotation.Nullable; | ||
import org.apache.kafka.clients.producer.ProducerConfig; | ||
import org.apache.kafka.clients.producer.ProducerInterceptor; | ||
import org.apache.kafka.clients.producer.ProducerRecord; | ||
import org.apache.kafka.clients.producer.RecordMetadata; | ||
|
||
/** | ||
* A ProducerInterceptor that adds OpenTelemetry instrumentation. Add this interceptor's class name | ||
* or class via ProducerConfig.INTERCEPTOR_CLASSES_CONFIG property to your Producer's properties to | ||
* get it instantiated and used. See more details on ProducerInterceptor usage in its Javadoc. | ||
* | ||
* <p>To configure the interceptor, use {@link KafkaTelemetry#producerInterceptorConfigProperties} | ||
* to obtain the configuration properties and add them to your producer configuration. | ||
* | ||
* @see KafkaTelemetry#producerInterceptorConfigProperties() | ||
*/ | ||
public class OpenTelemetryProducerInterceptor<K, V> implements ProducerInterceptor<K, V> { | ||
|
||
public static final String CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER = | ||
"opentelemetry.kafka-telemetry.supplier"; | ||
|
||
@Nullable private KafkaTelemetry telemetry; | ||
@Nullable private String clientId; | ||
|
||
@Override | ||
@CanIgnoreReturnValue | ||
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> producerRecord) { | ||
if (telemetry != null) { | ||
telemetry.buildAndInjectSpan(producerRecord, clientId); | ||
} | ||
return producerRecord; | ||
} | ||
|
||
@Override | ||
public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {} | ||
|
||
@Override | ||
public void close() {} | ||
|
||
@Override | ||
public void configure(Map<String, ?> configs) { | ||
clientId = Objects.toString(configs.get(ProducerConfig.CLIENT_ID_CONFIG), null); | ||
|
||
Object telemetrySupplier = configs.get(CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER); | ||
if (telemetrySupplier == null) { | ||
return; | ||
} | ||
|
||
if (!(telemetrySupplier instanceof Supplier)) { | ||
throw new IllegalStateException( | ||
"Configuration property " | ||
+ CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER | ||
+ " is not instance of Supplier"); | ||
} | ||
|
||
Object kafkaTelemetry = ((Supplier<?>) telemetrySupplier).get(); | ||
if (!(kafkaTelemetry instanceof KafkaTelemetry)) { | ||
throw new IllegalStateException( | ||
"Configuration property " | ||
+ CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER | ||
+ " supplier does not return KafkaTelemetry instance"); | ||
} | ||
|
||
this.telemetry = (KafkaTelemetry) kafkaTelemetry; | ||
} | ||
} |
trask marked this conversation as resolved.
Show resolved
Hide resolved
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@copilot update the doc, only need to mention the new approach, no need to mention the now deprecated approach (Alternative: Using interceptors with global OpenTelemetry)