Skip to content

Commit 67771b2

Browse files
committed
Moved MessageGroupId header add from FairQueuesFeature to InputQueuePump and removed additional references to DoNotAutomaticallyPropagateMessageGroupId from DelayedMessagesPump.
1 parent c51f801 commit 67771b2

File tree

6 files changed

+20
-29
lines changed

6 files changed

+20
-29
lines changed

src/NServiceBus.Transport.SQS.Tests/Receiving/DelayedMessagesPumpTests.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,9 +84,9 @@ public void Initialize_redrive_policy_used_throws()
8484
Assert.That(exception.Message, Is.EqualTo("Delayed delivery queue 'queue-delay.fifo' should not have Redrive Policy enabled."));
8585
}
8686

87-
async Task SetupInitializedPump(bool doNotAutomaticallyPropagateMessageGroupId = false)
87+
async Task SetupInitializedPump()
8888
{
89-
pump = new DelayedMessagesPump(FakeInputQueueQueueUrl, mockSqsClient, new QueueCache(mockSqsClient, q => QueueCache.GetSqsQueueName(q, "")), 15 * 60, doNotAutomaticallyPropagateMessageGroupId: doNotAutomaticallyPropagateMessageGroupId);
89+
pump = new DelayedMessagesPump(FakeInputQueueQueueUrl, mockSqsClient, new QueueCache(mockSqsClient, q => QueueCache.GetSqsQueueName(q, "")), 15 * 60);
9090

9191
mockSqsClient.GetAttributeNamesRequestsResponse = (queue, attributes) => new GetQueueAttributesResponse
9292
{

src/NServiceBus.Transport.SQS/Extensions/MessageExtensions.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ public static DateTimeOffset GetAdjustedDateTimeFromServerSetAttributes(this Mes
1919

2020
public static string? ExtractMessageGroupId(this Message messsage)
2121
{
22-
var messageGroupId = messsage.Attributes.GetValueOrDefault("MessageGroupId");
22+
var messageGroupId = messsage.Attributes?.GetValueOrDefault("MessageGroupId");
2323

2424
return !string.IsNullOrEmpty(messageGroupId) ? messageGroupId : null;
2525
}

src/NServiceBus.Transport.SQS/Features/FairQueuesFeature.cs

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -16,25 +16,10 @@ protected override void Setup(FeatureConfigurationContext context)
1616
return;
1717
}
1818

19-
context.Pipeline.Register(new PersistIncomingMessageGroupIdToHeadersBehavior(), "SQS persist incoming MessageGroupId to headers behavior");
2019
context.Pipeline.Register(new ApplyMessageGroupIdFromHeadersToOutgoingMessageBehavior(), "SQS apply MessageGroupId from headers to outgoing message behavior");
2120
}
2221
}
2322

24-
class PersistIncomingMessageGroupIdToHeadersBehavior : IBehavior<IIncomingPhysicalMessageContext, IIncomingPhysicalMessageContext>
25-
{
26-
public Task Invoke(IIncomingPhysicalMessageContext context, Func<IIncomingPhysicalMessageContext, Task> next)
27-
{
28-
if (context.Extensions.TryGet<Amazon.SQS.Model.Message>(out var nativeMessage)
29-
&& nativeMessage.Attributes.TryGetValue("MessageGroupId", out var messageGroupId)
30-
&& !string.IsNullOrWhiteSpace(messageGroupId))
31-
{
32-
context.Message.Headers[TransportHeaders.MessageGroupId] = messageGroupId;
33-
}
34-
return next(context);
35-
}
36-
}
37-
3823
class ApplyMessageGroupIdFromHeadersToOutgoingMessageBehavior : IBehavior<IOutgoingPhysicalMessageContext, IOutgoingPhysicalMessageContext>
3924
{
4025
public Task Invoke(IOutgoingPhysicalMessageContext context, Func<IOutgoingPhysicalMessageContext, Task> next)

src/NServiceBus.Transport.SQS/Receiving/DelayedMessagesPump.cs

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ namespace NServiceBus.Transport.SQS;
1212
using Extensions;
1313
using Logging;
1414

15-
class DelayedMessagesPump(string receiveAddress, IAmazonSQS sqsClient, QueueCache queueCache, int queueDelayTimeSeconds, bool doNotAutomaticallyPropagateMessageGroupId = false)
15+
class DelayedMessagesPump(string receiveAddress, IAmazonSQS sqsClient, QueueCache queueCache, int queueDelayTimeSeconds)
1616
{
1717
public async Task Initialize(CancellationToken cancellationToken = default)
1818
{
@@ -225,14 +225,10 @@ IReadOnlyCollection<SqsReceivedDelayedMessage> PrepareMessages(ReceiveMessageRes
225225
// Copy over all the message attributes so we don't lose part of the message when moving to the delayed delivery queue
226226
preparedMessage.CopyMessageAttributes(receivedMessage.MessageAttributes);
227227

228-
// Preserve fair queue MessageGroupId if it was set
229-
if (!doNotAutomaticallyPropagateMessageGroupId)
228+
var existingMessageGroupId = receivedMessage.ExtractMessageGroupId();
229+
if (!string.IsNullOrEmpty(existingMessageGroupId))
230230
{
231-
var existingMessageGroupId = receivedMessage.ExtractMessageGroupId();
232-
if (existingMessageGroupId != null)
233-
{
234-
preparedMessage.MessageGroupId = existingMessageGroupId;
235-
}
231+
preparedMessage.MessageGroupId = existingMessageGroupId;
236232
}
237233

238234
preparedMessage.MessageAttributes.Remove(TransportHeaders.DelaySeconds);

src/NServiceBus.Transport.SQS/Receiving/InputQueuePump.cs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@ class InputQueuePump(
2929
Action<string, Exception, CancellationToken> criticalErrorAction,
3030
int? configuredVisibilityTimeoutInSeconds,
3131
TimeSpan maxAutoMessageVisibilityRenewalDuration,
32-
bool setupInfrastructure = true)
32+
bool setupInfrastructure = true,
33+
bool doNotAutomaticallyPropagateMessageGroupId = false)
3334
: IMessageReceiver
3435
{
3536
public async Task Initialize(PushRuntimeSettings limitations, OnMessage onMessage, OnError onError, CancellationToken cancellationToken = default)
@@ -471,6 +472,15 @@ async Task<bool> InnerProcessMessage(Dictionary<string, string> headers, string
471472
transportTransaction.Set(nativeMessage);
472473
transportTransaction.Set("IncomingMessageId", headers[Headers.MessageId]);
473474

475+
if (doNotAutomaticallyPropagateMessageGroupId is false)
476+
{
477+
var messageGroupId = nativeMessage.ExtractMessageGroupId();
478+
if (!string.IsNullOrEmpty(messageGroupId))
479+
{
480+
headers[TransportHeaders.MessageGroupId] = messageGroupId;
481+
}
482+
}
483+
474484
try
475485
{
476486
var messageContext = new MessageContext(

src/NServiceBus.Transport.SQS/Receiving/MessagePump.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,11 @@ public MessagePump(string receiverId,
2828
bool doNotAutomaticallyPropagateMessageGroupId)
2929
{
3030
this.disableDelayedDelivery = disableDelayedDelivery;
31-
inputQueuePump = new InputQueuePump(receiverId, receiveAddress, errorQueueAddress, purgeOnStartup, sqsClient, queueCache, s3Settings, subscriptionManager, criticalErrorAction, visibilityTimeoutInSeconds, maxAutoMessageVisibilityRenewalDuration, setupInfrastructure);
31+
inputQueuePump = new InputQueuePump(receiverId, receiveAddress, errorQueueAddress, purgeOnStartup, sqsClient, queueCache, s3Settings, subscriptionManager, criticalErrorAction, visibilityTimeoutInSeconds, maxAutoMessageVisibilityRenewalDuration, setupInfrastructure, doNotAutomaticallyPropagateMessageGroupId);
3232
if (!disableDelayedDelivery)
3333
{
3434
delayedMessagesPump =
35-
new DelayedMessagesPump(receiveAddress, sqsClient, queueCache, queueDelayTimeSeconds, doNotAutomaticallyPropagateMessageGroupId);
35+
new DelayedMessagesPump(receiveAddress, sqsClient, queueCache, queueDelayTimeSeconds);
3636
}
3737
}
3838

0 commit comments

Comments
 (0)