Skip to content

Commit d861b88

Browse files
committed
KafkaHelper
1 parent a2b44cc commit d861b88

File tree

7 files changed

+204
-258
lines changed

7 files changed

+204
-258
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.instrumentation.kafkaclients.v2_6;
7+
8+
import static java.util.logging.Level.WARNING;
9+
10+
import io.opentelemetry.context.Context;
11+
import io.opentelemetry.context.propagation.TextMapPropagator;
12+
import io.opentelemetry.context.propagation.TextMapSetter;
13+
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
14+
import io.opentelemetry.instrumentation.api.internal.InstrumenterUtil;
15+
import io.opentelemetry.instrumentation.api.internal.Timer;
16+
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaConsumerContext;
17+
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaHeadersSetter;
18+
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaProcessRequest;
19+
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaProducerRequest;
20+
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaReceiveRequest;
21+
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.TracingList;
22+
import java.util.LinkedHashMap;
23+
import java.util.List;
24+
import java.util.Map;
25+
import java.util.logging.Logger;
26+
import org.apache.kafka.clients.consumer.ConsumerRecord;
27+
import org.apache.kafka.clients.consumer.ConsumerRecords;
28+
import org.apache.kafka.clients.producer.ProducerRecord;
29+
import org.apache.kafka.clients.producer.RecordMetadata;
30+
import org.apache.kafka.common.TopicPartition;
31+
import org.apache.kafka.common.header.Headers;
32+
33+
public final class KafkaHelper {
34+
private static final Logger logger = Logger.getLogger(KafkaHelper.class.getName());
35+
36+
private static final TextMapSetter<Headers> SETTER = KafkaHeadersSetter.INSTANCE;
37+
38+
private final TextMapPropagator propagator;
39+
private final Instrumenter<KafkaProducerRequest, RecordMetadata> producerInstrumenter;
40+
private final Instrumenter<KafkaReceiveRequest, Void> consumerReceiveInstrumenter;
41+
private final Instrumenter<KafkaProcessRequest, Void> consumerProcessInstrumenter;
42+
private final boolean producerPropagationEnabled;
43+
44+
KafkaHelper(
45+
TextMapPropagator propagator,
46+
Instrumenter<KafkaProducerRequest, RecordMetadata> producerInstrumenter,
47+
Instrumenter<KafkaReceiveRequest, Void> consumerReceiveInstrumenter,
48+
Instrumenter<KafkaProcessRequest, Void> consumerProcessInstrumenter,
49+
boolean producerPropagationEnabled) {
50+
this.propagator = propagator;
51+
this.producerInstrumenter = producerInstrumenter;
52+
this.consumerReceiveInstrumenter = consumerReceiveInstrumenter;
53+
this.consumerProcessInstrumenter = consumerProcessInstrumenter;
54+
this.producerPropagationEnabled = producerPropagationEnabled;
55+
}
56+
57+
/**
58+
* Build and inject span into record.
59+
*
60+
* @param record the producer record to inject span info.
61+
*/
62+
public <K, V> void buildAndInjectSpan(ProducerRecord<K, V> record, String clientId) {
63+
Context parentContext = Context.current();
64+
65+
KafkaProducerRequest request = KafkaProducerRequest.create(record, clientId);
66+
if (!producerInstrumenter.shouldStart(parentContext, request)) {
67+
return;
68+
}
69+
70+
Context context = producerInstrumenter.start(parentContext, request);
71+
if (producerPropagationEnabled) {
72+
try {
73+
propagator.inject(context, record.headers(), SETTER);
74+
} catch (Throwable t) {
75+
// it can happen if headers are read only (when record is sent second time)
76+
logger.log(WARNING, "failed to inject span context. sending record second time?", t);
77+
}
78+
}
79+
producerInstrumenter.end(context, request, null, null);
80+
}
81+
82+
public <K, V> Context buildAndFinishSpan(
83+
ConsumerRecords<K, V> records, String consumerGroup, String clientId, Timer timer) {
84+
if (records.isEmpty()) {
85+
return null;
86+
}
87+
Context parentContext = Context.current();
88+
KafkaReceiveRequest request = KafkaReceiveRequest.create(records, consumerGroup, clientId);
89+
Context context = null;
90+
if (consumerReceiveInstrumenter.shouldStart(parentContext, request)) {
91+
context =
92+
InstrumenterUtil.startAndEnd(
93+
consumerReceiveInstrumenter,
94+
parentContext,
95+
request,
96+
null,
97+
null,
98+
timer.startTime(),
99+
timer.now());
100+
}
101+
102+
// we're returning the context of the receive span so that process spans can use it as
103+
// parent context even though the span has ended
104+
// this is the suggested behavior according to the spec batch receive scenario:
105+
// https://github.com/open-telemetry/semantic-conventions/blob/main/docs/messaging/messaging-spans.md#batch-receiving
106+
return context;
107+
}
108+
109+
public <K, V> ConsumerRecords<K, V> addTracing(
110+
ConsumerRecords<K, V> consumerRecords, KafkaConsumerContext consumerContext) {
111+
if (consumerRecords.isEmpty()) {
112+
return consumerRecords;
113+
}
114+
115+
Map<TopicPartition, List<ConsumerRecord<K, V>>> records = new LinkedHashMap<>();
116+
for (TopicPartition partition : consumerRecords.partitions()) {
117+
List<ConsumerRecord<K, V>> list = consumerRecords.records(partition);
118+
if (list != null && !list.isEmpty()) {
119+
list = TracingList.wrap(list, consumerProcessInstrumenter, () -> true, consumerContext);
120+
}
121+
records.put(partition, list);
122+
}
123+
return new ConsumerRecords<>(records);
124+
}
125+
}

0 commit comments

Comments
 (0)