Skip to content

Commit f088138

Browse files
afprtclrlailabougriaramonsmits
authored
Setting proper queue name on edit & retry (#4973) (#4977)
* Setting proper queue name on edit & retry * Update src/ServiceControl.Persistence.Tests/Recoverability/EditMessageTests.cs * Update src/ServiceControl.Persistence.Tests/Recoverability/EditMessageTests.cs * ⚜️ small code improvements --------- Co-authored-by: Laila Bougria <[email protected]> Co-authored-by: Ramon Smits <[email protected]>
1 parent 47f8449 commit f088138

File tree

6 files changed

+302
-203
lines changed

6 files changed

+302
-203
lines changed

src/ServiceControl.Persistence.Tests/Recoverability/EditMessageTests.cs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,15 @@ sealed class EditMessageTests : PersistenceTestBase
2020
{
2121
EditHandler handler;
2222
readonly TestableUnicastDispatcher dispatcher = new();
23+
readonly ErrorQueueNameCache errorQueueNameCache = new()
24+
{
25+
ResolvedErrorAddress = "errorQueueName"
26+
};
2327

2428
public EditMessageTests() =>
2529
RegisterServices = services => services
2630
.AddSingleton<IMessageDispatcher>(dispatcher)
31+
.AddSingleton<ErrorQueueNameCache>(errorQueueNameCache)
2732
.AddTransient<EditHandler>();
2833

2934
[SetUp]
@@ -212,6 +217,20 @@ public async Task Should_assign_edited_message_new_message_id()
212217
Is.Not.EqualTo(messageFailure.ProcessingAttempts.Last().MessageId));
213218
}
214219

220+
[Test]
221+
public async Task Should_assign_correct_akcnowledgment_queue_address_when_editing_and_retyring()
222+
{
223+
var messageFailure = await CreateAndStoreFailedMessage();
224+
var message = CreateEditMessage(messageFailure.UniqueMessageId);
225+
226+
await handler.Handle(message, new TestableInvokeHandlerContext());
227+
228+
var sentMessage = dispatcher.DispatchedMessages.Single();
229+
Assert.That(
230+
sentMessage.Item1.Message.Headers["ServiceControl.Retry.AcknowledgementQueue"],
231+
Is.EqualTo(errorQueueNameCache.ResolvedErrorAddress));
232+
}
233+
215234
static EditAndSend CreateEditMessage(string failedMessageId, byte[] newBodyContent = null, Dictionary<string, string> newHeaders = null)
216235
{
217236
return new EditAndSend

src/ServiceControl.Persistence.Tests/RetryStateTests.cs

Lines changed: 51 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
using ServiceControl.Persistence;
1818
using ServiceControl.Recoverability;
1919
using ServiceControl.Transports;
20+
using QueueAddress = NServiceBus.Transport.QueueAddress;
2021

2122
[NonParallelizable]
2223
class RetryStateTests : PersistenceTestBase
@@ -51,6 +52,29 @@ public async Task When_a_group_is_prepared_and_SC_is_started_the_group_is_marked
5152
Assert.That(status.Failed, Is.True);
5253
}
5354

55+
[Test]
56+
public async Task When_the_dequeuer_is_created_then_the_error_address_is_cached()
57+
{
58+
var domainEvents = new FakeDomainEvents();
59+
var errorQueueNameCache = new ErrorQueueNameCache();
60+
var transportInfrastructure = new TestTransportInfrastructure(new Dictionary<string, IMessageReceiver>
61+
{
62+
["TestEndpoint.staging"] = null
63+
})
64+
{
65+
TransportAddress = "TestAddress"
66+
};
67+
68+
var transportCustomization = new TestTransportCustomization { TransportInfrastructure = transportInfrastructure };
69+
70+
var testReturnToSenderDequeuer = new TestReturnToSenderDequeuer(new ReturnToSender(ErrorStore), ErrorStore, domainEvents, "TestEndpoint",
71+
errorQueueNameCache, transportCustomization);
72+
73+
await testReturnToSenderDequeuer.StartAsync(new CancellationToken());
74+
75+
Assert.That(errorQueueNameCache.ResolvedErrorAddress, Is.EqualTo(transportInfrastructure.TransportAddress));
76+
}
77+
5478
[Test]
5579
public async Task When_a_group_is_prepared_with_three_batches_and_SC_is_restarted_while_the_first_group_is_being_forwarded_then_the_count_still_matches()
5680
{
@@ -60,7 +84,7 @@ public async Task When_a_group_is_prepared_with_three_batches_and_SC_is_restarte
6084
await CreateAFailedMessageAndMarkAsPartOfRetryBatch(retryManager, "Test-group", true, 2001);
6185

6286
var sender = new TestSender();
63-
var processor = new RetryProcessor(RetryBatchesStore, domainEvents, new TestReturnToSenderDequeuer(new ReturnToSender(ErrorStore), ErrorStore, domainEvents, "TestEndpoint"), retryManager, new Lazy<IMessageDispatcher>(() => sender));
87+
var processor = new RetryProcessor(RetryBatchesStore, domainEvents, new TestReturnToSenderDequeuer(new ReturnToSender(ErrorStore), ErrorStore, domainEvents, "TestEndpoint", new ErrorQueueNameCache(), new TestTransportCustomization()), retryManager, new Lazy<IMessageDispatcher>(() => sender));
6488

6589
// Needs index RetryBatches_ByStatus_ReduceInitialBatchSize
6690
CompleteDatabaseOperation();
@@ -74,7 +98,7 @@ public async Task When_a_group_is_prepared_with_three_batches_and_SC_is_restarte
7498

7599
await documentManager.RebuildRetryOperationState();
76100

77-
processor = new RetryProcessor(RetryBatchesStore, domainEvents, new TestReturnToSenderDequeuer(new ReturnToSender(ErrorStore), ErrorStore, domainEvents, "TestEndpoint"), retryManager, new Lazy<IMessageDispatcher>(() => sender));
101+
processor = new RetryProcessor(RetryBatchesStore, domainEvents, new TestReturnToSenderDequeuer(new ReturnToSender(ErrorStore), ErrorStore, domainEvents, "TestEndpoint", new ErrorQueueNameCache(), new TestTransportCustomization()), retryManager, new Lazy<IMessageDispatcher>(() => sender));
78102

79103
await processor.ProcessBatches();
80104

@@ -92,7 +116,7 @@ public async Task When_a_group_is_forwarded_the_status_is_Completed()
92116

93117
var sender = new TestSender();
94118

95-
var returnToSender = new TestReturnToSenderDequeuer(new ReturnToSender(ErrorStore), ErrorStore, domainEvents, "TestEndpoint");
119+
var returnToSender = new TestReturnToSenderDequeuer(new ReturnToSender(ErrorStore), ErrorStore, domainEvents, "TestEndpoint", new ErrorQueueNameCache(), new TestTransportCustomization());
96120
var processor = new RetryProcessor(RetryBatchesStore, domainEvents, returnToSender, retryManager, new Lazy<IMessageDispatcher>(() => sender));
97121

98122
await processor.ProcessBatches(); // mark ready
@@ -122,7 +146,7 @@ public async Task When_there_is_one_poison_message_it_is_removed_from_batch_and_
122146
}
123147
};
124148

125-
var returnToSender = new TestReturnToSenderDequeuer(new ReturnToSender(ErrorStore), ErrorStore, domainEvents, "TestEndpoint");
149+
var returnToSender = new TestReturnToSenderDequeuer(new ReturnToSender(ErrorStore), ErrorStore, domainEvents, "TestEndpoint", new ErrorQueueNameCache(), new TestTransportCustomization());
126150
var processor = new RetryProcessor(RetryBatchesStore, domainEvents, returnToSender, retryManager, new Lazy<IMessageDispatcher>(() => sender));
127151

128152
bool c;
@@ -163,7 +187,7 @@ public async Task When_a_group_has_one_batch_out_of_two_forwarded_the_status_is_
163187

164188
var sender = new TestSender();
165189

166-
var processor = new RetryProcessor(RetryBatchesStore, domainEvents, new TestReturnToSenderDequeuer(returnToSender, ErrorStore, domainEvents, "TestEndpoint"), retryManager, new Lazy<IMessageDispatcher>(() => sender));
190+
var processor = new RetryProcessor(RetryBatchesStore, domainEvents, new TestReturnToSenderDequeuer(returnToSender, ErrorStore, domainEvents, "TestEndpoint", new ErrorQueueNameCache(), new TestTransportCustomization()), retryManager, new Lazy<IMessageDispatcher>(() => sender));
167191

168192
CompleteDatabaseOperation();
169193

@@ -281,8 +305,9 @@ class FakeApplicationLifetime : IHostApplicationLifetime
281305

282306
class TestReturnToSenderDequeuer : ReturnToSenderDequeuer
283307
{
284-
public TestReturnToSenderDequeuer(ReturnToSender returnToSender, IErrorMessageDataStore store, IDomainEvents domainEvents, string endpointName)
285-
: base(returnToSender, store, domainEvents, new TestTransportCustomization(), null, new Settings { InstanceName = endpointName })
308+
public TestReturnToSenderDequeuer(ReturnToSender returnToSender, IErrorMessageDataStore store, IDomainEvents domainEvents, string endpointName,
309+
ErrorQueueNameCache cache, ITransportCustomization transportCustomization)
310+
: base(returnToSender, store, domainEvents, transportCustomization, null, new Settings { InstanceName = endpointName }, cache)
286311
{
287312
}
288313

@@ -294,10 +319,17 @@ public override Task Run(string forwardingBatchId, Predicate<MessageContext> fil
294319

295320
public class TestTransportCustomization : ITransportCustomization
296321
{
322+
public TransportInfrastructure TransportInfrastructure { get; set; }
323+
297324
public void AddTransportForAudit(IServiceCollection services, TransportSettings transportSettings) => throw new NotImplementedException();
298325
public void AddTransportForMonitoring(IServiceCollection services, TransportSettings transportSettings) => throw new NotImplementedException();
299326
public void AddTransportForPrimary(IServiceCollection services, TransportSettings transportSettings) => throw new NotImplementedException();
300-
public Task<TransportInfrastructure> CreateTransportInfrastructure(string name, TransportSettings transportSettings, OnMessage onMessage = null, OnError onError = null, Func<string, Exception, Task> onCriticalError = null, NServiceBus.TransportTransactionMode preferredTransactionMode = NServiceBus.TransportTransactionMode.ReceiveOnly) => throw new NotImplementedException();
327+
328+
public Task<TransportInfrastructure> CreateTransportInfrastructure(string name,
329+
TransportSettings transportSettings, OnMessage onMessage = null, OnError onError = null,
330+
Func<string, Exception, Task> onCriticalError = null,
331+
NServiceBus.TransportTransactionMode preferredTransactionMode =
332+
NServiceBus.TransportTransactionMode.ReceiveOnly) => Task.FromResult(TransportInfrastructure);
301333
public void CustomizeAuditEndpoint(NServiceBus.EndpointConfiguration endpointConfiguration, TransportSettings transportSettings) => throw new NotImplementedException();
302334
public void CustomizeMonitoringEndpoint(NServiceBus.EndpointConfiguration endpointConfiguration, TransportSettings transportSettings) => throw new NotImplementedException();
303335
public void CustomizePrimaryEndpoint(NServiceBus.EndpointConfiguration endpointConfiguration, TransportSettings transportSettings) => throw new NotImplementedException();
@@ -319,5 +351,16 @@ public Task Dispatch(TransportOperations outgoingMessages, TransportTransaction
319351
return Task.CompletedTask;
320352
}
321353
}
354+
355+
public class TestTransportInfrastructure : TransportInfrastructure
356+
{
357+
public TestTransportInfrastructure(IReadOnlyDictionary<string, IMessageReceiver> receivers = null) => Receivers = receivers ?? new Dictionary<string, IMessageReceiver>();
358+
359+
public string TransportAddress { get; set; }
360+
361+
public override Task Shutdown(CancellationToken cancellationToken = new CancellationToken()) => throw new NotImplementedException();
362+
363+
public override string ToTransportAddress(QueueAddress address) => TransportAddress;
364+
}
322365
}
323366
}

src/ServiceControl/Recoverability/Editing/EditHandler.cs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,14 @@
1414

1515
class EditHandler : IHandleMessages<EditAndSend>
1616
{
17-
public EditHandler(IErrorMessageDataStore store, IMessageRedirectsDataStore redirectsStore, IMessageDispatcher dispatcher)
17+
public EditHandler(IErrorMessageDataStore store, IMessageRedirectsDataStore redirectsStore, IMessageDispatcher dispatcher, ErrorQueueNameCache errorQueueNameCache)
1818
{
1919
this.store = store;
2020
this.redirectsStore = redirectsStore;
2121
this.dispatcher = dispatcher;
22+
this.errorQueueNameCache = errorQueueNameCache;
2223
corruptedReplyToHeaderStrategy = new CorruptedReplyToHeaderStrategy(RuntimeEnvironment.MachineName);
24+
2325
}
2426

2527
public async Task Handle(EditAndSend message, IMessageHandlerContext context)
@@ -67,7 +69,8 @@ public async Task Handle(EditAndSend message, IMessageHandlerContext context)
6769
var outgoingMessage = BuildMessage(message);
6870
// mark the new message with a link to the original message id
6971
outgoingMessage.Headers.Add("ServiceControl.EditOf", message.FailedMessageId);
70-
outgoingMessage.Headers.Remove("ServiceControl.Retry.AcknowledgementQueue");
72+
outgoingMessage.Headers["ServiceControl.Retry.AcknowledgementQueue"] = errorQueueNameCache.ResolvedErrorAddress;
73+
7174
var address = ApplyRedirect(attempt.FailureDetails.AddressOfFailingEndpoint, redirects);
7275

7376
if (outgoingMessage.Headers.TryGetValue("ServiceControl.RetryTo", out var retryTo))
@@ -116,6 +119,7 @@ Task DispatchEditedMessage(OutgoingMessage editedMessage, string address, IMessa
116119
readonly IErrorMessageDataStore store;
117120
readonly IMessageRedirectsDataStore redirectsStore;
118121
readonly IMessageDispatcher dispatcher;
122+
readonly ErrorQueueNameCache errorQueueNameCache;
119123
static readonly ILog log = LogManager.GetLogger<EditHandler>();
120124
}
121125
}

src/ServiceControl/Recoverability/RecoverabilityComponent.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ public override void Configure(Settings settings, ITransportCustomization transp
4747

4848
//Return to sender - registered both as singleton and hosted service because it is a dependency of the RetryProcessor
4949
services.AddSingleton<ReturnToSender>();
50+
services.AddSingleton<ErrorQueueNameCache>();
5051
services.AddSingleton<ReturnToSenderDequeuer>();
5152
services.AddHostedService(provider => provider.GetRequiredService<ReturnToSenderDequeuer>());
5253

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
namespace ServiceControl.Recoverability;
2+
3+
using System;
4+
5+
class ErrorQueueNameCache
6+
{
7+
string resolvedErrorAddress;
8+
9+
public string ResolvedErrorAddress
10+
{
11+
get
12+
{
13+
if (string.IsNullOrEmpty(resolvedErrorAddress))
14+
{
15+
throw new InvalidOperationException($"{nameof(ResolvedErrorAddress)} is not set. Please set it before accessing.");
16+
}
17+
18+
return resolvedErrorAddress;
19+
}
20+
set => resolvedErrorAddress = value;
21+
}
22+
}

0 commit comments

Comments
 (0)