Skip to content

Commit 489a41b

Browse files
committed
fix ci && add test
1 parent 1607ea3 commit 489a41b

File tree

6 files changed

+66
-0
lines changed

6 files changed

+66
-0
lines changed

instrumentation/kafka/kafka-streams-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsBaseTest.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,10 @@ abstract class KafkaStreamsBaseTest {
5252

5353
protected static final AttributeKey<String> MESSAGING_CLIENT_ID =
5454
AttributeKey.stringKey("messaging.client_id");
55+
56+
protected static final AttributeKey<String> MESSAGING_KAFKA_BOOTSTRAP_SERVERS =
57+
AttributeKey.stringKey("messaging.kafka.bootstrap.servers");
58+
5559
protected static final String STREAM_PENDING = "test.pending";
5660
protected static final String STREAM_PROCESSED = "test.processed";
5761

instrumentation/kafka/kafka-streams-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsDefaultTest.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import org.apache.kafka.streams.KafkaStreams;
4444
import org.apache.kafka.streams.StreamsConfig;
4545
import org.apache.kafka.streams.kstream.KStream;
46+
import org.assertj.core.api.AbstractStringAssert;
4647
import org.junit.jupiter.api.DisplayName;
4748
import org.junit.jupiter.api.Test;
4849

@@ -113,6 +114,8 @@ void testKafkaProduceAndConsumeWithStreamsInBetween() throws Exception {
113114
equalTo(MESSAGING_SYSTEM, KAFKA),
114115
equalTo(MESSAGING_DESTINATION_NAME, STREAM_PENDING),
115116
equalTo(MESSAGING_OPERATION, "publish"),
117+
satisfies(
118+
MESSAGING_KAFKA_BOOTSTRAP_SERVERS, AbstractStringAssert::isNotEmpty),
116119
satisfies(MESSAGING_CLIENT_ID, k -> k.startsWith("producer")),
117120
satisfies(
118121
MESSAGING_DESTINATION_PARTITION_ID,
@@ -131,6 +134,9 @@ void testKafkaProduceAndConsumeWithStreamsInBetween() throws Exception {
131134
equalTo(MESSAGING_SYSTEM, KAFKA),
132135
equalTo(MESSAGING_DESTINATION_NAME, STREAM_PENDING),
133136
equalTo(MESSAGING_OPERATION, "receive"),
137+
satisfies(
138+
MESSAGING_KAFKA_BOOTSTRAP_SERVERS,
139+
AbstractStringAssert::isNotEmpty),
134140
satisfies(MESSAGING_CLIENT_ID, k -> k.endsWith("consumer")),
135141
equalTo(MESSAGING_BATCH_MESSAGE_COUNT, 1)));
136142
if (Boolean.getBoolean("testLatestDeps")) {
@@ -149,6 +155,9 @@ void testKafkaProduceAndConsumeWithStreamsInBetween() throws Exception {
149155
equalTo(MESSAGING_SYSTEM, KAFKA),
150156
equalTo(MESSAGING_DESTINATION_NAME, STREAM_PENDING),
151157
equalTo(MESSAGING_OPERATION, "process"),
158+
satisfies(
159+
MESSAGING_KAFKA_BOOTSTRAP_SERVERS,
160+
AbstractStringAssert::isNotEmpty),
152161
satisfies(MESSAGING_CLIENT_ID, k -> k.endsWith("consumer")),
153162
satisfies(MESSAGING_MESSAGE_BODY_SIZE, k -> k.isInstanceOf(Long.class)),
154163
satisfies(
@@ -180,6 +189,8 @@ void testKafkaProduceAndConsumeWithStreamsInBetween() throws Exception {
180189
equalTo(MESSAGING_SYSTEM, KAFKA),
181190
equalTo(MESSAGING_DESTINATION_NAME, STREAM_PROCESSED),
182191
equalTo(MESSAGING_OPERATION, "publish"),
192+
satisfies(
193+
MESSAGING_KAFKA_BOOTSTRAP_SERVERS, AbstractStringAssert::isNotEmpty),
183194
satisfies(MESSAGING_CLIENT_ID, k -> k.endsWith("producer")),
184195
satisfies(
185196
MESSAGING_DESTINATION_PARTITION_ID,
@@ -198,6 +209,9 @@ void testKafkaProduceAndConsumeWithStreamsInBetween() throws Exception {
198209
equalTo(MESSAGING_SYSTEM, KAFKA),
199210
equalTo(MESSAGING_DESTINATION_NAME, STREAM_PROCESSED),
200211
equalTo(MESSAGING_OPERATION, "receive"),
212+
satisfies(
213+
MESSAGING_KAFKA_BOOTSTRAP_SERVERS,
214+
AbstractStringAssert::isNotEmpty),
201215
satisfies(MESSAGING_CLIENT_ID, k -> k.startsWith("consumer")),
202216
equalTo(MESSAGING_BATCH_MESSAGE_COUNT, 1)));
203217
if (Boolean.getBoolean("testLatestDeps")) {
@@ -216,6 +230,9 @@ void testKafkaProduceAndConsumeWithStreamsInBetween() throws Exception {
216230
equalTo(MESSAGING_SYSTEM, KAFKA),
217231
equalTo(MESSAGING_DESTINATION_NAME, STREAM_PROCESSED),
218232
equalTo(MESSAGING_OPERATION, "process"),
233+
satisfies(
234+
MESSAGING_KAFKA_BOOTSTRAP_SERVERS,
235+
AbstractStringAssert::isNotEmpty),
219236
satisfies(MESSAGING_CLIENT_ID, k -> k.startsWith("consumer")),
220237
satisfies(
221238
MESSAGING_MESSAGE_BODY_SIZE, k -> k.isInstanceOf(Long.class)),

instrumentation/kafka/kafka-streams-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsSuppressReceiveSpansTest.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.apache.kafka.streams.KafkaStreams;
4141
import org.apache.kafka.streams.StreamsConfig;
4242
import org.apache.kafka.streams.kstream.KStream;
43+
import org.assertj.core.api.AbstractStringAssert;
4344
import org.junit.jupiter.api.DisplayName;
4445
import org.junit.jupiter.api.Test;
4546

@@ -103,6 +104,9 @@ void testKafkaProduceAndConsumeWithStreamsInBetween() throws Exception {
103104
equalTo(MESSAGING_DESTINATION_NAME, STREAM_PENDING),
104105
equalTo(MESSAGING_OPERATION, "publish"),
105106
equalTo(MESSAGING_CLIENT_ID, "producer-1"),
107+
satisfies(
108+
MESSAGING_KAFKA_BOOTSTRAP_SERVERS,
109+
AbstractStringAssert::isNotEmpty),
106110
satisfies(
107111
MESSAGING_DESTINATION_PARTITION_ID,
108112
k -> k.isInstanceOf(String.class)),
@@ -116,6 +120,9 @@ void testKafkaProduceAndConsumeWithStreamsInBetween() throws Exception {
116120
equalTo(MESSAGING_SYSTEM, KAFKA),
117121
equalTo(MESSAGING_DESTINATION_NAME, STREAM_PENDING),
118122
equalTo(MESSAGING_OPERATION, "process"),
123+
satisfies(
124+
MESSAGING_KAFKA_BOOTSTRAP_SERVERS,
125+
AbstractStringAssert::isNotEmpty),
119126
satisfies(MESSAGING_CLIENT_ID, k -> k.endsWith("consumer")),
120127
satisfies(
121128
MESSAGING_MESSAGE_BODY_SIZE, k -> k.isInstanceOf(Long.class)),
@@ -148,6 +155,8 @@ void testKafkaProduceAndConsumeWithStreamsInBetween() throws Exception {
148155
equalTo(MESSAGING_SYSTEM, KAFKA),
149156
equalTo(MESSAGING_DESTINATION_NAME, STREAM_PROCESSED),
150157
equalTo(MESSAGING_OPERATION, "publish"),
158+
satisfies(
159+
MESSAGING_KAFKA_BOOTSTRAP_SERVERS, AbstractStringAssert::isNotEmpty),
151160
satisfies(MESSAGING_CLIENT_ID, k -> k.isInstanceOf(String.class)),
152161
satisfies(
153162
MESSAGING_DESTINATION_PARTITION_ID,
@@ -162,6 +171,9 @@ void testKafkaProduceAndConsumeWithStreamsInBetween() throws Exception {
162171
equalTo(MESSAGING_SYSTEM, KAFKA),
163172
equalTo(MESSAGING_DESTINATION_NAME, STREAM_PROCESSED),
164173
equalTo(MESSAGING_OPERATION, "process"),
174+
satisfies(
175+
MESSAGING_KAFKA_BOOTSTRAP_SERVERS,
176+
AbstractStringAssert::isNotEmpty),
165177
satisfies(MESSAGING_CLIENT_ID, k -> k.startsWith("consumer")),
166178
satisfies(
167179
MESSAGING_MESSAGE_BODY_SIZE, k -> k.isInstanceOf(Long.class)),

instrumentation/reactor/reactor-kafka-1.0/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/AbstractReactorKafkaTest.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,9 @@ public abstract class AbstractReactorKafkaTest {
7171

7272
@RegisterExtension static final AutoCleanupExtension cleanup = AutoCleanupExtension.create();
7373

74+
protected static final AttributeKey<String> MESSAGING_KAFKA_BOOTSTRAP_SERVERS =
75+
AttributeKey.stringKey("messaging.kafka.bootstrap.servers");
76+
7477
static KafkaContainer kafka;
7578
protected static KafkaSender<String, String> sender;
7679
protected static KafkaReceiver<String, String> receiver;
@@ -192,6 +195,7 @@ protected static List<AttributeAssertion> sendAttributes(ProducerRecord<String,
192195
equalTo(MESSAGING_SYSTEM, "kafka"),
193196
equalTo(MESSAGING_DESTINATION_NAME, record.topic()),
194197
equalTo(MESSAGING_OPERATION, "publish"),
198+
satisfies(MESSAGING_KAFKA_BOOTSTRAP_SERVERS, AbstractStringAssert::isNotEmpty),
195199
satisfies(
196200
AttributeKey.stringKey("messaging.client_id"),
197201
stringAssert -> stringAssert.startsWith("producer")),
@@ -212,6 +216,7 @@ protected static List<AttributeAssertion> receiveAttributes(String topic) {
212216
equalTo(MESSAGING_SYSTEM, "kafka"),
213217
equalTo(MESSAGING_DESTINATION_NAME, topic),
214218
equalTo(MESSAGING_OPERATION, "receive"),
219+
satisfies(MESSAGING_KAFKA_BOOTSTRAP_SERVERS, AbstractStringAssert::isNotEmpty),
215220
satisfies(
216221
AttributeKey.stringKey("messaging.client_id"),
217222
stringAssert -> stringAssert.startsWith("consumer")),
@@ -231,6 +236,7 @@ protected static List<AttributeAssertion> processAttributes(
231236
equalTo(MESSAGING_SYSTEM, "kafka"),
232237
equalTo(MESSAGING_DESTINATION_NAME, record.topic()),
233238
equalTo(MESSAGING_OPERATION, "process"),
239+
satisfies(MESSAGING_KAFKA_BOOTSTRAP_SERVERS, AbstractStringAssert::isNotEmpty),
234240
satisfies(
235241
AttributeKey.stringKey("messaging.client_id"),
236242
stringAssert -> stringAssert.startsWith("consumer")),

instrumentation/spring/spring-kafka-2.7/testing/src/main/java/io/opentelemetry/testing/AbstractSpringKafkaNoReceiveTelemetryTest.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,9 @@ void shouldCreateSpansForSingleRecordProcess() {
6464
equalTo(MESSAGING_SYSTEM, "kafka"),
6565
equalTo(MESSAGING_DESTINATION_NAME, "testSingleTopic"),
6666
equalTo(MESSAGING_OPERATION, "publish"),
67+
satisfies(
68+
MESSAGING_KAFKA_BOOTSTRAP_SERVERS,
69+
AbstractStringAssert::isNotEmpty),
6770
satisfies(
6871
MESSAGING_CLIENT_ID,
6972
stringAssert -> stringAssert.startsWith("producer")),
@@ -83,6 +86,9 @@ void shouldCreateSpansForSingleRecordProcess() {
8386
equalTo(MESSAGING_SYSTEM, "kafka"),
8487
equalTo(MESSAGING_DESTINATION_NAME, "testSingleTopic"),
8588
equalTo(MESSAGING_OPERATION, "process"),
89+
satisfies(
90+
MESSAGING_KAFKA_BOOTSTRAP_SERVERS,
91+
AbstractStringAssert::isNotEmpty),
8692
satisfies(
8793
MESSAGING_MESSAGE_BODY_SIZE, AbstractLongAssert::isNotNegative),
8894
satisfies(
@@ -118,6 +124,7 @@ void shouldHandleFailureInKafkaListener() {
118124
equalTo(MESSAGING_SYSTEM, "kafka"),
119125
equalTo(MESSAGING_DESTINATION_NAME, "testSingleTopic"),
120126
equalTo(MESSAGING_OPERATION, "process"),
127+
satisfies(MESSAGING_KAFKA_BOOTSTRAP_SERVERS, AbstractStringAssert::isNotEmpty),
121128
satisfies(MESSAGING_MESSAGE_BODY_SIZE, AbstractLongAssert::isNotNegative),
122129
satisfies(MESSAGING_DESTINATION_PARTITION_ID, AbstractStringAssert::isNotEmpty),
123130
satisfies(MESSAGING_KAFKA_MESSAGE_OFFSET, AbstractLongAssert::isNotNegative),
@@ -138,6 +145,9 @@ void shouldHandleFailureInKafkaListener() {
138145
equalTo(MESSAGING_SYSTEM, "kafka"),
139146
equalTo(MESSAGING_DESTINATION_NAME, "testSingleTopic"),
140147
equalTo(MESSAGING_OPERATION, "publish"),
148+
satisfies(
149+
MESSAGING_KAFKA_BOOTSTRAP_SERVERS,
150+
AbstractStringAssert::isNotEmpty),
141151
satisfies(
142152
MESSAGING_CLIENT_ID,
143153
stringAssert -> stringAssert.startsWith("producer")),
@@ -198,6 +208,9 @@ void shouldCreateSpansForBatchReceiveAndProcess() throws InterruptedException {
198208
equalTo(MESSAGING_SYSTEM, "kafka"),
199209
equalTo(MESSAGING_DESTINATION_NAME, "testBatchTopic"),
200210
equalTo(MESSAGING_OPERATION, "publish"),
211+
satisfies(
212+
MESSAGING_KAFKA_BOOTSTRAP_SERVERS,
213+
AbstractStringAssert::isNotEmpty),
201214
satisfies(
202215
MESSAGING_CLIENT_ID,
203216
stringAssert -> stringAssert.startsWith("producer")),
@@ -216,6 +229,9 @@ void shouldCreateSpansForBatchReceiveAndProcess() throws InterruptedException {
216229
equalTo(MESSAGING_SYSTEM, "kafka"),
217230
equalTo(MESSAGING_DESTINATION_NAME, "testBatchTopic"),
218231
equalTo(MESSAGING_OPERATION, "publish"),
232+
satisfies(
233+
MESSAGING_KAFKA_BOOTSTRAP_SERVERS,
234+
AbstractStringAssert::isNotEmpty),
219235
satisfies(
220236
MESSAGING_CLIENT_ID,
221237
stringAssert -> stringAssert.startsWith("producer")),
@@ -245,6 +261,9 @@ void shouldCreateSpansForBatchReceiveAndProcess() throws InterruptedException {
245261
equalTo(MESSAGING_DESTINATION_NAME, "testBatchTopic"),
246262
equalTo(MESSAGING_OPERATION, "process"),
247263
equalTo(MESSAGING_KAFKA_CONSUMER_GROUP, "testBatchListener"),
264+
satisfies(
265+
MESSAGING_KAFKA_BOOTSTRAP_SERVERS,
266+
AbstractStringAssert::isNotEmpty),
248267
satisfies(
249268
MESSAGING_CLIENT_ID,
250269
stringAssert -> stringAssert.startsWith("consumer")),
@@ -273,6 +292,7 @@ void shouldHandleFailureInKafkaBatchListener() {
273292
equalTo(MESSAGING_DESTINATION_NAME, "testBatchTopic"),
274293
equalTo(MESSAGING_OPERATION, "process"),
275294
equalTo(MESSAGING_KAFKA_CONSUMER_GROUP, "testBatchListener"),
295+
satisfies(MESSAGING_KAFKA_BOOTSTRAP_SERVERS, AbstractStringAssert::isNotEmpty),
276296
satisfies(MESSAGING_CLIENT_ID, stringAssert -> stringAssert.startsWith("consumer")),
277297
equalTo(MESSAGING_BATCH_MESSAGE_COUNT, 1));
278298

@@ -290,6 +310,9 @@ void shouldHandleFailureInKafkaBatchListener() {
290310
equalTo(MESSAGING_SYSTEM, "kafka"),
291311
equalTo(MESSAGING_DESTINATION_NAME, "testBatchTopic"),
292312
equalTo(MESSAGING_OPERATION, "publish"),
313+
satisfies(
314+
MESSAGING_KAFKA_BOOTSTRAP_SERVERS,
315+
AbstractStringAssert::isNotEmpty),
293316
satisfies(
294317
MESSAGING_CLIENT_ID,
295318
stringAssert -> stringAssert.startsWith("producer")),

instrumentation/spring/spring-kafka-2.7/testing/src/main/java/io/opentelemetry/testing/AbstractSpringKafkaTest.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,10 @@ public abstract class AbstractSpringKafkaTest {
4141

4242
protected static final AttributeKey<String> MESSAGING_CLIENT_ID =
4343
AttributeKey.stringKey("messaging.client_id");
44+
45+
protected static final AttributeKey<String> MESSAGING_KAFKA_BOOTSTRAP_SERVERS =
46+
AttributeKey.stringKey("messaging.kafka.bootstrap.servers");
47+
4448
static KafkaContainer kafka;
4549

4650
ConfigurableApplicationContext applicationContext;

0 commit comments

Comments
 (0)