Skip to content

Commit d72dddf

Browse files
committed
Adds Data Streams Monitoring support for IBM MQ via existing JMS instrumentation
This support is currently limited to only create DSM checkpoints for JMS when the underlying technology is IBM MQ, to reduce test surface and speed initial release of IBM MQ support. There is existing DSM support for most other technologies commonly used through JMS, so they would gain little from having support for other technologies enabled, but enabling them would require thorough testing to prevent double-reporting. This implementation does not add DSM context propagation in JMS, as in other languages customers have seen production breakages from unexpected additional fields added to IBM MQ. Optional DSM context propagation may be added in a future followup, but without it we are still able to track checkpoints in the DSM map. There is a slight modification to hold the raw queue name in addition to the formatted version (e.g. "queue:///DEV.QUEUE.1" rather than just the previous "Produced for Queue queue:///DEV.QUEUE.1") as that raw queue name is required when generating a DSM checkpoint. Other than that, changes are purely additive and shoulf only take effect when existing config `data_streams_enabled` is true.
1 parent 425276d commit d72dddf

File tree

5 files changed

+80
-12
lines changed

5 files changed

+80
-12
lines changed

dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/jms/MessageConsumerState.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,16 +9,19 @@ public final class MessageConsumerState {
99
private final SessionState sessionState;
1010
private final CharSequence brokerResourceName;
1111
private final String brokerServiceName;
12+
private final CharSequence consumerBaseResourceName;
1213
private final CharSequence consumerResourceName;
1314
private final boolean propagationDisabled;
1415

1516
public MessageConsumerState(
1617
SessionState sessionState,
1718
CharSequence brokerResourceName,
19+
CharSequence consumerBaseResourceName,
1820
CharSequence consumerResourceName,
1921
boolean propagationDisabled) {
2022
this.sessionState = sessionState;
2123
this.brokerResourceName = brokerResourceName;
24+
this.consumerBaseResourceName = consumerBaseResourceName;
2225
this.consumerResourceName = consumerResourceName;
2326
this.propagationDisabled = propagationDisabled;
2427

@@ -47,6 +50,10 @@ public String getBrokerServiceName() {
4750
return brokerServiceName;
4851
}
4952

53+
public CharSequence getConsumerBaseResourceName() {
54+
return consumerBaseResourceName;
55+
}
56+
5057
public CharSequence getConsumerResourceName() {
5158
return consumerResourceName;
5259
}

dd-java-agent/instrumentation/jms/src/main/java/datadog/trace/instrumentation/jms/JMSDecorator.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,18 @@ public static void logJMSException(JMSException ex) {
114114
}
115115
}
116116

117+
public static String messageTechnology(Message m) {
118+
String messageClass = m.getClass().getName();
119+
120+
if (messageClass.startsWith("com.amazon.sqs")) {
121+
return "sqs";
122+
} else if (messageClass.startsWith("com.ibm")) {
123+
return "ibmmq";
124+
} else {
125+
return "unknown";
126+
}
127+
}
128+
117129
@Override
118130
protected String[] instrumentationNames() {
119131
return new String[] {"jms"};

dd-java-agent/instrumentation/jms/src/main/java/datadog/trace/instrumentation/jms/JMSMessageConsumerInstrumentation.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
import static datadog.trace.agent.tooling.bytebuddy.matcher.HierarchyMatchers.hasInterface;
44
import static datadog.trace.agent.tooling.bytebuddy.matcher.HierarchyMatchers.implementsInterface;
55
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
6+
import static datadog.trace.api.datastreams.DataStreamsTags.Direction.INBOUND;
7+
import static datadog.trace.api.datastreams.DataStreamsTags.create;
68
import static datadog.trace.bootstrap.instrumentation.api.AgentPropagation.extractContextAndGetSpanContext;
79
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateNext;
810
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.closePrevious;
@@ -12,6 +14,7 @@
1214
import static datadog.trace.instrumentation.jms.JMSDecorator.JMS_CONSUME;
1315
import static datadog.trace.instrumentation.jms.JMSDecorator.JMS_DELIVER;
1416
import static datadog.trace.instrumentation.jms.JMSDecorator.TIME_IN_QUEUE_ENABLED;
17+
import static datadog.trace.instrumentation.jms.JMSDecorator.messageTechnology;
1518
import static datadog.trace.instrumentation.jms.MessageExtractAdapter.GETTER;
1619
import static java.util.concurrent.TimeUnit.MILLISECONDS;
1720
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
@@ -20,10 +23,14 @@
2023
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
2124

2225
import datadog.trace.agent.tooling.Instrumenter;
26+
import datadog.trace.api.Config;
27+
import datadog.trace.api.datastreams.DataStreamsContext;
28+
import datadog.trace.api.datastreams.DataStreamsTags;
2329
import datadog.trace.bootstrap.CallDepthThreadLocalMap;
2430
import datadog.trace.bootstrap.InstrumentationContext;
2531
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
2632
import datadog.trace.bootstrap.instrumentation.api.AgentSpanContext;
33+
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
2734
import datadog.trace.bootstrap.instrumentation.jms.MessageConsumerState;
2835
import datadog.trace.bootstrap.instrumentation.jms.SessionState;
2936
import javax.jms.Message;
@@ -143,6 +150,17 @@ public static void afterReceive(
143150

144151
CONSUMER_DECORATE.afterStart(span);
145152
CONSUMER_DECORATE.onConsume(span, message, consumerState.getConsumerResourceName());
153+
154+
if (Config.get().isDataStreamsEnabled()) {
155+
final String tech = messageTechnology(message);
156+
if (tech == "ibmmq") { // Initial release only supports DSM in JMS for IBM MQ
157+
DataStreamsTags tags =
158+
create(tech, INBOUND, consumerState.getConsumerBaseResourceName().toString());
159+
DataStreamsContext dsmContext = DataStreamsContext.fromTags(tags);
160+
AgentTracer.get().getDataStreamsMonitoring().setCheckpoint(span, dsmContext);
161+
}
162+
}
163+
146164
CONSUMER_DECORATE.onError(span, throwable);
147165

148166
activateNext(span); // scope is left open until next message or it times out

dd-java-agent/instrumentation/jms/src/main/java/datadog/trace/instrumentation/jms/JMSMessageProducerInstrumentation.java

Lines changed: 38 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,21 +4,27 @@
44
import static datadog.trace.agent.tooling.bytebuddy.matcher.HierarchyMatchers.hasInterface;
55
import static datadog.trace.agent.tooling.bytebuddy.matcher.HierarchyMatchers.implementsInterface;
66
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
7+
import static datadog.trace.api.datastreams.DataStreamsTags.Direction.OUTBOUND;
8+
import static datadog.trace.api.datastreams.DataStreamsTags.create;
79
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan;
810
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan;
911
import static datadog.trace.instrumentation.jms.JMSDecorator.JMS_PRODUCE;
1012
import static datadog.trace.instrumentation.jms.JMSDecorator.PRODUCER_DECORATE;
1113
import static datadog.trace.instrumentation.jms.JMSDecorator.TIME_IN_QUEUE_ENABLED;
14+
import static datadog.trace.instrumentation.jms.JMSDecorator.messageTechnology;
1215
import static datadog.trace.instrumentation.jms.MessageInjectAdapter.SETTER;
1316
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
1417
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
1518

1619
import datadog.trace.agent.tooling.Instrumenter;
1720
import datadog.trace.api.Config;
21+
import datadog.trace.api.datastreams.DataStreamsContext;
22+
import datadog.trace.api.datastreams.DataStreamsTags;
1823
import datadog.trace.bootstrap.CallDepthThreadLocalMap;
1924
import datadog.trace.bootstrap.InstrumentationContext;
2025
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
2126
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
27+
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
2228
import datadog.trace.bootstrap.instrumentation.jms.MessageProducerState;
2329
import javax.jms.Destination;
2430
import javax.jms.Message;
@@ -73,24 +79,35 @@ public static AgentScope beforeSend(
7379
.get(producer);
7480

7581
CharSequence resourceName;
76-
77-
if (null != producerState) {
78-
resourceName = producerState.getResourceName();
79-
} else {
80-
try {
81-
// fall-back when producer wasn't created via standard Session.createProducer API
82-
Destination destination = producer.getDestination();
83-
boolean isQueue = PRODUCER_DECORATE.isQueue(destination);
84-
String destinationName = PRODUCER_DECORATE.getDestinationName(destination);
82+
String destinationName;
83+
try {
84+
// fall-back when producer wasn't created via standard Session.createProducer API
85+
Destination destination = producer.getDestination();
86+
boolean isQueue = PRODUCER_DECORATE.isQueue(destination);
87+
destinationName = PRODUCER_DECORATE.getDestinationName(destination);
88+
if (null != producerState) {
89+
resourceName = producerState.getResourceName();
90+
} else {
8591
resourceName = PRODUCER_DECORATE.toResourceName(destinationName, isQueue);
86-
} catch (Exception ignored) {
87-
resourceName = "Unknown Destination";
8892
}
93+
} catch (Exception ignored) {
94+
resourceName = "Unknown Destination";
95+
destinationName = "";
8996
}
9097

9198
final AgentSpan span = startSpan(JMS_PRODUCE);
9299
PRODUCER_DECORATE.afterStart(span);
93100
PRODUCER_DECORATE.onProduce(span, resourceName);
101+
102+
if (!destinationName.isEmpty() && Config.get().isDataStreamsEnabled()) {
103+
final String tech = messageTechnology(message);
104+
if (tech == "ibmmq") { // Initial release only supports DSM in JMS for IBM MQ
105+
DataStreamsTags tags = create(tech, OUTBOUND, destinationName);
106+
DataStreamsContext dsmContext = DataStreamsContext.fromTags(tags);
107+
AgentTracer.get().getDataStreamsMonitoring().setCheckpoint(span, dsmContext);
108+
}
109+
}
110+
94111
if (JMSDecorator.canInject(message)) {
95112
if (Config.get().isJmsPropagationEnabled()
96113
&& (null == producerState || !producerState.isPropagationDisabled())) {
@@ -138,6 +155,16 @@ public static AgentScope beforeSend(
138155
final AgentSpan span = startSpan(JMS_PRODUCE);
139156
PRODUCER_DECORATE.afterStart(span);
140157
PRODUCER_DECORATE.onProduce(span, resourceName);
158+
159+
if (!destinationName.isEmpty() && Config.get().isDataStreamsEnabled()) {
160+
final String tech = messageTechnology(message);
161+
if (tech == "ibmmq") { // Initial release only supports DSM in JMS for IBM MQ
162+
DataStreamsTags tags = create(tech, OUTBOUND, destinationName);
163+
DataStreamsContext dsmContext = DataStreamsContext.fromTags(tags);
164+
AgentTracer.get().getDataStreamsMonitoring().setCheckpoint(span, dsmContext);
165+
}
166+
}
167+
141168
if (JMSDecorator.canInject(message)) {
142169
if (Config.get().isJmsPropagationEnabled()
143170
&& !Config.get().isJmsPropagationDisabledForDestination(destinationName))

dd-java-agent/instrumentation/jms/src/main/java/datadog/trace/instrumentation/jms/SessionInstrumentation.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,11 @@ public static void bindConsumerState(
176176
consumerStateStore.put(
177177
consumer,
178178
new MessageConsumerState(
179-
sessionState, brokerResourceName, consumerResourceName, propagationDisabled));
179+
sessionState,
180+
brokerResourceName,
181+
destinationName,
182+
consumerResourceName,
183+
propagationDisabled));
180184
}
181185
}
182186
}

0 commit comments

Comments
 (0)