1313import io .opentelemetry .context .propagation .TextMapPropagator ;
1414import io .opentelemetry .context .propagation .TextMapSetter ;
1515import io .opentelemetry .instrumentation .api .instrumenter .Instrumenter ;
16+ import io .opentelemetry .instrumentation .api .internal .InstrumenterUtil ;
17+ import io .opentelemetry .instrumentation .api .internal .Timer ;
18+ import io .opentelemetry .instrumentation .kafka .internal .KafkaConsumerContext ;
19+ import io .opentelemetry .instrumentation .kafka .internal .KafkaConsumerContextUtil ;
1620import io .opentelemetry .instrumentation .kafka .internal .KafkaHeadersSetter ;
1721import io .opentelemetry .instrumentation .kafka .internal .KafkaProcessRequest ;
1822import io .opentelemetry .instrumentation .kafka .internal .KafkaProducerRequest ;
23+ import io .opentelemetry .instrumentation .kafka .internal .KafkaReceiveRequest ;
1924import io .opentelemetry .instrumentation .kafka .internal .KafkaUtil ;
2025import io .opentelemetry .instrumentation .kafka .internal .OpenTelemetryMetricsReporter ;
2126import io .opentelemetry .instrumentation .kafka .internal .OpenTelemetrySupplier ;
27+ import io .opentelemetry .instrumentation .kafka .internal .TracingList ;
2228import java .lang .reflect .InvocationTargetException ;
2329import java .lang .reflect .Proxy ;
2430import java .util .Collections ;
2531import java .util .HashMap ;
32+ import java .util .LinkedHashMap ;
33+ import java .util .List ;
2634import java .util .Map ;
2735import java .util .concurrent .Future ;
2836import java .util .function .BiFunction ;
3745import org .apache .kafka .clients .producer .Producer ;
3846import org .apache .kafka .clients .producer .ProducerRecord ;
3947import org .apache .kafka .clients .producer .RecordMetadata ;
48+ import org .apache .kafka .common .TopicPartition ;
4049import org .apache .kafka .common .header .Headers ;
4150import org .apache .kafka .common .metrics .MetricsReporter ;
4251
@@ -47,16 +56,19 @@ public final class KafkaTelemetry {
4756
4857 private final OpenTelemetry openTelemetry ;
4958 private final Instrumenter <KafkaProducerRequest , RecordMetadata > producerInstrumenter ;
59+ private final Instrumenter <KafkaReceiveRequest , Void > consumerReceiveInstrumenter ;
5060 private final Instrumenter <KafkaProcessRequest , Void > consumerProcessInstrumenter ;
5161 private final boolean producerPropagationEnabled ;
5262
5363 KafkaTelemetry (
5464 OpenTelemetry openTelemetry ,
5565 Instrumenter <KafkaProducerRequest , RecordMetadata > producerInstrumenter ,
66+ Instrumenter <KafkaReceiveRequest , Void > consumerReceiveInstrumenter ,
5667 Instrumenter <KafkaProcessRequest , Void > consumerProcessInstrumenter ,
5768 boolean producerPropagationEnabled ) {
5869 this .openTelemetry = openTelemetry ;
5970 this .producerInstrumenter = producerInstrumenter ;
71+ this .consumerReceiveInstrumenter = consumerReceiveInstrumenter ;
6072 this .consumerProcessInstrumenter = consumerProcessInstrumenter ;
6173 this .producerPropagationEnabled = producerPropagationEnabled ;
6274 }
@@ -115,6 +127,7 @@ public <K, V> Consumer<K, V> wrap(Consumer<K, V> consumer) {
115127 new Class <?>[] {Consumer .class },
116128 (proxy , method , args ) -> {
117129 Object result ;
130+ Timer timer = "poll" .equals (method .getName ()) ? Timer .start () : null ;
118131 try {
119132 result = method .invoke (consumer , args );
120133 } catch (InvocationTargetException exception ) {
@@ -123,12 +136,36 @@ public <K, V> Consumer<K, V> wrap(Consumer<K, V> consumer) {
123136 // ConsumerRecords<K, V> poll(long timeout)
124137 // ConsumerRecords<K, V> poll(Duration duration)
125138 if ("poll" .equals (method .getName ()) && result instanceof ConsumerRecords ) {
126- buildAndFinishSpan ((ConsumerRecords ) result , consumer );
139+ ConsumerRecords <K , V > consumerRecords = (ConsumerRecords <K , V >) result ;
140+ Context receiveContext = buildAndFinishSpan (consumerRecords , consumer , timer );
141+ if (receiveContext == null ) {
142+ receiveContext = Context .current ();
143+ }
144+ KafkaConsumerContext consumerContext =
145+ KafkaConsumerContextUtil .create (receiveContext , consumer );
146+ result = addTracing (consumerRecords , consumerContext );
127147 }
128148 return result ;
129149 });
130150 }
131151
152+ <K , V > ConsumerRecords <K , V > addTracing (
153+ ConsumerRecords <K , V > consumerRecords , KafkaConsumerContext consumerContext ) {
154+ if (consumerRecords .isEmpty ()) {
155+ return consumerRecords ;
156+ }
157+
158+ Map <TopicPartition , List <ConsumerRecord <K , V >>> records = new LinkedHashMap <>();
159+ for (TopicPartition partition : consumerRecords .partitions ()) {
160+ List <ConsumerRecord <K , V >> list = consumerRecords .records (partition );
161+ if (list != null && !list .isEmpty ()) {
162+ list = TracingList .wrap (list , consumerProcessInstrumenter , () -> true , consumerContext );
163+ }
164+ records .put (partition , list );
165+ }
166+ return new ConsumerRecords <>(records );
167+ }
168+
132169 /**
133170 * Produces a set of kafka client config properties (consumer or producer) to register a {@link
134171 * MetricsReporter} that records metrics to an {@code openTelemetry} instance. Add these resulting
@@ -221,23 +258,37 @@ <K, V> Future<RecordMetadata> buildAndInjectSpan(
221258 }
222259 }
223260
224- private <K , V > void buildAndFinishSpan (ConsumerRecords <K , V > records , Consumer <K , V > consumer ) {
225- buildAndFinishSpan (
226- records , KafkaUtil .getConsumerGroup (consumer ), KafkaUtil .getClientId (consumer ));
261+ private <K , V > Context buildAndFinishSpan (
262+ ConsumerRecords <K , V > records , Consumer <K , V > consumer , Timer timer ) {
263+ return buildAndFinishSpan (
264+ records , KafkaUtil .getConsumerGroup (consumer ), KafkaUtil .getClientId (consumer ), timer );
227265 }
228266
229- <K , V > void buildAndFinishSpan (
230- ConsumerRecords <K , V > records , String consumerGroup , String clientId ) {
267+ <K , V > Context buildAndFinishSpan (
268+ ConsumerRecords <K , V > records , String consumerGroup , String clientId , Timer timer ) {
269+ if (records .isEmpty ()) {
270+ return null ;
271+ }
231272 Context parentContext = Context .current ();
232- for (ConsumerRecord <K , V > record : records ) {
233- KafkaProcessRequest request = KafkaProcessRequest .create (record , consumerGroup , clientId );
234- if (!consumerProcessInstrumenter .shouldStart (parentContext , request )) {
235- continue ;
236- }
237-
238- Context context = consumerProcessInstrumenter .start (parentContext , request );
239- consumerProcessInstrumenter .end (context , request , null , null );
273+ KafkaReceiveRequest request = KafkaReceiveRequest .create (records , consumerGroup , clientId );
274+ Context context = null ;
275+ if (consumerReceiveInstrumenter .shouldStart (parentContext , request )) {
276+ context =
277+ InstrumenterUtil .startAndEnd (
278+ consumerReceiveInstrumenter ,
279+ parentContext ,
280+ request ,
281+ null ,
282+ null ,
283+ timer .startTime (),
284+ timer .now ());
240285 }
286+
287+ // we're returning the context of the receive span so that process spans can use it as
288+ // parent context even though the span has ended
289+ // this is the suggested behavior according to the spec batch receive scenario:
290+ // https://github.com/open-telemetry/semantic-conventions/blob/main/docs/messaging/messaging-spans.md#batch-receiving
291+ return context ;
241292 }
242293
243294 private class ProducerCallback implements Callback {
0 commit comments