Skip to content

Commit 39a764e

Browse files
committed
simplify
1 parent 3fec0f6 commit 39a764e

File tree

3 files changed

+38
-70
lines changed

3 files changed

+38
-70
lines changed

instrumentation/kafka/kafka-streams-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsBaseTest.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,12 @@
1010

1111
import com.google.common.collect.ImmutableMap;
1212
import io.opentelemetry.api.common.AttributeKey;
13+
import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator;
14+
import io.opentelemetry.context.Context;
15+
import io.opentelemetry.context.propagation.TextMapGetter;
1316
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
1417
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
18+
import java.nio.charset.StandardCharsets;
1519
import java.time.Duration;
1620
import java.util.Collection;
1721
import java.util.Collections;
@@ -28,6 +32,7 @@
2832
import org.apache.kafka.clients.producer.KafkaProducer;
2933
import org.apache.kafka.clients.producer.Producer;
3034
import org.apache.kafka.common.TopicPartition;
35+
import org.apache.kafka.common.header.Headers;
3136
import org.apache.kafka.common.serialization.IntegerDeserializer;
3237
import org.apache.kafka.common.serialization.IntegerSerializer;
3338
import org.apache.kafka.common.serialization.StringDeserializer;
@@ -151,4 +156,28 @@ static void awaitUntilConsumerIsReady() throws InterruptedException {
151156
}
152157
consumer.seekToBeginning(Collections.emptyList());
153158
}
159+
160+
static Context getContext(Headers headers) {
161+
String traceparent =
162+
new String(
163+
headers.headers("traceparent").iterator().next().value(), StandardCharsets.UTF_8);
164+
return W3CTraceContextPropagator.getInstance()
165+
.extract(
166+
Context.root(),
167+
"",
168+
new TextMapGetter<String>() {
169+
@Override
170+
public String get(String carrier, String key) {
171+
if ("traceparent".equals(key)) {
172+
return traceparent;
173+
}
174+
return null;
175+
}
176+
177+
@Override
178+
public Iterable<String> keys(String carrier) {
179+
return Collections.singleton("traceparent");
180+
}
181+
});
182+
}
154183
}

instrumentation/kafka/kafka-streams-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsDefaultTest.java

Lines changed: 5 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -15,18 +15,13 @@
1515
import io.opentelemetry.api.trace.Span;
1616
import io.opentelemetry.api.trace.SpanContext;
1717
import io.opentelemetry.api.trace.SpanKind;
18-
import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator;
19-
import io.opentelemetry.context.Context;
20-
import io.opentelemetry.context.propagation.TextMapGetter;
2118
import io.opentelemetry.instrumentation.testing.util.TelemetryDataUtil;
2219
import io.opentelemetry.sdk.testing.assertj.AttributeAssertion;
2320
import io.opentelemetry.sdk.trace.data.LinkData;
2421
import io.opentelemetry.sdk.trace.data.SpanData;
2522
import io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes;
26-
import java.nio.charset.StandardCharsets;
2723
import java.time.Duration;
2824
import java.util.ArrayList;
29-
import java.util.Collections;
3025
import java.util.List;
3126
import java.util.Locale;
3227
import java.util.Properties;
@@ -87,6 +82,9 @@ void testKafkaProduceAndConsumeWithStreamsInBetween() throws Exception {
8782
receivedHeaders = record.headers();
8883
}
8984
}
85+
assertThat(receivedHeaders).isNotEmpty();
86+
SpanContext receivedContext = Span.fromContext(getContext(receivedHeaders)).getSpanContext();
87+
9088
AtomicReference<SpanData> producerPendingRef = new AtomicReference<>();
9189
AtomicReference<SpanData> producerProcessedRef = new AtomicReference<>();
9290

@@ -194,6 +192,8 @@ void testKafkaProduceAndConsumeWithStreamsInBetween() throws Exception {
194192
span.hasName(STREAM_PROCESSED + " publish")
195193
.hasKind(SpanKind.PRODUCER)
196194
.hasParent(trace.getSpan(1))
195+
.hasTraceId(receivedContext.getTraceId())
196+
.hasSpanId(receivedContext.getSpanId())
197197
.hasAttributesSatisfyingExactly(
198198
equalTo(
199199
MessagingIncubatingAttributes.MESSAGING_SYSTEM,
@@ -278,36 +278,5 @@ void testKafkaProduceAndConsumeWithStreamsInBetween() throws Exception {
278278
.hasLinks(LinkData.create(producerProcessedRef.get().getSpanContext()))
279279
.hasAttributesSatisfyingExactly(assertions);
280280
}));
281-
282-
assertThat(receivedHeaders.iterator().hasNext()).isTrue();
283-
String traceparent =
284-
new String(
285-
receivedHeaders.headers("traceparent").iterator().next().value(),
286-
StandardCharsets.UTF_8);
287-
Context context =
288-
W3CTraceContextPropagator.getInstance()
289-
.extract(
290-
Context.root(),
291-
"",
292-
new TextMapGetter<String>() {
293-
@Override
294-
public String get(String carrier, String key) {
295-
if ("traceparent".equals(key)) {
296-
return traceparent;
297-
}
298-
return null;
299-
}
300-
301-
@Override
302-
public Iterable<String> keys(String carrier) {
303-
return Collections.singleton("traceparent");
304-
}
305-
});
306-
SpanContext spanContext = Span.fromContext(context).getSpanContext();
307-
List<List<SpanData>> streamTrace = testing.waitForTraces(3);
308-
assertThat(streamTrace).hasSize(3);
309-
SpanData streamSendSpan = streamTrace.get(2).get(2);
310-
assertThat(spanContext.getTraceId()).isEqualTo(streamSendSpan.getTraceId());
311-
assertThat(spanContext.getSpanId()).isEqualTo(streamSendSpan.getSpanId());
312281
}
313282
}

instrumentation/kafka/kafka-streams-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsSuppressReceiveSpansTest.java

Lines changed: 4 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,11 @@
1515
import io.opentelemetry.api.trace.Span;
1616
import io.opentelemetry.api.trace.SpanContext;
1717
import io.opentelemetry.api.trace.SpanKind;
18-
import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator;
19-
import io.opentelemetry.context.Context;
20-
import io.opentelemetry.context.propagation.TextMapGetter;
2118
import io.opentelemetry.sdk.testing.assertj.AttributeAssertion;
2219
import io.opentelemetry.sdk.trace.data.SpanData;
2320
import io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes;
24-
import java.nio.charset.StandardCharsets;
2521
import java.time.Duration;
2622
import java.util.ArrayList;
27-
import java.util.Collections;
2823
import java.util.List;
2924
import java.util.Locale;
3025
import java.util.Properties;
@@ -85,6 +80,8 @@ void testKafkaProduceAndConsumeWithStreamsInBetween() throws Exception {
8580
receivedHeaders = record.headers();
8681
}
8782
}
83+
assertThat(receivedHeaders).isNotEmpty();
84+
SpanContext receivedContext = Span.fromContext(getContext(receivedHeaders)).getSpanContext();
8885

8986
AtomicReference<SpanData> streamSendSpanRef = new AtomicReference<>();
9087
testing.waitAndAssertTraces(
@@ -157,6 +154,8 @@ void testKafkaProduceAndConsumeWithStreamsInBetween() throws Exception {
157154
span.hasName(STREAM_PROCESSED + " publish")
158155
.hasKind(SpanKind.PRODUCER)
159156
.hasParent(trace.getSpan(1))
157+
.hasTraceId(receivedContext.getTraceId())
158+
.hasSpanId(receivedContext.getSpanId())
160159
.hasAttributesSatisfyingExactly(
161160
equalTo(
162161
MessagingIncubatingAttributes.MESSAGING_SYSTEM,
@@ -209,34 +208,5 @@ void testKafkaProduceAndConsumeWithStreamsInBetween() throws Exception {
209208
.hasParent(trace.getSpan(2))
210209
.hasAttributesSatisfyingExactly(assertions);
211210
}));
212-
213-
assertThat(receivedHeaders.iterator().hasNext()).isTrue();
214-
String traceparent =
215-
new String(
216-
receivedHeaders.headers("traceparent").iterator().next().value(),
217-
StandardCharsets.UTF_8);
218-
Context context =
219-
W3CTraceContextPropagator.getInstance()
220-
.extract(
221-
Context.root(),
222-
"",
223-
new TextMapGetter<String>() {
224-
@Override
225-
public String get(String carrier, String key) {
226-
if ("traceparent".equals(key)) {
227-
return traceparent;
228-
}
229-
return null;
230-
}
231-
232-
@Override
233-
public Iterable<String> keys(String carrier) {
234-
return Collections.singleton("traceparent");
235-
}
236-
});
237-
SpanContext spanContext = Span.fromContext(context).getSpanContext();
238-
SpanData streamSendSpan = streamSendSpanRef.get();
239-
assertThat(spanContext.getTraceId()).isEqualTo(streamSendSpan.getTraceId());
240-
assertThat(spanContext.getSpanId()).isEqualTo(streamSendSpan.getSpanId());
241211
}
242212
}

0 commit comments

Comments
 (0)