Skip to content

Commit 20a71f4

Browse files
committed
Pass basic props to RabbitMQActivitySource.BasicPublish to allow messageid tag to be included in trace span
1 parent a960e25 commit 20a71f4

File tree

4 files changed

+36
-16
lines changed

4 files changed

+36
-16
lines changed

projects/RabbitMQ.Client/Impl/Channel.BasicPublish.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ await MaybeEnforceFlowControlAsync(cancellationToken)
6161
var cmd = new BasicPublish(exchange, routingKey, mandatory, default);
6262

6363
using Activity? sendActivity = RabbitMQActivitySource.PublisherHasListeners
64-
? RabbitMQActivitySource.BasicPublish(routingKey, exchange, body.Length)
64+
? RabbitMQActivitySource.BasicPublish(routingKey, exchange, body.Length, basicProperties)
6565
: default;
6666

6767
ulong publishSequenceNumber = 0;
@@ -116,7 +116,7 @@ await MaybeEnforceFlowControlAsync(cancellationToken)
116116
var cmd = new BasicPublishMemory(exchange.Bytes, routingKey.Bytes, mandatory, default);
117117

118118
using Activity? sendActivity = RabbitMQActivitySource.PublisherHasListeners
119-
? RabbitMQActivitySource.BasicPublish(routingKey.Value, exchange.Value, body.Length)
119+
? RabbitMQActivitySource.BasicPublish(routingKey.Value, exchange.Value, body.Length, basicProperties)
120120
: default;
121121

122122
ulong publishSequenceNumber = 0;

projects/RabbitMQ.Client/Impl/RabbitMQActivitySource.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public static class RabbitMQActivitySource
6161
new KeyValuePair<string, object?>(ProtocolVersion, "0.9.1")
6262
};
6363

64-
internal static Activity? BasicPublish(string routingKey, string exchange, int bodySize,
64+
internal static Activity? BasicPublish(string routingKey, string exchange, int bodySize, IReadOnlyBasicProperties basicProperties,
6565
ActivityContext linkedContext = default)
6666
{
6767
if (!s_publisherSource.HasListeners())
@@ -78,7 +78,7 @@ public static class RabbitMQActivitySource
7878
ActivityKind.Producer, linkedContext);
7979
if (activity != null && activity.IsAllDataRequested)
8080
{
81-
PopulateMessagingTags(MessagingOperationTypeSend, MessagingOperationNameBasicPublish, routingKey, exchange, 0, bodySize, activity);
81+
PopulateMessagingTags(MessagingOperationTypeSend, MessagingOperationNameBasicPublish, routingKey, exchange, 0, basicProperties, bodySize, activity);
8282
}
8383

8484
return activity;

projects/Test/SequentialIntegration/TestActivitySource.cs

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -289,9 +289,11 @@ public async Task TestPublisherWithPublicationAddressAndConsumerActivityTagsAsyn
289289
}
290290

291291
[Theory]
292-
[InlineData(true)]
293-
[InlineData(false)]
294-
public async Task TestPublisherAndBasicGetActivityTags(bool useRoutingKeyAsOperationName)
292+
[InlineData(true, true)]
293+
[InlineData(false, true)]
294+
[InlineData(true, false)]
295+
[InlineData(false, false)]
296+
public async Task TestPublisherAndBasicGetActivityTags(bool useRoutingKeyAsOperationName, bool verifyMessageIdTag)
295297
{
296298
RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName;
297299
var activities = new List<Activity>();
@@ -300,18 +302,20 @@ public async Task TestPublisherAndBasicGetActivityTags(bool useRoutingKeyAsOpera
300302
string queue = $"queue-{Guid.NewGuid()}";
301303
const string msg = "for basic.get";
302304

305+
var basicProps = new BasicProperties() { MessageId = Guid.NewGuid().ToString() };
306+
303307
try
304308
{
305309
await _channel.QueueDeclareAsync(queue, false, false, false, null);
306-
await _channel.BasicPublishAsync("", queue, true, Encoding.UTF8.GetBytes(msg));
310+
await _channel.BasicPublishAsync("", queue, true, basicProps, Encoding.UTF8.GetBytes(msg));
307311
QueueDeclareOk ok = await _channel.QueueDeclarePassiveAsync(queue);
308312
Assert.Equal(1u, ok.MessageCount);
309313
BasicGetResult res = await _channel.BasicGetAsync(queue, true);
310314
Assert.Equal(msg, Encoding.UTF8.GetString(res.Body.ToArray()));
311315
ok = await _channel.QueueDeclarePassiveAsync(queue);
312316
Assert.Equal(0u, ok.MessageCount);
313317
await Task.Delay(500);
314-
AssertActivityData(useRoutingKeyAsOperationName, queue, activities, false);
318+
AssertActivityData(useRoutingKeyAsOperationName, queue, activities, false, verifyMessageIdTag);
315319
}
316320
finally
317321
{
@@ -400,7 +404,7 @@ private static ActivityListener StartActivityListener(List<Activity> activities)
400404
}
401405

402406
private void AssertActivityData(bool useRoutingKeyAsOperationName, string queueName,
403-
List<Activity> activityList, bool isDeliver = false)
407+
List<Activity> activityList, bool isDeliver = false, bool verifyMessageIdTag = false)
404408
{
405409
string childName = isDeliver ? "deliver" : "fetch";
406410
Activity[] activities = activityList.ToArray();
@@ -444,6 +448,12 @@ private void AssertActivityData(bool useRoutingKeyAsOperationName, string queueN
444448
AssertIntTagGreaterThanZero(sendActivity, RabbitMQActivitySource.MessagingEnvelopeSize);
445449
AssertIntTagGreaterThanZero(sendActivity, RabbitMQActivitySource.MessagingBodySize);
446450
AssertIntTagGreaterThanZero(receiveActivity, RabbitMQActivitySource.MessagingBodySize);
451+
452+
if (verifyMessageIdTag)
453+
{
454+
AssertStringTagNotNullOrEmpty(sendActivity, RabbitMQActivitySource.MessageId);
455+
AssertStringTagNotNullOrEmpty(receiveActivity, RabbitMQActivitySource.MessageId);
456+
}
447457
}
448458
}
449459
}

projects/Test/SequentialIntegration/TestOpenTelemetry.cs

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -301,9 +301,11 @@ public async Task TestPublisherWithCachedStringsAndConsumerActivityTagsAsync(boo
301301
}
302302

303303
[Theory]
304-
[InlineData(true)]
305-
[InlineData(false)]
306-
public async Task TestPublisherAndBasicGetActivityTags(bool useRoutingKeyAsOperationName)
304+
[InlineData(true, true)]
305+
[InlineData(false, true)]
306+
[InlineData(true, false)]
307+
[InlineData(false, false)]
308+
public async Task TestPublisherAndBasicGetActivityTags(bool useRoutingKeyAsOperationName, bool verifyMessageIdTag)
307309
{
308310
var exportedItems = new List<Activity>();
309311
using var tracer = Sdk.CreateTracerProviderBuilder()
@@ -318,10 +320,12 @@ public async Task TestPublisherAndBasicGetActivityTags(bool useRoutingKeyAsOpera
318320
string queue = $"queue-{Guid.NewGuid()}";
319321
const string msg = "for basic.get";
320322

323+
var basicProps = new BasicProperties() { MessageId = Guid.NewGuid().ToString() };
324+
321325
try
322326
{
323327
await _channel.QueueDeclareAsync(queue, false, false, false, null);
324-
await _channel.BasicPublishAsync("", queue, true, Encoding.UTF8.GetBytes(msg));
328+
await _channel.BasicPublishAsync("", queue, true, basicProps, Encoding.UTF8.GetBytes(msg));
325329
Baggage.ClearBaggage();
326330
Assert.Null(Baggage.GetBaggage("TestItem"));
327331
QueueDeclareOk ok = await _channel.QueueDeclarePassiveAsync(queue);
@@ -331,7 +335,7 @@ public async Task TestPublisherAndBasicGetActivityTags(bool useRoutingKeyAsOpera
331335
ok = await _channel.QueueDeclarePassiveAsync(queue);
332336
Assert.Equal(0u, ok.MessageCount);
333337
await Task.Delay(500);
334-
AssertActivityData(useRoutingKeyAsOperationName, queue, exportedItems, false);
338+
AssertActivityData(useRoutingKeyAsOperationName, queue, exportedItems, false, verifyMessageIdTag);
335339
}
336340
finally
337341
{
@@ -340,7 +344,7 @@ public async Task TestPublisherAndBasicGetActivityTags(bool useRoutingKeyAsOpera
340344
}
341345

342346
private void AssertActivityData(bool useRoutingKeyAsOperationName, string queueName,
343-
List<Activity> activityList, bool isDeliver = false, string baggageGuid = null)
347+
List<Activity> activityList, bool isDeliver = false, bool verifyMessageIdTag = false)
344348
{
345349
string childName = isDeliver ? "deliver" : "fetch";
346350
string childType = isDeliver ? "process" : "receive";
@@ -385,6 +389,12 @@ private void AssertActivityData(bool useRoutingKeyAsOperationName, string queueN
385389
AssertStringTagEquals(receiveActivity, RabbitMQActivitySource.MessagingOperationName, childName);
386390
AssertStringTagEquals(sendActivity, RabbitMQActivitySource.MessagingOperationType, "send");
387391
AssertStringTagEquals(sendActivity, RabbitMQActivitySource.MessagingOperationName, "publish");
392+
393+
if (verifyMessageIdTag)
394+
{
395+
AssertStringTagNotNullOrEmpty(sendActivity, RabbitMQActivitySource.MessageId);
396+
AssertStringTagNotNullOrEmpty(receiveActivity, RabbitMQActivitySource.MessageId);
397+
}
388398
}
389399
}
390400
}

0 commit comments

Comments
 (0)