Skip to content

Commit 2bc5492

Browse files
committed
Test deprecated classes until they're removed
1 parent f72ae47 commit 2bc5492

File tree

3 files changed

+294
-0
lines changed

3 files changed

+294
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.instrumentation.kafkaclients.v2_6;
7+
8+
import static org.assertj.core.api.Assertions.assertThat;
9+
10+
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaClientBaseTest;
11+
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
12+
import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension;
13+
import java.nio.charset.StandardCharsets;
14+
import java.time.Duration;
15+
import java.util.Map;
16+
import org.apache.kafka.clients.consumer.ConsumerConfig;
17+
import org.apache.kafka.clients.consumer.ConsumerRecord;
18+
import org.apache.kafka.clients.consumer.ConsumerRecords;
19+
import org.apache.kafka.clients.producer.ProducerConfig;
20+
import org.apache.kafka.clients.producer.ProducerRecord;
21+
import org.junit.jupiter.api.Test;
22+
import org.junit.jupiter.api.extension.RegisterExtension;
23+
24+
abstract class AbstractDeprecatedInterceptorsTest extends KafkaClientBaseTest {
25+
26+
@RegisterExtension
27+
static final InstrumentationExtension testing = LibraryInstrumentationExtension.create();
28+
29+
static final String greeting = "Hello Kafka!";
30+
31+
private static final KafkaTelemetry kafkaTelemetry =
32+
KafkaTelemetry.create(testing.getOpenTelemetry());
33+
34+
@Override
35+
public Map<String, Object> producerProps() {
36+
Map<String, Object> props = super.producerProps();
37+
props.put(
38+
ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.class.getName());
39+
props.putAll(kafkaTelemetry.producerInterceptorConfigProperties());
40+
return props;
41+
}
42+
43+
@Override
44+
public Map<String, Object> consumerProps() {
45+
Map<String, Object> props = super.consumerProps();
46+
props.put(
47+
ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingConsumerInterceptor.class.getName());
48+
props.putAll(kafkaTelemetry.consumerInterceptorConfigProperties());
49+
return props;
50+
}
51+
52+
@Test
53+
void testInterceptors() throws InterruptedException {
54+
testing.runWithSpan(
55+
"parent",
56+
() -> {
57+
ProducerRecord<Integer, String> producerRecord =
58+
new ProducerRecord<>(SHARED_TOPIC, greeting);
59+
producerRecord
60+
.headers()
61+
// add header to test capturing header value as span attribute
62+
.add("Test-Message-Header", "test".getBytes(StandardCharsets.UTF_8))
63+
// adding baggage header in w3c baggage format
64+
.add(
65+
"baggage",
66+
"test-baggage-key-1=test-baggage-value-1".getBytes(StandardCharsets.UTF_8))
67+
.add(
68+
"baggage",
69+
"test-baggage-key-2=test-baggage-value-2".getBytes(StandardCharsets.UTF_8));
70+
producer.send(
71+
producerRecord,
72+
(meta, ex) -> {
73+
if (ex == null) {
74+
testing.runWithSpan("producer callback", () -> {});
75+
} else {
76+
testing.runWithSpan("producer exception: " + ex, () -> {});
77+
}
78+
});
79+
});
80+
81+
awaitUntilConsumerIsReady();
82+
// check that the message was received
83+
ConsumerRecords<?, ?> records = consumer.poll(Duration.ofSeconds(5));
84+
assertThat(records.count()).isEqualTo(1);
85+
for (ConsumerRecord<?, ?> record : records) {
86+
assertThat(record.value()).isEqualTo(greeting);
87+
assertThat(record.key()).isNull();
88+
testing.runWithSpan("process child", () -> {});
89+
}
90+
91+
assertTraces();
92+
}
93+
94+
abstract void assertTraces();
95+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.instrumentation.kafkaclients.v2_6;
7+
8+
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;
9+
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies;
10+
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME;
11+
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID;
12+
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_KAFKA_CONSUMER_GROUP;
13+
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET;
14+
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE;
15+
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_OPERATION;
16+
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_SYSTEM;
17+
18+
import io.opentelemetry.api.common.AttributeKey;
19+
import io.opentelemetry.api.trace.SpanKind;
20+
import java.nio.charset.StandardCharsets;
21+
import org.assertj.core.api.AbstractLongAssert;
22+
import org.assertj.core.api.AbstractStringAssert;
23+
24+
class DeprecatedInterceptorsSuppressReceiveSpansTest extends AbstractDeprecatedInterceptorsTest {
25+
26+
@SuppressWarnings("deprecation") // using deprecated semconv
27+
@Override
28+
void assertTraces() {
29+
testing.waitAndAssertTraces(
30+
trace ->
31+
trace.hasSpansSatisfyingExactly(
32+
span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(),
33+
span ->
34+
span.hasName(SHARED_TOPIC + " publish")
35+
.hasKind(SpanKind.PRODUCER)
36+
.hasParent(trace.getSpan(0))
37+
.hasAttributesSatisfyingExactly(
38+
equalTo(MESSAGING_SYSTEM, "kafka"),
39+
equalTo(MESSAGING_DESTINATION_NAME, SHARED_TOPIC),
40+
equalTo(MESSAGING_OPERATION, "publish"),
41+
satisfies(
42+
MESSAGING_CLIENT_ID,
43+
stringAssert -> stringAssert.startsWith("producer"))),
44+
span ->
45+
span.hasName(SHARED_TOPIC + " process")
46+
.hasKind(SpanKind.CONSUMER)
47+
.hasParent(trace.getSpan(1))
48+
.hasAttributesSatisfyingExactly(
49+
equalTo(MESSAGING_SYSTEM, "kafka"),
50+
equalTo(MESSAGING_DESTINATION_NAME, SHARED_TOPIC),
51+
equalTo(MESSAGING_OPERATION, "process"),
52+
equalTo(
53+
MESSAGING_MESSAGE_BODY_SIZE,
54+
greeting.getBytes(StandardCharsets.UTF_8).length),
55+
satisfies(
56+
MESSAGING_DESTINATION_PARTITION_ID,
57+
AbstractStringAssert::isNotEmpty),
58+
satisfies(
59+
MESSAGING_KAFKA_MESSAGE_OFFSET, AbstractLongAssert::isNotNegative),
60+
equalTo(MESSAGING_KAFKA_CONSUMER_GROUP, "test"),
61+
satisfies(
62+
MESSAGING_CLIENT_ID,
63+
stringAssert -> stringAssert.startsWith("consumer")),
64+
equalTo(
65+
AttributeKey.stringKey("test-baggage-key-1"),
66+
"test-baggage-value-1"),
67+
equalTo(
68+
AttributeKey.stringKey("test-baggage-key-2"),
69+
"test-baggage-value-2")),
70+
span ->
71+
span.hasName("process child")
72+
.hasKind(SpanKind.INTERNAL)
73+
.hasParent(trace.getSpan(2))),
74+
// ideally we'd want producer callback to be part of the main trace, we just aren't able to
75+
// instrument that
76+
trace ->
77+
trace.hasSpansSatisfyingExactly(
78+
span ->
79+
span.hasName("producer callback").hasKind(SpanKind.INTERNAL).hasNoParent()));
80+
}
81+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.instrumentation.kafkaclients.v2_6;
7+
8+
import static io.opentelemetry.api.common.AttributeKey.stringArrayKey;
9+
import static io.opentelemetry.instrumentation.testing.util.TelemetryDataUtil.orderByRootSpanName;
10+
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;
11+
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies;
12+
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_BATCH_MESSAGE_COUNT;
13+
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME;
14+
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID;
15+
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_KAFKA_CONSUMER_GROUP;
16+
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET;
17+
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE;
18+
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_OPERATION;
19+
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_SYSTEM;
20+
import static java.util.Collections.singletonList;
21+
import static org.assertj.core.api.Assertions.assertThat;
22+
23+
import io.opentelemetry.api.trace.SpanContext;
24+
import io.opentelemetry.api.trace.SpanKind;
25+
import io.opentelemetry.sdk.trace.data.LinkData;
26+
import java.nio.charset.StandardCharsets;
27+
import java.util.concurrent.atomic.AtomicReference;
28+
import org.assertj.core.api.AbstractLongAssert;
29+
import org.assertj.core.api.AbstractStringAssert;
30+
31+
class DeprecatedInterceptorsTest extends AbstractDeprecatedInterceptorsTest {
32+
33+
@SuppressWarnings("deprecation") // using deprecated semconv
34+
@Override
35+
void assertTraces() {
36+
AtomicReference<SpanContext> producerSpanContext = new AtomicReference<>();
37+
testing.waitAndAssertSortedTraces(
38+
orderByRootSpanName("parent", SHARED_TOPIC + " receive", "producer callback"),
39+
trace -> {
40+
trace.hasSpansSatisfyingExactly(
41+
span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(),
42+
span ->
43+
span.hasName(SHARED_TOPIC + " publish")
44+
.hasKind(SpanKind.PRODUCER)
45+
.hasParent(trace.getSpan(0))
46+
.hasAttributesSatisfyingExactly(
47+
equalTo(
48+
stringArrayKey("messaging.header.Test_Message_Header"),
49+
singletonList("test")),
50+
equalTo(MESSAGING_SYSTEM, "kafka"),
51+
equalTo(MESSAGING_DESTINATION_NAME, SHARED_TOPIC),
52+
equalTo(MESSAGING_OPERATION, "publish"),
53+
satisfies(
54+
MESSAGING_CLIENT_ID,
55+
stringAssert -> stringAssert.startsWith("producer"))));
56+
SpanContext spanContext = trace.getSpan(1).getSpanContext();
57+
producerSpanContext.set(
58+
SpanContext.createFromRemoteParent(
59+
spanContext.getTraceId(),
60+
spanContext.getSpanId(),
61+
spanContext.getTraceFlags(),
62+
spanContext.getTraceState()));
63+
},
64+
trace ->
65+
trace.hasSpansSatisfyingExactly(
66+
span ->
67+
span.hasName(SHARED_TOPIC + " receive")
68+
.hasKind(SpanKind.CONSUMER)
69+
.hasNoParent()
70+
.hasLinksSatisfying(links -> assertThat(links).isEmpty())
71+
.hasAttributesSatisfyingExactly(
72+
equalTo(
73+
stringArrayKey("messaging.header.Test_Message_Header"),
74+
singletonList("test")),
75+
equalTo(MESSAGING_SYSTEM, "kafka"),
76+
equalTo(MESSAGING_DESTINATION_NAME, SHARED_TOPIC),
77+
equalTo(MESSAGING_OPERATION, "receive"),
78+
equalTo(MESSAGING_KAFKA_CONSUMER_GROUP, "test"),
79+
satisfies(
80+
MESSAGING_CLIENT_ID,
81+
stringAssert -> stringAssert.startsWith("consumer")),
82+
equalTo(MESSAGING_BATCH_MESSAGE_COUNT, 1)),
83+
span ->
84+
span.hasName(SHARED_TOPIC + " process")
85+
.hasKind(SpanKind.CONSUMER)
86+
.hasParent(trace.getSpan(0))
87+
.hasLinks(LinkData.create(producerSpanContext.get()))
88+
.hasAttributesSatisfyingExactly(
89+
equalTo(
90+
stringArrayKey("messaging.header.Test_Message_Header"),
91+
singletonList("test")),
92+
equalTo(MESSAGING_SYSTEM, "kafka"),
93+
equalTo(MESSAGING_DESTINATION_NAME, SHARED_TOPIC),
94+
equalTo(MESSAGING_OPERATION, "process"),
95+
equalTo(
96+
MESSAGING_MESSAGE_BODY_SIZE,
97+
greeting.getBytes(StandardCharsets.UTF_8).length),
98+
satisfies(
99+
MESSAGING_DESTINATION_PARTITION_ID,
100+
AbstractStringAssert::isNotEmpty),
101+
satisfies(
102+
MESSAGING_KAFKA_MESSAGE_OFFSET, AbstractLongAssert::isNotNegative),
103+
equalTo(MESSAGING_KAFKA_CONSUMER_GROUP, "test"),
104+
satisfies(
105+
MESSAGING_CLIENT_ID,
106+
stringAssert -> stringAssert.startsWith("consumer"))),
107+
span ->
108+
span.hasName("process child")
109+
.hasKind(SpanKind.INTERNAL)
110+
.hasParent(trace.getSpan(1))),
111+
// ideally we'd want producer callback to be part of the main trace, we just aren't able to
112+
// instrument that
113+
trace ->
114+
trace.hasSpansSatisfyingExactly(
115+
span ->
116+
span.hasName("producer callback").hasKind(SpanKind.INTERNAL).hasNoParent()));
117+
}
118+
}

0 commit comments

Comments
 (0)