Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using System;
using System.Diagnostics;
using System.Threading.Tasks;
using RabbitMQ.Client.Impl;

Expand Down Expand Up @@ -80,8 +79,9 @@ public override Task HandleBasicDeliver(string consumerTag, ulong deliveryTag, b
IReadOnlyBasicProperties properties, ReadOnlyMemory<byte> body)
{
var deliverEventArgs = new BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body);

// No need to call base, it's empty.
return BasicDeliverWrapper(deliverEventArgs);
return _receivedWrapper.InvokeAsync(this, deliverEventArgs);
}

///<summary>Fires the Shutdown event.</summary>
Expand All @@ -95,13 +95,5 @@ await _shutdownWrapper.InvokeAsync(this, reason)
.ConfigureAwait(false);
}
}

private async Task BasicDeliverWrapper(BasicDeliverEventArgs eventArgs)
{
using (Activity? activity = RabbitMQActivitySource.Deliver(eventArgs))
{
await _receivedWrapper.InvokeAsync(this, eventArgs).ConfigureAwait(false);
}
}
}
}
12 changes: 4 additions & 8 deletions projects/RabbitMQ.Client/client/events/EventingBasicConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
//---------------------------------------------------------------------------

using System;
using System.Diagnostics;
using System.Threading.Tasks;

namespace RabbitMQ.Client.Events
Expand Down Expand Up @@ -89,13 +88,10 @@ public override void HandleBasicConsumeOk(string consumerTag)
public override async Task HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey,
IReadOnlyBasicProperties properties, ReadOnlyMemory<byte> body)
{
BasicDeliverEventArgs eventArgs = new BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body);
using (Activity? activity = RabbitMQActivitySource.SubscriberHasListeners ? RabbitMQActivitySource.Deliver(eventArgs) : default)
{
await base.HandleBasicDeliverAsync(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body)
.ConfigureAwait(false);
Received?.Invoke(this, eventArgs);
}
var eventArgs = new BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body);
await base.HandleBasicDeliverAsync(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body)
.ConfigureAwait(false);
Received?.Invoke(this, eventArgs);
}

///<summary>Fires the Shutdown event.</summary>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Diagnostics;
using System.Threading.Tasks;
using RabbitMQ.Client.Events;
using RabbitMQ.Client.Impl;
Expand All @@ -24,23 +25,35 @@ protected override async Task ProcessChannelAsync()
{
try
{
Task task = work.WorkType switch
switch (work.WorkType)
{
WorkType.Deliver => work.AsyncConsumer.HandleBasicDeliver(
work.ConsumerTag!, work.DeliveryTag, work.Redelivered,
work.Exchange!, work.RoutingKey!, work.BasicProperties!, work.Body.Memory),

WorkType.Cancel => work.AsyncConsumer.HandleBasicCancel(work.ConsumerTag!),

WorkType.CancelOk => work.AsyncConsumer.HandleBasicCancelOk(work.ConsumerTag!),

WorkType.ConsumeOk => work.AsyncConsumer.HandleBasicConsumeOk(work.ConsumerTag!),

WorkType.Shutdown => work.AsyncConsumer.HandleChannelShutdown(_channel, work.Reason!),

_ => Task.CompletedTask
};
await task.ConfigureAwait(false);
case WorkType.Deliver:
using (Activity? activity = RabbitMQActivitySource.Deliver(work.RoutingKey!, work.Exchange!,
work.DeliveryTag, work.BasicProperties!, work.Body.Size))
{
await work.AsyncConsumer.HandleBasicDeliver(
work.ConsumerTag!, work.DeliveryTag, work.Redelivered,
work.Exchange!, work.RoutingKey!, work.BasicProperties!, work.Body.Memory)
.ConfigureAwait(false);
}
break;
case WorkType.Cancel:
await work.AsyncConsumer.HandleBasicCancel(work.ConsumerTag!)
.ConfigureAwait(false);
break;
case WorkType.CancelOk:
await work.AsyncConsumer.HandleBasicCancelOk(work.ConsumerTag!)
.ConfigureAwait(false);
break;
case WorkType.ConsumeOk:
await work.AsyncConsumer.HandleBasicConsumeOk(work.ConsumerTag!)
.ConfigureAwait(false);
break;
case WorkType.Shutdown:
await work.AsyncConsumer.HandleChannelShutdown(_channel, work.Reason!)
.ConfigureAwait(false);
break;
}
}
catch (Exception e)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Diagnostics;
using System.Threading.Tasks;
using RabbitMQ.Client.Events;
using RabbitMQ.Client.Impl;
Expand Down Expand Up @@ -29,10 +30,14 @@ protected override async Task ProcessChannelAsync()
switch (work.WorkType)
{
case WorkType.Deliver:
await consumer.HandleBasicDeliverAsync(
consumerTag, work.DeliveryTag, work.Redelivered,
work.Exchange!, work.RoutingKey!, work.BasicProperties!, work.Body.Memory)
.ConfigureAwait(false);
using (Activity? activity = RabbitMQActivitySource.Deliver(work.RoutingKey!, work.Exchange!,
work.DeliveryTag, work.BasicProperties!, work.Body.Size))
{
await consumer.HandleBasicDeliverAsync(
consumerTag, work.DeliveryTag, work.Redelivered,
work.Exchange!, work.RoutingKey!, work.BasicProperties!, work.Body.Memory)
.ConfigureAwait(false);
}
break;
case WorkType.Cancel:
consumer.HandleBasicCancel(consumerTag);
Expand Down
14 changes: 6 additions & 8 deletions projects/RabbitMQ.Client/client/impl/RabbitMQActivitySource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
using System.Net.Sockets;
using System.Reflection;
using System.Text;
using RabbitMQ.Client.Events;
using RabbitMQ.Client.Impl;

namespace RabbitMQ.Client
Expand Down Expand Up @@ -46,7 +45,6 @@ public static class RabbitMQActivitySource

public static bool UseRoutingKeyAsOperationName { get; set; } = true;
internal static bool PublisherHasListeners => s_publisherSource.HasListeners();
internal static bool SubscriberHasListeners => s_subscriberSource.HasListeners();

internal static readonly IEnumerable<KeyValuePair<string, object?>> CreationTags = new[]
{
Expand Down Expand Up @@ -120,7 +118,8 @@ public static class RabbitMQActivitySource
return activity;
}

internal static Activity? Deliver(BasicDeliverEventArgs deliverEventArgs)
internal static Activity? Deliver(string routingKey, string exchange, ulong deliveryTag,
IReadOnlyBasicProperties basicProperties, int bodySize)
{
if (!s_subscriberSource.HasListeners())
{
Expand All @@ -129,13 +128,12 @@ public static class RabbitMQActivitySource

// Extract the PropagationContext of the upstream parent from the message headers.
Activity? activity = s_subscriberSource.StartLinkedRabbitMQActivity(
UseRoutingKeyAsOperationName ? $"{deliverEventArgs.RoutingKey} deliver" : "deliver",
ActivityKind.Consumer, ContextExtractor(deliverEventArgs.BasicProperties));
UseRoutingKeyAsOperationName ? $"{routingKey} deliver" : "deliver",
ActivityKind.Consumer, ContextExtractor(basicProperties));
if (activity != null && activity.IsAllDataRequested)
{
PopulateMessagingTags("deliver", deliverEventArgs.RoutingKey, deliverEventArgs.Exchange,
deliverEventArgs.DeliveryTag, deliverEventArgs.BasicProperties, deliverEventArgs.Body.Length,
activity);
PopulateMessagingTags("deliver", routingKey, exchange,
deliveryTag, basicProperties, bodySize, activity);
}

return activity;
Expand Down