Skip to content

Commit 9c0570a

Browse files
committed
fix ci & test
1 parent 9f393c0 commit 9c0570a

File tree

15 files changed

+149
-445
lines changed

15 files changed

+149
-445
lines changed

instrumentation/kafka/kafka-clients/kafka-clients-0.11/testing/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaClientBaseTest.java

Lines changed: 18 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,21 @@ public abstract class KafkaClientBaseTest {
6969
protected static final AttributeKey<List<String>> MESSAGING_KAFKA_BOOTSTRAP_SERVERS =
7070
AttributeKey.stringArrayKey("messaging.kafka.bootstrap.servers");
7171

72+
protected static AttributeAssertion bootstrapServersAssertion() {
73+
return satisfies(
74+
MESSAGING_KAFKA_BOOTSTRAP_SERVERS,
75+
listAssert -> {
76+
if (Boolean.getBoolean("otel.instrumentation.kafka.experimental-span-attributes")) {
77+
listAssert
78+
.isNotEmpty()
79+
.allSatisfy(
80+
server -> org.assertj.core.api.Assertions.assertThat(server).isNotEmpty());
81+
} else {
82+
listAssert.isNullOrEmpty();
83+
}
84+
});
85+
}
86+
7287
private KafkaContainer kafka;
7388
protected Producer<Integer, String> producer;
7489
protected Consumer<Integer, String> consumer;
@@ -179,6 +194,7 @@ protected static List<AttributeAssertion> sendAttributes(
179194
equalTo(MESSAGING_SYSTEM, "kafka"),
180195
equalTo(MESSAGING_DESTINATION_NAME, SHARED_TOPIC),
181196
equalTo(MESSAGING_OPERATION, "publish"),
197+
bootstrapServersAssertion(),
182198
satisfies(MESSAGING_CLIENT_ID, stringAssert -> stringAssert.startsWith("producer")),
183199
satisfies(MESSAGING_DESTINATION_PARTITION_ID, AbstractStringAssert::isNotEmpty),
184200
satisfies(MESSAGING_KAFKA_MESSAGE_OFFSET, AbstractLongAssert::isNotNegative)));
@@ -188,17 +204,6 @@ protected static List<AttributeAssertion> sendAttributes(
188204
if (messageValue == null) {
189205
assertions.add(equalTo(MESSAGING_KAFKA_MESSAGE_TOMBSTONE, true));
190206
}
191-
if (Boolean.getBoolean("otel.instrumentation.kafka.experimental-span-attributes")) {
192-
assertions.add(
193-
satisfies(
194-
MESSAGING_KAFKA_BOOTSTRAP_SERVERS,
195-
listAssert ->
196-
listAssert
197-
.isNotEmpty()
198-
.allSatisfy(
199-
server ->
200-
org.assertj.core.api.Assertions.assertThat(server).isNotEmpty())));
201-
}
202207
if (testHeaders) {
203208
assertions.add(
204209
equalTo(
@@ -216,23 +221,13 @@ protected static List<AttributeAssertion> receiveAttributes(boolean testHeaders)
216221
equalTo(MESSAGING_SYSTEM, "kafka"),
217222
equalTo(MESSAGING_DESTINATION_NAME, SHARED_TOPIC),
218223
equalTo(MESSAGING_OPERATION, "receive"),
224+
bootstrapServersAssertion(),
219225
satisfies(MESSAGING_CLIENT_ID, stringAssert -> stringAssert.startsWith("consumer")),
220226
satisfies(MESSAGING_BATCH_MESSAGE_COUNT, AbstractLongAssert::isPositive)));
221227
// consumer group is not available in version 0.11
222228
if (Boolean.getBoolean("testLatestDeps")) {
223229
assertions.add(equalTo(MESSAGING_KAFKA_CONSUMER_GROUP, "test"));
224230
}
225-
if (Boolean.getBoolean("otel.instrumentation.kafka.experimental-span-attributes")) {
226-
assertions.add(
227-
satisfies(
228-
MESSAGING_KAFKA_BOOTSTRAP_SERVERS,
229-
listAssert ->
230-
listAssert
231-
.isNotEmpty()
232-
.allSatisfy(
233-
server ->
234-
org.assertj.core.api.Assertions.assertThat(server).isNotEmpty())));
235-
}
236231
if (testHeaders) {
237232
assertions.add(
238233
equalTo(
@@ -251,6 +246,7 @@ protected static List<AttributeAssertion> processAttributes(
251246
equalTo(MESSAGING_SYSTEM, "kafka"),
252247
equalTo(MESSAGING_DESTINATION_NAME, SHARED_TOPIC),
253248
equalTo(MESSAGING_OPERATION, "process"),
249+
bootstrapServersAssertion(),
254250
satisfies(MESSAGING_CLIENT_ID, stringAssert -> stringAssert.startsWith("consumer")),
255251
satisfies(MESSAGING_DESTINATION_PARTITION_ID, AbstractStringAssert::isNotEmpty),
256252
satisfies(MESSAGING_KAFKA_MESSAGE_OFFSET, AbstractLongAssert::isNotNegative),
@@ -261,17 +257,6 @@ protected static List<AttributeAssertion> processAttributes(
261257
if (Boolean.getBoolean("testLatestDeps")) {
262258
assertions.add(equalTo(MESSAGING_KAFKA_CONSUMER_GROUP, "test"));
263259
}
264-
if (Boolean.getBoolean("otel.instrumentation.kafka.experimental-span-attributes")) {
265-
assertions.add(
266-
satisfies(
267-
MESSAGING_KAFKA_BOOTSTRAP_SERVERS,
268-
listAssert ->
269-
listAssert
270-
.isNotEmpty()
271-
.allSatisfy(
272-
server ->
273-
org.assertj.core.api.Assertions.assertThat(server).isNotEmpty())));
274-
}
275260
if (messageKey != null) {
276261
assertions.add(equalTo(MESSAGING_KAFKA_MESSAGE_KEY, messageKey));
277262
}

instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/AbstractWrapperTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ void testWrappers(boolean testHeaders) throws InterruptedException {
3737
.setCapturedHeaders(singletonList("test-message-header"))
3838
// TODO run tests both with and without experimental span attributes
3939
.setCaptureExperimentalSpanAttributes(true);
40+
System.setProperty(
41+
"otel.instrumentation.kafka.experimental-span-attributes", String.valueOf(true));
4042
configure(telemetryBuilder);
4143
KafkaTelemetry telemetry = telemetryBuilder.build();
4244

instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/InterceptorsSuppressReceiveSpansTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ void assertTraces() {
3838
equalTo(MESSAGING_SYSTEM, "kafka"),
3939
equalTo(MESSAGING_DESTINATION_NAME, SHARED_TOPIC),
4040
equalTo(MESSAGING_OPERATION, "publish"),
41+
bootstrapServersAssertion(),
4142
satisfies(
4243
MESSAGING_CLIENT_ID,
4344
stringAssert -> stringAssert.startsWith("producer"))),
@@ -52,6 +53,7 @@ void assertTraces() {
5253
equalTo(
5354
MESSAGING_MESSAGE_BODY_SIZE,
5455
greeting.getBytes(StandardCharsets.UTF_8).length),
56+
bootstrapServersAssertion(),
5557
satisfies(
5658
MESSAGING_DESTINATION_PARTITION_ID,
5759
AbstractStringAssert::isNotEmpty),

instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/InterceptorsTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ void assertTraces() {
6868
equalTo(MESSAGING_DESTINATION_NAME, SHARED_TOPIC),
6969
equalTo(MESSAGING_OPERATION, "receive"),
7070
equalTo(MESSAGING_KAFKA_CONSUMER_GROUP, "test"),
71+
bootstrapServersAssertion(),
7172
satisfies(
7273
MESSAGING_CLIENT_ID,
7374
stringAssert -> stringAssert.startsWith("consumer")),
@@ -81,6 +82,7 @@ void assertTraces() {
8182
equalTo(MESSAGING_SYSTEM, "kafka"),
8283
equalTo(MESSAGING_DESTINATION_NAME, SHARED_TOPIC),
8384
equalTo(MESSAGING_OPERATION, "process"),
85+
bootstrapServersAssertion(),
8486
equalTo(
8587
MESSAGING_MESSAGE_BODY_SIZE,
8688
greeting.getBytes(StandardCharsets.UTF_8).length),

instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/WrapperSuppressReceiveSpansTest.java

Lines changed: 2 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -67,15 +67,7 @@ protected static List<AttributeAssertion> sendAttributes(boolean testHeaders) {
6767
equalTo(MESSAGING_SYSTEM, "kafka"),
6868
equalTo(MESSAGING_DESTINATION_NAME, SHARED_TOPIC),
6969
equalTo(MESSAGING_OPERATION, "publish"),
70-
satisfies(
71-
MESSAGING_KAFKA_BOOTSTRAP_SERVERS,
72-
listAssert ->
73-
listAssert
74-
.isNotEmpty()
75-
.allSatisfy(
76-
server ->
77-
org.assertj.core.api.Assertions.assertThat(server)
78-
.isNotEmpty())),
70+
bootstrapServersAssertion(),
7971
satisfies(MESSAGING_CLIENT_ID, stringAssert -> stringAssert.startsWith("producer")),
8072
satisfies(MESSAGING_DESTINATION_PARTITION_ID, AbstractStringAssert::isNotEmpty),
8173
satisfies(MESSAGING_KAFKA_MESSAGE_OFFSET, AbstractLongAssert::isNotNegative)));
@@ -98,15 +90,7 @@ private static List<AttributeAssertion> processAttributes(String greeting, boole
9890
equalTo(MESSAGING_OPERATION, "process"),
9991
equalTo(
10092
MESSAGING_MESSAGE_BODY_SIZE, greeting.getBytes(StandardCharsets.UTF_8).length),
101-
satisfies(
102-
MESSAGING_KAFKA_BOOTSTRAP_SERVERS,
103-
listAssert ->
104-
listAssert
105-
.isNotEmpty()
106-
.allSatisfy(
107-
server ->
108-
org.assertj.core.api.Assertions.assertThat(server)
109-
.isNotEmpty())),
93+
bootstrapServersAssertion(),
11094
satisfies(MESSAGING_DESTINATION_PARTITION_ID, AbstractStringAssert::isNotEmpty),
11195
satisfies(MESSAGING_KAFKA_MESSAGE_OFFSET, AbstractLongAssert::isNotNegative),
11296
satisfies(

instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/WrapperTest.java

Lines changed: 3 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -90,15 +90,7 @@ protected static List<AttributeAssertion> sendAttributes(boolean testHeaders) {
9090
equalTo(MESSAGING_SYSTEM, "kafka"),
9191
equalTo(MESSAGING_DESTINATION_NAME, SHARED_TOPIC),
9292
equalTo(MESSAGING_OPERATION, "publish"),
93-
satisfies(
94-
MESSAGING_KAFKA_BOOTSTRAP_SERVERS,
95-
listAssert ->
96-
listAssert
97-
.isNotEmpty()
98-
.allSatisfy(
99-
server ->
100-
org.assertj.core.api.Assertions.assertThat(server)
101-
.isNotEmpty())),
93+
bootstrapServersAssertion(),
10294
satisfies(MESSAGING_CLIENT_ID, stringAssert -> stringAssert.startsWith("producer")),
10395
satisfies(MESSAGING_DESTINATION_PARTITION_ID, AbstractStringAssert::isNotEmpty),
10496
satisfies(MESSAGING_KAFKA_MESSAGE_OFFSET, AbstractLongAssert::isNotNegative)));
@@ -121,15 +113,7 @@ private static List<AttributeAssertion> processAttributes(String greeting, boole
121113
equalTo(MESSAGING_OPERATION, "process"),
122114
equalTo(
123115
MESSAGING_MESSAGE_BODY_SIZE, greeting.getBytes(StandardCharsets.UTF_8).length),
124-
satisfies(
125-
MESSAGING_KAFKA_BOOTSTRAP_SERVERS,
126-
listAssert ->
127-
listAssert
128-
.isNotEmpty()
129-
.allSatisfy(
130-
server ->
131-
org.assertj.core.api.Assertions.assertThat(server)
132-
.isNotEmpty())),
116+
bootstrapServersAssertion(),
133117
satisfies(MESSAGING_DESTINATION_PARTITION_ID, AbstractStringAssert::isNotEmpty),
134118
satisfies(MESSAGING_KAFKA_MESSAGE_OFFSET, AbstractLongAssert::isNotNegative),
135119
satisfies(
@@ -156,15 +140,7 @@ protected static List<AttributeAssertion> receiveAttributes(boolean testHeaders)
156140
equalTo(MESSAGING_DESTINATION_NAME, SHARED_TOPIC),
157141
equalTo(MESSAGING_OPERATION, "receive"),
158142
equalTo(MESSAGING_KAFKA_CONSUMER_GROUP, "test"),
159-
satisfies(
160-
MESSAGING_KAFKA_BOOTSTRAP_SERVERS,
161-
listAssert ->
162-
listAssert
163-
.isNotEmpty()
164-
.allSatisfy(
165-
server ->
166-
org.assertj.core.api.Assertions.assertThat(server)
167-
.isNotEmpty())),
143+
bootstrapServersAssertion(),
168144
satisfies(MESSAGING_CLIENT_ID, stringAssert -> stringAssert.startsWith("consumer")),
169145
equalTo(MESSAGING_BATCH_MESSAGE_COUNT, 1)));
170146
if (testHeaders) {

instrumentation/kafka/kafka-streams-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsBaseTest.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
package io.opentelemetry.javaagent.instrumentation.kafkastreams;
77

8+
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies;
89
import static java.util.Arrays.asList;
910
import static java.util.Collections.singleton;
1011

@@ -15,6 +16,7 @@
1516
import io.opentelemetry.context.propagation.TextMapGetter;
1617
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
1718
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
19+
import io.opentelemetry.sdk.testing.assertj.AttributeAssertion;
1820
import java.nio.charset.StandardCharsets;
1921
import java.time.Duration;
2022
import java.util.Collection;
@@ -57,6 +59,21 @@ abstract class KafkaStreamsBaseTest {
5759
protected static final AttributeKey<List<String>> MESSAGING_KAFKA_BOOTSTRAP_SERVERS =
5860
AttributeKey.stringArrayKey("messaging.kafka.bootstrap.servers");
5961

62+
protected static AttributeAssertion bootstrapServersAssertion() {
63+
return satisfies(
64+
MESSAGING_KAFKA_BOOTSTRAP_SERVERS,
65+
listAssert -> {
66+
if (Boolean.getBoolean("otel.instrumentation.kafka.experimental-span-attributes")) {
67+
listAssert
68+
.isNotEmpty()
69+
.allSatisfy(
70+
server -> org.assertj.core.api.Assertions.assertThat(server).isNotEmpty());
71+
} else {
72+
listAssert.isNullOrEmpty();
73+
}
74+
});
75+
}
76+
6077
protected static final String STREAM_PENDING = "test.pending";
6178
protected static final String STREAM_PROCESSED = "test.processed";
6279

0 commit comments

Comments
 (0)