Skip to content

Commit 6f3587b

Browse files
committed
add test for kafka clients
1 parent 8ef16f8 commit 6f3587b

File tree

12 files changed

+606
-36
lines changed

12 files changed

+606
-36
lines changed

messaging-wrappers/api/src/main/java/io/opentelemetry/contrib/messaging/wrappers/DefaultMessagingProcessWrapperBuilder.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package io.opentelemetry.contrib.messaging.wrappers;
22

3+
import com.google.errorprone.annotations.CanIgnoreReturnValue;
34
import io.opentelemetry.api.GlobalOpenTelemetry;
45
import io.opentelemetry.api.OpenTelemetry;
56
import io.opentelemetry.context.propagation.TextMapGetter;
@@ -9,23 +10,28 @@
910
import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesExtractor;
1011
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
1112

13+
import javax.annotation.Nullable;
1214
import java.util.ArrayList;
1315
import java.util.Collection;
1416
import java.util.List;
1517

1618
public class DefaultMessagingProcessWrapperBuilder<REQUEST extends MessagingProcessRequest> {
1719

20+
@Nullable
1821
private OpenTelemetry openTelemetry;
1922

23+
@Nullable
2024
protected TextMapGetter<REQUEST> textMapGetter;
2125

22-
protected List<AttributesExtractor<REQUEST, Void>> attributesExtractors;
23-
26+
@CanIgnoreReturnValue
2427
public DefaultMessagingProcessWrapperBuilder<REQUEST> openTelemetry(OpenTelemetry openTelemetry) {
2528
this.openTelemetry = openTelemetry;
2629
return this;
2730
}
2831

32+
protected List<AttributesExtractor<REQUEST, Void>> attributesExtractors;
33+
34+
@CanIgnoreReturnValue
2935
public DefaultMessagingProcessWrapperBuilder<REQUEST> textMapGetter(TextMapGetter<REQUEST> textMapGetter) {
3036
this.textMapGetter = textMapGetter;
3137
return this;
@@ -35,22 +41,26 @@ public DefaultMessagingProcessWrapperBuilder<REQUEST> textMapGetter(TextMapGette
3541
* This method overrides the original items.
3642
* <p>See {@link DefaultMessagingProcessWrapperBuilder#addAttributesExtractor} if you just want to append one.</p>
3743
* */
44+
@CanIgnoreReturnValue
3845
public DefaultMessagingProcessWrapperBuilder<REQUEST> attributesExtractors(
3946
Collection<AttributesExtractor<REQUEST, Void>> attributesExtractors) {
4047
this.attributesExtractors = new ArrayList<>();
4148
this.attributesExtractors.addAll(attributesExtractors);
4249
return this;
4350
}
4451

52+
@CanIgnoreReturnValue
4553
public DefaultMessagingProcessWrapperBuilder<REQUEST> addAttributesExtractor(
4654
AttributesExtractor<REQUEST, Void> attributesExtractor) {
4755
this.attributesExtractors.add(attributesExtractor);
4856
return this;
4957
}
5058

5159
public MessagingProcessWrapper<REQUEST> build() {
52-
return new MessagingProcessWrapper<>(this.openTelemetry == null ? GlobalOpenTelemetry.get() : this.openTelemetry,
53-
this.textMapGetter, this.attributesExtractors);
60+
return new MessagingProcessWrapper<>(
61+
this.openTelemetry == null ? GlobalOpenTelemetry.get() : this.openTelemetry,
62+
this.textMapGetter == null ? NoopTextMapGetter.create() : this.textMapGetter,
63+
this.attributesExtractors);
5464
}
5565

5666
protected DefaultMessagingProcessWrapperBuilder() {

messaging-wrappers/api/src/main/java/io/opentelemetry/contrib/messaging/wrappers/MessagingProcessWrapper.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -77,23 +77,23 @@ protected Span handleStart(REQUEST request) {
7777
return spanBuilder.setAllAttributes(builder.build()).startSpan();
7878
}
7979

80-
protected void handleEnd(Span span, REQUEST request, Throwable t) {
80+
protected void handleEnd(Span span, REQUEST request, @Nullable Throwable t) {
8181
AttributesBuilder builder = Attributes.builder();
8282
for (AttributesExtractor<REQUEST, Void> extractor : this.attributesExtractors) {
8383
extractor.onEnd(builder, Context.current(), request, null, t);
8484
}
8585
span.end();
8686
}
8787

88-
protected String getDefaultSpanName(String destination) {
88+
protected String getDefaultSpanName(@Nullable String destination) {
8989
if (destination == null) {
9090
destination = "unknown";
9191
}
9292
return OPERATION_NAME + " " + destination;
9393
}
9494

9595
protected MessagingProcessWrapper(OpenTelemetry openTelemetry,
96-
@Nullable TextMapGetter<REQUEST> textMapGetter,
96+
TextMapGetter<REQUEST> textMapGetter,
9797
List<AttributesExtractor<REQUEST, Void>> attributesExtractors) {
9898
this.textMapPropagator = openTelemetry.getPropagators().getTextMapPropagator();
9999
this.tracer = openTelemetry.getTracer(INSTRUMENTATION_SCOPE + "-" + INSTRUMENTATION_VERSION);
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package io.opentelemetry.contrib.messaging.wrappers;
2+
3+
import io.opentelemetry.context.propagation.TextMapGetter;
4+
import io.opentelemetry.contrib.messaging.wrappers.semconv.MessagingProcessRequest;
5+
import javax.annotation.Nullable;
6+
import java.util.Collections;
7+
8+
public class NoopTextMapGetter<REQUEST extends MessagingProcessRequest> implements TextMapGetter<REQUEST> {
9+
10+
public static <REQUEST extends MessagingProcessRequest> TextMapGetter<REQUEST> create() {
11+
return new NoopTextMapGetter<>();
12+
}
13+
14+
@Override
15+
public Iterable<String> keys(REQUEST request) {
16+
return Collections.emptyList();
17+
}
18+
19+
@Nullable
20+
@Override
21+
public String get(@Nullable REQUEST request, String s) {
22+
return null;
23+
}
24+
25+
private NoopTextMapGetter() {}
26+
}

messaging-wrappers/api/src/main/java/io/opentelemetry/contrib/messaging/wrappers/semconv/MessagingProcessRequest.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package io.opentelemetry.contrib.messaging.wrappers.semconv;
22

3+
import javax.annotation.Nullable;
34
import java.util.List;
45

56
import static java.util.Collections.emptyList;
@@ -13,29 +14,39 @@ public interface MessagingProcessRequest {
1314

1415
String getSystem();
1516

17+
@Nullable
1618
String getDestination();
1719

20+
@Nullable
1821
String getDestinationTemplate();
1922

2023
boolean isTemporaryDestination();
2124

2225
boolean isAnonymousDestination();
2326

27+
@Nullable
2428
String getConversationId();
2529

30+
@Nullable
2631
Long getMessageBodySize();
2732

33+
@Nullable
2834
Long getMessageEnvelopeSize();
2935

36+
@Nullable
3037
String getMessageId();
3138

39+
@Nullable
3240
default String getClientId() {
3341
return null;
3442
}
43+
44+
@Nullable
3545
default Long getBatchMessageCount() {
3646
return null;
3747
}
3848

49+
@Nullable
3950
default String getDestinationPartitionId() {
4051
return null;
4152
}

messaging-wrappers/kafka-clients/build.gradle.kts

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,22 @@ dependencies {
1616

1717
compileOnly("org.apache.kafka:kafka-clients:0.11.0.0")
1818

19+
testImplementation("org.apache.kafka:kafka-clients:0.11.0.0")
20+
testImplementation("io.opentelemetry.instrumentation:opentelemetry-kafka-clients-2.6")
21+
22+
testAnnotationProcessor("com.google.auto.service:auto-service")
23+
testCompileOnly("com.google.auto.service:auto-service-annotations")
24+
testImplementation("org.junit.jupiter:junit-jupiter-api")
25+
testImplementation("org.junit.jupiter:junit-jupiter-params")
26+
testImplementation("org.testcontainers:kafka")
27+
testImplementation("org.testcontainers:junit-jupiter")
28+
1929
testImplementation("io.opentelemetry:opentelemetry-sdk-extension-autoconfigure")
20-
testImplementation("io.opentelemetry:opentelemetry-sdk-trace")
30+
testImplementation("io.opentelemetry:opentelemetry-sdk")
2131
testImplementation("io.opentelemetry:opentelemetry-sdk-testing")
22-
2332
testImplementation("io.opentelemetry:opentelemetry-sdk-extension-incubator")
24-
testImplementation("uk.org.webcompere:system-stubs-jupiter:2.0.3")
33+
testImplementation("io.opentelemetry.semconv:opentelemetry-semconv")
34+
testImplementation("io.opentelemetry.semconv:opentelemetry-semconv-incubating")
35+
testImplementation("io.opentelemetry:opentelemetry-exporter-logging")
36+
testImplementation("io.opentelemetry.javaagent:opentelemetry-testing-common")
2537
}

messaging-wrappers/kafka-clients/src/main/java/io/opentelemetry/contrib/messaging/wrappers/kafka/semconv/KafkaProcessRequest.java

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55

66
import javax.annotation.Nullable;
77
import java.nio.charset.StandardCharsets;
8-
import java.util.Collections;
98
import java.util.List;
109
import java.util.stream.Collectors;
1110
import java.util.stream.StreamSupport;
@@ -14,15 +13,18 @@ public class KafkaProcessRequest implements MessagingProcessRequest {
1413

1514
private final ConsumerRecord<?, ?> consumerRecord;
1615

16+
@Nullable
1717
private final String clientId;
1818

19+
@Nullable
1920
private final String consumerGroup;
2021

2122
public static KafkaProcessRequest of(ConsumerRecord<?, ?> consumerRecord) {
2223
return of(consumerRecord, null, null);
2324
}
2425

25-
public static KafkaProcessRequest of(ConsumerRecord<?, ?> consumerRecord, String consumerGroup, String clientId) {
26+
public static KafkaProcessRequest of(ConsumerRecord<?, ?> consumerRecord,
27+
@Nullable String consumerGroup, @Nullable String clientId) {
2628
return new KafkaProcessRequest(consumerRecord, consumerGroup, clientId);
2729
}
2830

@@ -31,11 +33,9 @@ public String getSystem() {
3133
return "kafka";
3234
}
3335

36+
@Nullable
3437
@Override
3538
public String getDestination() {
36-
if (this.consumerRecord == null) {
37-
return null;
38-
}
3939
return this.consumerRecord.topic();
4040
}
4141

@@ -64,9 +64,6 @@ public String getConversationId() {
6464
@Nullable
6565
@Override
6666
public Long getMessageBodySize() {
67-
if (this.consumerRecord == null) {
68-
return null;
69-
}
7067
long size = this.consumerRecord.serializedValueSize();
7168
return size >= 0 ? size : null;
7269
}
@@ -97,9 +94,6 @@ public Long getBatchMessageCount() {
9794

9895
@Override
9996
public List<String> getMessageHeader(String name) {
100-
if (this.consumerRecord == null) {
101-
return Collections.emptyList();
102-
}
10397
return StreamSupport.stream(this.consumerRecord.headers().headers(name).spliterator(), false)
10498
.map(header -> new String(header.value(), StandardCharsets.UTF_8))
10599
.collect(Collectors.toList());
@@ -114,7 +108,8 @@ public String getConsumerGroup() {
114108
return consumerRecord;
115109
}
116110

117-
private KafkaProcessRequest(ConsumerRecord<?, ?> consumerRecord, String consumerGroup, String clientId) {
111+
private KafkaProcessRequest(ConsumerRecord<?, ?> consumerRecord,
112+
@Nullable String consumerGroup, @Nullable String clientId) {
118113
this.consumerRecord = consumerRecord;
119114
this.consumerGroup = consumerGroup;
120115
this.clientId = clientId;

0 commit comments

Comments
 (0)