Skip to content

Commit e4a78d4

Browse files
committed
Implement core EF Core data stores
1 parent 52639dd commit e4a78d4

24 files changed

+3522
-0
lines changed
Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
namespace ServiceControl.Persistence.Sql.Core.Implementation;
2+
3+
using System;
4+
using System.Collections.Generic;
5+
using System.Linq;
6+
using System.Threading.Tasks;
7+
using DbContexts;
8+
using Entities;
9+
using Microsoft.EntityFrameworkCore;
10+
using Microsoft.Extensions.DependencyInjection;
11+
using Microsoft.Extensions.Logging;
12+
using ServiceControl.Infrastructure.DomainEvents;
13+
using ServiceControl.Persistence.Recoverability;
14+
using ServiceControl.Recoverability;
15+
16+
public class ArchiveMessages : DataStoreBase, IArchiveMessages
17+
{
18+
readonly IDomainEvents domainEvents;
19+
readonly ILogger<ArchiveMessages> logger;
20+
21+
public ArchiveMessages(
22+
IServiceProvider serviceProvider,
23+
IDomainEvents domainEvents,
24+
ILogger<ArchiveMessages> logger) : base(serviceProvider)
25+
{
26+
this.domainEvents = domainEvents;
27+
this.logger = logger;
28+
}
29+
30+
public async Task ArchiveAllInGroup(string groupId)
31+
{
32+
// This would update all failed messages in the group to archived status
33+
// For now, this is a placeholder that would need the failed message infrastructure
34+
logger.LogInformation("Archiving all messages in group {GroupId}", groupId);
35+
await Task.CompletedTask;
36+
}
37+
38+
public async Task UnarchiveAllInGroup(string groupId)
39+
{
40+
logger.LogInformation("Unarchiving all messages in group {GroupId}", groupId);
41+
await Task.CompletedTask;
42+
}
43+
44+
public bool IsOperationInProgressFor(string groupId, ArchiveType archiveType)
45+
{
46+
return ExecuteWithDbContext(dbContext =>
47+
{
48+
var operationId = MakeOperationId(groupId, archiveType);
49+
var operation = dbContext.ArchiveOperations
50+
.AsNoTracking()
51+
.FirstOrDefault(a => a.Id == Guid.Parse(operationId));
52+
53+
if (operation == null)
54+
{
55+
return Task.FromResult(false);
56+
}
57+
58+
return Task.FromResult(operation.ArchiveState != (int)ArchiveState.ArchiveCompleted);
59+
}).Result;
60+
}
61+
62+
public bool IsArchiveInProgressFor(string groupId)
63+
{
64+
return IsOperationInProgressFor(groupId, ArchiveType.FailureGroup) ||
65+
IsOperationInProgressFor(groupId, ArchiveType.All);
66+
}
67+
68+
public void DismissArchiveOperation(string groupId, ArchiveType archiveType)
69+
{
70+
ExecuteWithDbContext(dbContext =>
71+
{
72+
var operationId = Guid.Parse(MakeOperationId(groupId, archiveType));
73+
74+
dbContext.ArchiveOperations.Where(a => a.Id == operationId).ExecuteDelete();
75+
return Task.CompletedTask;
76+
}).Wait();
77+
}
78+
79+
public Task StartArchiving(string groupId, ArchiveType archiveType)
80+
{
81+
return ExecuteWithDbContext(async dbContext =>
82+
{
83+
var operation = new ArchiveOperationEntity
84+
{
85+
Id = Guid.Parse(MakeOperationId(groupId, archiveType)),
86+
RequestId = groupId,
87+
GroupName = groupId,
88+
ArchiveType = (int)archiveType,
89+
ArchiveState = (int)ArchiveState.ArchiveStarted,
90+
TotalNumberOfMessages = 0,
91+
NumberOfMessagesArchived = 0,
92+
NumberOfBatches = 0,
93+
CurrentBatch = 0,
94+
Started = DateTime.UtcNow
95+
};
96+
97+
await dbContext.ArchiveOperations.AddAsync(operation);
98+
await dbContext.SaveChangesAsync();
99+
100+
logger.LogInformation("Started archiving for group {GroupId}", groupId);
101+
});
102+
}
103+
104+
public Task StartUnarchiving(string groupId, ArchiveType archiveType)
105+
{
106+
return ExecuteWithDbContext(async dbContext =>
107+
{
108+
var operation = new ArchiveOperationEntity
109+
{
110+
Id = Guid.Parse(MakeOperationId(groupId, archiveType)),
111+
RequestId = groupId,
112+
GroupName = groupId,
113+
ArchiveType = (int)archiveType,
114+
ArchiveState = (int)ArchiveState.ArchiveStarted,
115+
TotalNumberOfMessages = 0,
116+
NumberOfMessagesArchived = 0,
117+
NumberOfBatches = 0,
118+
CurrentBatch = 0,
119+
Started = DateTime.UtcNow
120+
};
121+
122+
await dbContext.ArchiveOperations.AddAsync(operation);
123+
await dbContext.SaveChangesAsync();
124+
125+
logger.LogInformation("Started unarchiving for group {GroupId}", groupId);
126+
});
127+
}
128+
129+
public IEnumerable<InMemoryArchive> GetArchivalOperations()
130+
{
131+
// Note: IEnumerable methods need direct scope management as they yield results
132+
using var scope = serviceProvider.CreateScope();
133+
var dbContext = scope.ServiceProvider.GetRequiredService<ServiceControlDbContextBase>();
134+
135+
var operations = dbContext.ArchiveOperations
136+
.AsNoTracking()
137+
.AsEnumerable();
138+
139+
foreach (var op in operations)
140+
{
141+
yield return new InMemoryArchive(op.RequestId, (ArchiveType)op.ArchiveType, domainEvents)
142+
{
143+
GroupName = op.GroupName,
144+
ArchiveState = (ArchiveState)op.ArchiveState,
145+
TotalNumberOfMessages = op.TotalNumberOfMessages,
146+
NumberOfMessagesArchived = op.NumberOfMessagesArchived,
147+
NumberOfBatches = op.NumberOfBatches,
148+
CurrentBatch = op.CurrentBatch,
149+
Started = op.Started,
150+
Last = op.Last,
151+
CompletionTime = op.CompletionTime
152+
};
153+
}
154+
}
155+
156+
static string MakeOperationId(string groupId, ArchiveType archiveType)
157+
{
158+
return $"{archiveType}/{groupId}";
159+
}
160+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
namespace ServiceControl.Persistence.Sql.Core.Implementation;
2+
3+
using System;
4+
using System.IO;
5+
using System.Threading.Tasks;
6+
using Microsoft.EntityFrameworkCore;
7+
using ServiceControl.Operations.BodyStorage;
8+
9+
public class BodyStorage : DataStoreBase, IBodyStorage
10+
{
11+
public BodyStorage(IServiceProvider serviceProvider) : base(serviceProvider)
12+
{
13+
}
14+
15+
public Task<MessageBodyStreamResult> TryFetch(string bodyId)
16+
{
17+
return ExecuteWithDbContext(async dbContext =>
18+
{
19+
// Try to fetch the body directly by ID
20+
var messageBody = await dbContext.MessageBodies
21+
.AsNoTracking()
22+
.FirstOrDefaultAsync(mb => mb.Id == Guid.Parse(bodyId));
23+
24+
if (messageBody == null)
25+
{
26+
return new MessageBodyStreamResult { HasResult = false };
27+
}
28+
29+
return new MessageBodyStreamResult
30+
{
31+
HasResult = true,
32+
Stream = new MemoryStream(messageBody.Body),
33+
ContentType = messageBody.ContentType,
34+
BodySize = messageBody.BodySize,
35+
Etag = messageBody.Etag
36+
};
37+
});
38+
}
39+
}
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
namespace ServiceControl.Persistence.Sql.Core.Implementation;
2+
3+
using System;
4+
using System.Collections.Generic;
5+
using System.Linq;
6+
using System.Threading.Tasks;
7+
using Contracts.CustomChecks;
8+
using Entities;
9+
using Microsoft.EntityFrameworkCore;
10+
using Operations;
11+
using ServiceControl.Persistence;
12+
using ServiceControl.Persistence.Infrastructure;
13+
14+
public class CustomChecksDataStore : DataStoreBase, ICustomChecksDataStore
15+
{
16+
public CustomChecksDataStore(IServiceProvider serviceProvider) : base(serviceProvider)
17+
{
18+
}
19+
20+
public Task<CheckStateChange> UpdateCustomCheckStatus(CustomCheckDetail detail)
21+
{
22+
return ExecuteWithDbContext(async dbContext =>
23+
{
24+
var status = CheckStateChange.Unchanged;
25+
var id = detail.GetDeterministicId();
26+
27+
var customCheck = await dbContext.CustomChecks.FirstOrDefaultAsync(c => c.Id == id);
28+
29+
if (customCheck == null ||
30+
(customCheck.Status == (int)Status.Fail && !detail.HasFailed) ||
31+
(customCheck.Status == (int)Status.Pass && detail.HasFailed))
32+
{
33+
if (customCheck == null)
34+
{
35+
customCheck = new CustomCheckEntity { Id = id };
36+
await dbContext.CustomChecks.AddAsync(customCheck);
37+
}
38+
39+
status = CheckStateChange.Changed;
40+
}
41+
42+
customCheck.CustomCheckId = detail.CustomCheckId;
43+
customCheck.Category = detail.Category;
44+
customCheck.Status = detail.HasFailed ? (int)Status.Fail : (int)Status.Pass;
45+
customCheck.ReportedAt = detail.ReportedAt;
46+
customCheck.FailureReason = detail.FailureReason;
47+
customCheck.EndpointName = detail.OriginatingEndpoint.Name;
48+
customCheck.HostId = detail.OriginatingEndpoint.HostId;
49+
customCheck.Host = detail.OriginatingEndpoint.Host;
50+
51+
await dbContext.SaveChangesAsync();
52+
53+
return status;
54+
});
55+
}
56+
57+
public Task<QueryResult<IList<CustomCheck>>> GetStats(PagingInfo paging, string? status = null)
58+
{
59+
return ExecuteWithDbContext(async dbContext =>
60+
{
61+
var query = dbContext.CustomChecks.AsQueryable();
62+
63+
// Add status filter
64+
if (status == "fail")
65+
{
66+
query = query.Where(c => c.Status == (int)Status.Fail);
67+
}
68+
if (status == "pass")
69+
{
70+
query = query.Where(c => c.Status == (int)Status.Pass);
71+
}
72+
73+
var totalCount = await query.CountAsync();
74+
75+
var results = await query
76+
.OrderByDescending(c => c.ReportedAt)
77+
.Skip(paging.Offset)
78+
.Take(paging.Next)
79+
.AsNoTracking()
80+
.ToListAsync();
81+
82+
var customChecks = results.Select(e => new CustomCheck
83+
{
84+
Id = $"{e.Id}",
85+
CustomCheckId = e.CustomCheckId,
86+
Category = e.Category,
87+
Status = (Status)e.Status,
88+
ReportedAt = e.ReportedAt,
89+
FailureReason = e.FailureReason,
90+
OriginatingEndpoint = new EndpointDetails
91+
{
92+
Name = e.EndpointName,
93+
HostId = e.HostId,
94+
Host = e.Host
95+
}
96+
}).ToList();
97+
98+
var queryStats = new QueryStatsInfo(string.Empty, totalCount, false);
99+
return new QueryResult<IList<CustomCheck>>(customChecks, queryStats);
100+
});
101+
}
102+
103+
public Task DeleteCustomCheck(Guid id)
104+
{
105+
return ExecuteWithDbContext(async dbContext =>
106+
{
107+
await dbContext.CustomChecks.Where(c => c.Id == id).ExecuteDeleteAsync();
108+
});
109+
}
110+
111+
public Task<int> GetNumberOfFailedChecks()
112+
{
113+
return ExecuteWithDbContext(async dbContext =>
114+
{
115+
return await dbContext.CustomChecks.CountAsync(c => c.Status == (int)Status.Fail);
116+
});
117+
}
118+
}

0 commit comments

Comments
 (0)