Skip to content

Commit b793b36

Browse files
committed
Setting proper queue name on edit & retry
1 parent bd2d5e6 commit b793b36

File tree

5 files changed

+36
-4
lines changed

5 files changed

+36
-4
lines changed

src/ServiceControl.Persistence.Tests/RetryStateTests.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -282,7 +282,7 @@ class FakeApplicationLifetime : IHostApplicationLifetime
282282
class TestReturnToSenderDequeuer : ReturnToSenderDequeuer
283283
{
284284
public TestReturnToSenderDequeuer(ReturnToSender returnToSender, IErrorMessageDataStore store, IDomainEvents domainEvents, string endpointName)
285-
: base(returnToSender, store, domainEvents, new TestTransportCustomization(), null, new Settings { InstanceName = endpointName })
285+
: base(returnToSender, store, domainEvents, new TestTransportCustomization(), null, new Settings { InstanceName = endpointName }, new ErrorQueueNameCache())
286286
{
287287
}
288288

src/ServiceControl/Recoverability/Editing/EditHandler.cs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,14 @@
1414

1515
class EditHandler : IHandleMessages<EditAndSend>
1616
{
17-
public EditHandler(IErrorMessageDataStore store, IMessageRedirectsDataStore redirectsStore, IMessageDispatcher dispatcher)
17+
public EditHandler(IErrorMessageDataStore store, IMessageRedirectsDataStore redirectsStore, IMessageDispatcher dispatcher, ErrorQueueNameCache errorQueueNameCache)
1818
{
1919
this.store = store;
2020
this.redirectsStore = redirectsStore;
2121
this.dispatcher = dispatcher;
22+
this.errorQueueNameCache = errorQueueNameCache;
2223
corruptedReplyToHeaderStrategy = new CorruptedReplyToHeaderStrategy(RuntimeEnvironment.MachineName);
24+
2325
}
2426

2527
public async Task Handle(EditAndSend message, IMessageHandlerContext context)
@@ -67,7 +69,8 @@ public async Task Handle(EditAndSend message, IMessageHandlerContext context)
6769
var outgoingMessage = BuildMessage(message);
6870
// mark the new message with a link to the original message id
6971
outgoingMessage.Headers.Add("ServiceControl.EditOf", message.FailedMessageId);
70-
outgoingMessage.Headers.Remove("ServiceControl.Retry.AcknowledgementQueue");
72+
outgoingMessage.Headers["ServiceControl.Retry.AcknowledgementQueue"] = errorQueueNameCache.ResolvedQueueName;
73+
7174
var address = ApplyRedirect(attempt.FailureDetails.AddressOfFailingEndpoint, redirects);
7275

7376
if (outgoingMessage.Headers.TryGetValue("ServiceControl.RetryTo", out var retryTo))
@@ -116,6 +119,7 @@ Task DispatchEditedMessage(OutgoingMessage editedMessage, string address, IMessa
116119
readonly IErrorMessageDataStore store;
117120
readonly IMessageRedirectsDataStore redirectsStore;
118121
readonly IMessageDispatcher dispatcher;
122+
readonly ErrorQueueNameCache errorQueueNameCache;
119123
static readonly ILog log = LogManager.GetLogger<EditHandler>();
120124
}
121125
}

src/ServiceControl/Recoverability/RecoverabilityComponent.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ public override void Configure(Settings settings, ITransportCustomization transp
4747

4848
//Return to sender - registered both as singleton and hosted service because it is a dependency of the RetryProcessor
4949
services.AddSingleton<ReturnToSender>();
50+
services.AddSingleton<ErrorQueueNameCache>();
5051
services.AddSingleton<ReturnToSenderDequeuer>();
5152
services.AddHostedService(provider => provider.GetRequiredService<ReturnToSenderDequeuer>());
5253

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
namespace ServiceControl.Recoverability
2+
{
3+
using System;
4+
5+
public class ErrorQueueNameCache
6+
{
7+
string resolvedQueueName;
8+
9+
public string ResolvedQueueName
10+
{
11+
get
12+
{
13+
if (string.IsNullOrEmpty(resolvedQueueName))
14+
{
15+
throw new InvalidOperationException($"{ResolvedQueueName} is not set. Please set it before accessing.");
16+
}
17+
18+
return resolvedQueueName;
19+
}
20+
set => resolvedQueueName = value;
21+
}
22+
}
23+
}

src/ServiceControl/Recoverability/Retrying/Infrastructure/ReturnToSenderDequeuer.cs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,15 @@ namespace ServiceControl.Recoverability
1414

1515
class ReturnToSenderDequeuer : IHostedService
1616
{
17-
public ReturnToSenderDequeuer(ReturnToSender returnToSender, IErrorMessageDataStore dataStore, IDomainEvents domainEvents, ITransportCustomization transportCustomization, TransportSettings transportSettings, Settings settings)
17+
public ReturnToSenderDequeuer(ReturnToSender returnToSender, IErrorMessageDataStore dataStore, IDomainEvents domainEvents,
18+
ITransportCustomization transportCustomization, TransportSettings transportSettings, Settings settings, ErrorQueueNameCache errorQueueNameCache)
1819
{
1920
InputAddress = transportCustomization.ToTransportQualifiedQueueName(settings.StagingQueue);
2021
this.returnToSender = returnToSender;
2122
errorQueue = settings.ErrorQueue;
2223
this.transportCustomization = transportCustomization;
2324
this.transportSettings = transportSettings;
25+
this.errorQueueNameCache = errorQueueNameCache;
2426

2527
faultManager = new CaptureIfMessageSendingFails(dataStore, domainEvents, IncrementCounterOrProlongTimer);
2628
timer = new Timer(state => StopInternal().GetAwaiter().GetResult());
@@ -35,6 +37,7 @@ public async Task StartAsync(CancellationToken cancellationToken)
3537
messageDispatcher = transportInfrastructure.Dispatcher;
3638

3739
errorQueueTransportAddress = transportInfrastructure.ToTransportAddress(new QueueAddress(errorQueue));
40+
errorQueueNameCache.ResolvedQueueName = errorQueueTransportAddress;
3841
}
3942

4043
public async Task StopAsync(CancellationToken cancellationToken)
@@ -185,6 +188,7 @@ async Task StopInternal()
185188
string errorQueueTransportAddress;
186189
readonly ITransportCustomization transportCustomization;
187190
readonly TransportSettings transportSettings;
191+
readonly ErrorQueueNameCache errorQueueNameCache;
188192
TransportInfrastructure transportInfrastructure;
189193
IMessageDispatcher messageDispatcher;
190194
IMessageReceiver messageReceiver;

0 commit comments

Comments
 (0)