Skip to content

Commit e9d4989

Browse files
committed
Do not require message id header for error ingestion
1 parent 821f4d9 commit e9d4989

File tree

6 files changed

+44
-71
lines changed

6 files changed

+44
-71
lines changed

src/ServiceControl.AcceptanceTests/Recoverability/MessageFailures/When_ingesting_failed_message_with_missing_headers.cs

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
using System;
44
using System.Collections.Generic;
5+
using System.Linq;
56
using System.Threading.Tasks;
67
using AcceptanceTesting;
78
using AcceptanceTesting.EndpointTemplates;
@@ -72,7 +73,7 @@ public async Task Should_include_headers_required_by_ServicePulse()
7273
[Test]
7374
public async Task TimeSent_should_not_be_casted()
7475
{
75-
var sentTime = DateTime.Parse("2014-11-11T02:26:58.000462Z");
76+
var sentTime = DateTime.Parse("2014-11-11T02:26:58.000462Z").ToUniversalTime();
7677

7778
var context = await Define<TestContext>(c =>
7879
{
@@ -91,27 +92,23 @@ public async Task TimeSent_should_not_be_casted()
9192

9293
async Task<bool> TryGetFailureFromApi(TestContext context)
9394
{
94-
context.Failure = await this.TryGet<FailedMessageView>($"/api/errors/last/{context.UniqueMessageId}");
95+
var allFailures = await this.TryGetMany<FailedMessageView>("/api/errors/");
96+
97+
context.Failure = allFailures.Items.SingleOrDefault(f => f.QueueAddress == context.EndpointNameOfReceivingEndpoint);
98+
9599
return context.Failure != null;
96100
}
97101

98102
class TestContext : ScenarioContext
99103
{
100-
public string MessageId { get; } = Guid.NewGuid().ToString();
101-
102-
public string EndpointNameOfReceivingEndpoint => "MyEndpoint";
103-
104-
public string UniqueMessageId => DeterministicGuid.MakeId(MessageId, EndpointNameOfReceivingEndpoint).ToString();
104+
// Endpoint name is made unique since we are using it to find the failure once ingestion is complete
105+
public string EndpointNameOfReceivingEndpoint => $"MyEndpoint-{NUnit.Framework.TestContext.CurrentContext.Test.ID}";
105106

106107
public Dictionary<string, string> Headers { get; } = [];
107108

108109
public FailedMessageView Failure { get; set; }
109110

110-
public void AddMinimalRequiredHeaders()
111-
{
112-
Headers["NServiceBus.FailedQ"] = EndpointNameOfReceivingEndpoint;
113-
Headers[NServiceBus.Headers.MessageId] = MessageId;
114-
}
111+
public void AddMinimalRequiredHeaders() => Headers["NServiceBus.FailedQ"] = EndpointNameOfReceivingEndpoint;
115112
}
116113

117114
class FailingEndpoint : EndpointConfigurationBuilder
@@ -122,7 +119,9 @@ class SendFailedMessage : DispatchRawMessages<TestContext>
122119
{
123120
protected override TransportOperations CreateMessage(TestContext context)
124121
{
125-
var outgoingMessage = new OutgoingMessage(context.MessageId, context.Headers, Array.Empty<byte>());
122+
// we can't control the native message id so any guid will do here, we need to find the failed messsage using
123+
// the endpoint name instead
124+
var outgoingMessage = new OutgoingMessage(Guid.NewGuid().ToString(), context.Headers, Array.Empty<byte>());
126125

127126
return new TransportOperations(new TransportOperation(outgoingMessage, new UnicastAddressTag("error")));
128127
}

src/ServiceControl.Persistence.RavenDB/UnitOfWork/RavenRecoverabilityIngestionUnitOfWork.cs

Lines changed: 28 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ public Task RecordFailedProcessingAttempt(
3232
FailedMessage.ProcessingAttempt processingAttempt,
3333
List<FailedMessage.FailureGroup> groups)
3434
{
35-
var uniqueMessageId = context.Headers.UniqueId();
35+
var uniqueMessageId = GetUniqueMessageId(context);
3636
var contentType = GetContentType(context.Headers, "text/xml");
3737
var bodySize = context.Body.Length;
3838

@@ -61,7 +61,7 @@ public Task RecordFailedProcessingAttempt(
6161
var storeMessageCmd = CreateFailedMessagesPatchCommand(uniqueMessageId, processingAttempt, groups);
6262
parentUnitOfWork.AddCommand(storeMessageCmd);
6363

64-
AddStoreBodyCommands(context, contentType);
64+
AddStoreBodyCommands(uniqueMessageId, context, contentType);
6565

6666
return Task.CompletedTask;
6767
}
@@ -71,10 +71,7 @@ public Task RecordSuccessfulRetry(string retriedMessageUniqueId)
7171
var failedMessageDocumentId = FailedMessageIdGenerator.MakeDocumentId(retriedMessageUniqueId);
7272
var failedMessageRetryDocumentId = FailedMessageRetry.MakeDocumentId(retriedMessageUniqueId);
7373

74-
var patchRequest = new PatchRequest
75-
{
76-
Script = $@"this.{nameof(FailedMessage.Status)} = {(int)FailedMessageStatus.Resolved};"
77-
};
74+
var patchRequest = new PatchRequest { Script = $@"this.{nameof(FailedMessage.Status)} = {(int)FailedMessageStatus.Resolved};" };
7875

7976
expirationManager.EnableExpiration(patchRequest);
8077

@@ -84,6 +81,21 @@ public Task RecordSuccessfulRetry(string retriedMessageUniqueId)
8481
return Task.CompletedTask;
8582
}
8683

84+
static string GetUniqueMessageId(MessageContext context)
85+
{
86+
if (context.Headers.TryGetValue("ServiceControl.Retry.UniqueMessageId", out var existingUniqueMessageId))
87+
{
88+
return existingUniqueMessageId;
89+
}
90+
91+
if (!context.Headers.TryGetValue(Headers.MessageId, out var messageId))
92+
{
93+
messageId = context.NativeMessageId;
94+
}
95+
96+
return DeterministicGuid.MakeId(messageId, context.Headers.ProcessingEndpointName()).ToString();
97+
}
98+
8799
ICommandData CreateFailedMessagesPatchCommand(string uniqueMessageId, FailedMessage.ProcessingAttempt processingAttempt,
88100
List<FailedMessage.FailureGroup> groups)
89101
{
@@ -119,9 +131,9 @@ ICommandData CreateFailedMessagesPatchCommand(string uniqueMessageId, FailedMess
119131
",
120132
Values = new Dictionary<string, object>
121133
{
122-
{"status", (int)FailedMessageStatus.Unresolved},
123-
{"failureGroups", groups},
124-
{"attempt", processingAttempt}
134+
{ "status", (int)FailedMessageStatus.Unresolved },
135+
{ "failureGroups", groups },
136+
{ "attempt", processingAttempt }
125137
},
126138
},
127139
patchIfMissing: new PatchRequest
@@ -137,18 +149,17 @@ ICommandData CreateFailedMessagesPatchCommand(string uniqueMessageId, FailedMess
137149
",
138150
Values = new Dictionary<string, object>
139151
{
140-
{"status", (int)FailedMessageStatus.Unresolved},
141-
{"failureGroups", groups},
142-
{"attempt", processingAttempt},
143-
{"uniqueMessageId", uniqueMessageId}
152+
{ "status", (int)FailedMessageStatus.Unresolved },
153+
{ "failureGroups", groups },
154+
{ "attempt", processingAttempt },
155+
{ "uniqueMessageId", uniqueMessageId }
144156
}
145157
});
146158
}
147159

148-
void AddStoreBodyCommands(MessageContext context, string contentType)
160+
void AddStoreBodyCommands(string uniqueMessageId, MessageContext context, string contentType)
149161
{
150-
var uniqueId = context.Headers.UniqueId();
151-
var documentId = FailedMessageIdGenerator.MakeDocumentId(uniqueId);
162+
var documentId = FailedMessageIdGenerator.MakeDocumentId(uniqueMessageId);
152163

153164
var stream = new ReadOnlyStream(context.Body);
154165
var putAttachmentCmd = new PutAttachmentCommandData(documentId, "body", stream, contentType, changeVector: null);
@@ -160,9 +171,9 @@ static string GetContentType(IReadOnlyDictionary<string, string> headers, string
160171
=> headers.GetValueOrDefault(Headers.ContentType, defaultContentType);
161172

162173
static int MaxProcessingAttempts = 10;
174+
163175
// large object heap starts above 85000 bytes and not above 85 KB!
164176
internal const int LargeObjectHeapThreshold = 85_000;
165177
static readonly Encoding utf8 = new UTF8Encoding(true, true);
166-
167178
}
168179
}

src/ServiceControl.Persistence/Infrastructure/TransportMessageExtensions.cs

Lines changed: 1 addition & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -36,27 +36,6 @@ public static string ProcessingEndpointName(this IReadOnlyDictionary<string, str
3636
throw new Exception($"No processing endpoint could be determined for message ({headers.MessageId()})");
3737
}
3838

39-
public static string UniqueId(this IReadOnlyDictionary<string, string> headers)
40-
{
41-
return headers.TryGetValue("ServiceControl.Retry.UniqueMessageId", out var existingUniqueMessageId)
42-
? existingUniqueMessageId
43-
: DeterministicGuid.MakeId(headers.MessageId(), headers.ProcessingEndpointName()).ToString();
44-
}
45-
46-
public static string ProcessingId(this IReadOnlyDictionary<string, string> headers)
47-
{
48-
var messageId = headers.MessageId();
49-
var processingEndpointName = headers.ProcessingEndpointName();
50-
var processingStarted = headers.ProcessingStarted();
51-
52-
if (messageId == default || processingEndpointName == default || processingStarted == default)
53-
{
54-
return Guid.NewGuid().ToString();
55-
}
56-
57-
return DeterministicGuid.MakeId(messageId, processingEndpointName, processingStarted).ToString();
58-
}
59-
6039
// NOTE: Duplicated from TransportMessage
6140
public static string MessageId(this IReadOnlyDictionary<string, string> headers)
6241
{
@@ -102,16 +81,12 @@ public static bool IsBinary(this IReadOnlyDictionary<string, string> headers)
10281

10382
return true;
10483
}
84+
10585
static string ReplyToAddress(this IReadOnlyDictionary<string, string> headers)
10686
{
10787
return headers.TryGetValue(Headers.ReplyToAddress, out var destination) ? destination : null;
10888
}
10989

110-
static string ProcessingStarted(this IReadOnlyDictionary<string, string> headers)
111-
{
112-
return headers.TryGetValue(Headers.ProcessingStarted, out var processingStarted) ? processingStarted : null;
113-
}
114-
11590
static string ExtractQueue(string address)
11691
{
11792
var atIndex = address?.IndexOf("@", StringComparison.InvariantCulture);

src/ServiceControl.Persistence/ProcessedMessage.cs

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,6 @@
22
{
33
using System;
44
using System.Collections.Generic;
5-
using NServiceBus;
6-
using ServiceControl.Persistence;
7-
using ServiceControl.Persistence.Infrastructure;
85

96
public class ProcessedMessage
107
{
@@ -14,16 +11,6 @@ public ProcessedMessage()
1411
Headers = [];
1512
}
1613

17-
public ProcessedMessage(Dictionary<string, string> headers, Dictionary<string, object> metadata)
18-
{
19-
UniqueMessageId = headers.UniqueId();
20-
MessageMetadata = metadata;
21-
Headers = headers;
22-
23-
ProcessedAt = Headers.TryGetValue(NServiceBus.Headers.ProcessingEnded, out var processedAt) ?
24-
DateTimeOffsetHelper.ToDateTimeOffset(processedAt).UtcDateTime : DateTime.UtcNow; // best guess
25-
}
26-
2714
public string Id { get; set; }
2815

2916
public string UniqueMessageId { get; set; }

src/ServiceControl/Operations/ErrorProcessor.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ async Task ProcessMessage(MessageContext context, IIngestionUnitOfWork unitOfWor
116116
var failureDetails = failedMessageFactory.ParseFailureDetails(context.Headers);
117117

118118
var processingAttempt = failedMessageFactory.CreateProcessingAttempt(
119+
messageId,
119120
context.Headers,
120121
new Dictionary<string, object>(metadata),
121122
failureDetails);

src/ServiceControl/Operations/FailedMessageFactory.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,14 +59,14 @@ static ExceptionDetails GetException(IReadOnlyDictionary<string, string> headers
5959
return exceptionDetails;
6060
}
6161

62-
public FailedMessage.ProcessingAttempt CreateProcessingAttempt(Dictionary<string, string> headers, Dictionary<string, object> metadata, FailureDetails failureDetails)
62+
public FailedMessage.ProcessingAttempt CreateProcessingAttempt(string messageId, Dictionary<string, string> headers, Dictionary<string, object> metadata, FailureDetails failureDetails)
6363
{
6464
return new FailedMessage.ProcessingAttempt
6565
{
6666
AttemptedAt = failureDetails.TimeOfFailure,
6767
FailureDetails = failureDetails,
6868
MessageMetadata = metadata,
69-
MessageId = headers[Headers.MessageId],
69+
MessageId = messageId,
7070
Headers = headers
7171
};
7272
}

0 commit comments

Comments
 (0)