Skip to content

Commit 3bd0901

Browse files
committed
Added more throttling to ingest and analysis
1 parent e2377a3 commit 3bd0901

File tree

4 files changed

+308
-259
lines changed

4 files changed

+308
-259
lines changed

src/KeeperData.Bridge/appsettings.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@
8484
{
8585
"JobType": "CleanseReportJob",
8686
"Enabled": true,
87-
"CronSchedule": "0 0 8 * * ?"
87+
"CronSchedule": "0 0 2 * * ?"
8888
}
8989
]
9090
},

src/KeeperData.Core.Reports/Cleanse/Analysis/Command/Abstract/CleanseAnalysisEngineBase.cs

Lines changed: 28 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@ namespace KeeperData.Core.Reports.Cleanse.Analysis.Command.Abstract;
88

99
public abstract class CleanseAnalysisEngineBase(ICtsSamQueryService dataService, IIssueCommandService issueCommandService)
1010
{
11-
private const int BatchSize = 100;
11+
private const int ThrottlingPumpDelayMs = 400;
12+
private const int BatchSize = 70;
1213
private const int ProgressUpdateInterval = 100;
1314
private const string DateTimeFormat = "yyyy-MM-dd HH:mm:ss";
1415

@@ -34,35 +35,11 @@ protected abstract Task ProcessCtsPrimaryRecordAsync(string id,
3435
protected abstract Task ProcessSamPrimaryRecordAsync(string id,
3536
string operationId, AnalysisMetrics metrics, CancellationToken ct);
3637

37-
public async Task<AnalysisMetrics> ExecuteAsync(string operationId, ProgressCallback progressCallback, CancellationToken ct)
38-
{
39-
var metrics = new AnalysisMetrics();
40-
41-
var ctsTotalRecords = await dataService.GetCtsCphHoldingsCountAsync(ct);
42-
var samTotalRecords = await dataService.GetSamCphHoldingsCountAsync(ct);
43-
var totalRecords = ctsTotalRecords + samTotalRecords;
44-
45-
await progressCallback(0, totalRecords, 0, 0);
46-
47-
// iterate CTS CPH records
48-
await PumpAsync(new PumpContext(totalRecords, operationId, metrics, progressCallback,
49-
dataService.ListCtsCphHoldingsAsync, ProcessCtsPrimaryRecordAsync,
50-
DataFields.CtsCphHoldingFields.LidFullIdentifier), ct);
51-
52-
// iterate SAM CPH records
53-
await PumpAsync(new PumpContext(totalRecords, operationId, metrics, progressCallback,
54-
dataService.ListSamCphHoldingsAsync, ProcessSamPrimaryRecordAsync,
55-
DataFields.SamCphHoldingFields.Cph), ct);
56-
57-
return metrics;
58-
}
59-
60-
6138

6239
protected async Task PumpAsync(PumpContext context, CancellationToken ct)
6340
{
6441
var skip = 0;
65-
while (true && !ct.IsCancellationRequested)
42+
while (!ct.IsCancellationRequested)
6643
{
6744
var batch = await context.Fetcher(skip, BatchSize, ct);
6845

@@ -80,9 +57,34 @@ protected async Task PumpAsync(PumpContext context, CancellationToken ct)
8057
{
8158
await context.ProgressCallback(context.Metrics.RecordsAnalyzed, context.TotalRecords, context.Metrics.IssuesFound, context.Metrics.IssuesResolved);
8259
}
60+
61+
await Task.Delay(ThrottlingPumpDelayMs, ct);
8362
}
8463
}
8564

65+
public async Task<AnalysisMetrics> ExecuteAsync(string operationId, ProgressCallback progressCallback, CancellationToken ct)
66+
{
67+
var metrics = new AnalysisMetrics();
68+
69+
var ctsTotalRecords = await dataService.GetCtsCphHoldingsCountAsync(ct);
70+
var samTotalRecords = await dataService.GetSamCphHoldingsCountAsync(ct);
71+
var totalRecords = ctsTotalRecords + samTotalRecords;
72+
73+
await progressCallback(0, totalRecords, 0, 0);
74+
75+
// iterate CTS CPH records
76+
await PumpAsync(new PumpContext(totalRecords, operationId, metrics, progressCallback,
77+
dataService.ListCtsCphHoldingsAsync, ProcessCtsPrimaryRecordAsync,
78+
DataFields.CtsCphHoldingFields.LidFullIdentifier), ct);
79+
80+
// iterate SAM CPH records
81+
await PumpAsync(new PumpContext(totalRecords, operationId, metrics, progressCallback,
82+
dataService.ListSamCphHoldingsAsync, ProcessSamPrimaryRecordAsync,
83+
DataFields.SamCphHoldingFields.Cph), ct);
84+
85+
return metrics;
86+
}
87+
8688
protected static async Task ProcessBatchAsync(QueryResult batch, PumpContext context, CancellationToken ct)
8789
{
8890
foreach (var record in batch.Data)

src/KeeperData.Core.Reports/Cleanse/Analysis/Command/Impl/CleanseAnalysisEngine.cs

Lines changed: 41 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ public class CleanseAnalysisEngine(ICtsSamQueryService dataService, IIssueComman
1515
private readonly RecordIdGenerator _recordIdGenerator = new();
1616
private readonly ICtsSamQueryService _dataService = dataService;
1717

18+
private const int ThrottleDelayMs = 100;
19+
1820
private async Task ProcessCtsPrimaryRecordInternalAsync(LidFullIdentifier lidFullIdentifier, string operationId, AnalysisMetrics metrics, CancellationToken ct)
1921
{
2022
var samCphHolding = await _dataService.GetSamCphHoldingAsync(lidFullIdentifier.Cph, ct);
@@ -38,13 +40,19 @@ private async Task ProcessCtsPrimaryRecordInternalAsync(LidFullIdentifier lidFul
3840
}
3941

4042
private static void EvaluateCtsSamRules(CtsCphHoldingModel ctsHolding, SamCphHoldingModel samCphHolding, List<RuleResult> results)
43+
{
44+
EvaluateEmailRules(ctsHolding, samCphHolding, results);
45+
EvaluatePhoneRules(ctsHolding, samCphHolding, results);
46+
EvaluateCattleUnitRule(ctsHolding, samCphHolding, results);
47+
EvaluateLocationConsistencyRule(ctsHolding, samCphHolding, results);
48+
}
49+
50+
private static void EvaluateEmailRules(CtsCphHoldingModel ctsHolding, SamCphHoldingModel samCphHolding, List<RuleResult> results)
4151
{
4252
var ctsEmails = ctsHolding.GetEmailAddresses();
4353
var samEmails = samCphHolding.GetEmailAddresses();
44-
var ctsPhones = ctsHolding.GetPhoneNumbers();
45-
var samPhones = samCphHolding.GetPhoneNumbers();
4654

47-
// PRIORITY 2: RULE 4: CPH present in both CTS and SAM but no email addresses in either system
55+
// PRIORITY 2: RULE 4 - CPH present in both CTS and SAM but no email addresses in either system
4856
if (ctsEmails.Length + samEmails.Length == 0)
4957
{
5058
results.Add(RuleResult.Issue(RuleDescriptors.CtsSamNoEmailAddresses, ctsHolding.Id, samCphHolding.Cph));
@@ -53,21 +61,28 @@ private static void EvaluateCtsSamRules(CtsCphHoldingModel ctsHolding, SamCphHol
5361
var missingEmails = ctsEmails.Except(samEmails).ToArray();
5462
if (missingEmails.Length > 0)
5563
{
56-
if (samEmails.Length == 0) // PRIORITY 3: RULE 12 - Email addresses in CTS missing from SAM
64+
if (samEmails.Length == 0) // PRIORITY 3: RULE 12 - Email addresses in CTS missing from SAM
5765
{
58-
results.Add(RuleResult.Issue(RuleDescriptors.SamMissingEmailAddresses, ctsHolding.Id, samCphHolding.Cph, x => x.EmailCTS = missingEmails)); }
66+
results.Add(RuleResult.Issue(RuleDescriptors.SamMissingEmailAddresses, ctsHolding.Id, samCphHolding.Cph,
67+
x => x.EmailCTS = missingEmails));
68+
}
5969
}
60-
else // PRIORITY 7: RULE 6 - Email addresses inconsistent between CTS and SAM
70+
else // PRIORITY 7: RULE 6 - Email addresses inconsistent between CTS and SAM
6171
{
6272
results.Add(RuleResult.Issue(RuleDescriptors.CtsSamEmailAddressesInconsistent, ctsHolding.Id, samCphHolding.Cph, x =>
6373
{
6474
x.EmailCTS = missingEmails;
65-
x.EmailSAM = string.Join(";",samEmails);
75+
x.EmailSAM = string.Join("; ", samEmails);
6676
}));
6777
}
68-
78+
}
79+
80+
private static void EvaluatePhoneRules(CtsCphHoldingModel ctsHolding, SamCphHoldingModel samCphHolding, List<RuleResult> results)
81+
{
82+
var ctsPhones = ctsHolding.GetPhoneNumbers();
83+
var samPhones = samCphHolding.GetPhoneNumbers();
6984

70-
// PRIORITY 4: RULE 5 - CPH present in both CTS and SAM but no phone numbers in either system
85+
// PRIORITY 4: RULE 5 - CPH present in both CTS and SAM but no phone numbers in either system
7186
if (ctsPhones.Length + samPhones.Length == 0)
7287
{
7388
results.Add(RuleResult.Issue(RuleDescriptors.CtsSamNoPhoneNumbers, ctsHolding.Id, samCphHolding.Cph));
@@ -76,30 +91,37 @@ private static void EvaluateCtsSamRules(CtsCphHoldingModel ctsHolding, SamCphHol
7691
var missingPhones = ctsPhones.Except(samPhones).ToArray();
7792
if (missingPhones.Length > 0)
7893
{
79-
if (samPhones.Length == 0) // // PRIORITY 5: RULE 11 - CTS phone numbers missing from SAM
94+
if (samPhones.Length == 0) // PRIORITY 5: RULE 11 - CTS phone numbers missing from SAM
8095
{
81-
results.Add(RuleResult.Issue(RuleDescriptors.SamMissingPhoneNumbers, ctsHolding.Id, samCphHolding.Cph, x => x.TelCTS = missingPhones));
96+
results.Add(RuleResult.Issue(RuleDescriptors.SamMissingPhoneNumbers, ctsHolding.Id, samCphHolding.Cph,
97+
x => x.TelCTS = missingPhones));
8298
}
8399
else
84100
{
85-
// PRIORITY 8: RULE 7 - Phone numbers inconsistent between CTS and SAM
101+
// PRIORITY 8: RULE 7 - Phone numbers inconsistent between CTS and SAM
86102
results.Add(RuleResult.Issue(RuleDescriptors.CtsSamPhoneNosInconsistent, ctsHolding.Id, samCphHolding.Cph, x =>
87103
{
88104
x.TelCTS = missingPhones;
89-
x.TelSAM = string.Join(";", samPhones);
105+
x.TelSAM = string.Join("; ", samPhones);
90106
}));
91107
}
92108
}
109+
}
93110

94-
// PRIORITY 6: RULE 1 - No cattle unit defined in SAM
111+
private static void EvaluateCattleUnitRule(CtsCphHoldingModel ctsHolding, SamCphHoldingModel samCphHolding, List<RuleResult> results)
112+
{
113+
// PRIORITY 6: RULE 1 - No cattle unit defined in SAM
95114
var asc = samCphHolding.Holding[DataFields.SamCphHoldingFields.AnimalSpeciesCode]?.ToString();
96115
if (asc != "CTT")
97116
{
98-
results.Add(RuleResult.Issue(RuleDescriptors.SamNoCattleUnit, ctsHolding.Id, samCphHolding.Cph, x => x.AnimalSpeciesCode = asc));
117+
results.Add(RuleResult.Issue(RuleDescriptors.SamNoCattleUnit, ctsHolding.Id, samCphHolding.Cph,
118+
x => x.AnimalSpeciesCode = asc));
99119
}
120+
}
100121

101-
// PRIORITY 10: RULE 3 - Cattle-related CPHs in SAM (e.g. those with relevant animal species or purpose codes) that are not present in CTS
102-
// aka: where ANIMAL_SPECIES_CODE=CTT - if SAM.FEATURE_NAME=['Unknown','Not known','Notknown','',null] OR CTS.ADR_NAME != SAM.FEATURE_NAME then raise issue
122+
private static void EvaluateLocationConsistencyRule(CtsCphHoldingModel ctsHolding, SamCphHoldingModel samCphHolding, List<RuleResult> results)
123+
{
124+
// PRIORITY 10: RULE 3 - Cattle-related CPHs in SAM with mismatched or unknown location names
103125
if (samCphHolding.AnimalSpeciesCode == "CTT" && IsLocationMismatch(ctsHolding, samCphHolding))
104126
{
105127
results.Add(RuleResult.Issue(RuleDescriptors.CtsSamLocationsDiffer, ctsHolding.Id, samCphHolding.Cph,
@@ -120,7 +142,6 @@ private static bool IsLocationMismatch(CtsCphHoldingModel ctsHolding, SamCphHold
120142
|| !string.Equals(ctsHolding.LocationName, samCphHolding.LocationName, StringComparison.OrdinalIgnoreCase);
121143
}
122144

123-
124145
private async Task ProcessSamPrimaryRecordInternalAsync(Cph cph, string operationId, AnalysisMetrics metrics, CancellationToken ct)
125146
{
126147
var results = new List<RuleResult>();
@@ -135,8 +156,6 @@ private async Task ProcessSamPrimaryRecordInternalAsync(Cph cph, string operatio
135156
await RecordResultsAsync(cph.Value, cph, operationId, metrics, results, ct);
136157
}
137158

138-
#region Overrides
139-
140159
protected override async Task ProcessCtsPrimaryRecordAsync(string id, string operationId, AnalysisMetrics metrics, CancellationToken ct)
141160
{
142161
var lidFullIdentifier = LidFullIdentifier.TryParse(id);
@@ -157,10 +176,6 @@ protected override async Task ProcessSamPrimaryRecordAsync(string id, string ope
157176
}
158177
}
159178

160-
#endregion
161-
162-
#region Helpers
163-
164179
private async Task RecordResultsAsync(string primaryRecordId, Cph cph, string operationId,
165180
AnalysisMetrics metrics, List<RuleResult> results, CancellationToken ct)
166181
{
@@ -182,6 +197,8 @@ private async Task RecordResultsAsync(string primaryRecordId, Cph cph, string op
182197
{
183198
metrics.IssuesFound++;
184199
}
200+
201+
await Task.Delay(ThrottleDelayMs, ct);
185202
}
186203
}
187204

@@ -193,8 +210,6 @@ protected static bool IsValidCountyCode(LidFullIdentifier lidFullIdentifier)
193210

194211
protected string GenerateThumbprint(string primaryRecordId, string ruleId)
195212
=> _recordIdGenerator.GenerateId($"{primaryRecordId}:{ruleId}");
196-
197-
#endregion
198213
}
199214

200215

0 commit comments

Comments
 (0)