Skip to content

Commit d3cf417

Browse files
committed
Move Deliver OTEL activity to consumer dispatchers
Fixes #1621
1 parent 0c8492a commit d3cf417

File tree

5 files changed

+50
-46
lines changed

5 files changed

+50
-46
lines changed

projects/RabbitMQ.Client/client/events/AsyncEventingBasicConsumer.cs

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
using System;
2-
using System.Diagnostics;
32
using System.Threading.Tasks;
43
using RabbitMQ.Client.Impl;
54

@@ -80,8 +79,9 @@ public override Task HandleBasicDeliver(string consumerTag, ulong deliveryTag, b
8079
IReadOnlyBasicProperties properties, ReadOnlyMemory<byte> body)
8180
{
8281
var deliverEventArgs = new BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body);
82+
8383
// No need to call base, it's empty.
84-
return BasicDeliverWrapper(deliverEventArgs);
84+
return _receivedWrapper.InvokeAsync(this, deliverEventArgs);
8585
}
8686

8787
///<summary>Fires the Shutdown event.</summary>
@@ -95,13 +95,5 @@ await _shutdownWrapper.InvokeAsync(this, reason)
9595
.ConfigureAwait(false);
9696
}
9797
}
98-
99-
private async Task BasicDeliverWrapper(BasicDeliverEventArgs eventArgs)
100-
{
101-
using (Activity? activity = RabbitMQActivitySource.Deliver(eventArgs))
102-
{
103-
await _receivedWrapper.InvokeAsync(this, eventArgs).ConfigureAwait(false);
104-
}
105-
}
10698
}
10799
}

projects/RabbitMQ.Client/client/events/EventingBasicConsumer.cs

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
//---------------------------------------------------------------------------
3131

3232
using System;
33-
using System.Diagnostics;
3433
using System.Threading.Tasks;
3534

3635
namespace RabbitMQ.Client.Events
@@ -89,13 +88,10 @@ public override void HandleBasicConsumeOk(string consumerTag)
8988
public override async Task HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey,
9089
IReadOnlyBasicProperties properties, ReadOnlyMemory<byte> body)
9190
{
92-
BasicDeliverEventArgs eventArgs = new BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body);
93-
using (Activity? activity = RabbitMQActivitySource.SubscriberHasListeners ? RabbitMQActivitySource.Deliver(eventArgs) : default)
94-
{
95-
await base.HandleBasicDeliverAsync(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body)
96-
.ConfigureAwait(false);
97-
Received?.Invoke(this, eventArgs);
98-
}
91+
var eventArgs = new BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body);
92+
await base.HandleBasicDeliverAsync(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body)
93+
.ConfigureAwait(false);
94+
Received?.Invoke(this, eventArgs);
9995
}
10096

10197
///<summary>Fires the Shutdown event.</summary>

projects/RabbitMQ.Client/client/impl/ConsumerDispatching/AsyncConsumerDispatcher.cs

Lines changed: 29 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using System;
2+
using System.Diagnostics;
23
using System.Threading.Tasks;
34
using RabbitMQ.Client.Events;
45
using RabbitMQ.Client.Impl;
@@ -24,23 +25,35 @@ protected override async Task ProcessChannelAsync()
2425
{
2526
try
2627
{
27-
Task task = work.WorkType switch
28+
switch (work.WorkType)
2829
{
29-
WorkType.Deliver => work.AsyncConsumer.HandleBasicDeliver(
30-
work.ConsumerTag!, work.DeliveryTag, work.Redelivered,
31-
work.Exchange!, work.RoutingKey!, work.BasicProperties!, work.Body.Memory),
32-
33-
WorkType.Cancel => work.AsyncConsumer.HandleBasicCancel(work.ConsumerTag!),
34-
35-
WorkType.CancelOk => work.AsyncConsumer.HandleBasicCancelOk(work.ConsumerTag!),
36-
37-
WorkType.ConsumeOk => work.AsyncConsumer.HandleBasicConsumeOk(work.ConsumerTag!),
38-
39-
WorkType.Shutdown => work.AsyncConsumer.HandleChannelShutdown(_channel, work.Reason!),
40-
41-
_ => Task.CompletedTask
42-
};
43-
await task.ConfigureAwait(false);
30+
case WorkType.Deliver:
31+
using (Activity? activity = RabbitMQActivitySource.Deliver(work.RoutingKey!, work.Exchange!,
32+
work.DeliveryTag, work.BasicProperties!, work.Body.Size))
33+
{
34+
await work.AsyncConsumer.HandleBasicDeliver(
35+
work.ConsumerTag!, work.DeliveryTag, work.Redelivered,
36+
work.Exchange!, work.RoutingKey!, work.BasicProperties!, work.Body.Memory)
37+
.ConfigureAwait(false);
38+
}
39+
break;
40+
case WorkType.Cancel:
41+
await work.AsyncConsumer.HandleBasicCancel(work.ConsumerTag!)
42+
.ConfigureAwait(false);
43+
break;
44+
case WorkType.CancelOk:
45+
await work.AsyncConsumer.HandleBasicCancelOk(work.ConsumerTag!)
46+
.ConfigureAwait(false);
47+
break;
48+
case WorkType.ConsumeOk:
49+
await work.AsyncConsumer.HandleBasicConsumeOk(work.ConsumerTag!)
50+
.ConfigureAwait(false);
51+
break;
52+
case WorkType.Shutdown:
53+
await work.AsyncConsumer.HandleChannelShutdown(_channel, work.Reason!)
54+
.ConfigureAwait(false);
55+
break;
56+
}
4457
}
4558
catch (Exception e)
4659
{

projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcher.cs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using System;
2+
using System.Diagnostics;
23
using System.Threading.Tasks;
34
using RabbitMQ.Client.Events;
45
using RabbitMQ.Client.Impl;
@@ -29,10 +30,14 @@ protected override async Task ProcessChannelAsync()
2930
switch (work.WorkType)
3031
{
3132
case WorkType.Deliver:
32-
await consumer.HandleBasicDeliverAsync(
33-
consumerTag, work.DeliveryTag, work.Redelivered,
34-
work.Exchange!, work.RoutingKey!, work.BasicProperties!, work.Body.Memory)
35-
.ConfigureAwait(false);
33+
using (Activity? activity = RabbitMQActivitySource.Deliver(work.RoutingKey!, work.Exchange!,
34+
work.DeliveryTag, work.BasicProperties!, work.Body.Size))
35+
{
36+
await consumer.HandleBasicDeliverAsync(
37+
consumerTag, work.DeliveryTag, work.Redelivered,
38+
work.Exchange!, work.RoutingKey!, work.BasicProperties!, work.Body.Memory)
39+
.ConfigureAwait(false);
40+
}
3641
break;
3742
case WorkType.Cancel:
3843
consumer.HandleBasicCancel(consumerTag);

projects/RabbitMQ.Client/client/impl/RabbitMQActivitySource.cs

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
using System.Net.Sockets;
66
using System.Reflection;
77
using System.Text;
8-
using RabbitMQ.Client.Events;
98
using RabbitMQ.Client.Impl;
109

1110
namespace RabbitMQ.Client
@@ -46,7 +45,6 @@ public static class RabbitMQActivitySource
4645

4746
public static bool UseRoutingKeyAsOperationName { get; set; } = true;
4847
internal static bool PublisherHasListeners => s_publisherSource.HasListeners();
49-
internal static bool SubscriberHasListeners => s_subscriberSource.HasListeners();
5048

5149
internal static readonly IEnumerable<KeyValuePair<string, object?>> CreationTags = new[]
5250
{
@@ -120,7 +118,8 @@ public static class RabbitMQActivitySource
120118
return activity;
121119
}
122120

123-
internal static Activity? Deliver(BasicDeliverEventArgs deliverEventArgs)
121+
internal static Activity? Deliver(string routingKey, string exchange, ulong deliveryTag,
122+
IReadOnlyBasicProperties basicProperties, int bodySize)
124123
{
125124
if (!s_subscriberSource.HasListeners())
126125
{
@@ -129,13 +128,12 @@ public static class RabbitMQActivitySource
129128

130129
// Extract the PropagationContext of the upstream parent from the message headers.
131130
Activity? activity = s_subscriberSource.StartLinkedRabbitMQActivity(
132-
UseRoutingKeyAsOperationName ? $"{deliverEventArgs.RoutingKey} deliver" : "deliver",
133-
ActivityKind.Consumer, ContextExtractor(deliverEventArgs.BasicProperties));
131+
UseRoutingKeyAsOperationName ? $"{routingKey} deliver" : "deliver",
132+
ActivityKind.Consumer, ContextExtractor(basicProperties));
134133
if (activity != null && activity.IsAllDataRequested)
135134
{
136-
PopulateMessagingTags("deliver", deliverEventArgs.RoutingKey, deliverEventArgs.Exchange,
137-
deliverEventArgs.DeliveryTag, deliverEventArgs.BasicProperties, deliverEventArgs.Body.Length,
138-
activity);
135+
PopulateMessagingTags("deliver", routingKey, exchange,
136+
deliveryTag, basicProperties, bodySize, activity);
139137
}
140138

141139
return activity;

0 commit comments

Comments
 (0)