Skip to content

Commit aa366fb

Browse files
authored
fix(referrals): Reprocess pending usages when program completion requirements are reduced (#1737)
* fix(referrals): Reprocess pending usages when program completion requirements are reduced * Add batched and throttled parallel processing for usage progress and expiration
1 parent 1e3fcc5 commit aa366fb

File tree

5 files changed

+103
-14
lines changed

5 files changed

+103
-14
lines changed

src/api/src/domain/Yoma.Core.Domain/Referral/Enumerations.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,7 @@ public enum ReferralBlockReason
164164

165165
public enum ReferralTriggerSource
166166
{
167+
ProgramUpdated,
167168
IdentityAction,
168169
OpportunityCompletion
169170
}

src/api/src/domain/Yoma.Core.Domain/Referral/Events/ReferralProgressTriggerEventHandler.cs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,13 @@ public async Task Handle(ReferralProgressTriggerEvent notification, Cancellation
2828
{
2929
switch (notification.Entity.Source)
3030
{
31+
case ReferralTriggerSource.ProgramUpdated:
3132
case ReferralTriggerSource.IdentityAction:
32-
if (_logger.IsEnabled(LogLevel.Information)) _logger.LogInformation("Referral progress: handling identity trigger for user {UserId} ({Username})",
33-
notification.Entity.UserId,
34-
notification.Entity.Username);
33+
if (_logger.IsEnabled(LogLevel.Information))
34+
_logger.LogInformation("Referral progress: handling {Source} trigger for user {UserId} ({Username})",
35+
notification.Entity.Source,
36+
notification.Entity.UserId,
37+
notification.Entity.Username);
3538
break;
3639

3740
case ReferralTriggerSource.OpportunityCompletion:
@@ -42,7 +45,8 @@ public async Task Handle(ReferralProgressTriggerEvent notification, Cancellation
4245
throw new InvalidOperationException("OpportunityTitle must be provided for OpportunityCompletion source");
4346

4447
if (_logger.IsEnabled(LogLevel.Information)) _logger.LogInformation(
45-
"Referral progress: handling Opportunity trigger for user {UserId} ({Username}) — opportunity {OpportunityId} '{OpportunityTitle}'",
48+
"Referral progress: handling {Source} trigger for user {UserId} ({Username}) — opportunity {OpportunityId} '{OpportunityTitle}'",
49+
notification.Entity.Source,
4650
notification.Entity.UserId,
4751
notification.Entity.Username,
4852
notification.Entity.OpportunityId,
@@ -59,7 +63,7 @@ public async Task Handle(ReferralProgressTriggerEvent notification, Cancellation
5963
{
6064
if (_logger.IsEnabled(LogLevel.Error)) _logger.LogError(ex,
6165
"Referral progress: failed processing {Source} trigger for user {UserId} ({Username}) — error: {ErrorMessage}",
62-
notification.Entity.Source.ToString(),
66+
notification.Entity.Source,
6367
notification.Entity.UserId,
6468
notification.Entity.Username,
6569
ex.Message);

src/api/src/domain/Yoma.Core.Domain/Referral/Interfaces/ILinkMaintenanceService.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,5 +18,7 @@ public interface ILinkMaintenanceService
1818
Task CancelByProgramId(List<Guid> programIds, ILogger? logger = null);
1919

2020
Task ExpireByProgramId(List<Guid> programIds, ILogger? logger = null);
21+
22+
Task ProcessUsageProgressByProgramId(Guid programId, ILogger? logger = null);
2123
}
2224
}

src/api/src/domain/Yoma.Core.Domain/Referral/Services/LinkMaintenanceService.cs

Lines changed: 83 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
1+
using MediatR;
12
using Microsoft.Extensions.Logging;
23
using Yoma.Core.Domain.Core.Exceptions;
34
using Yoma.Core.Domain.Core.Helpers;
45
using Yoma.Core.Domain.Core.Interfaces;
6+
using Yoma.Core.Domain.Referral.Events;
57
using Yoma.Core.Domain.Referral.Interfaces;
68
using Yoma.Core.Domain.Referral.Interfaces.Lookups;
79
using Yoma.Core.Domain.Referral.Models;
@@ -14,17 +16,22 @@ public class LinkMaintenanceService : ILinkMaintenanceService
1416
private readonly ILinkStatusService _linkStatusService;
1517
private readonly ILinkUsageStatusService _linkUsageStatusService;
1618

19+
private readonly IMediator _mediator;
1720
private readonly IExecutionStrategyService _executionStrategyService;
1821

1922
private readonly IRepositoryBatchedValueContainsWithNavigation<ReferralLink> _linkRepository;
2023
private readonly IRepositoryBatched<ReferralLinkUsage> _linkUsageRepository;
24+
25+
private const int Processing_BatchSize = 1000;
26+
private const int Processing_Parallelism = 10;
2127
#endregion
2228

2329
#region Constructor
2430
public LinkMaintenanceService(
2531
ILinkStatusService linkStatusService,
2632
ILinkUsageStatusService linkUsageStatusService,
2733

34+
IMediator mediator,
2835
IExecutionStrategyService executionStrategyService,
2936

3037
IRepositoryBatchedValueContainsWithNavigation<ReferralLink> linkRepository,
@@ -33,6 +40,7 @@ public LinkMaintenanceService(
3340
_linkStatusService = linkStatusService ?? throw new ArgumentNullException(nameof(linkStatusService));
3441
_linkUsageStatusService = linkUsageStatusService ?? throw new ArgumentNullException(nameof(linkUsageStatusService));
3542

43+
_mediator = mediator ?? throw new ArgumentNullException(nameof(mediator));
3644
_executionStrategyService = executionStrategyService ?? throw new ArgumentNullException(nameof(executionStrategyService));
3745

3846
_linkRepository = linkRepository ?? throw new ArgumentNullException(nameof(linkRepository));
@@ -214,6 +222,61 @@ await _executionStrategyService.ExecuteInExecutionStrategyAsync(async () =>
214222

215223
logger.LogInformation("Expired {Total} link(s) across {ProgramCount} program(s)", items.Count, byProgram.Count);
216224
}
225+
226+
/// <summary>
227+
/// Trigger sweep: reprocesses all pending link usages for a program when completion requirements are reduced
228+
/// (e.g. Proof of Personhood or Pathway removed), ensuring users are re-evaluated against updated rules.
229+
/// </summary>
230+
public async Task ProcessUsageProgressByProgramId(Guid programId, ILogger? logger = null)
231+
{
232+
if (programId == Guid.Empty) throw new ArgumentNullException(nameof(programId));
233+
234+
var statusPendingId = _linkUsageStatusService.GetByName(ReferralLinkUsageStatus.Pending.ToString()).Id;
235+
var totalProcessed = 0;
236+
var page = 0;
237+
238+
using var throttler = new SemaphoreSlim(Processing_Parallelism, Processing_Parallelism);
239+
240+
while (true)
241+
{
242+
var items = _linkUsageRepository.Query()
243+
.Where(o => o.ProgramId == programId && o.StatusId == statusPendingId)
244+
.OrderBy(o => o.Id)
245+
.Take(Processing_BatchSize)
246+
.Select(o => new { o.UserId, o.Username, o.UserDisplayName })
247+
.ToList();
248+
249+
if (items.Count == 0) break;
250+
251+
var tasks = items.Select(async item =>
252+
{
253+
await throttler.WaitAsync();
254+
try
255+
{
256+
await _mediator.Publish(new ReferralProgressTriggerEvent(new ReferralProgressTriggerMessage
257+
{
258+
Source = ReferralTriggerSource.ProgramUpdated,
259+
UserId = item.UserId,
260+
Username = item.Username,
261+
UserDisplayName = item.UserDisplayName
262+
}));
263+
}
264+
finally { throttler.Release(); }
265+
});
266+
267+
await Task.WhenAll(tasks);
268+
269+
totalProcessed += items.Count;
270+
page++;
271+
}
272+
273+
if (logger == null || !logger.IsEnabled(LogLevel.Information)) return;
274+
275+
if (totalProcessed == 0)
276+
logger.LogInformation("No pending link usages found for program {ProgramId}", programId);
277+
else
278+
logger.LogInformation("Processed {Count} pending link usage(s) for program {ProgramId}", totalProcessed, programId);
279+
}
217280
#endregion
218281

219282
#region Private Members
@@ -224,25 +287,36 @@ private async Task ExpireLinkUsagesByLinkId(List<Guid> linkIds, ILogger? logger
224287
{
225288
if (linkIds == null || linkIds.Count == 0 || linkIds.Any(o => o == Guid.Empty))
226289
throw new ArgumentNullException(nameof(linkIds));
290+
227291
linkIds = [.. linkIds.Distinct()];
228292

229293
var statusExpiredId = _linkUsageStatusService.GetByName(ReferralLinkUsageStatus.Expired.ToString()).Id;
230294
var statusExpirableIds = LinkUsageBackgroundService.Statuses_Expirable.Select(o => _linkUsageStatusService.GetByName(o.ToString()).Id).ToList();
231295

232-
var items = _linkUsageRepository.Query()
233-
.Where(o => linkIds.Contains(o.LinkId) && statusExpirableIds.Contains(o.StatusId))
234-
.ToList();
296+
var totalProcessed = 0;
235297

236-
if (items.Count == 0)
298+
while (true)
237299
{
238-
if (logger?.IsEnabled(LogLevel.Information) == true) logger.LogInformation("No expirable link usages found for {LinkCount} link(s)", linkIds.Count);
239-
return;
300+
var items = _linkUsageRepository.Query()
301+
.Where(o => linkIds.Contains(o.LinkId) && statusExpirableIds.Contains(o.StatusId))
302+
.OrderBy(o => o.Id)
303+
.Take(Processing_BatchSize)
304+
.ToList();
305+
306+
if (items.Count == 0) break;
307+
308+
items.ForEach(o => { o.StatusId = statusExpiredId; o.Status = ReferralLinkUsageStatus.Expired; });
309+
await _linkUsageRepository.Update(items);
310+
311+
totalProcessed += items.Count;
240312
}
241313

242-
items.ForEach(o => { o.StatusId = statusExpiredId; o.Status = ReferralLinkUsageStatus.Expired; });
243-
await _linkUsageRepository.Update(items);
314+
if (logger == null || !logger.IsEnabled(LogLevel.Information)) return;
244315

245-
if (logger?.IsEnabled(LogLevel.Information) == true) logger.LogInformation("Expired {Count} link usage(s) across {LinkCount} link(s)", items.Count, linkIds.Count);
316+
if (totalProcessed == 0)
317+
logger.LogInformation("No expirable link usages found for {LinkCount} link(s)", linkIds.Count);
318+
else
319+
logger.LogInformation("Expired {Count} link usage(s) across {LinkCount} link(s)", totalProcessed, linkIds.Count);
246320
}
247321
#endregion
248322
}

src/api/src/domain/Yoma.Core.Domain/Referral/Services/ProgramService.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -510,6 +510,9 @@ public async Task<Program> Update(ProgramRequestUpdate request)
510510

511511
var user = _userService.GetByUsername(HttpContextAccessorHelper.GetUsername(_httpContextAccessor, false), false, false);
512512

513+
var pathwayRemoved = result.Pathway != null && request.Pathway == null;
514+
var popRemoved = result.ProofOfPersonhoodRequired && !request.ProofOfPersonhoodRequired;
515+
513516
result.Name = request.Name;
514517
result.Summary = request.Summary;
515518
result.Description = request.Description;
@@ -568,6 +571,11 @@ await _executionStrategyService.ExecuteInExecutionStrategyAsync(async () =>
568571

569572
if (_logger.IsEnabled(LogLevel.Information)) _logger.LogInformation("Program {ProgramId} updated. FinalStatus={Status}", result.Id, result.Status);
570573

574+
// Trigger sweep: reprocess pending usages when completion requirements are reduced (POP / Pathway removed)
575+
if (result.Status == ProgramStatus.Active && (popRemoved || pathwayRemoved))
576+
if (result.Status == ProgramStatus.Active && (popRemoved || pathwayRemoved))
577+
await _linkMaintenanceService.ProcessUsageProgressByProgramId(result.Id, _logger);
578+
571579
return result;
572580
}
573581

0 commit comments

Comments
 (0)