Skip to content

Commit 717bf53

Browse files
authored
Merge pull request #84 from DEFRA/feature/cleanse-report-optimisations
Added more mongo indexes
2 parents 2a541e8 + 52ad926 commit 717bf53

File tree

21 files changed

+997
-109
lines changed

21 files changed

+997
-109
lines changed

src/KeeperData.Bridge.Worker/Configuration/ScheduledJobConfiguration.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,6 @@ namespace KeeperData.Bridge.Worker.Configuration;
33
public class ScheduledJobConfiguration
44
{
55
public string JobType { get; set; } = string.Empty;
6-
public bool Enabled { get; set; }
6+
public bool IsEnabled { get; set; }
77
public string CronSchedule { get; set; } = string.Empty;
88
}

src/KeeperData.Bridge.Worker/Jobs/ImportDeltaFilesJob.cs

Lines changed: 0 additions & 30 deletions
This file was deleted.

src/KeeperData.Bridge.Worker/Setup/ServiceCollectionExtensions.cs

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ private static IServiceCollection AddQuartz(this IServiceCollection services, IC
2727
services.AddQuartz(q =>
2828
{
2929
var importBulkFilesConfig = scheduledJobConfiguration.FirstOrDefault(x => x.JobType == nameof(ImportBulkFilesJob));
30-
if (importBulkFilesConfig?.CronSchedule != null)
30+
if (!string.IsNullOrWhiteSpace(importBulkFilesConfig?.CronSchedule) && importBulkFilesConfig.IsEnabled)
3131
{
3232
q.AddJob<ImportBulkFilesJob>(opts => opts.WithIdentity(importBulkFilesConfig.JobType));
3333

@@ -37,19 +37,8 @@ private static IServiceCollection AddQuartz(this IServiceCollection services, IC
3737
.WithCronSchedule(importBulkFilesConfig.CronSchedule));
3838
}
3939

40-
var importDeltaFilesConfig = scheduledJobConfiguration.FirstOrDefault(x => x.JobType == nameof(ImportDeltaFilesJob));
41-
if (importDeltaFilesConfig?.CronSchedule != null)
42-
{
43-
q.AddJob<ImportDeltaFilesJob>(opts => opts.WithIdentity(importDeltaFilesConfig.JobType));
44-
45-
q.AddTrigger(opts => opts
46-
.ForJob(importDeltaFilesConfig.JobType)
47-
.WithIdentity($"{importDeltaFilesConfig.JobType}-trigger")
48-
.WithCronSchedule(importDeltaFilesConfig.CronSchedule));
49-
}
50-
5140
var cleanseReportConfig = scheduledJobConfiguration.FirstOrDefault(x => x.JobType == nameof(CleanseReportJob));
52-
if (cleanseReportConfig?.CronSchedule != null)
41+
if (!string.IsNullOrWhiteSpace(cleanseReportConfig?.CronSchedule) && cleanseReportConfig.IsEnabled)
5342
{
5443
q.AddJob<CleanseReportJob>(opts => opts.WithIdentity(cleanseReportConfig.JobType));
5544

@@ -71,7 +60,6 @@ private static IServiceCollection AddQuartz(this IServiceCollection services, IC
7160
private static IServiceCollection AddJobs(this IServiceCollection services)
7261
{
7362
services.AddScoped<ImportBulkFilesJob>();
74-
services.AddScoped<ImportDeltaFilesJob>();
7563
services.AddScoped<CleanseReportJob>();
7664

7765
return services;
@@ -80,7 +68,6 @@ private static IServiceCollection AddJobs(this IServiceCollection services)
8068
private static IServiceCollection AddTasks(this IServiceCollection services)
8169
{
8270
services.AddScoped<ITaskProcessBulkFiles, TaskProcessBulkFiles>();
83-
services.AddScoped<ITaskProcessDeltaFiles, TaskProcessDeltaFiles>();
8471
services.AddScoped<ITaskRunCleanseReport, TaskRunCleanseReport>();
8572

8673
return services;

src/KeeperData.Bridge.Worker/Tasks/ITaskProcessDeltaFiles.cs

Lines changed: 0 additions & 3 deletions
This file was deleted.

src/KeeperData.Bridge.Worker/Tasks/Implementations/TaskProcessDeltaFiles.cs

Lines changed: 0 additions & 30 deletions
This file was deleted.
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
using KeeperData.Bridge.Worker.Configuration;
2+
using Microsoft.Extensions.Diagnostics.HealthChecks;
3+
4+
namespace KeeperData.Bridge.Setup;
5+
6+
public class QuartzJobsHealthCheck(IConfiguration configuration) : IHealthCheck
7+
{
8+
public Task<HealthCheckResult> CheckHealthAsync(
9+
HealthCheckContext context,
10+
CancellationToken cancellationToken = default)
11+
{
12+
var jobs = configuration.GetSection("Quartz:Jobs").Get<List<ScheduledJobConfiguration>>() ?? [];
13+
14+
var data = new Dictionary<string, object>();
15+
foreach (var job in jobs)
16+
{
17+
data[job.JobType] = new
18+
{
19+
job.IsEnabled,
20+
job.CronSchedule
21+
};
22+
}
23+
24+
return Task.FromResult(HealthCheckResult.Healthy("Quartz scheduled jobs", data));
25+
}
26+
}

src/KeeperData.Bridge/Setup/ServiceCollectionExtensions.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,8 @@ public static void ConfigureApi(this IServiceCollection services, IConfiguration
7676

7777
private static void ConfigureHealthChecks(this IServiceCollection services)
7878
{
79-
services.AddHealthChecks();
79+
services.AddHealthChecks()
80+
.AddCheck<QuartzJobsHealthCheck>("quartz_jobs", tags: ["quartz", "jobs"]);
8081
services.AddSingleton<IHealthCheckPublisher, HealthCheckMetricsPublisher>();
8182
}
8283

src/KeeperData.Bridge/appsettings.json

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -73,17 +73,12 @@
7373
"Jobs": [
7474
{
7575
"JobType": "ImportBulkFilesJob",
76-
"Enabled": true,
76+
"IsEnabled": true,
7777
"CronSchedule": "0 0 8 * * ?"
7878
},
79-
{
80-
"JobType": "ImportDeltaFilesJob",
81-
"Enabled": true,
82-
"CronSchedule": "0 0 9 * * ?"
83-
},
8479
{
8580
"JobType": "CleanseReportJob",
86-
"Enabled": true,
81+
"IsEnabled": false,
8782
"CronSchedule": "0 0 2 * * ?"
8883
}
8984
]

src/KeeperData.Core.Reports/Cleanse/Analysis/Command/Abstract/CleanseAnalysisEngineBase.cs

Lines changed: 56 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,28 @@
1+
using System.Diagnostics;
12
using KeeperData.Core.Reports.Cleanse.Analysis.Command.Domain;
23
using KeeperData.Core.Reports.Domain;
34
using KeeperData.Core.Reports.Issues.Command.Abstract;
45
using KeeperData.Core.Reports.SamCtsHoldings.Query.Abstract;
56
using KeeperData.Core.Reports.SamCtsHoldings.Query.Domain;
7+
using Microsoft.Extensions.Logging;
68

79
namespace KeeperData.Core.Reports.Cleanse.Analysis.Command.Abstract;
810

9-
public abstract class CleanseAnalysisEngineBase(ICtsSamQueryService dataService, IIssueCommandService issueCommandService)
11+
public abstract class CleanseAnalysisEngineBase(ICtsSamQueryService dataService, IIssueCommandService issueCommandService, ILogger logger)
1012
{
11-
private const int ThrottlingPumpDelayMs = 400;
12-
private const int BatchSize = 70;
13+
private const int ThrottlingPumpDelayMs = 150;
14+
private const int BatchSize = 100;
1315
private const int ProgressUpdateInterval = 100;
1416
private const string DateTimeFormat = "yyyy-MM-dd HH:mm:ss";
1517

1618
protected IIssueCommandService IssueCommandService { get; } = issueCommandService;
1719

20+
/// <summary>
21+
/// In-memory lookup mapping CPH values to their full LID_FULL_IDENTIFIER strings.
22+
/// Populated before the SAM pump runs so that CPH→LID resolution avoids regex queries on MongoDB.
23+
/// </summary>
24+
protected Dictionary<string, string> CphToLidLookup { get; private set; } = [];
25+
1826
protected delegate Task RecordProcessor(string id,
1927
string operationId, AnalysisMetrics metrics, CancellationToken ct);
2028

@@ -77,6 +85,10 @@ await PumpAsync(new PumpContext(totalRecords, operationId, metrics, progressCall
7785
dataService.ListCtsCphHoldingsAsync, ProcessCtsPrimaryRecordAsync,
7886
DataFields.CtsCphHoldingFields.LidFullIdentifier), ct);
7987

88+
// Pre-load all CTS LID_FULL_IDENTIFIER values so CPH→LID resolution
89+
// can be done in-memory instead of via regex queries on MongoDB.
90+
CphToLidLookup = await BuildCphToLidLookupAsync(ct);
91+
8092
// iterate SAM CPH records
8193
await PumpAsync(new PumpContext(totalRecords, operationId, metrics, progressCallback,
8294
dataService.ListSamCphHoldingsAsync, ProcessSamPrimaryRecordAsync,
@@ -117,6 +129,47 @@ protected static bool IsCtsCphHoldingRecordActive(IDictionary<string, object?> r
117129
};
118130

119131

132+
/// <summary>
133+
/// Pages through all CTS CPH Holding records in throttled batches,
134+
/// building a dictionary that maps each CPH value to its full LID_FULL_IDENTIFIER.
135+
/// </summary>
136+
private async Task<Dictionary<string, string>> BuildCphToLidLookupAsync(CancellationToken ct)
137+
{
138+
logger.LogInformation("Building CPH to LID lookup: starting");
139+
var stopwatch = Stopwatch.StartNew();
140+
141+
var lookup = new Dictionary<string, string>(StringComparer.OrdinalIgnoreCase);
142+
var skip = 0;
143+
144+
while (!ct.IsCancellationRequested)
145+
{
146+
var batch = await dataService.ListCtsCphHoldingsAsync(skip, BatchSize, ct);
147+
148+
if (batch.Data.Count == 0)
149+
{
150+
break;
151+
}
152+
153+
foreach (var record in batch.Data)
154+
{
155+
var lid = LidFullIdentifier.TryParse(record[DataFields.CtsCphHoldingFields.LidFullIdentifier]?.ToString());
156+
if (lid is not null)
157+
{
158+
lookup.TryAdd(lid.Cph.Value, lid.Value);
159+
}
160+
}
161+
162+
skip += batch.Data.Count;
163+
await Task.Delay(ThrottlingPumpDelayMs, ct);
164+
}
165+
166+
stopwatch.Stop();
167+
logger.LogInformation("Building CPH to LID lookup: completed. Records={RecordCount}, Duration={DurationMs}ms ({DurationSeconds}s)",
168+
lookup.Count, stopwatch.ElapsedMilliseconds, stopwatch.Elapsed.TotalSeconds);
169+
170+
return lookup;
171+
}
172+
120173
protected static bool ShouldUpdateProgress(int recordsAnalyzed)
121174
=> recordsAnalyzed % ProgressUpdateInterval == 0;
122175
}

src/KeeperData.Core.Reports/Cleanse/Analysis/Command/Impl/CleanseAnalysisEngine.cs

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,17 @@
66
using KeeperData.Core.Reports.Issues.Command.Abstract;
77
using KeeperData.Core.Reports.SamCtsHoldings.Query.Abstract;
88
using KeeperData.Core.Reports.SamCtsHoldings.Query.Domain;
9+
using Microsoft.Extensions.Logging;
910

1011
namespace KeeperData.Core.Reports.Cleanse.Analysis.Command.Impl;
1112

12-
public class CleanseAnalysisEngine(ICtsSamQueryService dataService, IIssueCommandService issueCommandService)
13-
: CleanseAnalysisEngineBase(dataService, issueCommandService), ICleanseAnalysisEngine
13+
public class CleanseAnalysisEngine(ICtsSamQueryService dataService, IIssueCommandService issueCommandService, ILogger<CleanseAnalysisEngine> logger)
14+
: CleanseAnalysisEngineBase(dataService, issueCommandService, logger), ICleanseAnalysisEngine
1415
{
1516
private readonly RecordIdGenerator _recordIdGenerator = new();
1617
private readonly ICtsSamQueryService _dataService = dataService;
1718

18-
private const int ThrottleDelayMs = 100;
19+
private const int ThrottleDelayMs = 50;
1920

2021
private async Task ProcessCtsPrimaryRecordInternalAsync(LidFullIdentifier lidFullIdentifier, string operationId, AnalysisMetrics metrics, CancellationToken ct)
2122
{
@@ -146,8 +147,15 @@ private async Task ProcessSamPrimaryRecordInternalAsync(Cph cph, string operatio
146147
{
147148
var results = new List<RuleResult>();
148149

149-
// get the CTS CPH Holding...
150-
var ctsCphHolding = await _dataService.GetCtsCphHoldingAsync(cph, ct);
150+
// Resolve CPH to full LID_FULL_IDENTIFIER via the in-memory lookup
151+
// so we can query by equals instead of a regex endswith on MongoDB.
152+
CtsCphHoldingModel? ctsCphHolding = null;
153+
if (CphToLidLookup.TryGetValue(cph.Value, out var lidValue))
154+
{
155+
var lid = LidFullIdentifier.Parse(lidValue);
156+
ctsCphHolding = await _dataService.GetCtsCphHoldingAsync(lid, ct);
157+
}
158+
151159
if (ctsCphHolding is null) // does not exist
152160
{
153161
results.Add(RuleResult.Issue(RuleDescriptors.SamCphNotInCts, cph)); // PRIORITY 1B: RULE 2B - CPH present in SAM but missing in CTS

0 commit comments

Comments
 (0)