Skip to content

Commit 4f58c8d

Browse files
committed
revert
1 parent 3fe741b commit 4f58c8d

File tree

4 files changed

+401
-0
lines changed

4 files changed

+401
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.instrumentation.kafkaclients.v2_6;
7+
8+
import static java.util.Collections.singletonList;
9+
import static org.assertj.core.api.Assertions.assertThat;
10+
11+
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaClientBaseTest;
12+
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
13+
import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension;
14+
import java.nio.charset.StandardCharsets;
15+
import java.time.Duration;
16+
import org.apache.kafka.clients.consumer.Consumer;
17+
import org.apache.kafka.clients.consumer.ConsumerRecord;
18+
import org.apache.kafka.clients.consumer.ConsumerRecords;
19+
import org.apache.kafka.clients.producer.Producer;
20+
import org.apache.kafka.clients.producer.ProducerRecord;
21+
import org.junit.jupiter.api.extension.RegisterExtension;
22+
import org.junit.jupiter.params.ParameterizedTest;
23+
import org.junit.jupiter.params.provider.ValueSource;
24+
25+
abstract class AbstractWrapperTest extends KafkaClientBaseTest {
26+
27+
@RegisterExtension
28+
static final InstrumentationExtension testing = LibraryInstrumentationExtension.create();
29+
30+
static final String greeting = "Hello Kafka!";
31+
32+
@ParameterizedTest
33+
@ValueSource(booleans = {true, false})
34+
void testWrappers(boolean testHeaders) throws InterruptedException {
35+
KafkaTelemetryBuilder telemetryBuilder =
36+
KafkaTelemetry.builder(testing.getOpenTelemetry())
37+
.setCapturedHeaders(singletonList("Test-Message-Header"))
38+
// TODO run tests both with and without experimental span attributes
39+
.setCaptureExperimentalSpanAttributes(true);
40+
configure(telemetryBuilder);
41+
KafkaTelemetry telemetry = telemetryBuilder.build();
42+
43+
Producer<Integer, String> wrappedProducer = telemetry.wrap(producer);
44+
45+
testing.runWithSpan(
46+
"parent",
47+
() -> {
48+
ProducerRecord<Integer, String> producerRecord =
49+
new ProducerRecord<>(SHARED_TOPIC, greeting);
50+
if (testHeaders) {
51+
producerRecord
52+
.headers()
53+
.add("Test-Message-Header", "test".getBytes(StandardCharsets.UTF_8));
54+
}
55+
wrappedProducer.send(
56+
producerRecord,
57+
(meta, ex) -> {
58+
if (ex == null) {
59+
testing.runWithSpan("producer callback", () -> {});
60+
} else {
61+
testing.runWithSpan("producer exception: " + ex, () -> {});
62+
}
63+
});
64+
});
65+
66+
awaitUntilConsumerIsReady();
67+
Consumer<Integer, String> wrappedConsumer = telemetry.wrap(consumer);
68+
ConsumerRecords<?, ?> records = wrappedConsumer.poll(Duration.ofSeconds(10));
69+
assertThat(records.count()).isEqualTo(1);
70+
for (ConsumerRecord<?, ?> record : records) {
71+
assertThat(record.value()).isEqualTo(greeting);
72+
assertThat(record.key()).isNull();
73+
testing.runWithSpan("process child", () -> {});
74+
}
75+
76+
assertTraces(testHeaders);
77+
}
78+
79+
abstract void configure(KafkaTelemetryBuilder builder);
80+
81+
abstract void assertTraces(boolean testHeaders);
82+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.instrumentation.kafkaclients.v2_6;
7+
8+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
9+
10+
import java.lang.reflect.InvocationHandler;
11+
import java.lang.reflect.Method;
12+
import java.lang.reflect.Proxy;
13+
import org.apache.kafka.clients.consumer.Consumer;
14+
import org.apache.kafka.clients.producer.Producer;
15+
import org.junit.jupiter.api.Test;
16+
17+
class ExceptionHandlingTest extends KafkaClientBaseTest {
18+
19+
@Test
20+
void testConsumerPropagatesException() {
21+
Consumer<?, ?> throwingConsumer =
22+
(Consumer<?, ?>)
23+
Proxy.newProxyInstance(
24+
getClass().getClassLoader(),
25+
new Class<?>[] {Consumer.class},
26+
new InvocationHandler() {
27+
@Override
28+
public Object invoke(Object proxy, Method method, Object[] args) {
29+
throw new IllegalStateException("Test exception");
30+
}
31+
});
32+
Consumer<?, ?> wrappedConsumer =
33+
KafkaTelemetry.create(testing.getOpenTelemetry()).wrap(throwingConsumer);
34+
assertThatThrownBy(() -> wrappedConsumer.poll(null))
35+
.isInstanceOf(IllegalStateException.class)
36+
.hasMessage("Test exception");
37+
}
38+
39+
@Test
40+
void testProducerPropagatesException() {
41+
Producer<?, ?> throwingProducer =
42+
(Producer<?, ?>)
43+
Proxy.newProxyInstance(
44+
getClass().getClassLoader(),
45+
new Class<?>[] {Producer.class},
46+
new InvocationHandler() {
47+
@Override
48+
public Object invoke(Object proxy, Method method, Object[] args) {
49+
throw new IllegalStateException("Test exception");
50+
}
51+
});
52+
Producer<?, ?> wrappedProducer =
53+
KafkaTelemetry.create(testing.getOpenTelemetry()).wrap(throwingProducer);
54+
assertThatThrownBy(() -> wrappedProducer.send(null))
55+
.isInstanceOf(IllegalStateException.class)
56+
.hasMessage("Test exception");
57+
}
58+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.instrumentation.kafkaclients.v2_6;
7+
8+
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;
9+
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies;
10+
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME;
11+
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID;
12+
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_KAFKA_CONSUMER_GROUP;
13+
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET;
14+
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE;
15+
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_OPERATION;
16+
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_SYSTEM;
17+
import static org.assertj.core.api.Assertions.assertThat;
18+
19+
import io.opentelemetry.api.common.AttributeKey;
20+
import io.opentelemetry.api.trace.SpanKind;
21+
import io.opentelemetry.sdk.testing.assertj.AttributeAssertion;
22+
import java.nio.charset.StandardCharsets;
23+
import java.util.ArrayList;
24+
import java.util.Arrays;
25+
import java.util.Collections;
26+
import java.util.List;
27+
import org.assertj.core.api.AbstractLongAssert;
28+
import org.assertj.core.api.AbstractStringAssert;
29+
30+
class WrapperSuppressReceiveSpansTest extends AbstractWrapperTest {
31+
32+
@Override
33+
void configure(KafkaTelemetryBuilder builder) {
34+
builder.setMessagingReceiveInstrumentationEnabled(false);
35+
}
36+
37+
@Override
38+
void assertTraces(boolean testHeaders) {
39+
testing.waitAndAssertTraces(
40+
trace ->
41+
trace.hasSpansSatisfyingExactly(
42+
span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(),
43+
span ->
44+
span.hasName(SHARED_TOPIC + " publish")
45+
.hasKind(SpanKind.PRODUCER)
46+
.hasParent(trace.getSpan(0))
47+
.hasAttributesSatisfyingExactly(sendAttributes(testHeaders)),
48+
span ->
49+
span.hasName("producer callback")
50+
.hasKind(SpanKind.INTERNAL)
51+
.hasParent(trace.getSpan(0)),
52+
span ->
53+
span.hasName(SHARED_TOPIC + " process")
54+
.hasKind(SpanKind.CONSUMER)
55+
.hasParent(trace.getSpan(1))
56+
.hasLinksSatisfying(links -> assertThat(links).isEmpty())
57+
.hasAttributesSatisfyingExactly(processAttributes(greeting, testHeaders)),
58+
span ->
59+
span.hasName("process child")
60+
.hasKind(SpanKind.INTERNAL)
61+
.hasParent(trace.getSpan(3))));
62+
}
63+
64+
@SuppressWarnings("deprecation") // using deprecated semconv
65+
protected static List<AttributeAssertion> sendAttributes(boolean testHeaders) {
66+
List<AttributeAssertion> assertions =
67+
new ArrayList<>(
68+
Arrays.asList(
69+
equalTo(MESSAGING_SYSTEM, "kafka"),
70+
equalTo(MESSAGING_DESTINATION_NAME, SHARED_TOPIC),
71+
equalTo(MESSAGING_OPERATION, "publish"),
72+
satisfies(MESSAGING_CLIENT_ID, stringAssert -> stringAssert.startsWith("producer")),
73+
satisfies(MESSAGING_DESTINATION_PARTITION_ID, AbstractStringAssert::isNotEmpty),
74+
satisfies(MESSAGING_KAFKA_MESSAGE_OFFSET, AbstractLongAssert::isNotNegative)));
75+
if (testHeaders) {
76+
assertions.add(
77+
equalTo(
78+
AttributeKey.stringArrayKey("messaging.header.Test_Message_Header"),
79+
Collections.singletonList("test")));
80+
}
81+
return assertions;
82+
}
83+
84+
@SuppressWarnings("deprecation") // using deprecated semconv
85+
private static List<AttributeAssertion> processAttributes(String greeting, boolean testHeaders) {
86+
List<AttributeAssertion> assertions =
87+
new ArrayList<>(
88+
Arrays.asList(
89+
equalTo(MESSAGING_SYSTEM, "kafka"),
90+
equalTo(MESSAGING_DESTINATION_NAME, SHARED_TOPIC),
91+
equalTo(MESSAGING_OPERATION, "process"),
92+
equalTo(
93+
MESSAGING_MESSAGE_BODY_SIZE, greeting.getBytes(StandardCharsets.UTF_8).length),
94+
satisfies(MESSAGING_DESTINATION_PARTITION_ID, AbstractStringAssert::isNotEmpty),
95+
satisfies(MESSAGING_KAFKA_MESSAGE_OFFSET, AbstractLongAssert::isNotNegative),
96+
satisfies(
97+
AttributeKey.longKey("kafka.record.queue_time_ms"),
98+
AbstractLongAssert::isNotNegative),
99+
equalTo(MESSAGING_KAFKA_CONSUMER_GROUP, "test"),
100+
satisfies(
101+
MESSAGING_CLIENT_ID, stringAssert -> stringAssert.startsWith("consumer"))));
102+
if (testHeaders) {
103+
assertions.add(
104+
equalTo(
105+
AttributeKey.stringArrayKey("messaging.header.Test_Message_Header"),
106+
Collections.singletonList("test")));
107+
}
108+
return assertions;
109+
}
110+
}

0 commit comments

Comments
 (0)