diff --git a/src/Microsoft.Health.Fhir.Core/Features/Operations/Reindex/ReindexJobMetricsNotification.cs b/src/Microsoft.Health.Fhir.Core/Features/Operations/Reindex/ReindexJobMetricsNotification.cs new file mode 100644 index 0000000000..097935006e --- /dev/null +++ b/src/Microsoft.Health.Fhir.Core/Features/Operations/Reindex/ReindexJobMetricsNotification.cs @@ -0,0 +1,118 @@ +// ------------------------------------------------------------------------------------------------- +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information. +// ------------------------------------------------------------------------------------------------- + +using System; +using System.Collections.Generic; +using Microsoft.Health.Fhir.Core.Features.Metrics; +using Microsoft.Health.Fhir.ValueSets; + +namespace Microsoft.Health.Fhir.Core.Features.Operations.Reindex +{ + /// + /// Represents the metrics notification payload for a completed reindex job. + /// + public class ReindexJobMetricsNotification : IMetricsNotification + { + /// + /// Initializes a new instance of the class. + /// + /// The reindex job ID. + /// The final reindex job status. + /// The time the reindex job was created. + /// The time the reindex job completed. + /// The number of successfully reindexed resources. + /// The number of resources that failed reindexing. + /// The number of child processing jobs created. + /// The number of child processing jobs completed. + /// The search parameter URIs included in reindexing. + /// The resource types included in reindexing. + public ReindexJobMetricsNotification( + string id, + string status, + DateTimeOffset createTime, + DateTimeOffset endTime, + long? succeededCount, + long? failedCount, + int createdChildJobs, + int completedChildJobs, + IReadOnlyCollection searchParams, + IReadOnlyCollection resourceTypes) + { + FhirOperation = AuditEventSubType.Reindex; + ResourceType = null; + + Id = id; + Status = status; + CreateTime = createTime; + EndTime = endTime; + SucceededCount = succeededCount; + FailedCount = failedCount; + CreatedChildJobs = createdChildJobs; + CompletedChildJobs = completedChildJobs; + SearchParams = searchParams; + ResourceTypes = resourceTypes; + } + + /// + /// Gets the FHIR operation name associated with this metrics notification. + /// + public string FhirOperation { get; } + + /// + /// Gets the resource type for this metrics notification. + /// + public string ResourceType { get; } + + /// + /// Gets the reindex job ID. + /// + public string Id { get; } + + /// + /// Gets the final reindex job status. + /// + public string Status { get; } + + /// + /// Gets the time the reindex job was created. + /// + public DateTimeOffset CreateTime { get; } + + /// + /// Gets the time the reindex job completed. + /// + public DateTimeOffset EndTime { get; } + + /// + /// Gets the number of successfully reindexed resources. + /// + public long? SucceededCount { get; } + + /// + /// Gets the number of resources that failed reindexing. + /// + public long? FailedCount { get; } + + /// + /// Gets the number of child processing jobs created. + /// + public int CreatedChildJobs { get; } + + /// + /// Gets the number of child processing jobs completed. + /// + public int CompletedChildJobs { get; } + + /// + /// Gets the search parameter URIs included in reindexing. + /// + public IReadOnlyCollection SearchParams { get; } + + /// + /// Gets the resource types included in reindexing. + /// + public IReadOnlyCollection ResourceTypes { get; } + } +} diff --git a/src/Microsoft.Health.Fhir.Core/Features/Operations/Reindex/ReindexOrchestratorJob.cs b/src/Microsoft.Health.Fhir.Core/Features/Operations/Reindex/ReindexOrchestratorJob.cs index eb80deebdf..c2425b5456 100644 --- a/src/Microsoft.Health.Fhir.Core/Features/Operations/Reindex/ReindexOrchestratorJob.cs +++ b/src/Microsoft.Health.Fhir.Core/Features/Operations/Reindex/ReindexOrchestratorJob.cs @@ -12,6 +12,7 @@ using System.Threading.Tasks; using EnsureThat; using Hl7.Fhir.Model; +using MediatR; using Microsoft.AspNetCore.JsonPatch.Internal; using Microsoft.Data.SqlClient; using Microsoft.Extensions.Logging; @@ -42,6 +43,7 @@ public sealed class ReindexOrchestratorJob : IJob private readonly ISearchParameterStatusManager _searchParameterStatusManager; private readonly IModelInfoProvider _modelInfoProvider; private readonly ISearchParameterOperations _searchParameterOperations; + private readonly IMediator _mediator; private readonly bool _isSurrogateIdRangingSupported; private readonly CoreFeatureConfiguration _coreFeatureConfiguration; private readonly OperationsConfiguration _operationsConfiguration; @@ -91,6 +93,7 @@ public ReindexOrchestratorJob( ISearchParameterOperations searchParameterOperations, IFhirRuntimeConfiguration fhirRuntimeConfiguration, ILoggerFactory loggerFactory, + IMediator mediator, IOptions coreFeatureConfiguration, IOptions operationsConfiguration) { @@ -101,6 +104,7 @@ public ReindexOrchestratorJob( EnsureArg.IsNotNull(modelInfoProvider, nameof(modelInfoProvider)); EnsureArg.IsNotNull(searchParameterStatusManager, nameof(searchParameterStatusManager)); EnsureArg.IsNotNull(searchParameterOperations, nameof(searchParameterOperations)); + EnsureArg.IsNotNull(mediator, nameof(mediator)); EnsureArg.IsNotNull(coreFeatureConfiguration, nameof(coreFeatureConfiguration)); EnsureArg.IsNotNull(coreFeatureConfiguration.Value, nameof(coreFeatureConfiguration.Value)); EnsureArg.IsNotNull(operationsConfiguration, nameof(operationsConfiguration)); @@ -113,6 +117,7 @@ public ReindexOrchestratorJob( _modelInfoProvider = modelInfoProvider; _searchParameterStatusManager = searchParameterStatusManager; _searchParameterOperations = searchParameterOperations; + _mediator = mediator; _coreFeatureConfiguration = coreFeatureConfiguration.Value; _operationsConfiguration = operationsConfiguration.Value; @@ -180,6 +185,8 @@ public async Task ExecuteAsync(JobInfo jobInfo, CancellationToken cancel HandleException(ex); } + await PublishReindexMetricsNotification(); + return JsonConvert.SerializeObject(_currentResult); } @@ -751,6 +758,34 @@ private void HandleException(Exception ex) _logger.LogJobError(ex, _jobInfo, "ReindexJob Failed and didn't complete."); } + /// + /// Publishes a metrics notification with the final reindex job results. + /// Wrapped in try/catch to prevent metrics failures from affecting the job outcome. + /// + private async Task PublishReindexMetricsNotification() + { + try + { + var notification = new ReindexJobMetricsNotification( + _jobInfo.Id.ToString(), + _reindexJobRecord.Status.ToString(), + _jobInfo.CreateDate, + Clock.UtcNow, + _currentResult.SucceededResources, + _currentResult.FailedResources, + _currentResult.CreatedJobs, + _currentResult.CompletedJobs, + _reindexJobRecord.SearchParams?.ToList() ?? new List(), + _reindexJobRecord.Resources?.ToList() ?? new List()); + + await _mediator.Publish(notification, CancellationToken.None); + } + catch (Exception ex) + { + _logger.LogJobWarning(ex, _jobInfo, "Failed to publish reindex job metrics notification."); + } + } + private async Task UpdateSearchParameterStatus(List completedJobs, List readySearchParameters, CancellationToken cancellationToken) { // Check if all the resource types which are base types of the search parameter diff --git a/src/Microsoft.Health.Fhir.Core/Features/Operations/Reindex/ReindexProcessingJob.cs b/src/Microsoft.Health.Fhir.Core/Features/Operations/Reindex/ReindexProcessingJob.cs index d6faf25957..79954c00dc 100644 --- a/src/Microsoft.Health.Fhir.Core/Features/Operations/Reindex/ReindexProcessingJob.cs +++ b/src/Microsoft.Health.Fhir.Core/Features/Operations/Reindex/ReindexProcessingJob.cs @@ -13,12 +13,15 @@ using System.Threading; using System.Threading.Tasks; using EnsureThat; +using MediatR; using Microsoft.Data.SqlClient; using Microsoft.Extensions.Logging; using Microsoft.Health.Abstractions.Exceptions; +using Microsoft.Health.Core; using Microsoft.Health.Extensions.DependencyInjection; using Microsoft.Health.Fhir.Core.Exceptions; using Microsoft.Health.Fhir.Core.Extensions; +using Microsoft.Health.Fhir.Core.Features.Operations; using Microsoft.Health.Fhir.Core.Features.Operations.Reindex.Models; using Microsoft.Health.Fhir.Core.Features.Persistence; using Microsoft.Health.Fhir.Core.Features.Search; @@ -62,6 +65,7 @@ public class ReindexProcessingJob : IJob private readonly IResourceWrapperFactory _resourceWrapperFactory; private readonly Func> _fhirDataStoreFactory; private readonly ILogger _logger; + private readonly IMediator _mediator; private readonly ISearchParameterStatusManager _searchParameterStatusManager; private readonly ISearchParameterOperations _searchParameterOperations; @@ -87,7 +91,8 @@ public ReindexProcessingJob( IResourceWrapperFactory resourceWrapperFactory, ISearchParameterOperations searchParameterOperations, ISearchParameterStatusManager searchParameterStatusManager, - ILogger logger) + ILogger logger, + IMediator mediator) { EnsureArg.IsNotNull(searchServiceFactory, nameof(searchServiceFactory)); EnsureArg.IsNotNull(fhirDataStoreFactory, nameof(fhirDataStoreFactory)); @@ -95,6 +100,7 @@ public ReindexProcessingJob( EnsureArg.IsNotNull(searchParameterOperations, nameof(searchParameterOperations)); EnsureArg.IsNotNull(searchParameterStatusManager, nameof(searchParameterStatusManager)); EnsureArg.IsNotNull(logger, nameof(logger)); + EnsureArg.IsNotNull(mediator, nameof(mediator)); _searchServiceFactory = searchServiceFactory; _fhirDataStoreFactory = fhirDataStoreFactory; @@ -102,6 +108,7 @@ public ReindexProcessingJob( _searchParameterStatusManager = searchParameterStatusManager; _searchParameterOperations = searchParameterOperations; _logger = logger; + _mediator = mediator; } public async Task ExecuteAsync(JobInfo jobInfo, CancellationToken cancellationToken) @@ -120,6 +127,8 @@ public async Task ExecuteAsync(JobInfo jobInfo, CancellationToken cancel await ProcessQueryAsync(cancellationToken); + await PublishProcessingJobMetricsNotification(); + return JsonConvert.SerializeObject(_reindexProcessingJobResult); } @@ -696,6 +705,34 @@ private async Task TryLogEvent(string process, string status, string text, DateT await store.Value.TryLogEvent(process, status, text, startDate, cancellationToken); } + /// + /// Publishes a metrics notification with the processing job results. + /// Wrapped in try/catch to prevent metrics failures from affecting the job outcome. + /// + private async Task PublishProcessingJobMetricsNotification() + { + try + { + var notification = new ReindexJobMetricsNotification( + _jobInfo.Id.ToString(), + _reindexProcessingJobResult.Error == null ? OperationStatus.Completed.ToString() : OperationStatus.Failed.ToString(), + _jobInfo.CreateDate, + Clock.UtcNow, + _reindexProcessingJobResult.SucceededResourceCount, + _reindexProcessingJobResult.FailedResourceCount, + 0, + 0, + _reindexProcessingJobDefinition.SearchParameterUrls?.ToList() ?? new List(), + new List { _reindexProcessingJobDefinition.ResourceType }); + + await _mediator.Publish(notification, CancellationToken.None); + } + catch (Exception ex) + { + _logger.LogJobWarning(ex, _jobInfo, "Failed to publish reindex processing job metrics notification."); + } + } + private static ReindexProcessingJobDefinition DeserializeJobDefinition(JobInfo jobInfo) { return JsonConvert.DeserializeObject(jobInfo.Definition); diff --git a/src/Microsoft.Health.Fhir.Shared.Core.UnitTests/Features/Operations/Reindex/ReindexOrchestratorJobTests.cs b/src/Microsoft.Health.Fhir.Shared.Core.UnitTests/Features/Operations/Reindex/ReindexOrchestratorJobTests.cs index b1830b0c36..c1bb8315b1 100644 --- a/src/Microsoft.Health.Fhir.Shared.Core.UnitTests/Features/Operations/Reindex/ReindexOrchestratorJobTests.cs +++ b/src/Microsoft.Health.Fhir.Shared.Core.UnitTests/Features/Operations/Reindex/ReindexOrchestratorJobTests.cs @@ -12,6 +12,7 @@ using EnsureThat.Enforcers; using Hl7.Fhir.Rest; using Hl7.Fhir.Utility; +using MediatR; using Microsoft.Extensions.Logging.Abstractions; using Microsoft.Extensions.Options; using Microsoft.Health.Extensions.DependencyInjection; @@ -76,9 +77,10 @@ private void Dispose() _cancellationTokenSource?.Dispose(); } - private ReindexOrchestratorJob CreateReindexOrchestratorJob(IFhirRuntimeConfiguration runtimeConfig = null, int waitMultiplier = 0) + private ReindexOrchestratorJob CreateReindexOrchestratorJob(IFhirRuntimeConfiguration runtimeConfig = null, int waitMultiplier = 0, IMediator mediator = null) { runtimeConfig ??= new AzureHealthDataServicesRuntimeConfiguration(); + mediator ??= Substitute.For(); var coreFeatureConfig = Substitute.For>(); coreFeatureConfig.Value.Returns(new CoreFeatureConfiguration()); @@ -97,6 +99,7 @@ private ReindexOrchestratorJob CreateReindexOrchestratorJob(IFhirRuntimeConfigur _searchParameterOperations, runtimeConfig, NullLoggerFactory.Instance, + mediator, coreFeatureConfig, operationsConfig); } @@ -277,6 +280,49 @@ public async Task ExecuteAsync_WhenCancellationRequested_ReturnsJobCancelledResu Assert.Contains(jobResult.Error, e => e.Diagnostics.Contains("cancelled")); } + [Fact] + public async Task ExecuteAsync_WhenCompleted_PublishesMetricsNotification() + { + var mediator = Substitute.For(); + _searchParameterStatusManager.GetAllSearchParameterStatus(Arg.Any()) + .Returns(new List()); + + var orchestrator = CreateReindexOrchestratorJob(mediator: mediator); + var jobInfo = await CreateReindexJobRecord(); + + await orchestrator.ExecuteAsync(jobInfo, _cancellationToken); + + _ = mediator.Received(1).Publish( + Arg.Is( + notification => notification.Id == jobInfo.Id.ToString() && + notification.FhirOperation == "reindex" && + notification.SucceededCount == 0 && + notification.FailedCount == 0), + Arg.Any()); + } + + [Fact] + public async Task ExecuteAsync_WhenCancelled_StillPublishesMetricsNotification() + { + var mediator = Substitute.For(); + var cts = new CancellationTokenSource(); + cts.Cancel(); + + _searchParameterStatusManager.GetAllSearchParameterStatus(Arg.Any()) + .Returns(new List()); + + var orchestrator = CreateReindexOrchestratorJob(mediator: mediator); + var jobInfo = await CreateReindexJobRecord(); + + await orchestrator.ExecuteAsync(jobInfo, cts.Token); + + _ = mediator.Received(1).Publish( + Arg.Is( + notification => notification.Id == jobInfo.Id.ToString() && + notification.FhirOperation == "reindex"), + Arg.Any()); + } + [Fact] public async Task ExecuteAsync_WithExceptionInProcessing_ReturnsErrorResult() { diff --git a/src/Microsoft.Health.Fhir.Shared.Core.UnitTests/Features/Operations/Reindex/ReindexProcessingJobTests.cs b/src/Microsoft.Health.Fhir.Shared.Core.UnitTests/Features/Operations/Reindex/ReindexProcessingJobTests.cs index 1e3804b07b..d6efa317cc 100644 --- a/src/Microsoft.Health.Fhir.Shared.Core.UnitTests/Features/Operations/Reindex/ReindexProcessingJobTests.cs +++ b/src/Microsoft.Health.Fhir.Shared.Core.UnitTests/Features/Operations/Reindex/ReindexProcessingJobTests.cs @@ -8,6 +8,7 @@ using System.Linq; using System.Threading; using System.Threading.Tasks; +using MediatR; using Microsoft.Extensions.Logging.Abstractions; using Microsoft.Health.Fhir.Core.Exceptions; using Microsoft.Health.Fhir.Core.Features.Operations; @@ -42,11 +43,13 @@ public class ReindexProcessingJobTests private readonly Func _reindexProcessingJobTaskFactory; private readonly CancellationToken _cancellationToken; private readonly ISearchParameterStatusManager _searchParameterStatusManager = Substitute.For(); + private IMediator _mediator; public ReindexProcessingJobTests() { Func> fhirDataStoreScope = () => _fhirDataStore.CreateMockScope(); _cancellationToken = _cancellationTokenSource.Token; + _mediator = Substitute.For(); _reindexProcessingJobTaskFactory = () => new ReindexProcessingJob( () => _searchService.CreateMockScope(), @@ -54,7 +57,8 @@ public ReindexProcessingJobTests() _resourceWrapperFactory, _searchParameterOperations, _searchParameterStatusManager, - NullLogger.Instance); + NullLogger.Instance, + _mediator); } [Fact] @@ -113,6 +117,63 @@ public async Task GivenAProcessingJob_WhenExecuted_ThenCorrectCountIsProcessed() Assert.Equal(_mockedSearchCount, result.SucceededResourceCount); } + [Fact] + public async Task ExecuteAsync_WhenCompleted_PublishesMetricsNotification() + { + var mediator = Substitute.For(); + _mediator = mediator; + + var expectedResourceType = "Account"; + ReindexProcessingJobDefinition job = new ReindexProcessingJobDefinition() + { + MaximumNumberOfResourcesPerQuery = 100, + MaximumNumberOfResourcesPerWrite = 100, + ResourceType = expectedResourceType, + ResourceCount = new SearchResultReindex() + { + Count = 0, + EndResourceSurrogateId = 0, + StartResourceSurrogateId = 0, + ContinuationToken = null, + }, + SearchParameterHash = "accountHash", + SearchParameterUrls = new List() { "http://hl7.org/fhir/SearchParam/Account-status" }, + TypeId = (int)JobType.ReindexProcessing, + }; + + _searchParameterOperations.GetSearchParameterHash(Arg.Any()).Returns(job.SearchParameterHash); + + JobInfo jobInfo = new JobInfo() + { + Id = 1, + Definition = JsonConvert.SerializeObject(job), + QueueType = (byte)QueueType.Reindex, + GroupId = 1, + CreateDate = DateTime.UtcNow, + Status = JobStatus.Running, + }; + + _searchService.SearchForReindexAsync( + Arg.Any>>(), + Arg.Any(), + Arg.Any(), + Arg.Any(), + Arg.Any()) + .Returns(new SearchResult(Enumerable.Empty(), null, null, new List>())); + + var processingJob = _reindexProcessingJobTaskFactory(); + await processingJob.ExecuteAsync(jobInfo, _cancellationToken); + + _ = mediator.Received(1).Publish( + Arg.Is( + notification => notification.Id == jobInfo.Id.ToString() && + notification.FhirOperation == "reindex" && + notification.SucceededCount == 0 && + notification.FailedCount == 0 && + notification.ResourceTypes.Contains(expectedResourceType)), + Arg.Any()); + } + private SearchResultEntry CreateSearchResultEntry(string id, string type) { return new SearchResultEntry( diff --git a/test/Microsoft.Health.Fhir.Shared.Tests.Integration/Features/Operations/Reindex/ReindexJobTests.cs b/test/Microsoft.Health.Fhir.Shared.Tests.Integration/Features/Operations/Reindex/ReindexJobTests.cs index 998b489181..7844c19904 100644 --- a/test/Microsoft.Health.Fhir.Shared.Tests.Integration/Features/Operations/Reindex/ReindexJobTests.cs +++ b/test/Microsoft.Health.Fhir.Shared.Tests.Integration/Features/Operations/Reindex/ReindexJobTests.cs @@ -239,6 +239,7 @@ private void InitializeJobHosting() _searchParameterOperations, _fixture.FhirRuntimeConfiguration, NullLoggerFactory.Instance, + Substitute.For(), _coreFeatureConfig, _operationsConfig); } @@ -251,7 +252,8 @@ private void InitializeJobHosting() _resourceWrapperFactory, _searchParameterOperations, _searchParameterStatusManager, - NullLogger.Instance); + NullLogger.Instance, + Substitute.For()); } else { @@ -1109,6 +1111,7 @@ public async Task GivenFailedProcessingJobs_WhenOrchestratorProcessesResults_The _searchParameterOperations, runtimeConfiguration, NullLoggerFactory.Instance, + Substitute.For(), _coreFeatureConfig, _operationsConfig); @@ -1261,7 +1264,8 @@ public async Task GivenSurrogateRangeFetchOom_WhenProcessingJobRuns_ThenSplitUse _resourceWrapperFactory, _searchParameterOperations, _searchParameterStatusManager, - NullLogger.Instance); + NullLogger.Instance, + Substitute.For()); string resultJson = await processingJob.ExecuteAsync(jobInfo, CancellationToken.None); var result = JsonConvert.DeserializeObject(resultJson);