Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
// -------------------------------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information.
// -------------------------------------------------------------------------------------------------

using System;
using System.Collections.Generic;
using Microsoft.Health.Fhir.Core.Features.Metrics;
using Microsoft.Health.Fhir.ValueSets;

namespace Microsoft.Health.Fhir.Core.Features.Operations.Reindex
{
/// <summary>
/// Represents the metrics notification payload for a completed reindex job.
/// </summary>
public class ReindexJobMetricsNotification : IMetricsNotification
{
/// <summary>
/// Initializes a new instance of the <see cref="ReindexJobMetricsNotification"/> class.
/// </summary>
/// <param name="id">The reindex job ID.</param>
/// <param name="status">The final reindex job status.</param>
/// <param name="createTime">The time the reindex job was created.</param>
/// <param name="endTime">The time the reindex job completed.</param>
/// <param name="succeededCount">The number of successfully reindexed resources.</param>
/// <param name="failedCount">The number of resources that failed reindexing.</param>
/// <param name="createdChildJobs">The number of child processing jobs created.</param>
/// <param name="completedChildJobs">The number of child processing jobs completed.</param>
/// <param name="searchParams">The search parameter URIs included in reindexing.</param>
/// <param name="resourceTypes">The resource types included in reindexing.</param>
public ReindexJobMetricsNotification(
string id,
string status,
DateTimeOffset createTime,
DateTimeOffset endTime,
long? succeededCount,
long? failedCount,
int createdChildJobs,
int completedChildJobs,
IReadOnlyCollection<string> searchParams,
IReadOnlyCollection<string> resourceTypes)
{
FhirOperation = AuditEventSubType.Reindex;
ResourceType = null;

Id = id;
Status = status;
CreateTime = createTime;
EndTime = endTime;
SucceededCount = succeededCount;
FailedCount = failedCount;
CreatedChildJobs = createdChildJobs;
CompletedChildJobs = completedChildJobs;
SearchParams = searchParams;
ResourceTypes = resourceTypes;
}

/// <summary>
/// Gets the FHIR operation name associated with this metrics notification.
/// </summary>
public string FhirOperation { get; }

/// <summary>
/// Gets the resource type for this metrics notification.
/// </summary>
public string ResourceType { get; }

/// <summary>
/// Gets the reindex job ID.
/// </summary>
public string Id { get; }

/// <summary>
/// Gets the final reindex job status.
/// </summary>
public string Status { get; }

/// <summary>
/// Gets the time the reindex job was created.
/// </summary>
public DateTimeOffset CreateTime { get; }

/// <summary>
/// Gets the time the reindex job completed.
/// </summary>
public DateTimeOffset EndTime { get; }

/// <summary>
/// Gets the number of successfully reindexed resources.
/// </summary>
public long? SucceededCount { get; }

/// <summary>
/// Gets the number of resources that failed reindexing.
/// </summary>
public long? FailedCount { get; }

/// <summary>
/// Gets the number of child processing jobs created.
/// </summary>
public int CreatedChildJobs { get; }

/// <summary>
/// Gets the number of child processing jobs completed.
/// </summary>
public int CompletedChildJobs { get; }

/// <summary>
/// Gets the search parameter URIs included in reindexing.
/// </summary>
public IReadOnlyCollection<string> SearchParams { get; }

/// <summary>
/// Gets the resource types included in reindexing.
/// </summary>
public IReadOnlyCollection<string> ResourceTypes { get; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
using System.Threading.Tasks;
using EnsureThat;
using Hl7.Fhir.Model;
using MediatR;
using Microsoft.AspNetCore.JsonPatch.Internal;
using Microsoft.Data.SqlClient;
using Microsoft.Extensions.Logging;
Expand Down Expand Up @@ -42,6 +43,7 @@ public sealed class ReindexOrchestratorJob : IJob
private readonly ISearchParameterStatusManager _searchParameterStatusManager;
private readonly IModelInfoProvider _modelInfoProvider;
private readonly ISearchParameterOperations _searchParameterOperations;
private readonly IMediator _mediator;
private readonly bool _isSurrogateIdRangingSupported;
private readonly CoreFeatureConfiguration _coreFeatureConfiguration;
private readonly OperationsConfiguration _operationsConfiguration;
Expand Down Expand Up @@ -91,6 +93,7 @@ public ReindexOrchestratorJob(
ISearchParameterOperations searchParameterOperations,
IFhirRuntimeConfiguration fhirRuntimeConfiguration,
ILoggerFactory loggerFactory,
IMediator mediator,
IOptions<CoreFeatureConfiguration> coreFeatureConfiguration,
IOptions<OperationsConfiguration> operationsConfiguration)
{
Expand All @@ -101,6 +104,7 @@ public ReindexOrchestratorJob(
EnsureArg.IsNotNull(modelInfoProvider, nameof(modelInfoProvider));
EnsureArg.IsNotNull(searchParameterStatusManager, nameof(searchParameterStatusManager));
EnsureArg.IsNotNull(searchParameterOperations, nameof(searchParameterOperations));
EnsureArg.IsNotNull(mediator, nameof(mediator));
EnsureArg.IsNotNull(coreFeatureConfiguration, nameof(coreFeatureConfiguration));
EnsureArg.IsNotNull(coreFeatureConfiguration.Value, nameof(coreFeatureConfiguration.Value));
EnsureArg.IsNotNull(operationsConfiguration, nameof(operationsConfiguration));
Expand All @@ -113,6 +117,7 @@ public ReindexOrchestratorJob(
_modelInfoProvider = modelInfoProvider;
_searchParameterStatusManager = searchParameterStatusManager;
_searchParameterOperations = searchParameterOperations;
_mediator = mediator;
_coreFeatureConfiguration = coreFeatureConfiguration.Value;
_operationsConfiguration = operationsConfiguration.Value;

Expand Down Expand Up @@ -180,6 +185,8 @@ public async Task<string> ExecuteAsync(JobInfo jobInfo, CancellationToken cancel
HandleException(ex);
}

await PublishReindexMetricsNotification();

return JsonConvert.SerializeObject(_currentResult);
}

Expand Down Expand Up @@ -751,6 +758,34 @@ private void HandleException(Exception ex)
_logger.LogJobError(ex, _jobInfo, "ReindexJob Failed and didn't complete.");
}

/// <summary>
/// Publishes a metrics notification with the final reindex job results.
/// Wrapped in try/catch to prevent metrics failures from affecting the job outcome.
/// </summary>
private async Task PublishReindexMetricsNotification()
{
try
{
var notification = new ReindexJobMetricsNotification(
_jobInfo.Id.ToString(),
_reindexJobRecord.Status.ToString(),
_jobInfo.CreateDate,
Clock.UtcNow,
_currentResult.SucceededResources,
_currentResult.FailedResources,
_currentResult.CreatedJobs,
_currentResult.CompletedJobs,
_reindexJobRecord.SearchParams?.ToList() ?? new List<string>(),
_reindexJobRecord.Resources?.ToList() ?? new List<string>());

await _mediator.Publish(notification, CancellationToken.None);
}
catch (Exception ex)
{
_logger.LogJobWarning(ex, _jobInfo, "Failed to publish reindex job metrics notification.");
}
}

private async Task UpdateSearchParameterStatus(List<JobInfo> completedJobs, List<string> readySearchParameters, CancellationToken cancellationToken)
{
// Check if all the resource types which are base types of the search parameter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,15 @@
using System.Threading;
using System.Threading.Tasks;
using EnsureThat;
using MediatR;
using Microsoft.Data.SqlClient;
using Microsoft.Extensions.Logging;
using Microsoft.Health.Abstractions.Exceptions;
using Microsoft.Health.Core;
using Microsoft.Health.Extensions.DependencyInjection;
using Microsoft.Health.Fhir.Core.Exceptions;
using Microsoft.Health.Fhir.Core.Extensions;
using Microsoft.Health.Fhir.Core.Features.Operations;
using Microsoft.Health.Fhir.Core.Features.Operations.Reindex.Models;
using Microsoft.Health.Fhir.Core.Features.Persistence;
using Microsoft.Health.Fhir.Core.Features.Search;
Expand Down Expand Up @@ -62,6 +65,7 @@ public class ReindexProcessingJob : IJob
private readonly IResourceWrapperFactory _resourceWrapperFactory;
private readonly Func<IScoped<IFhirDataStore>> _fhirDataStoreFactory;
private readonly ILogger<ReindexProcessingJob> _logger;
private readonly IMediator _mediator;
private readonly ISearchParameterStatusManager _searchParameterStatusManager;
private readonly ISearchParameterOperations _searchParameterOperations;

Expand All @@ -87,21 +91,24 @@ public ReindexProcessingJob(
IResourceWrapperFactory resourceWrapperFactory,
ISearchParameterOperations searchParameterOperations,
ISearchParameterStatusManager searchParameterStatusManager,
ILogger<ReindexProcessingJob> logger)
ILogger<ReindexProcessingJob> logger,
IMediator mediator)
{
EnsureArg.IsNotNull(searchServiceFactory, nameof(searchServiceFactory));
EnsureArg.IsNotNull(fhirDataStoreFactory, nameof(fhirDataStoreFactory));
EnsureArg.IsNotNull(resourceWrapperFactory, nameof(resourceWrapperFactory));
EnsureArg.IsNotNull(searchParameterOperations, nameof(searchParameterOperations));
EnsureArg.IsNotNull(searchParameterStatusManager, nameof(searchParameterStatusManager));
EnsureArg.IsNotNull(logger, nameof(logger));
EnsureArg.IsNotNull(mediator, nameof(mediator));

_searchServiceFactory = searchServiceFactory;
_fhirDataStoreFactory = fhirDataStoreFactory;
_resourceWrapperFactory = resourceWrapperFactory;
_searchParameterStatusManager = searchParameterStatusManager;
_searchParameterOperations = searchParameterOperations;
_logger = logger;
_mediator = mediator;
}

public async Task<string> ExecuteAsync(JobInfo jobInfo, CancellationToken cancellationToken)
Expand All @@ -120,6 +127,8 @@ public async Task<string> ExecuteAsync(JobInfo jobInfo, CancellationToken cancel

await ProcessQueryAsync(cancellationToken);

await PublishProcessingJobMetricsNotification();

return JsonConvert.SerializeObject(_reindexProcessingJobResult);
}

Expand Down Expand Up @@ -696,6 +705,34 @@ private async Task TryLogEvent(string process, string status, string text, DateT
await store.Value.TryLogEvent(process, status, text, startDate, cancellationToken);
}

/// <summary>
/// Publishes a metrics notification with the processing job results.
/// Wrapped in try/catch to prevent metrics failures from affecting the job outcome.
/// </summary>
private async Task PublishProcessingJobMetricsNotification()
{
try
{
var notification = new ReindexJobMetricsNotification(
_jobInfo.Id.ToString(),
_reindexProcessingJobResult.Error == null ? OperationStatus.Completed.ToString() : OperationStatus.Failed.ToString(),
_jobInfo.CreateDate,
Clock.UtcNow,
_reindexProcessingJobResult.SucceededResourceCount,
_reindexProcessingJobResult.FailedResourceCount,
0,
0,
_reindexProcessingJobDefinition.SearchParameterUrls?.ToList() ?? new List<string>(),
new List<string> { _reindexProcessingJobDefinition.ResourceType });

await _mediator.Publish(notification, CancellationToken.None);
}
catch (Exception ex)
{
_logger.LogJobWarning(ex, _jobInfo, "Failed to publish reindex processing job metrics notification.");
}
}

private static ReindexProcessingJobDefinition DeserializeJobDefinition(JobInfo jobInfo)
{
return JsonConvert.DeserializeObject<ReindexProcessingJobDefinition>(jobInfo.Definition);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
using EnsureThat.Enforcers;
using Hl7.Fhir.Rest;
using Hl7.Fhir.Utility;
using MediatR;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
using Microsoft.Health.Extensions.DependencyInjection;
Expand Down Expand Up @@ -76,9 +77,10 @@ private void Dispose()
_cancellationTokenSource?.Dispose();
}

private ReindexOrchestratorJob CreateReindexOrchestratorJob(IFhirRuntimeConfiguration runtimeConfig = null, int waitMultiplier = 0)
private ReindexOrchestratorJob CreateReindexOrchestratorJob(IFhirRuntimeConfiguration runtimeConfig = null, int waitMultiplier = 0, IMediator mediator = null)
{
runtimeConfig ??= new AzureHealthDataServicesRuntimeConfiguration();
mediator ??= Substitute.For<IMediator>();

var coreFeatureConfig = Substitute.For<IOptions<CoreFeatureConfiguration>>();
coreFeatureConfig.Value.Returns(new CoreFeatureConfiguration());
Expand All @@ -97,6 +99,7 @@ private ReindexOrchestratorJob CreateReindexOrchestratorJob(IFhirRuntimeConfigur
_searchParameterOperations,
runtimeConfig,
NullLoggerFactory.Instance,
mediator,
coreFeatureConfig,
operationsConfig);
}
Expand Down Expand Up @@ -277,6 +280,49 @@ public async Task ExecuteAsync_WhenCancellationRequested_ReturnsJobCancelledResu
Assert.Contains(jobResult.Error, e => e.Diagnostics.Contains("cancelled"));
}

[Fact]
public async Task ExecuteAsync_WhenCompleted_PublishesMetricsNotification()
{
var mediator = Substitute.For<IMediator>();
_searchParameterStatusManager.GetAllSearchParameterStatus(Arg.Any<CancellationToken>())
.Returns(new List<ResourceSearchParameterStatus>());

var orchestrator = CreateReindexOrchestratorJob(mediator: mediator);
var jobInfo = await CreateReindexJobRecord();

await orchestrator.ExecuteAsync(jobInfo, _cancellationToken);

_ = mediator.Received(1).Publish(
Arg.Is<ReindexJobMetricsNotification>(
notification => notification.Id == jobInfo.Id.ToString() &&
notification.FhirOperation == "reindex" &&
notification.SucceededCount == 0 &&
notification.FailedCount == 0),
Arg.Any<CancellationToken>());
}

[Fact]
public async Task ExecuteAsync_WhenCancelled_StillPublishesMetricsNotification()
{
var mediator = Substitute.For<IMediator>();
var cts = new CancellationTokenSource();
cts.Cancel();

_searchParameterStatusManager.GetAllSearchParameterStatus(Arg.Any<CancellationToken>())
.Returns(new List<ResourceSearchParameterStatus>());

var orchestrator = CreateReindexOrchestratorJob(mediator: mediator);
var jobInfo = await CreateReindexJobRecord();

await orchestrator.ExecuteAsync(jobInfo, cts.Token);

_ = mediator.Received(1).Publish(
Arg.Is<ReindexJobMetricsNotification>(
notification => notification.Id == jobInfo.Id.ToString() &&
notification.FhirOperation == "reindex"),
Arg.Any<CancellationToken>());
}

[Fact]
public async Task ExecuteAsync_WithExceptionInProcessing_ReturnsErrorResult()
{
Expand Down
Loading
Loading