Skip to content

Commit be396b1

Browse files
committed
Implement EF Core Unit of Work for ingestion
1 parent f863b9e commit be396b1

File tree

4 files changed

+321
-0
lines changed

4 files changed

+321
-0
lines changed
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
namespace ServiceControl.Persistence.Sql.Core.Implementation.UnitOfWork;
2+
3+
using System.Threading;
4+
using System.Threading.Tasks;
5+
using DbContexts;
6+
using ServiceControl.Persistence.UnitOfWork;
7+
8+
class IngestionUnitOfWork : IngestionUnitOfWorkBase
9+
{
10+
public IngestionUnitOfWork(ServiceControlDbContextBase dbContext)
11+
{
12+
DbContext = dbContext;
13+
Monitoring = new MonitoringIngestionUnitOfWork(this);
14+
Recoverability = new RecoverabilityIngestionUnitOfWork(this);
15+
}
16+
17+
internal ServiceControlDbContextBase DbContext { get; }
18+
19+
// EF Core automatically batches all pending operations
20+
// The upsert operations execute SQL directly, but EF Core tracked changes (Add/Remove/Update) are batched
21+
public override Task Complete(CancellationToken cancellationToken) =>
22+
DbContext.SaveChangesAsync(cancellationToken);
23+
24+
protected override void Dispose(bool disposing)
25+
{
26+
if (disposing)
27+
{
28+
DbContext?.Dispose();
29+
}
30+
base.Dispose(disposing);
31+
}
32+
}
33+
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
namespace ServiceControl.Persistence.Sql.Core.Implementation.UnitOfWork;
2+
3+
using System;
4+
using System.Threading.Tasks;
5+
using DbContexts;
6+
using Microsoft.Extensions.DependencyInjection;
7+
using ServiceControl.Persistence;
8+
using ServiceControl.Persistence.UnitOfWork;
9+
10+
class IngestionUnitOfWorkFactory(IServiceProvider serviceProvider, MinimumRequiredStorageState storageState) : IIngestionUnitOfWorkFactory
11+
{
12+
public ValueTask<IIngestionUnitOfWork> StartNew()
13+
{
14+
var scope = serviceProvider.CreateScope();
15+
var dbContext = scope.ServiceProvider.GetRequiredService<ServiceControlDbContextBase>();
16+
var unitOfWork = new IngestionUnitOfWork(dbContext);
17+
return ValueTask.FromResult<IIngestionUnitOfWork>(unitOfWork);
18+
}
19+
20+
public bool CanIngestMore() => storageState.CanIngestMore;
21+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
namespace ServiceControl.Persistence.Sql.Core.Implementation.UnitOfWork;
2+
3+
using System.Threading.Tasks;
4+
using Entities;
5+
using ServiceControl.Persistence;
6+
using ServiceControl.Persistence.UnitOfWork;
7+
8+
class MonitoringIngestionUnitOfWork(IngestionUnitOfWork parent) : IMonitoringIngestionUnitOfWork
9+
{
10+
public async Task RecordKnownEndpoint(KnownEndpoint knownEndpoint)
11+
{
12+
var entity = new KnownEndpointEntity
13+
{
14+
Id = knownEndpoint.EndpointDetails.GetDeterministicId(),
15+
EndpointName = knownEndpoint.EndpointDetails.Name,
16+
HostId = knownEndpoint.EndpointDetails.HostId,
17+
Host = knownEndpoint.EndpointDetails.Host,
18+
HostDisplayName = knownEndpoint.HostDisplayName,
19+
Monitored = knownEndpoint.Monitored
20+
};
21+
22+
// Use EF's change tracking for upsert
23+
var existing = await parent.DbContext.KnownEndpoints.FindAsync(entity.Id);
24+
if (existing == null)
25+
{
26+
parent.DbContext.KnownEndpoints.Add(entity);
27+
}
28+
else
29+
{
30+
parent.DbContext.Entry(existing).CurrentValues.SetValues(entity);
31+
}
32+
}
33+
}
Lines changed: 234 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,234 @@
1+
namespace ServiceControl.Persistence.Sql.Core.Implementation.UnitOfWork;
2+
3+
using System;
4+
using System.Collections.Generic;
5+
using System.Linq;
6+
using System.Text.Json;
7+
using System.Threading.Tasks;
8+
using Entities;
9+
using Infrastructure;
10+
using Microsoft.EntityFrameworkCore;
11+
using NServiceBus;
12+
using NServiceBus.Transport;
13+
using ServiceControl.MessageFailures;
14+
using ServiceControl.Operations;
15+
using ServiceControl.Persistence.Infrastructure;
16+
using ServiceControl.Persistence.UnitOfWork;
17+
18+
class RecoverabilityIngestionUnitOfWork(IngestionUnitOfWork parent) : IRecoverabilityIngestionUnitOfWork
19+
{
20+
const int MaxProcessingAttempts = 10;
21+
22+
public async Task RecordFailedProcessingAttempt(MessageContext context, FailedMessage.ProcessingAttempt processingAttempt, List<FailedMessage.FailureGroup> groups)
23+
{
24+
var uniqueMessageId = context.Headers.UniqueId();
25+
var contentType = GetContentType(context.Headers, "text/plain");
26+
var bodySize = context.Body.Length;
27+
28+
// Add metadata to the processing attempt
29+
processingAttempt.MessageMetadata.Add("ContentType", contentType);
30+
processingAttempt.MessageMetadata.Add("ContentLength", bodySize);
31+
processingAttempt.MessageMetadata.Add("BodyUrl", $"/messages/{uniqueMessageId}/body");
32+
33+
// Store endpoint details in metadata for efficient retrieval
34+
var sendingEndpoint = ExtractSendingEndpoint(context.Headers);
35+
var receivingEndpoint = ExtractReceivingEndpoint(context.Headers);
36+
37+
if (sendingEndpoint != null)
38+
{
39+
processingAttempt.MessageMetadata.Add("SendingEndpoint", sendingEndpoint);
40+
}
41+
42+
if (receivingEndpoint != null)
43+
{
44+
processingAttempt.MessageMetadata.Add("ReceivingEndpoint", receivingEndpoint);
45+
}
46+
47+
// Extract denormalized fields from headers for efficient querying
48+
var messageType = context.Headers.TryGetValue(Headers.EnclosedMessageTypes, out var mt) ? mt?.Split(',').FirstOrDefault()?.Trim() : null;
49+
var timeSent = context.Headers.TryGetValue(Headers.TimeSent, out var ts) && DateTimeOffset.TryParse(ts, out var parsedTime) ? parsedTime.UtcDateTime : (DateTime?)null;
50+
var queueAddress = context.Headers.TryGetValue("NServiceBus.FailedQ", out var qa) ? qa : null;
51+
var conversationId = context.Headers.TryGetValue(Headers.ConversationId, out var cid) ? cid : null;
52+
53+
// Extract performance metrics from metadata for efficient sorting
54+
var criticalTime = processingAttempt.MessageMetadata.TryGetValue("CriticalTime", out var ct) && ct is TimeSpan ctSpan ? (TimeSpan?)ctSpan : null;
55+
var processingTime = processingAttempt.MessageMetadata.TryGetValue("ProcessingTime", out var pt) && pt is TimeSpan ptSpan ? (TimeSpan?)ptSpan : null;
56+
var deliveryTime = processingAttempt.MessageMetadata.TryGetValue("DeliveryTime", out var dt) && dt is TimeSpan dtSpan ? (TimeSpan?)dtSpan : null;
57+
58+
// Load existing message to merge attempts list
59+
var existingMessage = await parent.DbContext.FailedMessages
60+
.AsNoTracking()
61+
.FirstOrDefaultAsync(fm => fm.UniqueMessageId == uniqueMessageId);
62+
63+
List<FailedMessage.ProcessingAttempt> attempts;
64+
if (existingMessage != null)
65+
{
66+
// Merge with existing attempts
67+
attempts = JsonSerializer.Deserialize<List<FailedMessage.ProcessingAttempt>>(existingMessage.ProcessingAttemptsJson) ?? [];
68+
69+
// De-duplicate attempts by AttemptedAt value
70+
var duplicateIndex = attempts.FindIndex(a => a.AttemptedAt == processingAttempt.AttemptedAt);
71+
if (duplicateIndex < 0)
72+
{
73+
attempts.Add(processingAttempt);
74+
}
75+
76+
// Trim to the latest MaxProcessingAttempts
77+
attempts = [.. attempts
78+
.OrderBy(a => a.AttemptedAt)
79+
.TakeLast(MaxProcessingAttempts)];
80+
}
81+
else
82+
{
83+
// First attempt for this message
84+
attempts = [processingAttempt];
85+
}
86+
87+
// Build the complete entity with all fields
88+
var failedMessageEntity = new FailedMessageEntity
89+
{
90+
Id = SequentialGuidGenerator.NewSequentialGuid(),
91+
UniqueMessageId = uniqueMessageId,
92+
Status = FailedMessageStatus.Unresolved,
93+
ProcessingAttemptsJson = JsonSerializer.Serialize(attempts),
94+
FailureGroupsJson = JsonSerializer.Serialize(groups),
95+
PrimaryFailureGroupId = groups.Count > 0 ? groups[0].Id : null,
96+
MessageId = processingAttempt.MessageId,
97+
MessageType = messageType,
98+
TimeSent = timeSent,
99+
SendingEndpointName = sendingEndpoint?.Name,
100+
ReceivingEndpointName = receivingEndpoint?.Name,
101+
ExceptionType = processingAttempt.FailureDetails?.Exception?.ExceptionType,
102+
ExceptionMessage = processingAttempt.FailureDetails?.Exception?.Message,
103+
QueueAddress = queueAddress,
104+
NumberOfProcessingAttempts = attempts.Count,
105+
LastProcessedAt = processingAttempt.AttemptedAt,
106+
ConversationId = conversationId,
107+
CriticalTime = criticalTime,
108+
ProcessingTime = processingTime,
109+
DeliveryTime = deliveryTime
110+
};
111+
112+
// Use EF's change tracking for upsert
113+
if (existingMessage != null)
114+
{
115+
parent.DbContext.FailedMessages.Update(failedMessageEntity);
116+
}
117+
else
118+
{
119+
parent.DbContext.FailedMessages.Add(failedMessageEntity);
120+
}
121+
122+
// Store the message body (avoid allocation if body already exists)
123+
await StoreMessageBody(uniqueMessageId, context.Body, contentType, bodySize);
124+
}
125+
126+
public async Task RecordSuccessfulRetry(string retriedMessageUniqueId)
127+
{
128+
// Find the failed message by unique ID
129+
var failedMessage = await parent.DbContext.FailedMessages
130+
.FirstOrDefaultAsync(fm => fm.UniqueMessageId == retriedMessageUniqueId);
131+
132+
if (failedMessage != null)
133+
{
134+
// Update its status to Resolved - EF Core tracks this change
135+
failedMessage.Status = FailedMessageStatus.Resolved;
136+
}
137+
138+
// Remove any retry tracking document - query by UniqueMessageId instead since we no longer have the composite pattern
139+
var failedMsg = await parent.DbContext.FailedMessages
140+
.AsNoTracking()
141+
.FirstOrDefaultAsync(fm => fm.UniqueMessageId == retriedMessageUniqueId);
142+
143+
if (failedMsg != null)
144+
{
145+
var retryDocument = await parent.DbContext.FailedMessageRetries
146+
.FirstOrDefaultAsync(r => r.FailedMessageId == failedMsg.Id.ToString());
147+
148+
if (retryDocument != null)
149+
{
150+
// EF Core tracks this removal
151+
parent.DbContext.FailedMessageRetries.Remove(retryDocument);
152+
}
153+
}
154+
}
155+
156+
async Task StoreMessageBody(string uniqueMessageId, ReadOnlyMemory<byte> body, string contentType, int bodySize)
157+
{
158+
// Parse the uniqueMessageId to Guid for querying
159+
var bodyId = Guid.Parse(uniqueMessageId);
160+
161+
// Check if body already exists (bodies are immutable)
162+
var exists = await parent.DbContext.MessageBodies
163+
.AsNoTracking()
164+
.AnyAsync(mb => mb.Id == bodyId);
165+
166+
if (!exists)
167+
{
168+
// Only allocate the array if we need to store it
169+
var bodyEntity = new MessageBodyEntity
170+
{
171+
Id = bodyId,
172+
Body = body.ToArray(), // Allocation happens here, but only when needed
173+
ContentType = contentType,
174+
BodySize = bodySize,
175+
Etag = Guid.NewGuid().ToString() // Generate a simple etag
176+
};
177+
178+
// Add new message body
179+
parent.DbContext.MessageBodies.Add(bodyEntity);
180+
}
181+
// If body already exists, we don't update it (it's immutable) - no allocation!
182+
}
183+
184+
static string GetContentType(IReadOnlyDictionary<string, string> headers, string defaultContentType)
185+
=> headers.TryGetValue(Headers.ContentType, out var contentType) ? contentType : defaultContentType;
186+
187+
static EndpointDetails? ExtractSendingEndpoint(IReadOnlyDictionary<string, string> headers)
188+
{
189+
var endpoint = new EndpointDetails();
190+
191+
if (headers.TryGetValue("NServiceBus.OriginatingEndpoint", out var name))
192+
{
193+
endpoint.Name = name;
194+
}
195+
196+
if (headers.TryGetValue("NServiceBus.OriginatingMachine", out var host))
197+
{
198+
endpoint.Host = host;
199+
}
200+
201+
if (headers.TryGetValue("NServiceBus.OriginatingHostId", out var hostId) && Guid.TryParse(hostId, out var parsedHostId))
202+
{
203+
endpoint.HostId = parsedHostId;
204+
}
205+
206+
return !string.IsNullOrEmpty(endpoint.Name) ? endpoint : null;
207+
}
208+
209+
static EndpointDetails? ExtractReceivingEndpoint(IReadOnlyDictionary<string, string> headers)
210+
{
211+
var endpoint = new EndpointDetails();
212+
213+
if (headers.TryGetValue("NServiceBus.ProcessingEndpoint", out var name))
214+
{
215+
endpoint.Name = name;
216+
}
217+
218+
if (headers.TryGetValue("NServiceBus.HostDisplayName", out var host))
219+
{
220+
endpoint.Host = host;
221+
}
222+
else if (headers.TryGetValue("NServiceBus.ProcessingMachine", out var machine))
223+
{
224+
endpoint.Host = machine;
225+
}
226+
227+
if (headers.TryGetValue("NServiceBus.HostId", out var hostId) && Guid.TryParse(hostId, out var parsedHostId))
228+
{
229+
endpoint.HostId = parsedHostId;
230+
}
231+
232+
return !string.IsNullOrEmpty(endpoint.Name) ? endpoint : null;
233+
}
234+
}

0 commit comments

Comments
 (0)