Skip to content
Merged
1 change: 1 addition & 0 deletions application/CohortManager/compose.cohort-distribution.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ services:
- CohortDistributionDataServiceURL=http://cohort-distribution-data-service:7992/api/CohortDistributionDataService/
- BsSelectRequestAuditDataService=http://bs-request-audit-data-service:7989/api/BsSelectRequestAuditDataService/
- AcceptableLatencyThresholdMs=500
- RetrieveSupersededRecordsLast=false

retrieve-cohort-request-audit:
container_name: retrieve-cohort-request-audit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
.ConfigureFunctionsWorkerDefaults()
.ConfigureServices(services =>
{
services.AddTransient<IExtractCohortDistributionRecordsStrategy, ExtractCohortDistributionRecords>();
services.AddTransient<ICreateCohortDistributionData, CreateCohortDistributionData>();
services.AddSingleton<ICreateResponse, CreateResponse>();
services.AddSingleton<IDatabaseHelper, DatabaseHelper>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ public async Task<HttpResponseData> Run([HttpTrigger(AuthorizationLevel.Anonymou
List<CohortDistributionParticipantDto> cohortDistributionParticipants;
try
{

int rowCount = _config.MaxRowCount;
bool retrieveSupersededRecordsLast = _config.RetrieveSupersededRecordsLast;
if (!string.IsNullOrEmpty(req.Query["rowCount"]) && int.TryParse(req.Query["rowCount"], out int rowCountParam))
{
rowCount = Math.Min(rowCount, rowCountParam);
Expand All @@ -67,7 +67,7 @@ public async Task<HttpResponseData> Run([HttpTrigger(AuthorizationLevel.Anonymou
if (string.IsNullOrEmpty(req.Query["requestId"]))
{
cohortDistributionParticipants = await _createCohortDistributionData
.GetUnextractedCohortDistributionParticipants(rowCount);
.GetUnextractedCohortDistributionParticipants(rowCount, retrieveSupersededRecordsLast);

return CreateResponse(cohortDistributionParticipants, req);
}
Expand All @@ -87,7 +87,7 @@ public async Task<HttpResponseData> Run([HttpTrigger(AuthorizationLevel.Anonymou
return CreateResponse(cohortDistributionParticipants, req);
}

cohortDistributionParticipants = await _createCohortDistributionData.GetUnextractedCohortDistributionParticipants(rowCount);
cohortDistributionParticipants = await _createCohortDistributionData.GetUnextractedCohortDistributionParticipants(rowCount, retrieveSupersededRecordsLast);

return CreateResponse(cohortDistributionParticipants, req);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,6 @@ public class RetrieveCohortDistributionConfig
[Required]
public string BsSelectRequestAuditDataService { get; set; }
public int MaxRowCount { get; set; } = 1_000;
public bool RetrieveSupersededRecordsLast { get; set; } = false;

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ namespace Common.Interfaces;

public interface ICreateCohortDistributionData
{
Task<List<CohortDistributionParticipantDto>> GetUnextractedCohortDistributionParticipants(int rowCount);
Task<List<CohortDistributionParticipantDto>> GetUnextractedCohortDistributionParticipants(int rowCount, bool retrieveSupersededRecordsLast);
Task<List<CohortDistributionParticipantDto>> GetCohortDistributionParticipantsByRequestId(Guid requestId);
Task<List<CohortRequestAudit>> GetCohortRequestAudit(string? requestId, string? statusCode, DateTime? dateFrom);
Task<CohortRequestAudit> GetNextCohortRequestAudit(Guid requestId);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
namespace Common.Interfaces;

using Model;

/// <summary>
/// Strategy interface for extracting cohort distribution participants.
/// </summary>
public interface IExtractCohortDistributionRecordsStrategy
{
/// <summary>
/// Gets unextracted cohort distribution participants using the strategy's specific logic.
/// </summary>
/// <param name="rowCount">Maximum number of participants to extract</param>
/// <param name="retrieveSupersededRecordsLast">Flag to determine if superseded records should be retrieved last</param>
/// <returns>List of cohort distribution entities to be extracted</returns>
Task<List<CohortDistribution>> GetUnextractedParticipants(int rowCount, bool retrieveSupersededRecordsLast);
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,35 @@ namespace Data.Database;
public class CreateCohortDistributionData : ICreateCohortDistributionData
{
private readonly IDataServiceClient<CohortDistribution> _cohortDistributionDataServiceClient;

private readonly IDataServiceClient<BsSelectRequestAudit> _bsSelectRequestAuditDataServiceClient;
private readonly IExtractCohortDistributionRecordsStrategy? _extractionStrategy;

public CreateCohortDistributionData(IDataServiceClient<CohortDistribution> cohortDistributionDataServiceClient, IDataServiceClient<BsSelectRequestAudit> bsSelectRequestAuditDataServiceClient)
public CreateCohortDistributionData(
IDataServiceClient<CohortDistribution> cohortDistributionDataServiceClient,
IDataServiceClient<BsSelectRequestAudit> bsSelectRequestAuditDataServiceClient,
IExtractCohortDistributionRecordsStrategy? extractionStrategy = null)
{
_cohortDistributionDataServiceClient = cohortDistributionDataServiceClient;
_bsSelectRequestAuditDataServiceClient = bsSelectRequestAuditDataServiceClient;
_extractionStrategy = extractionStrategy;

}


public async Task<List<CohortDistributionParticipantDto>> GetUnextractedCohortDistributionParticipants(int rowCount)
public async Task<List<CohortDistributionParticipantDto>> GetUnextractedCohortDistributionParticipants(int rowCount, bool retrieveSupersededRecordsLast)
{
var participantsList = await _cohortDistributionDataServiceClient.GetByFilter(x => x.IsExtracted.Equals(0) && x.RequestId == Guid.Empty);
List<CohortDistribution> participantsToBeExtracted;
// Use new extraction logic if environment variable is set and strategy is injected
if (retrieveSupersededRecordsLast && _extractionStrategy != null)
{
participantsToBeExtracted = await _extractionStrategy.GetUnextractedParticipants(rowCount, retrieveSupersededRecordsLast);
}
else
{
var participantsList = await _cohortDistributionDataServiceClient.GetByFilter(x => x.IsExtracted.Equals(0) && x.RequestId == Guid.Empty);
participantsToBeExtracted = participantsList.OrderBy(x => x.RecordUpdateDateTime ?? x.RecordInsertDateTime).Take(rowCount).ToList();
}

var participantsToBeExtracted = participantsList.OrderBy(x => x.RecordUpdateDateTime ?? x.RecordInsertDateTime).Take(rowCount).ToList();
//TODO do this filtering on the data services
var CohortDistributionParticipantList = participantsToBeExtracted.Select(x => new CohortDistributionParticipant(x)).ToList();

Expand All @@ -49,7 +63,6 @@ public async Task<List<CohortDistributionParticipantDto>> GetUnextractedCohortDi

public async Task<List<CohortDistributionParticipantDto>> GetCohortDistributionParticipantsByRequestId(Guid requestId)
{
var requestIdString = requestId.ToString();
if (requestId == Guid.Empty)
{
CohortDistributionParticipantDto(new List<CohortDistributionParticipant>());
Expand Down Expand Up @@ -141,16 +154,14 @@ private async Task<bool> MarkCohortDistributionParticipantsAsExtracted(List<Coho

var extractedParticipants = cohortParticipants.Select(x => x.CohortDistributionId);



foreach (var participantId in extractedParticipants)
{

var participant = await _cohortDistributionDataServiceClient.GetSingle(participantId.ToString());
participant.IsExtracted = 1;
participant.RequestId = requestId;

var updatedRecord = await _cohortDistributionDataServiceClient.Update(participant);

if (!updatedRecord)
{
return false;
Expand Down Expand Up @@ -205,8 +216,6 @@ private List<CohortRequestAudit> BuildCohortRequestAudits(IEnumerable<BsSelectRe
}).OrderBy(x => x.CreatedDateTime).ToList();
}



private static Expression<Func<BsSelectRequestAudit, bool>> BuildCohortRequestAuditQuery(string? requestId, string? statusCode, DateTime? dateFrom)
{
var conditions = new List<Expression<Func<BsSelectRequestAudit, bool>>>();
Expand All @@ -231,7 +240,6 @@ private static Expression<Func<BsSelectRequestAudit, bool>> BuildCohortRequestAu
conditions.Add(predicate);
}


Expression<Func<BsSelectRequestAudit, bool>> finalPredicate;
if (conditions.Count > 0)
{
Expand All @@ -253,7 +261,6 @@ private static Expression<Func<T, bool>> CombineWithAnd<T>(List<Expression<Func<
return firstExpression;
}


var body = expressions
.Skip(1)
.Aggregate(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
namespace Data.Database;

using Common.Interfaces;
using DataServices.Client;
using Model;

/// <summary>
/// Extract Cohort Distribution Records without superseded nhs by nhs number first, if none found, get records with superseded by nhs number
/// </summary>
public class ExtractCohortDistributionRecords : IExtractCohortDistributionRecordsStrategy
{
private readonly IDataServiceClient<CohortDistribution> _cohortDistributionDataServiceClient;

public async Task<List<CohortDistribution>> GetUnextractedParticipants(int rowCount, bool retrieveSupersededRecordsLast)
{
// Try unextracted participants without superseded by nhs number first, if none found, get records with superseded by nhs number
return await GetRegularUnextractedParticipants(rowCount)
?? await GetSupersededParticipants(rowCount);
}

public ExtractCohortDistributionRecords(IDataServiceClient<CohortDistribution> cohortDistributionDataServiceClient)
{
_cohortDistributionDataServiceClient = cohortDistributionDataServiceClient;
}

private async Task<List<CohortDistribution>?> GetRegularUnextractedParticipants(int rowCount)
{
var unextractedParticipants = await _cohortDistributionDataServiceClient.GetByFilter(
x => x.IsExtracted.Equals(0) && x.RequestId == Guid.Empty && x.SupersededNHSNumber == null);

return unextractedParticipants.Any()
? OrderAndTakeParticipants(unextractedParticipants, rowCount)
: null;
}

private async Task<List<CohortDistribution>> GetSupersededParticipants(int rowCount)
{
var supersededParticipants = await _cohortDistributionDataServiceClient.GetByFilter(
x => x.IsExtracted.Equals(0) && x.RequestId == Guid.Empty && x.SupersededNHSNumber != null);

// Get distinct non-null superseded NHS numbers
var supersededNhsNumbers = supersededParticipants
.Select(sp => sp.SupersededNHSNumber.Value)

Check warning on line 43 in application/CohortManager/src/Functions/Shared/Data/Database/ExtractCohortDistributionRecords.cs

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Nullable value type may be null.

See more on https://sonarcloud.io/project/issues?id=NHSDigital_dtos-cohort-manager&issues=AZrg2d1RfMa30m0oLRV6&open=AZrg2d1RfMa30m0oLRV6&pullRequest=1762
.Distinct()
.ToList();

// Find matching extracted participants
var matchingParticipants = new List<CohortDistribution>();
foreach (var nhsNumber in supersededNhsNumbers)
{
var matches = await _cohortDistributionDataServiceClient.GetByFilter(
x => x.NHSNumber == nhsNumber && x.IsExtracted.Equals(1));
matchingParticipants.AddRange(matches);
}

// Filter superseded participants that have matching records
var filteredParticipants = supersededParticipants
.Where(sp => matchingParticipants.Any(mp => mp.NHSNumber == sp.SupersededNHSNumber))
.ToList();

return OrderAndTakeParticipants(filteredParticipants, rowCount);
}

private static List<CohortDistribution> OrderAndTakeParticipants(IEnumerable<CohortDistribution> participants, int rowCount)
{
return participants
.OrderBy(x => (x.RecordUpdateDateTime ?? x.RecordInsertDateTime) ?? DateTime.MinValue)
.Take(rowCount)
.ToList();
}
}
1 change: 1 addition & 0 deletions infrastructure/tf-core/environments/development.tfvars
Original file line number Diff line number Diff line change
Expand Up @@ -671,6 +671,7 @@ function_apps = {
]
env_vars_static = {
AcceptableLatencyThresholdMs = "500"
RetrieveSupersededRecordsLast = "false"
}
}

Expand Down
1 change: 1 addition & 0 deletions infrastructure/tf-core/environments/integration.tfvars
Original file line number Diff line number Diff line change
Expand Up @@ -672,6 +672,7 @@ function_apps = {
]
env_vars_static = {
AcceptableLatencyThresholdMs = "500"
RetrieveSupersededRecordsLast = "false"
}
}

Expand Down
1 change: 1 addition & 0 deletions infrastructure/tf-core/environments/nft.tfvars
Original file line number Diff line number Diff line change
Expand Up @@ -671,6 +671,7 @@ function_apps = {
]
env_vars_static = {
AcceptableLatencyThresholdMs = "500"
RetrieveSupersededRecordsLast = "false"
}
}

Expand Down
1 change: 1 addition & 0 deletions infrastructure/tf-core/environments/preprod.tfvars
Original file line number Diff line number Diff line change
Expand Up @@ -663,6 +663,7 @@ function_apps = {
]
env_vars_static = {
AcceptableLatencyThresholdMs = "500"
RetrieveSupersededRecordsLast = "false"
}
}

Expand Down
1 change: 1 addition & 0 deletions infrastructure/tf-core/environments/production.tfvars
Original file line number Diff line number Diff line change
Expand Up @@ -698,6 +698,7 @@ function_apps = {
]
env_vars_static = {
AcceptableLatencyThresholdMs = "500"
RetrieveSupersededRecordsLast = "false"
}
}

Expand Down
1 change: 1 addition & 0 deletions infrastructure/tf-core/environments/sandbox.tfvars
Original file line number Diff line number Diff line change
Expand Up @@ -671,6 +671,7 @@ function_apps = {
]
env_vars_static = {
AcceptableLatencyThresholdMs = "500"
RetrieveSupersededRecordsLast = "false"
}
}

Expand Down
Loading
Loading