Skip to content

Commit c980d2b

Browse files
Propagate native headers to NServiceBus headers (#2774)
* Add a simple way to enumerate all TransportHeaders * Add a test to validate custom headers are propagated to NServiceBus headers * Propagate custom native headers t NServiceBus headers * Add a to-do ti further discuss * Native message attributes can override/take precedence over NServiceBus headers * Allow overriding the message ID * Use FrozenSet and add a clarification comment * Bring back test annotation * Completed Task * bring back message ID override check * More test coverage * MessageTypeFullname should be forwarded * Remove MessageTypeFullName --------- Co-authored-by: Daniel Marbach <danielmarbach@users.noreply.github.com>
1 parent e4bcbcf commit c980d2b

12 files changed

+150
-18
lines changed

src/NServiceBus.Transport.SQS.AcceptanceTests/Batching/When_batching_multiple_outgoing_small_events.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ public class MyMessageHandler : IHandleMessages<MyEvent>
159159
public Task Handle(MyEvent messageWithLargePayload, IMessageHandlerContext context)
160160
{
161161
testContext.MessageIdsReceived.Add(context.MessageId);
162-
return Task.FromResult(0);
162+
return Task.CompletedTask;
163163
}
164164

165165
readonly Context testContext;

src/NServiceBus.Transport.SQS.AcceptanceTests/Batching/When_batching_multiple_outgoing_small_messages.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ public MyMessageHandler(Context testContext)
174174
public Task Handle(MyMessage messageWithLargePayload, IMessageHandlerContext context)
175175
{
176176
testContext.MessageIdsReceived.Add(context.MessageId);
177-
return Task.FromResult(0);
177+
return Task.CompletedTask;
178178
}
179179
}
180180
}

src/NServiceBus.Transport.SQS.AcceptanceTests/DelayedDelivery/SendOnly_Sending_when_receiver_not_properly_configured.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ public Task Handle(DelayedMessage message, IMessageHandlerContext context)
117117
testContext.Payload = message.Payload;
118118
testContext.ReceivedAt = DateTime.UtcNow;
119119

120-
return Task.FromResult(0);
120+
return Task.CompletedTask;
121121
}
122122
}
123123
}

src/NServiceBus.Transport.SQS.AcceptanceTests/DelayedDelivery/SendOnly_Sending_when_sender_and_receiver_are_properly_configured.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ public Task Handle(DelayedMessage message, IMessageHandlerContext context)
118118
testContext.Payload = message.Payload;
119119
testContext.ReceivedAt = DateTime.UtcNow;
120120

121-
return Task.FromResult(0);
121+
return Task.CompletedTask;
122122
}
123123
}
124124
}

src/NServiceBus.Transport.SQS.AcceptanceTests/DelayedDelivery/SendOnly_Sending_when_sender_not_properly_configured.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ public Task Handle(DelayedMessage message, IMessageHandlerContext context)
8686
testContext.Payload = message.Payload;
8787
testContext.ReceivedAt = DateTime.UtcNow;
8888

89-
return Task.FromResult(0);
89+
return Task.CompletedTask;
9090
}
9191
}
9292
}

src/NServiceBus.Transport.SQS.AcceptanceTests/DelayedDelivery/SendOnly_when_configured_with_queue_delay_that_requires_multiple_cycles.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ public Task Handle(DelayedMessage message, IMessageHandlerContext context)
8888
testContext.Payload = message.Payload;
8989
testContext.ReceivedAt = DateTime.UtcNow;
9090

91-
return Task.FromResult(0);
91+
return Task.CompletedTask;
9292
}
9393
}
9494
}

src/NServiceBus.Transport.SQS.AcceptanceTests/Routing/When_publishing_an_event_implementing_two_unrelated_interfaces.cs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ public async Task Event_should_be_published_using_instance_type()
2626
{
2727
ctx.EventASubscribed = true;
2828
ctx.EventBSubscribed = true;
29-
return Task.FromResult(0);
29+
return Task.CompletedTask;
3030
}))
3131
.Done(c => c.GotEventA && c.GotEventB)
3232
.Run(TimeSpan.FromSeconds(20));
@@ -73,11 +73,11 @@ public Task Handle(IEventA @event, IMessageHandlerContext context)
7373
{
7474
if (@event.ContextId != testContext.Id)
7575
{
76-
return Task.FromResult(0);
76+
return Task.CompletedTask;
7777
}
7878
testContext.GotEventA = true;
7979

80-
return Task.FromResult(0);
80+
return Task.CompletedTask;
8181
}
8282

8383
Context testContext;
@@ -94,12 +94,12 @@ public Task Handle(IEventB @event, IMessageHandlerContext context)
9494
{
9595
if (@event.ContextId != testContext.Id)
9696
{
97-
return Task.FromResult(0);
97+
return Task.CompletedTask;
9898
}
9999

100100
testContext.GotEventB = true;
101101

102-
return Task.FromResult(0);
102+
return Task.CompletedTask;
103103
}
104104

105105
Context testContext;

src/NServiceBus.Transport.SQS.Tests/InputQueuePumpTests.cs

Lines changed: 112 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ async Task SetupInitializedPump(OnMessage onMessage = null)
4545
{
4646
await pump.Initialize(
4747
new PushRuntimeSettings(1),
48-
onMessage ?? ((ctx, ct) => Task.FromResult(0)),
48+
onMessage ?? ((ctx, ct) => Task.CompletedTask),
4949
(ctx, ct) => Task.FromResult(ErrorHandleResult.Handled));
5050
}
5151

@@ -88,7 +88,7 @@ public async Task Poison_messages_are_moved_to_error_queue_and_deleted_without_p
8888
await SetupInitializedPump(onMessage: (ctx, ct) =>
8989
{
9090
processed = true;
91-
return Task.FromResult(0);
91+
return Task.CompletedTask;
9292
});
9393

9494
var message = new Message
@@ -133,7 +133,7 @@ public async Task Poison_messages_failed_to_be_deleted_are_deleted_on_next_recei
133133
await SetupInitializedPump(onMessage: (ctx, ct) =>
134134
{
135135
processed = true;
136-
return Task.FromResult(0);
136+
return Task.CompletedTask;
137137
});
138138

139139
var deleteRequest = mockSqsClient.DeleteMessageRequestResponse;
@@ -185,7 +185,7 @@ public async Task Expired_messages_are_deleted_without_processing()
185185
await SetupInitializedPump(onMessage: (ctx, ct) =>
186186
{
187187
processed = true;
188-
return Task.FromResult(0);
188+
return Task.CompletedTask;
189189
});
190190

191191
var json = JsonSerializer.Serialize(new TransportMessage
@@ -234,7 +234,7 @@ public async Task Processed_messages_are_deleted()
234234
await SetupInitializedPump(onMessage: (ctx, ct) =>
235235
{
236236
processed = true;
237-
return Task.FromResult(0);
237+
return Task.CompletedTask;
238238
});
239239

240240
var json = JsonSerializer.Serialize(new TransportMessage
@@ -356,6 +356,113 @@ public async Task Should_cancel_processing_when_visibility_expired_during_proces
356356
Assert.That(mockSqsClient.ChangeMessageVisibilityRequestsSent, Has.Count.EqualTo(1));
357357
}
358358

359+
[Test]
360+
public async Task Custom_native_headers_are_propagated_to_transport_message_headers()
361+
{
362+
var nativeMessageId = Guid.NewGuid().ToString();
363+
var messageId = Guid.NewGuid().ToString();
364+
var customHeaderKey = "custom-header-key";
365+
var customHeaderValue = new MessageAttributeValue { StringValue = "custom header value" };
366+
367+
var transportMessageHeaderValue = string.Empty;
368+
await SetupInitializedPump(onMessage: (ctx, ct) =>
369+
{
370+
transportMessageHeaderValue = ctx.Headers[customHeaderKey];
371+
return Task.CompletedTask;
372+
});
373+
374+
var message = new Message
375+
{
376+
ReceiptHandle = "something",
377+
MessageId = nativeMessageId,
378+
MessageAttributes = new Dictionary<string, MessageAttributeValue>
379+
{
380+
{Headers.MessageId, new MessageAttributeValue {StringValue = messageId}},
381+
{customHeaderKey, customHeaderValue }
382+
},
383+
Body = TransportMessage.EmptyMessage
384+
};
385+
386+
await pump.ProcessMessage(message, CancellationToken.None).ConfigureAwait(false);
387+
388+
Assert.Multiple(() =>
389+
{
390+
Assert.That(transportMessageHeaderValue, Is.Not.Null);
391+
Assert.That(transportMessageHeaderValue!, Is.EqualTo(customHeaderValue.StringValue));
392+
});
393+
}
394+
395+
[Test]
396+
public async Task Message_type_fullname_header_is_propagated_to_transport_message_headers()
397+
{
398+
var nativeMessageId = Guid.NewGuid().ToString();
399+
var messageId = Guid.NewGuid().ToString();
400+
var customHeaderValue = new MessageAttributeValue { StringValue = "SomeMessageTypeName" };
401+
402+
var transportMessageHeaderValue = string.Empty;
403+
await SetupInitializedPump(onMessage: (ctx, ct) =>
404+
{
405+
transportMessageHeaderValue = ctx.Headers[TransportHeaders.MessageTypeFullName];
406+
return Task.CompletedTask;
407+
});
408+
409+
var message = new Message
410+
{
411+
ReceiptHandle = "something",
412+
MessageId = nativeMessageId,
413+
MessageAttributes = new Dictionary<string, MessageAttributeValue>
414+
{
415+
{Headers.MessageId, new MessageAttributeValue {StringValue = messageId}},
416+
{TransportHeaders.MessageTypeFullName, customHeaderValue }
417+
},
418+
Body = TransportMessage.EmptyMessage
419+
};
420+
421+
await pump.ProcessMessage(message, CancellationToken.None).ConfigureAwait(false);
422+
423+
Assert.Multiple(() =>
424+
{
425+
Assert.That(transportMessageHeaderValue, Is.Not.Null);
426+
Assert.That(transportMessageHeaderValue!, Is.EqualTo(customHeaderValue.StringValue));
427+
});
428+
}
429+
430+
[Theory]
431+
[TestCase(TransportHeaders.Headers, "{}")]
432+
// [TestCase(TransportHeaders.MessageTypeFullName)] special case that is forwarded due to historic reason
433+
[TestCase(TransportHeaders.S3BodyKey)]
434+
[TestCase(TransportHeaders.DelaySeconds)]
435+
[TestCase(TransportHeaders.TimeToBeReceived)]
436+
public async Task Excluded_transport_headers_are_not_propagate_to_transport_message_headers(string headerKey, string headerValue = "custom-header-value")
437+
{
438+
var nativeMessageId = Guid.NewGuid().ToString();
439+
var messageId = Guid.NewGuid().ToString();
440+
var customHeaderValue = new MessageAttributeValue { StringValue = headerValue };
441+
442+
var transportHeaderPresent = true;
443+
await SetupInitializedPump(onMessage: (ctx, ct) =>
444+
{
445+
transportHeaderPresent = ctx.Headers.ContainsKey(headerKey);
446+
return Task.CompletedTask;
447+
});
448+
449+
var message = new Message
450+
{
451+
ReceiptHandle = "something",
452+
MessageId = nativeMessageId,
453+
MessageAttributes = new Dictionary<string, MessageAttributeValue>
454+
{
455+
{ Headers.MessageId, new MessageAttributeValue { StringValue = messageId } },
456+
{ headerKey, customHeaderValue }
457+
},
458+
Body = TransportMessage.EmptyMessage
459+
};
460+
461+
await pump.ProcessMessage(message, CancellationToken.None).ConfigureAwait(false);
462+
463+
Assert.That(transportHeaderPresent, Is.False, "Transport header should not be present");
464+
}
465+
359466
static Message CreateValidTransportMessage(string messageId,
360467
string nativeMessageId, string expectedReceiptHandle = "receipt-handle")
361468
{

src/NServiceBus.Transport.SQS.Tests/MockSqsClient.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ public Task SetAttributesAsync(string queueUrl, Dictionary<string, string> attri
132132
{
133133
attributeRequestsQueue.Enqueue(attributes);
134134
}
135-
return Task.FromResult(0);
135+
return Task.CompletedTask;
136136
}
137137

138138
public void EnableGetAttributeReturnsWhatWasSet()

src/NServiceBus.Transport.SQS.Tests/SubscriptionManagerTests.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -489,7 +489,7 @@ public TestableSubscriptionManager(IAmazonSQS sqsClient, IAmazonSimpleNotificati
489489
protected override Task Delay(int millisecondsDelay, CancellationToken cancellationToken = default)
490490
{
491491
Delays.Add(millisecondsDelay);
492-
return Task.FromResult(0);
492+
return Task.CompletedTask;
493493
}
494494
}
495495

0 commit comments

Comments
 (0)