Skip to content

Commit 2ab779d

Browse files
author
Christian
committed
Using editId colum value as the uniquemessage id
1 parent 4c70bd7 commit 2ab779d

File tree

8 files changed

+79
-17
lines changed

8 files changed

+79
-17
lines changed

src/ServiceControl.Persistence.RavenDB/Editing/EditFailedMessageManager.cs

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
namespace ServiceControl.Persistence.RavenDB
22
{
33
using System;
4+
using System.Linq;
45
using System.Threading.Tasks;
6+
using Raven.Client.Documents;
57
using Raven.Client.Documents.Session;
68
using ServiceControl.MessageFailures;
79
using ServiceControl.Persistence.Recoverability.Editing;
10+
using ServiceControl.Persistence.RavenDB.Indexes;
811

912
class EditFailedMessageManager : AbstractSessionManager, IEditFailedMessagesManager
1013
{
@@ -25,13 +28,13 @@ public async Task<FailedMessage> GetFailedMessage(string failedMessageId)
2528
return failedMessage;
2629
}
2730

28-
public async Task<string> GetCurrentEditingMessageId(string failedMessageId)
31+
public async Task<string> GetCurrentEditingRequestId(string failedMessageId)
2932
{
3033
var edit = await session.LoadAsync<FailedMessageEdit>(FailedMessageEdit.MakeDocumentId(failedMessageId));
3134
return edit?.EditId;
3235
}
3336

34-
public Task SetCurrentEditingMessageId(string editingMessageId)
37+
public Task SetCurrentEditingRequestId(string editingMessageId)
3538
{
3639
if (failedMessage == null)
3740
{
@@ -54,5 +57,19 @@ public Task SetFailedMessageAsResolved()
5457

5558
return Task.CompletedTask;
5659
}
60+
61+
public async Task<string> GetFailedMessageIdByEditId(string editId)
62+
{
63+
var edit = await session.Query<FailedMessageEdit, FailedMessageEditIndex>()
64+
.Where(x => x.EditId == editId)
65+
.FirstOrDefaultAsync();
66+
67+
if (edit?.FailedMessageId != null)
68+
{
69+
return FailedMessageIdGenerator.GetMessageIdFromDocumentId(edit.FailedMessageId);
70+
}
71+
72+
return null;
73+
}
5774
}
5875
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
namespace ServiceControl.Persistence.RavenDB.Indexes
2+
{
3+
using System.Linq;
4+
using Raven.Client.Documents.Indexes;
5+
using ServiceControl.Persistence.Recoverability.Editing;
6+
7+
class FailedMessageEditIndex : AbstractIndexCreationTask<FailedMessageEdit>
8+
{
9+
public FailedMessageEditIndex()
10+
{
11+
Map = edits =>
12+
from edit in edits
13+
select new
14+
{
15+
edit.EditId,
16+
edit.FailedMessageId
17+
};
18+
}
19+
20+
public class SortAndFilterOptions
21+
{
22+
public string EditId { get; set; }
23+
public string FailedMessageId { get; set; }
24+
}
25+
}
26+
}

src/ServiceControl.Persistence.Tests/Recoverability/EditMessageTests.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public async Task Should_discard_edit_if_edited_message_not_unresolved(FailedMes
5858
var failedMessage = await ErrorMessageDataStore.ErrorBy(failedMessageId);
5959

6060
var editFailedMessagesManager = await ErrorMessageDataStore.CreateEditFailedMessageManager();
61-
var editOperation = await editFailedMessagesManager.GetCurrentEditingMessageId(failedMessageId);
61+
var editOperation = await editFailedMessagesManager.GetCurrentEditingRequestId(failedMessageId);
6262

6363
Assert.Multiple(() =>
6464
{
@@ -79,7 +79,7 @@ public async Task Should_discard_edit_when_different_edit_already_exists()
7979
using (var editFailedMessagesManager = await ErrorMessageDataStore.CreateEditFailedMessageManager())
8080
{
8181
_ = await editFailedMessagesManager.GetFailedMessage(failedMessageId);
82-
await editFailedMessagesManager.SetCurrentEditingMessageId(previousEdit);
82+
await editFailedMessagesManager.SetCurrentEditingRequestId(previousEdit);
8383
await editFailedMessagesManager.SaveChanges();
8484
}
8585

@@ -91,7 +91,7 @@ public async Task Should_discard_edit_when_different_edit_already_exists()
9191
using (var editFailedMessagesManagerAssert = await ErrorMessageDataStore.CreateEditFailedMessageManager())
9292
{
9393
var failedMessage = await editFailedMessagesManagerAssert.GetFailedMessage(failedMessageId);
94-
var editId = await editFailedMessagesManagerAssert.GetCurrentEditingMessageId(failedMessageId);
94+
var editId = await editFailedMessagesManagerAssert.GetCurrentEditingRequestId(failedMessageId);
9595

9696
Assert.Multiple(() =>
9797
{
@@ -130,7 +130,7 @@ public async Task Should_dispatch_edited_message_when_first_edit()
130130
var failedMessage2 = await x.GetFailedMessage(failedMessage.UniqueMessageId);
131131
Assert.That(failedMessage2, Is.Not.Null, "Edited failed message");
132132

133-
var editId = await x.GetCurrentEditingMessageId(failedMessage2.UniqueMessageId);
133+
var editId = await x.GetCurrentEditingRequestId(failedMessage2.UniqueMessageId);
134134

135135
Assert.Multiple(() =>
136136
{

src/ServiceControl.Persistence/IEditFailedMessagesManager.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,9 @@
66
public interface IEditFailedMessagesManager : IDataSessionManager
77
{
88
Task<FailedMessage> GetFailedMessage(string failedMessageId);
9-
Task<string> GetCurrentEditingMessageId(string failedMessageId);
10-
Task SetCurrentEditingMessageId(string editingMessageId);
9+
Task<string> GetCurrentEditingRequestId(string failedMessageId);
10+
Task SetCurrentEditingRequestId(string editingMessageId);
1111
Task SetFailedMessageAsResolved();
12+
Task<string> GetFailedMessageIdByEditId(string editId);
1213
}
1314
}

src/ServiceControl/MessageFailures/Api/EditFailedMessagesController.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ public async Task<ActionResult<EditRetryResponse>> Edit(string failedMessageId,
3737

3838
//HINT: This validation is the first one because we want to minimize the chance of two users concurrently execute an edit-retry.
3939
var editManager = await store.CreateEditFailedMessageManager();
40-
var editId = await editManager.GetCurrentEditingMessageId(failedMessageId);
40+
var editId = await editManager.GetCurrentEditingRequestId(failedMessageId);
4141
if (editId != null)
4242
{
4343
logger.LogWarning("Cannot edit message {FailedMessageId} because it has already been edited", failedMessageId);

src/ServiceControl/Operations/ErrorIngestor.cs

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,24 +15,26 @@
1515
using Recoverability;
1616
using ServiceBus.Management.Infrastructure.Settings;
1717
using ServiceControl.Persistence.UnitOfWork;
18+
using ServiceControl.Persistence;
1819
using ServiceControl.Transports;
1920

2021
public class ErrorIngestor
2122
{
2223
static readonly long FrequencyInMilliseconds = Stopwatch.Frequency / 1000;
23-
2424
public ErrorIngestor(Metrics metrics,
2525
IEnumerable<IEnrichImportedErrorMessages> errorEnrichers,
2626
IEnumerable<IFailedMessageEnricher> failedMessageEnrichers,
2727
IDomainEvents domainEvents,
2828
IIngestionUnitOfWorkFactory unitOfWorkFactory,
2929
Lazy<IMessageDispatcher> messageDispatcher,
3030
ITransportCustomization transportCustomization,
31+
IErrorMessageDataStore errorMessageDataStore,
3132
Settings settings,
3233
ILogger<ErrorIngestor> logger)
3334
{
3435
this.unitOfWorkFactory = unitOfWorkFactory;
3536
this.messageDispatcher = messageDispatcher;
37+
this.errorMessageDataStore = errorMessageDataStore;
3638
this.settings = settings;
3739
this.logger = logger;
3840
bulkInsertDurationMeter = metrics.GetMeter("Error ingestion - bulk insert duration", FrequencyInMilliseconds);
@@ -80,7 +82,8 @@ public async Task Ingest(List<MessageContext> contexts, CancellationToken cancel
8082
}
8183
foreach (var context in retriedMessages)
8284
{
83-
announcerTasks.Add(retryConfirmationProcessor.Announce(context));
85+
var failedMessageId = await GetFailedMessageId(context);
86+
announcerTasks.Add(retryConfirmationProcessor.Announce(failedMessageId));
8487
}
8588

8689
await Task.WhenAll(announcerTasks);
@@ -108,6 +111,18 @@ public async Task Ingest(List<MessageContext> contexts, CancellationToken cancel
108111
}
109112
}
110113

114+
async Task<string> GetFailedMessageId(MessageContext context)
115+
{
116+
var retryUniqueMessageId = context.Headers["ServiceControl.Retry.UniqueMessageId"];
117+
118+
//Check if this retry was recorded as an edit and retry in order to locate the original failedMessageId;
119+
using var editFailedMessagesManager = await errorMessageDataStore.CreateEditFailedMessageManager();
120+
var failedMessageId = await editFailedMessagesManager.GetFailedMessageIdByEditId(retryUniqueMessageId);
121+
122+
// If not found, this is a regular retry, so return retryUniqueMessageId
123+
return failedMessageId ?? retryUniqueMessageId;
124+
}
125+
111126
async Task<IReadOnlyList<MessageContext>> PersistFailedMessages(List<MessageContext> failedMessageContexts, List<MessageContext> retriedMessageContexts, CancellationToken cancellationToken)
112127
{
113128
var stopwatch = Stopwatch.StartNew();
@@ -193,6 +208,7 @@ public async Task VerifyCanReachForwardingAddress(CancellationToken cancellation
193208
readonly Settings settings;
194209
readonly ErrorProcessor errorProcessor;
195210
readonly Lazy<IMessageDispatcher> messageDispatcher;
211+
readonly IErrorMessageDataStore errorMessageDataStore;
196212
readonly RetryConfirmationProcessor retryConfirmationProcessor;
197213
readonly UnicastAddressTag logQueueAddress;
198214

src/ServiceControl/Operations/RetryConfirmationProcessor.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ class RetryConfirmationProcessor
1212
public const string SuccessfulRetryHeader = "ServiceControl.Retry.Successful";
1313
const string RetryUniqueMessageIdHeader = "ServiceControl.Retry.UniqueMessageId";
1414

15+
1516
public RetryConfirmationProcessor(IDomainEvents domainEvents)
1617
{
1718
this.domainEvents = domainEvents;
@@ -26,11 +27,11 @@ public async Task Process(List<MessageContext> contexts, IIngestionUnitOfWork un
2627
}
2728
}
2829

29-
public Task Announce(MessageContext messageContext)
30+
public Task Announce(string failedMessageId)
3031
{
3132
return domainEvents.Raise(new MessageFailureResolvedByRetry
3233
{
33-
FailedMessageId = messageContext.Headers[RetryUniqueMessageIdHeader],
34+
FailedMessageId = failedMessageId
3435
});
3536
}
3637

src/ServiceControl/Recoverability/Editing/EditHandler.cs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ public EditHandler(IErrorMessageDataStore store, IMessageRedirectsDataStore redi
2727

2828
public async Task Handle(EditAndSend message, IMessageHandlerContext context)
2929
{
30+
string editRequestIdentifier = context.MessageId;
3031
FailedMessage failedMessage;
3132
using (var session = await store.CreateEditFailedMessageManager())
3233
{
@@ -38,7 +39,7 @@ public async Task Handle(EditAndSend message, IMessageHandlerContext context)
3839
return;
3940
}
4041

41-
var editId = await session.GetCurrentEditingMessageId(message.FailedMessageId);
42+
var editId = await session.GetCurrentEditingRequestId(message.FailedMessageId);
4243
if (editId == null)
4344
{
4445
if (failedMessage.Status != FailedMessageStatus.Unresolved)
@@ -48,9 +49,9 @@ public async Task Handle(EditAndSend message, IMessageHandlerContext context)
4849
}
4950

5051
// create a retries document to prevent concurrent edits
51-
await session.SetCurrentEditingMessageId(context.MessageId);
52+
await session.SetCurrentEditingRequestId(editRequestIdentifier);
5253
}
53-
else if (editId != context.MessageId)
54+
else if (editId != editRequestIdentifier)
5455
{
5556
logger.LogWarning("Discarding edit & retry request because the failed message id {FailedMessageId} has already been edited by Message ID {EditedMessageId}", message.FailedMessageId, editId);
5657
return;
@@ -71,7 +72,7 @@ public async Task Handle(EditAndSend message, IMessageHandlerContext context)
7172
// mark the new message with a link to the original message id
7273
outgoingMessage.Headers.Add("ServiceControl.EditOf", message.FailedMessageId);
7374
outgoingMessage.Headers["ServiceControl.Retry.AcknowledgementQueue"] = errorQueueNameCache.ResolvedErrorAddress;
74-
outgoingMessage.Headers["ServiceControl.Retry.UniqueMessageId"] = Guid.NewGuid().ToString("D");
75+
outgoingMessage.Headers["ServiceControl.Retry.UniqueMessageId"] = editRequestIdentifier; //lets re-use the edit request identifier as the value for ServiceControl.Retry.UniqueMessageId so that when the notification from NSB about successful retries arrives, we can use this Id to query FailedMessageEdit by EditId
7576

7677
var address = ApplyRedirect(attempt.FailureDetails.AddressOfFailingEndpoint, redirects);
7778

0 commit comments

Comments
 (0)