Skip to content

Commit 41c767c

Browse files
committed
split test
1 parent 61813b5 commit 41c767c

File tree

7 files changed

+360
-67
lines changed

7 files changed

+360
-67
lines changed
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.instrumentation.kafkaclients.v2_6.internal;
7+
8+
import io.opentelemetry.context.Context;
9+
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
10+
import io.opentelemetry.instrumentation.api.internal.InstrumenterUtil;
11+
import io.opentelemetry.instrumentation.api.internal.Timer;
12+
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaConsumerContext;
13+
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaProcessRequest;
14+
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaReceiveRequest;
15+
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.TracingList;
16+
import java.util.LinkedHashMap;
17+
import java.util.List;
18+
import java.util.Map;
19+
import org.apache.kafka.clients.consumer.ConsumerRecord;
20+
import org.apache.kafka.clients.consumer.ConsumerRecords;
21+
import org.apache.kafka.common.TopicPartition;
22+
23+
/**
24+
* Helper for consumer-side instrumentation.
25+
*
26+
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
27+
* at any time.
28+
*/
29+
public class KafkaConsumerTelemetry {
30+
31+
private final Instrumenter<KafkaReceiveRequest, Void> consumerReceiveInstrumenter;
32+
private final Instrumenter<KafkaProcessRequest, Void> consumerProcessInstrumenter;
33+
34+
public KafkaConsumerTelemetry(
35+
Instrumenter<KafkaReceiveRequest, Void> consumerReceiveInstrumenter,
36+
Instrumenter<KafkaProcessRequest, Void> consumerProcessInstrumenter) {
37+
this.consumerReceiveInstrumenter = consumerReceiveInstrumenter;
38+
this.consumerProcessInstrumenter = consumerProcessInstrumenter;
39+
}
40+
41+
public <K, V> Context buildAndFinishSpan(
42+
ConsumerRecords<K, V> records, String consumerGroup, String clientId, Timer timer) {
43+
if (records.isEmpty()) {
44+
return null;
45+
}
46+
Context parentContext = Context.current();
47+
KafkaReceiveRequest request = KafkaReceiveRequest.create(records, consumerGroup, clientId);
48+
Context context = null;
49+
if (consumerReceiveInstrumenter.shouldStart(parentContext, request)) {
50+
context =
51+
InstrumenterUtil.startAndEnd(
52+
consumerReceiveInstrumenter,
53+
parentContext,
54+
request,
55+
null,
56+
null,
57+
timer.startTime(),
58+
timer.now());
59+
}
60+
61+
// we're returning the context of the receive span so that process spans can use it as
62+
// parent context even though the span has ended
63+
// this is the suggested behavior according to the spec batch receive scenario:
64+
// https://github.com/open-telemetry/semantic-conventions/blob/main/docs/messaging/messaging-spans.md#batch-receiving
65+
return context;
66+
}
67+
68+
public <K, V> ConsumerRecords<K, V> addTracing(
69+
ConsumerRecords<K, V> consumerRecords, KafkaConsumerContext consumerContext) {
70+
if (consumerRecords.isEmpty()) {
71+
return consumerRecords;
72+
}
73+
74+
Map<TopicPartition, List<ConsumerRecord<K, V>>> records = new LinkedHashMap<>();
75+
for (TopicPartition partition : consumerRecords.partitions()) {
76+
List<ConsumerRecord<K, V>> list = consumerRecords.records(partition);
77+
if (list != null && !list.isEmpty()) {
78+
list = TracingList.wrap(list, consumerProcessInstrumenter, () -> true, consumerContext);
79+
}
80+
records.put(partition, list);
81+
}
82+
return new ConsumerRecords<>(records);
83+
}
84+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.instrumentation.kafkaclients.v2_6.internal;
7+
8+
import java.io.Serializable;
9+
import java.util.Objects;
10+
import java.util.function.Supplier;
11+
12+
/**
13+
* Wrapper for KafkaConsumerTelemetry that can be injected into kafka configuration without breaking
14+
* serialization.
15+
*
16+
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
17+
* at any time.
18+
*/
19+
public class KafkaConsumerTelemetrySupplier
20+
implements Supplier<KafkaConsumerTelemetry>, Serializable {
21+
22+
private final KafkaConsumerTelemetry consumerTelemetry;
23+
24+
public KafkaConsumerTelemetrySupplier(KafkaConsumerTelemetry consumerTelemetry) {
25+
this.consumerTelemetry = Objects.requireNonNull(consumerTelemetry);
26+
}
27+
28+
@Override
29+
public KafkaConsumerTelemetry get() {
30+
return consumerTelemetry;
31+
}
32+
33+
/**
34+
* Replaces this object with null in the serialization stream. This ensures the supplier does not
35+
* get serialized.
36+
*/
37+
private Object writeReplace() {
38+
return null;
39+
}
40+
}
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.instrumentation.kafkaclients.v2_6.internal;
7+
8+
import static java.util.logging.Level.WARNING;
9+
10+
import io.opentelemetry.context.Context;
11+
import io.opentelemetry.context.propagation.TextMapPropagator;
12+
import io.opentelemetry.context.propagation.TextMapSetter;
13+
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
14+
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaHeadersSetter;
15+
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaProducerRequest;
16+
import java.util.logging.Logger;
17+
import org.apache.kafka.clients.producer.ProducerRecord;
18+
import org.apache.kafka.clients.producer.RecordMetadata;
19+
import org.apache.kafka.common.header.Headers;
20+
21+
/**
22+
* Helper for producer-side instrumentation.
23+
*
24+
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
25+
* at any time.
26+
*/
27+
public class KafkaProducerTelemetry {
28+
private static final Logger logger = Logger.getLogger(KafkaProducerTelemetry.class.getName());
29+
30+
private static final TextMapSetter<Headers> SETTER = KafkaHeadersSetter.INSTANCE;
31+
32+
private final TextMapPropagator propagator;
33+
private final Instrumenter<KafkaProducerRequest, RecordMetadata> producerInstrumenter;
34+
private final boolean producerPropagationEnabled;
35+
36+
public KafkaProducerTelemetry(
37+
TextMapPropagator propagator,
38+
Instrumenter<KafkaProducerRequest, RecordMetadata> producerInstrumenter,
39+
boolean producerPropagationEnabled) {
40+
this.propagator = propagator;
41+
this.producerInstrumenter = producerInstrumenter;
42+
this.producerPropagationEnabled = producerPropagationEnabled;
43+
}
44+
45+
/**
46+
* Build and inject span into record.
47+
*
48+
* @param record the producer record to inject span info.
49+
*/
50+
public <K, V> void buildAndInjectSpan(ProducerRecord<K, V> record, String clientId) {
51+
Context parentContext = Context.current();
52+
53+
KafkaProducerRequest request = KafkaProducerRequest.create(record, clientId);
54+
if (!producerInstrumenter.shouldStart(parentContext, request)) {
55+
return;
56+
}
57+
58+
Context context = producerInstrumenter.start(parentContext, request);
59+
if (producerPropagationEnabled) {
60+
try {
61+
propagator.inject(context, record.headers(), SETTER);
62+
} catch (Throwable t) {
63+
// it can happen if headers are read only (when record is sent second time)
64+
logger.log(WARNING, "failed to inject span context. sending record second time?", t);
65+
}
66+
}
67+
producerInstrumenter.end(context, request, null, null);
68+
}
69+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.instrumentation.kafkaclients.v2_6.internal;
7+
8+
import java.io.Serializable;
9+
import java.util.Objects;
10+
import java.util.function.Supplier;
11+
12+
/**
13+
* Wrapper for KafkaProducerTelemetry that can be injected into kafka configuration without breaking
14+
* serialization.
15+
*
16+
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
17+
* at any time.
18+
*/
19+
public class KafkaProducerTelemetrySupplier
20+
implements Supplier<KafkaProducerTelemetry>, Serializable {
21+
22+
private final KafkaProducerTelemetry producerTelemetry;
23+
24+
public KafkaProducerTelemetrySupplier(KafkaProducerTelemetry producerTelemetry) {
25+
this.producerTelemetry = Objects.requireNonNull(producerTelemetry);
26+
}
27+
28+
@Override
29+
public KafkaProducerTelemetry get() {
30+
return producerTelemetry;
31+
}
32+
33+
/**
34+
* Replaces this object with null in the serialization stream. This ensures the supplier does not
35+
* get serialized.
36+
*/
37+
private Object writeReplace() {
38+
return null;
39+
}
40+
}
Lines changed: 7 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,12 @@
33
* SPDX-License-Identifier: Apache-2.0
44
*/
55

6-
package io.opentelemetry.instrumentation.kafkaclients.v2_6;
6+
package io.opentelemetry.instrumentation.kafkaclients.v2_6.internal;
77

88
import static org.assertj.core.api.Assertions.assertThat;
99
import static org.assertj.core.api.Assertions.assertThatThrownBy;
1010

11-
import io.opentelemetry.instrumentation.kafkaclients.v2_6.internal.KafkaConsumerTelemetrySupplier;
12-
import io.opentelemetry.instrumentation.kafkaclients.v2_6.internal.KafkaProducerTelemetrySupplier;
13-
import io.opentelemetry.instrumentation.kafkaclients.v2_6.internal.OpenTelemetryConsumerInterceptor;
14-
import io.opentelemetry.instrumentation.kafkaclients.v2_6.internal.OpenTelemetryProducerInterceptor;
11+
import io.opentelemetry.instrumentation.kafkaclients.v2_6.KafkaTelemetry;
1512
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
1613
import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension;
1714
import java.io.ByteArrayInputStream;
@@ -26,29 +23,16 @@
2623
import java.util.function.Supplier;
2724
import org.apache.kafka.clients.consumer.ConsumerConfig;
2825
import org.apache.kafka.clients.consumer.KafkaConsumer;
29-
import org.apache.kafka.clients.producer.KafkaProducer;
30-
import org.apache.kafka.clients.producer.ProducerConfig;
3126
import org.apache.kafka.common.serialization.StringDeserializer;
32-
import org.apache.kafka.common.serialization.StringSerializer;
3327
import org.junit.jupiter.api.Assumptions;
3428
import org.junit.jupiter.api.Test;
3529
import org.junit.jupiter.api.extension.RegisterExtension;
3630

37-
class KafkaTelemetryInterceptorTest {
31+
class OpenTelemetryConsumerInterceptorTest {
3832

3933
@RegisterExtension
4034
static final InstrumentationExtension testing = LibraryInstrumentationExtension.create();
4135

42-
private static Map<String, Object> producerConfig() {
43-
Map<String, Object> config = new HashMap<>();
44-
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
45-
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
46-
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
47-
config.putAll(
48-
KafkaTelemetry.create(testing.getOpenTelemetry()).producerInterceptorConfigProperties());
49-
return config;
50-
}
51-
5236
private static Map<String, Object> consumerConfig() {
5337
Map<String, Object> config = new HashMap<>();
5438
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
@@ -61,37 +45,7 @@ private static Map<String, Object> consumerConfig() {
6145
}
6246

6347
@Test
64-
void badProducerConfig() {
65-
Assumptions.assumeFalse(Boolean.getBoolean("testLatestDeps"));
66-
67-
// Bad config - wrong type for supplier
68-
assertThatThrownBy(
69-
() -> {
70-
Map<String, Object> producerConfig = producerConfig();
71-
producerConfig.put(
72-
OpenTelemetryProducerInterceptor.CONFIG_KEY_KAFKA_PRODUCER_TELEMETRY_SUPPLIER, "foo");
73-
new KafkaProducer<>(producerConfig).close();
74-
})
75-
.hasRootCauseInstanceOf(IllegalStateException.class)
76-
.hasRootCauseMessage(
77-
"Configuration property opentelemetry.kafka-producer-telemetry.supplier is not instance of KafkaProducerTelemetrySupplier");
78-
79-
// Bad config - supplier returns wrong type
80-
assertThatThrownBy(
81-
() -> {
82-
Map<String, Object> producerConfig = producerConfig();
83-
producerConfig.put(
84-
OpenTelemetryProducerInterceptor.CONFIG_KEY_KAFKA_PRODUCER_TELEMETRY_SUPPLIER,
85-
(Supplier<?>) () -> "not a KafkaProducerTelemetry");
86-
new KafkaProducer<>(producerConfig).close();
87-
})
88-
.hasRootCauseInstanceOf(IllegalStateException.class)
89-
.hasRootCauseMessage(
90-
"Configuration property opentelemetry.kafka-producer-telemetry.supplier is not instance of KafkaProducerTelemetrySupplier");
91-
}
92-
93-
@Test
94-
void badConsumerConfig() {
48+
void badConfig() {
9549
Assumptions.assumeFalse(Boolean.getBoolean("testLatestDeps"));
9650

9751
// Bad config - wrong type for supplier
@@ -122,27 +76,18 @@ void badConsumerConfig() {
12276

12377
@Test
12478
void serializableConfig() throws IOException, ClassNotFoundException {
125-
testSerialize(producerConfig());
12679
testSerialize(consumerConfig());
12780
}
12881

12982
@SuppressWarnings("unchecked")
13083
private static void testSerialize(Map<String, Object> map)
13184
throws IOException, ClassNotFoundException {
132-
// Check that producer config has the supplier
133-
Object producerSupplier =
134-
map.get(OpenTelemetryProducerInterceptor.CONFIG_KEY_KAFKA_PRODUCER_TELEMETRY_SUPPLIER);
85+
// Check that consumer config has the supplier
13586
Object consumerSupplier =
13687
map.get(OpenTelemetryConsumerInterceptor.CONFIG_KEY_KAFKA_CONSUMER_TELEMETRY_SUPPLIER);
13788

138-
Supplier<?> supplier = null;
139-
if (producerSupplier instanceof KafkaProducerTelemetrySupplier) {
140-
supplier = (KafkaProducerTelemetrySupplier) producerSupplier;
141-
} else if (consumerSupplier instanceof KafkaConsumerTelemetrySupplier) {
142-
supplier = (KafkaConsumerTelemetrySupplier) consumerSupplier;
143-
}
144-
145-
assertThat(supplier).isNotNull();
89+
assertThat(consumerSupplier).isInstanceOf(KafkaConsumerTelemetrySupplier.class);
90+
KafkaConsumerTelemetrySupplier supplier = (KafkaConsumerTelemetrySupplier) consumerSupplier;
14691
assertThat(supplier.get()).isNotNull();
14792

14893
ByteArrayOutputStream byteOutputStream = new ByteArrayOutputStream();
@@ -169,8 +114,6 @@ protected Class<?> resolveClass(ObjectStreamClass desc)
169114
try (ObjectInputStream inputStream =
170115
new CustomObjectInputStream(new ByteArrayInputStream(byteOutputStream.toByteArray()))) {
171116
Map<String, Object> result = (Map<String, Object>) inputStream.readObject();
172-
assertThat(result.get(OpenTelemetryProducerInterceptor.CONFIG_KEY_KAFKA_PRODUCER_TELEMETRY_SUPPLIER))
173-
.isNull();
174117
assertThat(result.get(OpenTelemetryConsumerInterceptor.CONFIG_KEY_KAFKA_CONSUMER_TELEMETRY_SUPPLIER))
175118
.isNull();
176119
}

instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/internal/OpenTelemetryMetricsReporterTest.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import java.util.Map;
2525
import org.apache.kafka.clients.consumer.KafkaConsumer;
2626
import org.apache.kafka.clients.producer.KafkaProducer;
27-
import org.junit.jupiter.api.Assumptions;
2827
import org.junit.jupiter.api.Test;
2928
import org.junit.jupiter.api.extension.RegisterExtension;
3029

@@ -45,8 +44,6 @@ protected InstrumentationExtension testing() {
4544

4645
@Test
4746
void badConfig() {
48-
Assumptions.assumeFalse(Boolean.getBoolean("testLatestDeps"));
49-
5047
// Bad producer config
5148
assertThatThrownBy(
5249
() -> {

0 commit comments

Comments
 (0)