Skip to content

Commit 5acff52

Browse files
committed
add kafka stream test
1 parent 6c2bd01 commit 5acff52

File tree

3 files changed

+462
-131
lines changed

3 files changed

+462
-131
lines changed

instrumentation/kafka/kafka-streams-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsBaseTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141
import org.testcontainers.kafka.KafkaContainer;
4242
import org.testcontainers.utility.DockerImageName;
4343

44-
class KafkaStreamsBaseTest {
44+
abstract class KafkaStreamsBaseTest {
4545
private static final Logger logger =
4646
LoggerFactory.getLogger("io.opentelemetry.KafkaStreamsBaseTest");
4747

instrumentation/kafka/kafka-streams-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsDefaultTest.java

Lines changed: 162 additions & 130 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import static io.opentelemetry.api.common.AttributeKey.stringKey;
1010
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;
1111
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies;
12+
import static java.util.Arrays.asList;
1213
import static org.assertj.core.api.Assertions.assertThat;
1314
import static org.junit.jupiter.api.Assertions.assertEquals;
1415

@@ -19,14 +20,17 @@
1920
import io.opentelemetry.context.Context;
2021
import io.opentelemetry.context.propagation.TextMapGetter;
2122
import io.opentelemetry.instrumentation.testing.util.TelemetryDataUtil;
23+
import io.opentelemetry.sdk.testing.assertj.AttributeAssertion;
2224
import io.opentelemetry.sdk.trace.data.LinkData;
2325
import io.opentelemetry.sdk.trace.data.SpanData;
2426
import io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes;
2527
import java.lang.reflect.InvocationTargetException;
2628
import java.lang.reflect.Method;
29+
import java.nio.charset.StandardCharsets;
2730
import java.time.Duration;
2831
import java.util.Collections;
2932
import java.util.List;
33+
import java.util.Locale;
3034
import java.util.Properties;
3135
import java.util.concurrent.ExecutionException;
3236
import java.util.concurrent.TimeoutException;
@@ -46,6 +50,38 @@
4650

4751
class KafkaStreamsDefaultTest extends KafkaStreamsBaseTest {
4852

53+
@SuppressWarnings("ClassNewInstance")
54+
private static @NotNull Object createBuilder()
55+
throws InstantiationException, IllegalAccessException, ClassNotFoundException {
56+
Object builder;
57+
try {
58+
// Different class names for test and latestDepTest.
59+
builder = Class.forName("org.apache.kafka.streams.kstream.KStreamBuilder").newInstance();
60+
} catch (ClassNotFoundException | NoClassDefFoundError e) {
61+
builder = Class.forName("org.apache.kafka.streams.StreamsBuilder").newInstance();
62+
}
63+
return builder;
64+
}
65+
66+
@SuppressWarnings("unchecked")
67+
private static KStream<Integer, String> stream(Object builder)
68+
throws IllegalAccessException,
69+
InvocationTargetException,
70+
NoSuchMethodException,
71+
ClassNotFoundException {
72+
Method streamMethod;
73+
try {
74+
streamMethod =
75+
Class.forName("org.apache.kafka.streams.kstream.KStreamBuilder")
76+
.getMethod("stream", String[].class);
77+
} catch (ClassNotFoundException e) {
78+
streamMethod =
79+
Class.forName("org.apache.kafka.streams.StreamsBuilder")
80+
.getMethod("stream", String.class);
81+
}
82+
return (KStream<Integer, String>) streamMethod.invoke(builder, STREAM_PENDING);
83+
}
84+
4985
@DisplayName("test kafka produce and consume with streams in-between")
5086
@Test
5187
void testKafkaProduceAndConsumeWithStreamsInBetween()
@@ -71,7 +107,7 @@ void testKafkaProduceAndConsumeWithStreamsInBetween()
71107
textLines.mapValues(
72108
textLine -> {
73109
Span.current().setAttribute("asdf", "testing");
74-
return textLine.toLowerCase();
110+
return textLine.toLowerCase(Locale.ROOT);
75111
});
76112

77113
KafkaStreams streams = null;
@@ -99,7 +135,7 @@ void testKafkaProduceAndConsumeWithStreamsInBetween()
99135
Span.current().setAttribute("testing", 123);
100136

101137
assertEquals(10, record.key());
102-
assertEquals(greeting.toLowerCase(), record.value());
138+
assertEquals(greeting.toLowerCase(Locale.ROOT), record.value());
103139

104140
if (receivedHeaders == null) {
105141
receivedHeaders = record.headers();
@@ -141,51 +177,65 @@ void testKafkaProduceAndConsumeWithStreamsInBetween()
141177
trace -> {
142178
trace.hasSpansSatisfyingExactly(
143179
// kafka-clients CONSUMER receive
144-
span ->
145-
span.hasName(STREAM_PENDING + " receive")
146-
.hasKind(SpanKind.CONSUMER)
147-
.hasNoParent()
148-
.hasAttributesSatisfyingExactly(
149-
equalTo(MessagingIncubatingAttributes.MESSAGING_SYSTEM, "kafka"),
150-
equalTo(
151-
MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME,
152-
STREAM_PENDING),
153-
equalTo(MessagingIncubatingAttributes.MESSAGING_OPERATION, "receive"),
154-
satisfies(
155-
MessagingIncubatingAttributes.MESSAGING_CLIENT_ID,
156-
k -> k.startsWith("consumer")),
157-
equalTo(MessagingIncubatingAttributes.MESSAGING_BATCH_MESSAGE_COUNT, 1)
158-
// todo
159-
),
180+
span -> {
181+
List<AttributeAssertion> assertions =
182+
asList(
183+
equalTo(MessagingIncubatingAttributes.MESSAGING_SYSTEM, "kafka"),
184+
equalTo(
185+
MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME,
186+
STREAM_PENDING),
187+
equalTo(MessagingIncubatingAttributes.MESSAGING_OPERATION, "receive"),
188+
satisfies(
189+
MessagingIncubatingAttributes.MESSAGING_CLIENT_ID,
190+
k -> k.startsWith("consumer")),
191+
equalTo(MessagingIncubatingAttributes.MESSAGING_BATCH_MESSAGE_COUNT, 1));
192+
if (Boolean.getBoolean("testLatestDeps")) {
193+
assertions.add(
194+
equalTo(
195+
MessagingIncubatingAttributes.MESSAGING_KAFKA_CONSUMER_GROUP,
196+
"test-application"));
197+
}
198+
span.hasName(STREAM_PENDING + " receive")
199+
.hasKind(SpanKind.CONSUMER)
200+
.hasNoParent()
201+
.hasAttributesSatisfyingExactly(assertions);
202+
},
160203
// kafka-stream CONSUMER
161-
span ->
162-
span.hasName(STREAM_PENDING + " process")
163-
.hasKind(SpanKind.CONSUMER)
164-
.hasParent(trace.getSpan(0))
165-
.hasLinks(LinkData.create(producerPendingRef.get().getSpanContext()))
166-
.hasAttributesSatisfyingExactly(
167-
equalTo(MessagingIncubatingAttributes.MESSAGING_SYSTEM, "kafka"),
168-
equalTo(
169-
MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME,
170-
STREAM_PENDING),
171-
equalTo(MessagingIncubatingAttributes.MESSAGING_OPERATION, "process"),
172-
satisfies(
173-
MessagingIncubatingAttributes.MESSAGING_CLIENT_ID,
174-
k -> k.endsWith("consumer")),
175-
satisfies(
176-
MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE,
177-
k -> k.isInstanceOf(Long.class)),
178-
satisfies(
179-
MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID,
180-
k -> k.isInstanceOf(String.class)),
181-
equalTo(MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET, 0),
182-
equalTo(MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_KEY, "10"),
183-
satisfies(
184-
longKey("kafka.record.queue_time_ms"),
185-
k -> k.isGreaterThanOrEqualTo(0)),
186-
equalTo(stringKey("asdf"), "testing")
187-
// todo
188-
),
204+
span -> {
205+
List<AttributeAssertion> assertions =
206+
asList(
207+
equalTo(MessagingIncubatingAttributes.MESSAGING_SYSTEM, "kafka"),
208+
equalTo(
209+
MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME,
210+
STREAM_PENDING),
211+
equalTo(MessagingIncubatingAttributes.MESSAGING_OPERATION, "process"),
212+
satisfies(
213+
MessagingIncubatingAttributes.MESSAGING_CLIENT_ID,
214+
k -> k.endsWith("consumer")),
215+
satisfies(
216+
MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE,
217+
k -> k.isInstanceOf(Long.class)),
218+
satisfies(
219+
MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID,
220+
k -> k.isInstanceOf(String.class)),
221+
equalTo(MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET, 0),
222+
equalTo(MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_KEY, "10"),
223+
satisfies(
224+
longKey("kafka.record.queue_time_ms"),
225+
k -> k.isGreaterThanOrEqualTo(0)),
226+
equalTo(stringKey("asdf"), "testing"));
227+
if (Boolean.getBoolean("testLatestDeps")) {
228+
assertions.add(
229+
equalTo(
230+
MessagingIncubatingAttributes.MESSAGING_KAFKA_CONSUMER_GROUP,
231+
"test-application"));
232+
}
233+
span.hasName(STREAM_PENDING + " process")
234+
.hasKind(SpanKind.CONSUMER)
235+
.hasParent(trace.getSpan(0))
236+
.hasLinks(LinkData.create(producerPendingRef.get().getSpanContext()))
237+
.hasAttributesSatisfyingExactly(assertions);
238+
},
189239
// kafka-clients PRODUCER
190240
span ->
191241
span.hasName(STREAM_PROCESSED + " publish")
@@ -211,75 +261,89 @@ void testKafkaProduceAndConsumeWithStreamsInBetween()
211261
trace ->
212262
trace.hasSpansSatisfyingExactly(
213263
// kafka-clients CONSUMER receive
214-
span ->
215-
span.hasName(STREAM_PROCESSED + " receive")
216-
.hasKind(SpanKind.CONSUMER)
217-
.hasNoParent()
218-
.hasAttributesSatisfyingExactly(
219-
equalTo(MessagingIncubatingAttributes.MESSAGING_SYSTEM, "kafka"),
220-
equalTo(
221-
MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME,
222-
STREAM_PROCESSED),
223-
equalTo(MessagingIncubatingAttributes.MESSAGING_OPERATION, "receive"),
224-
satisfies(
225-
MessagingIncubatingAttributes.MESSAGING_CLIENT_ID,
226-
k -> k.startsWith("consumer")),
227-
equalTo(MessagingIncubatingAttributes.MESSAGING_BATCH_MESSAGE_COUNT, 1)
228-
// todo
229-
),
264+
span -> {
265+
List<AttributeAssertion> assertions =
266+
asList(
267+
equalTo(MessagingIncubatingAttributes.MESSAGING_SYSTEM, "kafka"),
268+
equalTo(
269+
MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME,
270+
STREAM_PROCESSED),
271+
equalTo(MessagingIncubatingAttributes.MESSAGING_OPERATION, "receive"),
272+
satisfies(
273+
MessagingIncubatingAttributes.MESSAGING_CLIENT_ID,
274+
k -> k.startsWith("consumer")),
275+
equalTo(MessagingIncubatingAttributes.MESSAGING_BATCH_MESSAGE_COUNT, 1));
276+
if (Boolean.getBoolean("testLatestDeps")) {
277+
assertions.add(
278+
equalTo(
279+
MessagingIncubatingAttributes.MESSAGING_KAFKA_CONSUMER_GROUP,
280+
"test-application"));
281+
}
282+
span.hasName(STREAM_PROCESSED + " receive")
283+
.hasKind(SpanKind.CONSUMER)
284+
.hasNoParent()
285+
.hasAttributesSatisfyingExactly(assertions);
286+
},
230287
// kafka-clients CONSUMER process
231-
span ->
232-
span.hasName(STREAM_PENDING + " process")
233-
.hasKind(SpanKind.CONSUMER)
234-
.hasParent(trace.getSpan(0))
235-
.hasLinks(LinkData.create(producerProcessedRef.get().getSpanContext()))
236-
.hasAttributesSatisfyingExactly(
237-
equalTo(MessagingIncubatingAttributes.MESSAGING_SYSTEM, "kafka"),
238-
equalTo(
239-
MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME,
240-
STREAM_PROCESSED),
241-
equalTo(MessagingIncubatingAttributes.MESSAGING_OPERATION, "process"),
242-
satisfies(
243-
MessagingIncubatingAttributes.MESSAGING_CLIENT_ID,
244-
k -> k.startsWith("consumer")),
245-
satisfies(
246-
MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE,
247-
k -> k.isInstanceOf(Long.class)),
248-
satisfies(
249-
MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID,
250-
k -> k.isInstanceOf(String.class)),
251-
equalTo(
252-
MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET, 0),
253-
equalTo(
254-
MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_KEY, "10"),
255-
satisfies(
256-
longKey("kafka.record.queue_time_ms"),
257-
k -> k.isGreaterThanOrEqualTo(0)),
258-
equalTo(longKey("testing"), 123)
259-
// todo
260-
)));
288+
span -> {
289+
List<AttributeAssertion> assertions =
290+
asList(
291+
equalTo(MessagingIncubatingAttributes.MESSAGING_SYSTEM, "kafka"),
292+
equalTo(
293+
MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME,
294+
STREAM_PROCESSED),
295+
equalTo(MessagingIncubatingAttributes.MESSAGING_OPERATION, "process"),
296+
satisfies(
297+
MessagingIncubatingAttributes.MESSAGING_CLIENT_ID,
298+
k -> k.startsWith("consumer")),
299+
satisfies(
300+
MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE,
301+
k -> k.isInstanceOf(Long.class)),
302+
satisfies(
303+
MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID,
304+
k -> k.isInstanceOf(String.class)),
305+
equalTo(MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET, 0),
306+
equalTo(MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_KEY, "10"),
307+
satisfies(
308+
longKey("kafka.record.queue_time_ms"),
309+
k -> k.isGreaterThanOrEqualTo(0)),
310+
equalTo(longKey("testing"), 123));
311+
if (Boolean.getBoolean("testLatestDeps")) {
312+
assertions.add(
313+
equalTo(
314+
MessagingIncubatingAttributes.MESSAGING_KAFKA_CONSUMER_GROUP,
315+
"test-application"));
316+
}
317+
span.hasName(STREAM_PENDING + " process")
318+
.hasKind(SpanKind.CONSUMER)
319+
.hasParent(trace.getSpan(0))
320+
.hasLinks(LinkData.create(producerProcessedRef.get().getSpanContext()))
321+
.hasAttributesSatisfyingExactly(assertions);
322+
}));
261323

262324
assertThat(receivedHeaders.iterator().hasNext()).isTrue();
263325
String traceparent =
264-
new String(receivedHeaders.headers("traceparent").iterator().next().value());
326+
new String(
327+
receivedHeaders.headers("traceparent").iterator().next().value(),
328+
StandardCharsets.UTF_8);
265329
Context context =
266330
W3CTraceContextPropagator.getInstance()
267331
.extract(
268332
Context.root(),
269333
"",
270334
new TextMapGetter<String>() {
271-
@Override
272-
public Iterable<String> keys(String carrier) {
273-
return Collections.singleton("traceparent");
274-
}
275-
276335
@Override
277336
public String get(String carrier, String key) {
278337
if ("traceparent".equals(key)) {
279338
return traceparent;
280339
}
281340
return null;
282341
}
342+
343+
@Override
344+
public Iterable<String> keys(String carrier) {
345+
return Collections.singleton("traceparent");
346+
}
283347
});
284348
SpanContext spanContext = Span.fromContext(context).getSpanContext();
285349
List<SpanData> streamTrace = testing.spans();
@@ -288,36 +352,4 @@ public String get(String carrier, String key) {
288352
assertThat(spanContext.getTraceId()).isEqualTo(streamSendSpan.getTraceId());
289353
assertThat(spanContext.getSpanId()).isEqualTo(streamSendSpan.getSpanId());
290354
}
291-
292-
@SuppressWarnings("ClassNewInstance")
293-
private static @NotNull Object createBuilder()
294-
throws InstantiationException, IllegalAccessException, ClassNotFoundException {
295-
Object builder;
296-
try {
297-
// Different class names for test and latestDepTest.
298-
builder = Class.forName("org.apache.kafka.streams.kstream.KStreamBuilder").newInstance();
299-
} catch (ClassNotFoundException | NoClassDefFoundError e) {
300-
builder = Class.forName("org.apache.kafka.streams.StreamsBuilder").newInstance();
301-
}
302-
return builder;
303-
}
304-
305-
@SuppressWarnings("unchecked")
306-
private static KStream<Integer, String> stream(Object builder)
307-
throws IllegalAccessException,
308-
InvocationTargetException,
309-
NoSuchMethodException,
310-
ClassNotFoundException {
311-
Method streamMethod;
312-
try {
313-
streamMethod =
314-
Class.forName("org.apache.kafka.streams.kstream.KStreamBuilder")
315-
.getMethod("stream", String[].class);
316-
} catch (ClassNotFoundException e) {
317-
streamMethod =
318-
Class.forName("org.apache.kafka.streams.StreamsBuilder")
319-
.getMethod("stream", String.class);
320-
}
321-
return (KStream<Integer, String>) streamMethod.invoke(builder, STREAM_PENDING);
322-
}
323355
}

0 commit comments

Comments
 (0)