Skip to content

Commit 3fe741b

Browse files
committed
up
1 parent eb7ecf9 commit 3fe741b

File tree

2 files changed

+15
-34
lines changed

2 files changed

+15
-34
lines changed

instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/internal/KafkaConsumerTelemetry.java

Lines changed: 15 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -40,12 +40,23 @@ public KafkaConsumerTelemetry(
4040
this.consumerProcessInstrumenter = consumerProcessInstrumenter;
4141
}
4242

43-
// this getter is needed for the deprecated wrap() methods in KafkaTelemetry
44-
public Instrumenter<KafkaProcessRequest, Void> getConsumerProcessInstrumenter() {
45-
return consumerProcessInstrumenter;
43+
public <K, V> ConsumerRecords<K, V> addTracing(
44+
ConsumerRecords<K, V> consumerRecords, KafkaConsumerContext consumerContext) {
45+
if (consumerRecords.isEmpty()) {
46+
return consumerRecords;
47+
}
48+
49+
Map<TopicPartition, List<ConsumerRecord<K, V>>> records = new LinkedHashMap<>();
50+
for (TopicPartition partition : consumerRecords.partitions()) {
51+
List<ConsumerRecord<K, V>> list = consumerRecords.records(partition);
52+
if (list != null && !list.isEmpty()) {
53+
list = TracingList.wrap(list, consumerProcessInstrumenter, () -> true, consumerContext);
54+
}
55+
records.put(partition, list);
56+
}
57+
return new ConsumerRecords<>(records);
4658
}
4759

48-
// this overload is needed for the deprecated wrap() methods in KafkaTelemetry
4960
public <K, V> Context buildAndFinishSpan(
5061
ConsumerRecords<K, V> records, Consumer<K, V> consumer, Timer timer) {
5162
return buildAndFinishSpan(
@@ -78,21 +89,4 @@ public <K, V> Context buildAndFinishSpan(
7889
// https://github.com/open-telemetry/semantic-conventions/blob/main/docs/messaging/messaging-spans.md#batch-receiving
7990
return context;
8091
}
81-
82-
public <K, V> ConsumerRecords<K, V> addTracing(
83-
ConsumerRecords<K, V> consumerRecords, KafkaConsumerContext consumerContext) {
84-
if (consumerRecords.isEmpty()) {
85-
return consumerRecords;
86-
}
87-
88-
Map<TopicPartition, List<ConsumerRecord<K, V>>> records = new LinkedHashMap<>();
89-
for (TopicPartition partition : consumerRecords.partitions()) {
90-
List<ConsumerRecord<K, V>> list = consumerRecords.records(partition);
91-
if (list != null && !list.isEmpty()) {
92-
list = TracingList.wrap(list, consumerProcessInstrumenter, () -> true, consumerContext);
93-
}
94-
records.put(partition, list);
95-
}
96-
return new ConsumerRecords<>(records);
97-
}
9892
}

instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/internal/KafkaProducerTelemetry.java

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -47,19 +47,6 @@ public KafkaProducerTelemetry(
4747
this.producerPropagationEnabled = producerPropagationEnabled;
4848
}
4949

50-
// these getters are needed for the deprecated wrap() methods in KafkaTelemetry
51-
public TextMapPropagator getPropagator() {
52-
return propagator;
53-
}
54-
55-
public Instrumenter<KafkaProducerRequest, RecordMetadata> getProducerInstrumenter() {
56-
return producerInstrumenter;
57-
}
58-
59-
public TextMapSetter<Headers> getSetter() {
60-
return SETTER;
61-
}
62-
6350
/**
6451
* Build and inject span into record.
6552
*

0 commit comments

Comments
 (0)