Skip to content

Commit 1fd2e9e

Browse files
committed
fix the messaging wrappers
1 parent c84ab3d commit 1fd2e9e

File tree

2 files changed

+13
-10
lines changed

2 files changed

+13
-10
lines changed

messaging-wrappers/api/src/main/java/io/opentelemetry/contrib/messaging/wrappers/MessagingProcessWrapper.java

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -46,30 +46,32 @@ public static <REQUEST> MessagingProcessWrapperBuilder<REQUEST> defaultBuilder()
4646
public <E extends Throwable> void doProcess(REQUEST request, ThrowingRunnable<E> runnable)
4747
throws E {
4848
Span span = handleStart(request);
49+
Scope scope = span.makeCurrent();
4950

50-
try (Scope scope = span.makeCurrent()) {
51+
try {
5152
runnable.run();
5253
} catch (Throwable t) {
53-
handleEnd(span, request, t);
54+
handleEnd(span, scope, request, t);
5455
throw t;
5556
}
5657

57-
handleEnd(span, request, null);
58+
handleEnd(span, scope, request, null);
5859
}
5960

6061
public <R, E extends Throwable> R doProcess(REQUEST request, ThrowingSupplier<R, E> supplier)
6162
throws E {
6263
Span span = handleStart(request);
64+
Scope scope = span.makeCurrent();
6365

6466
R result = null;
65-
try (Scope scope = span.makeCurrent()) {
67+
try {
6668
result = supplier.get();
6769
} catch (Throwable t) {
68-
handleEnd(span, request, t);
70+
handleEnd(span, scope, request, t);
6971
throw t;
7072
}
7173

72-
handleEnd(span, request, null);
74+
handleEnd(span, scope, request, null);
7375
return result;
7476
}
7577

@@ -86,11 +88,14 @@ protected Span handleStart(REQUEST request) {
8688
return spanBuilder.setAllAttributes(builder.build()).startSpan();
8789
}
8890

89-
protected void handleEnd(Span span, REQUEST request, @Nullable Throwable t) {
91+
protected void handleEnd(Span span, Scope scope, REQUEST request, @Nullable Throwable t) {
9092
AttributesBuilder builder = Attributes.builder();
9193
for (AttributesExtractor<REQUEST, Void> extractor : this.attributesExtractors) {
9294
extractor.onEnd(builder, Context.current(), request, null, t);
9395
}
96+
span.setAllAttributes(builder.build());
97+
98+
scope.close();
9499
span.end();
95100
}
96101

messaging-wrappers/kafka-clients/src/test/java/io/opentelemetry/contrib/messaging/wrappers/kafka/KafkaClientTest.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME;
1212
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID;
1313
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_KAFKA_CONSUMER_GROUP;
14-
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_KEY;
1514
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET;
1615
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE;
1716
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_OPERATION;
@@ -152,8 +151,7 @@ public void assertTraces() {
152151
AttributeKey.stringKey("messaging.client_id"), "test-consumer-1"),
153152
satisfies(MESSAGING_KAFKA_MESSAGE_OFFSET, AbstractAssert::isNotNull),
154153
equalTo(MESSAGING_KAFKA_CONSUMER_GROUP, "test"),
155-
equalTo(MESSAGING_OPERATION, "process"),
156-
equalTo(MESSAGING_KAFKA_MESSAGE_KEY, "42")),
154+
equalTo(MESSAGING_OPERATION, "process")),
157155
span ->
158156
span.hasName("process child")
159157
.hasKind(SpanKind.INTERNAL)

0 commit comments

Comments
 (0)