Skip to content

Commit 9ba647a

Browse files
committed
Add instrumentation for MessageListener
1 parent 871ce37 commit 9ba647a

File tree

5 files changed

+405
-4
lines changed

5 files changed

+405
-4
lines changed
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
package dd.inst.jms1;
2+
3+
import static com.datadoghq.agent.integration.JmsUtil.toResourceName;
4+
import static dd.trace.ClassLoaderMatcher.classLoaderHasClasses;
5+
import static dd.trace.ExceptionHandlers.defaultExceptionHandler;
6+
import static net.bytebuddy.matcher.ElementMatchers.hasSuperType;
7+
import static net.bytebuddy.matcher.ElementMatchers.isInterface;
8+
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
9+
import static net.bytebuddy.matcher.ElementMatchers.named;
10+
import static net.bytebuddy.matcher.ElementMatchers.not;
11+
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
12+
13+
import com.datadoghq.agent.integration.MessagePropertyTextMap;
14+
import com.datadoghq.trace.DDTags;
15+
import com.google.auto.service.AutoService;
16+
import dd.trace.Instrumenter;
17+
import io.opentracing.ActiveSpan;
18+
import io.opentracing.SpanContext;
19+
import io.opentracing.propagation.Format;
20+
import io.opentracing.tag.Tags;
21+
import io.opentracing.util.GlobalTracer;
22+
import java.util.Collections;
23+
import javax.jms.Message;
24+
import javax.jms.MessageListener;
25+
import net.bytebuddy.agent.builder.AgentBuilder;
26+
import net.bytebuddy.asm.Advice;
27+
28+
@AutoService(Instrumenter.class)
29+
public final class JMS1MessageListenerInstrumentation implements Instrumenter {
30+
31+
@Override
32+
public AgentBuilder instrument(final AgentBuilder agentBuilder) {
33+
return agentBuilder
34+
.type(
35+
not(isInterface()).and(hasSuperType(named("javax.jms.MessageListener"))),
36+
not(classLoaderHasClasses("javax.jms.JMSContext", "javax.jms.CompletionListener")))
37+
.transform(
38+
new AgentBuilder.Transformer.ForAdvice()
39+
.advice(
40+
named("onMessage")
41+
.and(takesArgument(0, named("javax.jms.Message")))
42+
.and(isPublic()),
43+
MessageListenerAdvice.class.getName())
44+
.withExceptionHandler(defaultExceptionHandler()))
45+
.asDecorator();
46+
}
47+
48+
public static class MessageListenerAdvice {
49+
50+
@Advice.OnMethodEnter(suppress = Throwable.class)
51+
public static ActiveSpan startSpan(
52+
@Advice.Argument(0) final Message message, @Advice.This final MessageListener listener) {
53+
54+
final SpanContext extractedContext =
55+
GlobalTracer.get().extract(Format.Builtin.TEXT_MAP, new MessagePropertyTextMap(message));
56+
57+
final ActiveSpan span =
58+
GlobalTracer.get()
59+
.buildSpan("jms.onMessage")
60+
.asChildOf(extractedContext)
61+
.withTag(DDTags.SERVICE_NAME, "jms")
62+
.withTag(DDTags.RESOURCE_NAME, "Received from " + toResourceName(message, null))
63+
.withTag(Tags.COMPONENT.getKey(), "jms1")
64+
.withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_CONSUMER)
65+
.withTag("span.origin.type", listener.getClass().getName())
66+
.startActive();
67+
68+
return span;
69+
}
70+
71+
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
72+
public static void stopSpan(
73+
@Advice.Enter final ActiveSpan span, @Advice.Thrown final Throwable throwable) {
74+
75+
if (span != null) {
76+
if (throwable != null) {
77+
Tags.ERROR.set(span, Boolean.TRUE);
78+
span.log(Collections.singletonMap("error.object", throwable));
79+
}
80+
span.deactivate();
81+
}
82+
}
83+
}
84+
}

dd-java-agent/integrations/jms-1/src/test/groovy/JMS1Test.groovy

Lines changed: 124 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import spock.lang.Unroll
1212
import javax.jms.Connection
1313
import javax.jms.Session
1414
import javax.jms.TextMessage
15+
import java.util.concurrent.atomic.AtomicReference
1516

1617
class JMS1Test extends Specification {
1718

@@ -46,7 +47,7 @@ class JMS1Test extends Specification {
4647
def producer = session.createProducer(destination)
4748
def consumer = session.createConsumer(destination)
4849
def message = session.createTextMessage("a message")
49-
writer.start()
50+
5051
producer.send(message)
5152

5253
TextMessage receivedMessage = consumer.receive()
@@ -157,4 +158,126 @@ class JMS1Test extends Specification {
157158
session.createTemporaryQueue() | "Temporary Queue"
158159
session.createTemporaryTopic() | "Temporary Topic"
159160
}
161+
162+
@Unroll
163+
def "sending to a MessageListener on #resourceName generates a span"() {
164+
setup:
165+
def messageRef = new AtomicReference<TextMessage>()
166+
def producer = session.createProducer(destination)
167+
def consumer = session.createConsumer(destination)
168+
consumer.setMessageListener { message ->
169+
Thread.sleep(5) // Slow things down a bit.
170+
messageRef.set(message)
171+
}
172+
173+
def message = session.createTextMessage("a message")
174+
producer.send(message)
175+
writer.waitForTraces(2)
176+
177+
expect:
178+
messageRef.get().text == "a message"
179+
writer.size() == 2
180+
181+
and: // producer trace
182+
def trace = writer.firstTrace()
183+
trace.size() == 3
184+
185+
and: // span 0
186+
def span0 = trace[0]
187+
188+
span0.context().operationName == "jms.produce"
189+
span0.serviceName == "jms"
190+
span0.resourceName == "Produced for $resourceName"
191+
span0.type == null
192+
!span0.context().getErrorFlag()
193+
span0.context().parentId == 0
194+
195+
196+
def tags0 = span0.context().tags
197+
tags0["span.kind"] == "producer"
198+
tags0["component"] == "jms1"
199+
200+
tags0["span.origin.type"] == ActiveMQMessageProducer.name
201+
202+
tags0["thread.name"] != null
203+
tags0["thread.id"] != null
204+
tags0.size() == 5
205+
206+
and: // span 1
207+
def span1 = trace[1]
208+
209+
span1.context().operationName == "jms.produce"
210+
span1.serviceName == "jms"
211+
span1.resourceName == "Produced for $resourceName"
212+
span1.type == null
213+
!span1.context().getErrorFlag()
214+
span1.context().parentId == span0.context().spanId
215+
216+
217+
def tags1 = span1.context().tags
218+
tags1["span.kind"] == "producer"
219+
tags1["component"] == "jms1"
220+
221+
tags1["span.origin.type"] == ActiveMQMessageProducer.name
222+
223+
tags1["thread.name"] != null
224+
tags1["thread.id"] != null
225+
tags1.size() == 5
226+
227+
and: // span 2
228+
def span2 = trace[2]
229+
230+
span2.context().operationName == "jms.produce"
231+
span2.serviceName == "jms"
232+
span2.resourceName == "Produced for $resourceName"
233+
span2.type == null
234+
!span2.context().getErrorFlag()
235+
span2.context().parentId == span1.context().spanId
236+
237+
238+
def tags2 = span2.context().tags
239+
tags2["span.kind"] == "producer"
240+
tags2["component"] == "jms1"
241+
242+
tags2["span.origin.type"] == ActiveMQMessageProducer.name
243+
244+
tags2["thread.name"] != null
245+
tags2["thread.id"] != null
246+
tags2.size() == 5
247+
248+
and: // consumer trace
249+
def consumerTrace = writer.get(1)
250+
consumerTrace.size() == 1
251+
252+
def consumerSpan = consumerTrace[0]
253+
254+
consumerSpan.context().operationName == "jms.onMessage"
255+
consumerSpan.serviceName == "jms"
256+
consumerSpan.resourceName == "Received from $resourceName"
257+
consumerSpan.type == null
258+
!consumerSpan.context().getErrorFlag()
259+
consumerSpan.context().parentId == span2.context().spanId
260+
261+
262+
def consumerTags = consumerSpan.context().tags
263+
consumerTags["span.kind"] == "consumer"
264+
consumerTags["component"] == "jms1"
265+
266+
consumerTags["span.origin.type"] != null
267+
268+
consumerTags["thread.name"] != null
269+
consumerTags["thread.id"] != null
270+
consumerTags.size() == 5
271+
272+
cleanup:
273+
producer.close()
274+
consumer.close()
275+
276+
where:
277+
destination | resourceName
278+
session.createQueue("someQueue") | "Queue someQueue"
279+
session.createTopic("someTopic") | "Topic someTopic"
280+
session.createTemporaryQueue() | "Temporary Queue"
281+
session.createTemporaryTopic() | "Temporary Topic"
282+
}
160283
}
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
package dd.inst.jms2;
2+
3+
import static com.datadoghq.agent.integration.JmsUtil.toResourceName;
4+
import static dd.trace.ClassLoaderMatcher.classLoaderHasClasses;
5+
import static dd.trace.ExceptionHandlers.defaultExceptionHandler;
6+
import static net.bytebuddy.matcher.ElementMatchers.hasSuperType;
7+
import static net.bytebuddy.matcher.ElementMatchers.isInterface;
8+
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
9+
import static net.bytebuddy.matcher.ElementMatchers.named;
10+
import static net.bytebuddy.matcher.ElementMatchers.not;
11+
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
12+
13+
import com.datadoghq.agent.integration.MessagePropertyTextMap;
14+
import com.datadoghq.trace.DDTags;
15+
import com.google.auto.service.AutoService;
16+
import dd.trace.Instrumenter;
17+
import io.opentracing.ActiveSpan;
18+
import io.opentracing.SpanContext;
19+
import io.opentracing.propagation.Format;
20+
import io.opentracing.tag.Tags;
21+
import io.opentracing.util.GlobalTracer;
22+
import java.util.Collections;
23+
import javax.jms.Message;
24+
import javax.jms.MessageListener;
25+
import net.bytebuddy.agent.builder.AgentBuilder;
26+
import net.bytebuddy.asm.Advice;
27+
28+
@AutoService(Instrumenter.class)
29+
public final class JMS2MessageListenerInstrumentation implements Instrumenter {
30+
31+
@Override
32+
public AgentBuilder instrument(final AgentBuilder agentBuilder) {
33+
return agentBuilder
34+
.type(
35+
not(isInterface()).and(hasSuperType(named("javax.jms.MessageListener"))),
36+
classLoaderHasClasses("javax.jms.JMSContext", "javax.jms.CompletionListener"))
37+
.transform(
38+
new AgentBuilder.Transformer.ForAdvice()
39+
.advice(
40+
named("onMessage")
41+
.and(takesArgument(0, named("javax.jms.Message")))
42+
.and(isPublic()),
43+
MessageListenerAdvice.class.getName())
44+
.withExceptionHandler(defaultExceptionHandler()))
45+
.asDecorator();
46+
}
47+
48+
public static class MessageListenerAdvice {
49+
50+
@Advice.OnMethodEnter(suppress = Throwable.class)
51+
public static ActiveSpan startSpan(
52+
@Advice.Argument(0) final Message message, @Advice.This final MessageListener listener) {
53+
54+
final SpanContext extractedContext =
55+
GlobalTracer.get().extract(Format.Builtin.TEXT_MAP, new MessagePropertyTextMap(message));
56+
57+
final ActiveSpan span =
58+
GlobalTracer.get()
59+
.buildSpan("jms.onMessage")
60+
.asChildOf(extractedContext)
61+
.withTag(DDTags.SERVICE_NAME, "jms")
62+
.withTag(DDTags.RESOURCE_NAME, "Received from " + toResourceName(message, null))
63+
.withTag(Tags.COMPONENT.getKey(), "jms2")
64+
.withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_CONSUMER)
65+
.withTag("span.origin.type", listener.getClass().getName())
66+
.startActive();
67+
68+
return span;
69+
}
70+
71+
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
72+
public static void stopSpan(
73+
@Advice.Enter final ActiveSpan span, @Advice.Thrown final Throwable throwable) {
74+
75+
if (span != null) {
76+
if (throwable != null) {
77+
Tags.ERROR.set(span, Boolean.TRUE);
78+
span.log(Collections.singletonMap("error.object", throwable));
79+
}
80+
span.deactivate();
81+
}
82+
}
83+
}
84+
}

0 commit comments

Comments
 (0)