Skip to content

Commit 9c1ca62

Browse files
authored
Adds Data Streams Monitoring support for IBM MQ in JMS/Jakarta Messaging (#9768)
* Adds Data Streams Monitoring support for IBM MQ via existing JMS instrumentation This support is currently limited to only create DSM checkpoints for JMS when the underlying technology is IBM MQ, to reduce test surface and speed initial release of IBM MQ support. There is existing DSM support for most other technologies commonly used through JMS, so they would gain little from having support for other technologies enabled, but enabling them would require thorough testing to prevent double-reporting. This implementation does not add DSM context propagation in JMS, as in other languages customers have seen production breakages from unexpected additional fields added to IBM MQ. Optional DSM context propagation may be added in a future followup, but without it we are still able to track checkpoints in the DSM map. There is a slight modification to hold the raw queue name in addition to the formatted version (e.g. "queue:///DEV.QUEUE.1" rather than just the previous "Produced for Queue queue:///DEV.QUEUE.1") as that raw queue name is required when generating a DSM checkpoint. Other than that, changes are purely additive and shoulf only take effect when existing config `data_streams_enabled` is true. * Test null before isEmpty() * Changes requeted from review
1 parent b083d21 commit 9c1ca62

File tree

5 files changed

+88
-10
lines changed

5 files changed

+88
-10
lines changed

dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/jms/MessageConsumerState.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,16 +9,19 @@ public final class MessageConsumerState {
99
private final SessionState sessionState;
1010
private final CharSequence brokerResourceName;
1111
private final String brokerServiceName;
12+
private final CharSequence consumerBaseResourceName;
1213
private final CharSequence consumerResourceName;
1314
private final boolean propagationDisabled;
1415

1516
public MessageConsumerState(
1617
SessionState sessionState,
1718
CharSequence brokerResourceName,
19+
CharSequence consumerBaseResourceName,
1820
CharSequence consumerResourceName,
1921
boolean propagationDisabled) {
2022
this.sessionState = sessionState;
2123
this.brokerResourceName = brokerResourceName;
24+
this.consumerBaseResourceName = consumerBaseResourceName;
2225
this.consumerResourceName = consumerResourceName;
2326
this.propagationDisabled = propagationDisabled;
2427

@@ -47,6 +50,10 @@ public String getBrokerServiceName() {
4750
return brokerServiceName;
4851
}
4952

53+
public CharSequence getConsumerBaseResourceName() {
54+
return consumerBaseResourceName;
55+
}
56+
5057
public CharSequence getConsumerResourceName() {
5158
return consumerResourceName;
5259
}

dd-java-agent/instrumentation/jms/src/main/java/datadog/trace/instrumentation/jms/JMSDecorator.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,22 @@ public static void logJMSException(JMSException ex) {
114114
}
115115
}
116116

117+
public static String messageTechnology(Message m) {
118+
if (null == m) {
119+
return "null";
120+
}
121+
122+
String messageClass = m.getClass().getName();
123+
124+
if (messageClass.startsWith("com.amazon.sqs")) {
125+
return "sqs";
126+
} else if (messageClass.startsWith("com.ibm")) {
127+
return "ibmmq";
128+
} else {
129+
return "unknown";
130+
}
131+
}
132+
117133
@Override
118134
protected String[] instrumentationNames() {
119135
return new String[] {"jms"};

dd-java-agent/instrumentation/jms/src/main/java/datadog/trace/instrumentation/jms/JMSMessageConsumerInstrumentation.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
import static datadog.trace.agent.tooling.bytebuddy.matcher.HierarchyMatchers.hasInterface;
44
import static datadog.trace.agent.tooling.bytebuddy.matcher.HierarchyMatchers.implementsInterface;
55
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
6+
import static datadog.trace.api.datastreams.DataStreamsTags.Direction.INBOUND;
7+
import static datadog.trace.api.datastreams.DataStreamsTags.create;
68
import static datadog.trace.bootstrap.instrumentation.api.AgentPropagation.extractContextAndGetSpanContext;
79
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateNext;
810
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.closePrevious;
@@ -12,6 +14,7 @@
1214
import static datadog.trace.instrumentation.jms.JMSDecorator.JMS_CONSUME;
1315
import static datadog.trace.instrumentation.jms.JMSDecorator.JMS_DELIVER;
1416
import static datadog.trace.instrumentation.jms.JMSDecorator.TIME_IN_QUEUE_ENABLED;
17+
import static datadog.trace.instrumentation.jms.JMSDecorator.messageTechnology;
1518
import static datadog.trace.instrumentation.jms.MessageExtractAdapter.GETTER;
1619
import static java.util.concurrent.TimeUnit.MILLISECONDS;
1720
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
@@ -20,10 +23,14 @@
2023
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
2124

2225
import datadog.trace.agent.tooling.Instrumenter;
26+
import datadog.trace.api.Config;
27+
import datadog.trace.api.datastreams.DataStreamsContext;
28+
import datadog.trace.api.datastreams.DataStreamsTags;
2329
import datadog.trace.bootstrap.CallDepthThreadLocalMap;
2430
import datadog.trace.bootstrap.InstrumentationContext;
2531
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
2632
import datadog.trace.bootstrap.instrumentation.api.AgentSpanContext;
33+
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
2734
import datadog.trace.bootstrap.instrumentation.jms.MessageConsumerState;
2835
import datadog.trace.bootstrap.instrumentation.jms.SessionState;
2936
import javax.jms.Message;
@@ -143,6 +150,17 @@ public static void afterReceive(
143150

144151
CONSUMER_DECORATE.afterStart(span);
145152
CONSUMER_DECORATE.onConsume(span, message, consumerState.getConsumerResourceName());
153+
154+
if (Config.get().isDataStreamsEnabled()) {
155+
final String tech = messageTechnology(message);
156+
if ("ibmmq".equals(tech)) { // Initial release only supports DSM in JMS for IBM MQ
157+
DataStreamsTags tags =
158+
create(tech, INBOUND, consumerState.getConsumerBaseResourceName().toString());
159+
DataStreamsContext dsmContext = DataStreamsContext.fromTags(tags);
160+
AgentTracer.get().getDataStreamsMonitoring().setCheckpoint(span, dsmContext);
161+
}
162+
}
163+
146164
CONSUMER_DECORATE.onError(span, throwable);
147165

148166
activateNext(span); // scope is left open until next message or it times out

dd-java-agent/instrumentation/jms/src/main/java/datadog/trace/instrumentation/jms/JMSMessageProducerInstrumentation.java

Lines changed: 42 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,21 +4,27 @@
44
import static datadog.trace.agent.tooling.bytebuddy.matcher.HierarchyMatchers.hasInterface;
55
import static datadog.trace.agent.tooling.bytebuddy.matcher.HierarchyMatchers.implementsInterface;
66
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
7+
import static datadog.trace.api.datastreams.DataStreamsTags.Direction.OUTBOUND;
8+
import static datadog.trace.api.datastreams.DataStreamsTags.create;
79
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan;
810
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan;
911
import static datadog.trace.instrumentation.jms.JMSDecorator.JMS_PRODUCE;
1012
import static datadog.trace.instrumentation.jms.JMSDecorator.PRODUCER_DECORATE;
1113
import static datadog.trace.instrumentation.jms.JMSDecorator.TIME_IN_QUEUE_ENABLED;
14+
import static datadog.trace.instrumentation.jms.JMSDecorator.messageTechnology;
1215
import static datadog.trace.instrumentation.jms.MessageInjectAdapter.SETTER;
1316
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
1417
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
1518

1619
import datadog.trace.agent.tooling.Instrumenter;
1720
import datadog.trace.api.Config;
21+
import datadog.trace.api.datastreams.DataStreamsContext;
22+
import datadog.trace.api.datastreams.DataStreamsTags;
1823
import datadog.trace.bootstrap.CallDepthThreadLocalMap;
1924
import datadog.trace.bootstrap.InstrumentationContext;
2025
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
2126
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
27+
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
2228
import datadog.trace.bootstrap.instrumentation.jms.MessageProducerState;
2329
import javax.jms.Destination;
2430
import javax.jms.Message;
@@ -73,24 +79,39 @@ public static AgentScope beforeSend(
7379
.get(producer);
7480

7581
CharSequence resourceName;
76-
77-
if (null != producerState) {
78-
resourceName = producerState.getResourceName();
79-
} else {
80-
try {
81-
// fall-back when producer wasn't created via standard Session.createProducer API
82+
String destinationName;
83+
try {
84+
// fall-back when producer wasn't created via standard Session.createProducer API
85+
if (null != producerState) {
86+
resourceName = producerState.getResourceName();
87+
Destination destination = producer.getDestination();
88+
destinationName = PRODUCER_DECORATE.getDestinationName(destination);
89+
} else {
8290
Destination destination = producer.getDestination();
91+
destinationName = PRODUCER_DECORATE.getDestinationName(destination);
8392
boolean isQueue = PRODUCER_DECORATE.isQueue(destination);
84-
String destinationName = PRODUCER_DECORATE.getDestinationName(destination);
8593
resourceName = PRODUCER_DECORATE.toResourceName(destinationName, isQueue);
86-
} catch (Exception ignored) {
87-
resourceName = "Unknown Destination";
8894
}
95+
} catch (Exception ignored) {
96+
resourceName = "Unknown Destination";
97+
destinationName = "";
8998
}
9099

91100
final AgentSpan span = startSpan(JMS_PRODUCE);
92101
PRODUCER_DECORATE.afterStart(span);
93102
PRODUCER_DECORATE.onProduce(span, resourceName);
103+
104+
if (null != destinationName
105+
&& !destinationName.isEmpty()
106+
&& Config.get().isDataStreamsEnabled()) {
107+
final String tech = messageTechnology(message);
108+
if ("ibmmq".equals(tech)) { // Initial release only supports DSM in JMS for IBM MQ
109+
DataStreamsTags tags = create(tech, OUTBOUND, destinationName);
110+
DataStreamsContext dsmContext = DataStreamsContext.fromTags(tags);
111+
AgentTracer.get().getDataStreamsMonitoring().setCheckpoint(span, dsmContext);
112+
}
113+
}
114+
94115
if (JMSDecorator.canInject(message)) {
95116
if (Config.get().isJmsPropagationEnabled()
96117
&& (null == producerState || !producerState.isPropagationDisabled())) {
@@ -138,6 +159,18 @@ public static AgentScope beforeSend(
138159
final AgentSpan span = startSpan(JMS_PRODUCE);
139160
PRODUCER_DECORATE.afterStart(span);
140161
PRODUCER_DECORATE.onProduce(span, resourceName);
162+
163+
if (null != destinationName
164+
&& !destinationName.isEmpty()
165+
&& Config.get().isDataStreamsEnabled()) {
166+
final String tech = messageTechnology(message);
167+
if ("ibmmq".equals(tech)) { // Initial release only supports DSM in JMS for IBM MQ
168+
DataStreamsTags tags = create(tech, OUTBOUND, destinationName);
169+
DataStreamsContext dsmContext = DataStreamsContext.fromTags(tags);
170+
AgentTracer.get().getDataStreamsMonitoring().setCheckpoint(span, dsmContext);
171+
}
172+
}
173+
141174
if (JMSDecorator.canInject(message)) {
142175
if (Config.get().isJmsPropagationEnabled()
143176
&& !Config.get().isJmsPropagationDisabledForDestination(destinationName))

dd-java-agent/instrumentation/jms/src/main/java/datadog/trace/instrumentation/jms/SessionInstrumentation.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,11 @@ public static void bindConsumerState(
176176
consumerStateStore.put(
177177
consumer,
178178
new MessageConsumerState(
179-
sessionState, brokerResourceName, consumerResourceName, propagationDisabled));
179+
sessionState,
180+
brokerResourceName,
181+
destinationName,
182+
consumerResourceName,
183+
propagationDisabled));
180184
}
181185
}
182186
}

0 commit comments

Comments
 (0)