Skip to content

Commit fcfd92b

Browse files
committed
Add optional (experimental) process span on consuming a message. This make the spans correctly connected and hierarchical instead of separeted.
1 parent aa83bad commit fcfd92b

File tree

5 files changed

+105
-6
lines changed

5 files changed

+105
-6
lines changed

instrumentation/jms/jms-1.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/jms/v1_1/JmsMessageConsumerInstrumentation.java

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,19 +8,25 @@
88
import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.hasClassesNamed;
99
import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.implementsInterface;
1010
import static io.opentelemetry.javaagent.instrumentation.jms.JmsReceiveSpanUtil.createReceiveSpan;
11+
import static io.opentelemetry.javaagent.instrumentation.jms.v1_1.JmsSingletons.consumerProcessInstrumenter;
1112
import static io.opentelemetry.javaagent.instrumentation.jms.v1_1.JmsSingletons.consumerReceiveInstrumenter;
1213
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
1314
import static net.bytebuddy.matcher.ElementMatchers.named;
1415
import static net.bytebuddy.matcher.ElementMatchers.returns;
1516
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
1617

1718
import io.opentelemetry.context.Context;
19+
import io.opentelemetry.context.Scope;
1820
import io.opentelemetry.instrumentation.api.internal.Timer;
21+
import io.opentelemetry.instrumentation.api.util.VirtualField;
1922
import io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge;
2023
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
2124
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
25+
import io.opentelemetry.javaagent.instrumentation.jms.JmsConfig;
26+
import io.opentelemetry.javaagent.instrumentation.jms.MessageState;
2227
import io.opentelemetry.javaagent.instrumentation.jms.MessageWithDestination;
2328
import javax.jms.Message;
29+
import javax.jms.MessageConsumer;
2430
import net.bytebuddy.asm.Advice;
2531
import net.bytebuddy.description.type.TypeDescription;
2632
import net.bytebuddy.matcher.ElementMatcher;
@@ -57,12 +63,25 @@ public void transform(TypeTransformer transformer) {
5763
public static class ConsumerAdvice {
5864

5965
@Advice.OnMethodEnter
60-
public static Timer onEnter() {
66+
public static Timer onEnter(@Advice.This MessageConsumer consumer) {
67+
68+
if (JmsConfig.EXPERIMENTAL_CONSUMER_PROCESS_TELEMETRY_ENABLED) {
69+
VirtualField<MessageConsumer, MessageState> storage =
70+
VirtualField.find(MessageConsumer.class, MessageState.class);
71+
MessageState messageState = storage.get(consumer);
72+
if (messageState != null) {
73+
messageState.processScope.close();
74+
consumerProcessInstrumenter().end(messageState.context, messageState.message, null, null);
75+
storage.set(consumer, null);
76+
}
77+
}
78+
6179
return Timer.start();
6280
}
6381

6482
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
6583
public static void stopSpan(
84+
@Advice.This MessageConsumer consumer,
6685
@Advice.Enter Timer timer,
6786
@Advice.Return Message message,
6887
@Advice.Thrown Throwable throwable) {
@@ -75,7 +94,17 @@ public static void stopSpan(
7594
MessageWithDestination request =
7695
MessageWithDestination.create(JavaxMessageAdapter.create(message), null);
7796

78-
createReceiveSpan(consumerReceiveInstrumenter(), request, timer, throwable);
97+
Context receiveContext =
98+
createReceiveSpan(consumerReceiveInstrumenter(), request, timer, throwable);
99+
if (JmsConfig.EXPERIMENTAL_CONSUMER_PROCESS_TELEMETRY_ENABLED && receiveContext != null) {
100+
if (consumerProcessInstrumenter().shouldStart(receiveContext, request)) {
101+
Context processContext = consumerProcessInstrumenter().start(receiveContext, request);
102+
Scope processScope = processContext.makeCurrent();
103+
VirtualField<MessageConsumer, MessageState> storage =
104+
VirtualField.find(MessageConsumer.class, MessageState.class);
105+
storage.set(consumer, new MessageState(processContext, processScope, request));
106+
}
107+
}
79108
}
80109
}
81110
}

instrumentation/jms/jms-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/jms/v3_0/JmsMessageConsumerInstrumentation.java

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,19 +8,25 @@
88
import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.hasClassesNamed;
99
import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.implementsInterface;
1010
import static io.opentelemetry.javaagent.instrumentation.jms.JmsReceiveSpanUtil.createReceiveSpan;
11+
import static io.opentelemetry.javaagent.instrumentation.jms.v3_0.JmsSingletons.consumerProcessInstrumenter;
1112
import static io.opentelemetry.javaagent.instrumentation.jms.v3_0.JmsSingletons.consumerReceiveInstrumenter;
1213
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
1314
import static net.bytebuddy.matcher.ElementMatchers.named;
1415
import static net.bytebuddy.matcher.ElementMatchers.returns;
1516
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
1617

1718
import io.opentelemetry.context.Context;
19+
import io.opentelemetry.context.Scope;
1820
import io.opentelemetry.instrumentation.api.internal.Timer;
21+
import io.opentelemetry.instrumentation.api.util.VirtualField;
1922
import io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge;
2023
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
2124
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
25+
import io.opentelemetry.javaagent.instrumentation.jms.JmsConfig;
26+
import io.opentelemetry.javaagent.instrumentation.jms.MessageState;
2227
import io.opentelemetry.javaagent.instrumentation.jms.MessageWithDestination;
2328
import jakarta.jms.Message;
29+
import jakarta.jms.MessageConsumer;
2430
import net.bytebuddy.asm.Advice;
2531
import net.bytebuddy.description.type.TypeDescription;
2632
import net.bytebuddy.matcher.ElementMatcher;
@@ -57,12 +63,24 @@ public void transform(TypeTransformer transformer) {
5763
public static class ConsumerAdvice {
5864

5965
@Advice.OnMethodEnter
60-
public static Timer onEnter() {
66+
public static Timer onEnter(@Advice.This MessageConsumer consumer) {
67+
if (JmsConfig.EXPERIMENTAL_CONSUMER_PROCESS_TELEMETRY_ENABLED) {
68+
VirtualField<MessageConsumer, MessageState> storage =
69+
VirtualField.find(MessageConsumer.class, MessageState.class);
70+
MessageState messageState = storage.get(consumer);
71+
if (messageState != null) {
72+
messageState.processScope.close();
73+
consumerProcessInstrumenter().end(messageState.context, messageState.message, null, null);
74+
storage.set(consumer, null);
75+
}
76+
}
77+
6178
return Timer.start();
6279
}
6380

6481
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
6582
public static void stopSpan(
83+
@Advice.This MessageConsumer consumer,
6684
@Advice.Enter Timer timer,
6785
@Advice.Return Message message,
6886
@Advice.Thrown Throwable throwable) {
@@ -75,7 +93,17 @@ public static void stopSpan(
7593
MessageWithDestination request =
7694
MessageWithDestination.create(JakartaMessageAdapter.create(message), null);
7795

78-
createReceiveSpan(consumerReceiveInstrumenter(), request, timer, throwable);
96+
Context receiveContext =
97+
createReceiveSpan(consumerReceiveInstrumenter(), request, timer, throwable);
98+
if (JmsConfig.EXPERIMENTAL_CONSUMER_PROCESS_TELEMETRY_ENABLED && receiveContext != null) {
99+
if (consumerProcessInstrumenter().shouldStart(receiveContext, request)) {
100+
Context processContext = consumerProcessInstrumenter().start(receiveContext, request);
101+
Scope processScope = processContext.makeCurrent();
102+
VirtualField<MessageConsumer, MessageState> storage =
103+
VirtualField.find(MessageConsumer.class, MessageState.class);
104+
storage.set(consumer, new MessageState(processContext, processScope, request));
105+
}
106+
}
79107
}
80108
}
81109
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.jms;
7+
8+
import io.opentelemetry.javaagent.bootstrap.internal.AgentInstrumentationConfig;
9+
10+
public final class JmsConfig {
11+
12+
public static final boolean EXPERIMENTAL_CONSUMER_PROCESS_TELEMETRY_ENABLED =
13+
AgentInstrumentationConfig.get()
14+
.getBoolean(
15+
"otel.instrumentation.jms.experimental.consumer-process-telemetry.enabled", false);
16+
17+
private JmsConfig() {}
18+
}

instrumentation/jms/jms-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/jms/JmsReceiveSpanUtil.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,14 @@ public final class JmsReceiveSpanUtil {
2121
private static final boolean receiveInstrumentationEnabled =
2222
ExperimentalConfig.get().messagingReceiveInstrumentationEnabled();
2323

24-
public static void createReceiveSpan(
24+
public static Context createReceiveSpan(
2525
Instrumenter<MessageWithDestination, Void> receiveInstrumenter,
2626
MessageWithDestination request,
2727
Timer timer,
2828
Throwable throwable) {
2929
Context parentContext = Context.current();
3030
// if receive instrumentation is not enabled we'll use the producer as parent
31-
// according to the stable convertions the production should only be linked not as parent
31+
// according to the stable conventions the production should only be linked not as parent
3232
if (!receiveInstrumentationEnabled && !emitStableMessagingSemconv()) {
3333
parentContext =
3434
propagators
@@ -47,7 +47,9 @@ public static void createReceiveSpan(
4747
timer.startTime(),
4848
timer.now());
4949
JmsReceiveContextHolder.set(receiveContext);
50+
return receiveContext;
5051
}
52+
return null;
5153
}
5254

5355
private JmsReceiveSpanUtil() {}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.jms;
7+
8+
import io.opentelemetry.context.Context;
9+
import io.opentelemetry.context.Scope;
10+
11+
public class MessageState {
12+
13+
public final Context context;
14+
public final Scope processScope;
15+
public final MessageWithDestination message;
16+
17+
public MessageState(Context context, Scope processScope, MessageWithDestination message) {
18+
this.context = context;
19+
this.processScope = processScope;
20+
this.message = message;
21+
}
22+
}

0 commit comments

Comments
 (0)