Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions application/CohortManager/compose.cohort-distribution.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ services:
- CohortDistributionDataServiceURL=http://cohort-distribution-data-service:7992/api/CohortDistributionDataService/
- BsSelectRequestAuditDataService=http://bs-request-audit-data-service:7989/api/BsSelectRequestAuditDataService/
- AcceptableLatencyThresholdMs=500
- RetrieveSupersededRecordsLast=false

retrieve-cohort-request-audit:
container_name: retrieve-cohort-request-audit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
.ConfigureFunctionsWorkerDefaults()
.ConfigureServices(services =>
{
services.AddTransient<IExtractCohortDistributionRecordsStrategy, ExtractCohortDistributionRecords>();
services.AddTransient<ICreateCohortDistributionData, CreateCohortDistributionData>();
services.AddSingleton<ICreateResponse, CreateResponse>();
services.AddSingleton<IDatabaseHelper, DatabaseHelper>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ public async Task<HttpResponseData> Run([HttpTrigger(AuthorizationLevel.Anonymou
List<CohortDistributionParticipantDto> cohortDistributionParticipants;
try
{

int rowCount = _config.MaxRowCount;
bool retrieveSupersededRecordsLast = _config.RetrieveSupersededRecordsLast;
if (!string.IsNullOrEmpty(req.Query["rowCount"]) && int.TryParse(req.Query["rowCount"], out int rowCountParam))
{
rowCount = Math.Min(rowCount, rowCountParam);
Expand All @@ -67,7 +67,7 @@ public async Task<HttpResponseData> Run([HttpTrigger(AuthorizationLevel.Anonymou
if (string.IsNullOrEmpty(req.Query["requestId"]))
{
cohortDistributionParticipants = await _createCohortDistributionData
.GetUnextractedCohortDistributionParticipants(rowCount);
.GetUnextractedCohortDistributionParticipants(rowCount, retrieveSupersededRecordsLast);

return CreateResponse(cohortDistributionParticipants, req);
}
Expand All @@ -87,7 +87,7 @@ public async Task<HttpResponseData> Run([HttpTrigger(AuthorizationLevel.Anonymou
return CreateResponse(cohortDistributionParticipants, req);
}

cohortDistributionParticipants = await _createCohortDistributionData.GetUnextractedCohortDistributionParticipants(rowCount);
cohortDistributionParticipants = await _createCohortDistributionData.GetUnextractedCohortDistributionParticipants(rowCount, retrieveSupersededRecordsLast);

return CreateResponse(cohortDistributionParticipants, req);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,6 @@ public class RetrieveCohortDistributionConfig
[Required]
public string BsSelectRequestAuditDataService { get; set; }
public int MaxRowCount { get; set; } = 1_000;
public bool RetrieveSupersededRecordsLast { get; set; } = false;

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ namespace Common.Interfaces;

public interface ICreateCohortDistributionData
{
Task<List<CohortDistributionParticipantDto>> GetUnextractedCohortDistributionParticipants(int rowCount);
Task<List<CohortDistributionParticipantDto>> GetUnextractedCohortDistributionParticipants(int rowCount, bool retrieveSupersededRecordsLast);
Task<List<CohortDistributionParticipantDto>> GetCohortDistributionParticipantsByRequestId(Guid requestId);
Task<List<CohortRequestAudit>> GetCohortRequestAudit(string? requestId, string? statusCode, DateTime? dateFrom);
Task<CohortRequestAudit> GetNextCohortRequestAudit(Guid requestId);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
namespace Common.Interfaces;

using Model;

/// <summary>
/// Strategy interface for extracting cohort distribution participants.
/// </summary>
public interface IExtractCohortDistributionRecordsStrategy
{
/// <summary>
/// Gets unextracted cohort distribution participants using the strategy's specific logic.
/// </summary>
/// <param name="rowCount">Maximum number of participants to extract</param>
/// <param name="retrieveSupersededRecordsLast">Flag to determine if superseded records should be retrieved last</param>
/// <returns>List of cohort distribution entities to be extracted</returns>
Task<List<CohortDistribution>> GetUnextractedParticipants(int rowCount, bool retrieveSupersededRecordsLast);
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,35 @@ namespace Data.Database;
public class CreateCohortDistributionData : ICreateCohortDistributionData
{
private readonly IDataServiceClient<CohortDistribution> _cohortDistributionDataServiceClient;

private readonly IDataServiceClient<BsSelectRequestAudit> _bsSelectRequestAuditDataServiceClient;
private readonly IExtractCohortDistributionRecordsStrategy? _extractionStrategy;

public CreateCohortDistributionData(IDataServiceClient<CohortDistribution> cohortDistributionDataServiceClient, IDataServiceClient<BsSelectRequestAudit> bsSelectRequestAuditDataServiceClient)
public CreateCohortDistributionData(
IDataServiceClient<CohortDistribution> cohortDistributionDataServiceClient,
IDataServiceClient<BsSelectRequestAudit> bsSelectRequestAuditDataServiceClient,
IExtractCohortDistributionRecordsStrategy? extractionStrategy = null)
{
_cohortDistributionDataServiceClient = cohortDistributionDataServiceClient;
_bsSelectRequestAuditDataServiceClient = bsSelectRequestAuditDataServiceClient;
_extractionStrategy = extractionStrategy;

}


public async Task<List<CohortDistributionParticipantDto>> GetUnextractedCohortDistributionParticipants(int rowCount)
public async Task<List<CohortDistributionParticipantDto>> GetUnextractedCohortDistributionParticipants(int rowCount, bool retrieveSupersededRecordsLast)
{
var participantsList = await _cohortDistributionDataServiceClient.GetByFilter(x => x.IsExtracted.Equals(0) && x.RequestId == Guid.Empty);
List<CohortDistribution> participantsToBeExtracted;
// Use new extraction logic if environment variable is set and strategy is injected
if (retrieveSupersededRecordsLast && _extractionStrategy != null)
{
participantsToBeExtracted = await _extractionStrategy.GetUnextractedParticipants(rowCount, retrieveSupersededRecordsLast);
}
else
{
var participantsList = await _cohortDistributionDataServiceClient.GetByFilter(x => x.IsExtracted.Equals(0) && x.RequestId == Guid.Empty);
participantsToBeExtracted = participantsList.OrderBy(x => x.RecordUpdateDateTime ?? x.RecordInsertDateTime).Take(rowCount).ToList();
}

var participantsToBeExtracted = participantsList.OrderBy(x => x.RecordUpdateDateTime ?? x.RecordInsertDateTime).Take(rowCount).ToList();
//TODO do this filtering on the data services
var CohortDistributionParticipantList = participantsToBeExtracted.Select(x => new CohortDistributionParticipant(x)).ToList();

Expand All @@ -49,7 +63,6 @@ public async Task<List<CohortDistributionParticipantDto>> GetUnextractedCohortDi

public async Task<List<CohortDistributionParticipantDto>> GetCohortDistributionParticipantsByRequestId(Guid requestId)
{
var requestIdString = requestId.ToString();
if (requestId == Guid.Empty)
{
CohortDistributionParticipantDto(new List<CohortDistributionParticipant>());
Expand Down Expand Up @@ -141,16 +154,14 @@ private async Task<bool> MarkCohortDistributionParticipantsAsExtracted(List<Coho

var extractedParticipants = cohortParticipants.Select(x => x.CohortDistributionId);



foreach (var participantId in extractedParticipants)
{

var participant = await _cohortDistributionDataServiceClient.GetSingle(participantId.ToString());
participant.IsExtracted = 1;
participant.RequestId = requestId;

var updatedRecord = await _cohortDistributionDataServiceClient.Update(participant);

if (!updatedRecord)
{
return false;
Expand Down Expand Up @@ -205,8 +216,6 @@ private List<CohortRequestAudit> BuildCohortRequestAudits(IEnumerable<BsSelectRe
}).OrderBy(x => x.CreatedDateTime).ToList();
}



private static Expression<Func<BsSelectRequestAudit, bool>> BuildCohortRequestAuditQuery(string? requestId, string? statusCode, DateTime? dateFrom)
{
var conditions = new List<Expression<Func<BsSelectRequestAudit, bool>>>();
Expand All @@ -231,7 +240,6 @@ private static Expression<Func<BsSelectRequestAudit, bool>> BuildCohortRequestAu
conditions.Add(predicate);
}


Expression<Func<BsSelectRequestAudit, bool>> finalPredicate;
if (conditions.Count > 0)
{
Expand All @@ -253,7 +261,6 @@ private static Expression<Func<T, bool>> CombineWithAnd<T>(List<Expression<Func<
return firstExpression;
}


var body = expressions
.Skip(1)
.Aggregate(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
namespace Data.Database;

using Common.Interfaces;
using DataServices.Client;
using Model;

/// <summary>
/// Extract Cohort Distribution Records without superseded nhs by nhs number first, if none found, get records with superseded by nhs number
/// </summary>
public class ExtractCohortDistributionRecords : IExtractCohortDistributionRecordsStrategy
{
private readonly IDataServiceClient<CohortDistribution> _cohortDistributionDataServiceClient;

public async Task<List<CohortDistribution>> GetUnextractedParticipants(int rowCount, bool retrieveSupersededRecordsLast)
{
// Try unextracted participants without superseded by nhs number first, if none found, get records with superseded by nhs number
return await GetRegularUnextractedParticipants(rowCount)
?? await GetSupersededParticipants(rowCount);
}

public ExtractCohortDistributionRecords(IDataServiceClient<CohortDistribution> cohortDistributionDataServiceClient)
{
_cohortDistributionDataServiceClient = cohortDistributionDataServiceClient;
}

private async Task<List<CohortDistribution>?> GetRegularUnextractedParticipants(int rowCount)
{
var unextractedParticipants = await _cohortDistributionDataServiceClient.GetByFilter(
participant => participant.IsExtracted.Equals(0) && participant.RequestId == Guid.Empty && participant.SupersededNHSNumber == null);

return unextractedParticipants.Any()
? OrderAndTakeParticipants(unextractedParticipants, rowCount)
: null;
}

private async Task<List<CohortDistribution>> GetSupersededParticipants(int rowCount)
{
var supersededParticipants = await _cohortDistributionDataServiceClient.GetByFilter(
participant => participant.IsExtracted.Equals(0) && participant.RequestId == Guid.Empty && participant.SupersededNHSNumber != null);

// Get distinct non-null superseded NHS numbers
var supersededNhsNumbers = supersededParticipants
.Select(sp => sp.SupersededNHSNumber.Value)

Check warning on line 43 in application/CohortManager/src/Functions/Shared/Data/Database/ExtractCohortDistributionRecords.cs

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Nullable value type may be null.

See more on https://sonarcloud.io/project/issues?id=NHSDigital_dtos-cohort-manager&issues=AZrg2d1RfMa30m0oLRV6&open=AZrg2d1RfMa30m0oLRV6&pullRequest=1762
.Distinct()
.ToList();

// Find matching extracted participants
var matchingParticipants = new List<CohortDistribution>();
foreach (var nhsNumber in supersededNhsNumbers)
{
var matches = await _cohortDistributionDataServiceClient.GetByFilter(
participant => participant.NHSNumber == nhsNumber && participant.IsExtracted.Equals(1));
matchingParticipants.AddRange(matches);
}

// Filter superseded participants that have matching records
var filteredParticipants = supersededParticipants
.Where(sp => matchingParticipants.Any(mp => mp.NHSNumber == sp.SupersededNHSNumber))
.ToList();

return OrderAndTakeParticipants(filteredParticipants, rowCount);
}

private static List<CohortDistribution> OrderAndTakeParticipants(IEnumerable<CohortDistribution> participants, int rowCount)
{
return participants
.OrderBy(participant => (participant.RecordUpdateDateTime ?? participant.RecordInsertDateTime) ?? DateTime.MinValue)
.Take(rowCount)
.ToList();
}
}
1 change: 1 addition & 0 deletions infrastructure/tf-core/environments/development.tfvars
Original file line number Diff line number Diff line change
Expand Up @@ -671,6 +671,7 @@ function_apps = {
]
env_vars_static = {
AcceptableLatencyThresholdMs = "500"
RetrieveSupersededRecordsLast = "false"
}
}

Expand Down
1 change: 1 addition & 0 deletions infrastructure/tf-core/environments/integration.tfvars
Original file line number Diff line number Diff line change
Expand Up @@ -672,6 +672,7 @@ function_apps = {
]
env_vars_static = {
AcceptableLatencyThresholdMs = "500"
RetrieveSupersededRecordsLast = "false"
}
}

Expand Down
1 change: 1 addition & 0 deletions infrastructure/tf-core/environments/nft.tfvars
Original file line number Diff line number Diff line change
Expand Up @@ -671,6 +671,7 @@ function_apps = {
]
env_vars_static = {
AcceptableLatencyThresholdMs = "500"
RetrieveSupersededRecordsLast = "false"
}
}

Expand Down
1 change: 1 addition & 0 deletions infrastructure/tf-core/environments/preprod.tfvars
Original file line number Diff line number Diff line change
Expand Up @@ -663,6 +663,7 @@ function_apps = {
]
env_vars_static = {
AcceptableLatencyThresholdMs = "500"
RetrieveSupersededRecordsLast = "false"
}
}

Expand Down
1 change: 1 addition & 0 deletions infrastructure/tf-core/environments/production.tfvars
Original file line number Diff line number Diff line change
Expand Up @@ -698,6 +698,7 @@ function_apps = {
]
env_vars_static = {
AcceptableLatencyThresholdMs = "500"
RetrieveSupersededRecordsLast = "false"
}
}

Expand Down
1 change: 1 addition & 0 deletions infrastructure/tf-core/environments/sandbox.tfvars
Original file line number Diff line number Diff line change
Expand Up @@ -671,6 +671,7 @@ function_apps = {
]
env_vars_static = {
AcceptableLatencyThresholdMs = "500"
RetrieveSupersededRecordsLast = "false"
}
}

Expand Down
Loading
Loading