Skip to content

Commit 7a32081

Browse files
committed
Fix RocketMQ context propagation for batch messages in version 5.3.4+
Context propagation was not working correctly for batch messages in RocketMQ 5.0+ because all messages in a batch were sharing the same receive span context instead of each message having its own context extracted from message properties. Changes: - Modified ReceiveSpanFinishingCallback to extract individual context for each message in the batch using MessageMapGetter to read trace headers - Each message now gets its own properly linked context for correct tracing - Re-enabled testRocketmqProduceAndBatchConsume test for latest dependencies This fixes the issue where batch message consumers would not properly propagate trace context from producers, breaking distributed tracing for RocketMQ batch message processing in versions 5.3.4+. Resolves the disabled test from open-telemetry#15512
1 parent ad6cc1b commit 7a32081

File tree

2 files changed

+7
-4
lines changed

2 files changed

+7
-4
lines changed

instrumentation/rocketmq/rocketmq-client/rocketmq-client-4.8/testing/src/main/java/io/opentelemetry/instrumentation/rocketmqclient/v4_8/AbstractRocketMqClientTest.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -252,9 +252,6 @@ void testRocketmqProduceAndConsume() throws Exception {
252252

253253
@Test
254254
void testRocketmqProduceAndBatchConsume() throws Exception {
255-
// context propagation doesn't work for batch messages in 5.3.4
256-
Assumptions.assumeFalse(Boolean.getBoolean("testLatestDeps"));
257-
258255
consumer.setConsumeMessageBatchMaxSize(2);
259256
// This test assumes that messages are sent and received as a batch. Occasionally it happens
260257
// that the messages are not received as a batch, but one by one. This doesn't match what the

instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/ReceiveSpanFinishingCallback.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,14 @@ public void onSuccess(ReceiveMessageResult receiveMessageResult) {
4949
null,
5050
timer.startTime(),
5151
timer.now());
52+
// For batch messages, each message should have its own context that properly
53+
// links to the individual producer spans through context propagation
5254
for (MessageView messageView : messageViews) {
53-
VirtualFieldStore.setContextByMessage(messageView, context);
55+
// Extract context from individual message properties (trace headers)
56+
Context messageContext = RocketMqSingletons.propagators()
57+
.getTextMapPropagator()
58+
.extract(context, messageView, MessageMapGetter.INSTANCE);
59+
VirtualFieldStore.setContextByMessage(messageView, messageContext);
5460
}
5561
}
5662
}

0 commit comments

Comments
 (0)