diff --git a/infrastructure/modules/container-apps/alerts.tf b/infrastructure/modules/container-apps/alerts.tf index a9ea412bd..111094fe5 100644 --- a/infrastructure/modules/container-apps/alerts.tf +++ b/infrastructure/modules/container-apps/alerts.tf @@ -36,3 +36,56 @@ resource "azurerm_monitor_scheduled_query_rules_alert_v2" "failure_event" { } } } + +# IMPORTANT: +# Enable metrics store with all dimensions: https://docs.azure.cn/en-us/azure-monitor/app/metrics-overview?tabs=standard#custom-metrics-dimensions-and-preaggregation +# currently this feature is in preview. +resource "azurerm_monitor_scheduled_query_rules_alert_v2" "queue_length_high" { + for_each = var.enable_alerting ? toset([ + "notifications-message-status-updates", + "notifications-message-batch-retries" + ]) : [] + + name = "${var.app_short_name}-${each.key}-${var.environment}-queue-length-high-alert" + location = var.region + resource_group_name = azurerm_resource_group.main.name + + auto_mitigation_enabled = true + description = "Alert when queue length exceeds ${var.queue_length_alert_threshold}" + display_name = "${var.app_short_name} Notifications Queue Length High Alert" + enabled = true + severity = 2 + evaluation_frequency = "PT10M" + window_duration = "PT10M" + scopes = [var.app_insights_id] + + criteria { + query = <<-KQL + customMetrics + | where name == "${each.key}" + | extend environment = tostring(customDimensions.environment) + | where environment == "${var.environment}" + | extend value = toreal(value) + | summarize avg_value = avg(value) by bin(timestamp, 5m) + | where avg_value > ${var.queue_length_alert_threshold} + KQL + + metric_measure_column = "avg_value" + time_aggregation_method = "Average" + operator = "GreaterThan" + threshold = 0 + + failing_periods { + minimum_failing_periods_to_trigger_alert = 1 + number_of_evaluation_periods = 1 + } + } + + action { + action_groups = [var.action_group_id] + } + + tags = { + environment = var.environment + } +} diff --git a/infrastructure/modules/container-apps/jobs.tf b/infrastructure/modules/container-apps/jobs.tf index 556a0ad7f..681d972ab 100644 --- a/infrastructure/modules/container-apps/jobs.tf +++ b/infrastructure/modules/container-apps/jobs.tf @@ -17,6 +17,7 @@ locals { job_container_args = "create_appointments" } send_message_batch = { + # cron_expression = "0,30 9 * * 1-5" cron_expression = null environment_variables = { @@ -43,6 +44,7 @@ locals { cron_expression = null environment_variables = { STATUS_UPDATES_QUEUE_NAME = "notifications-message-status-updates" + ENVIRONMENT = var.environment } job_short_name = "sms" job_container_args = "save_message_status" @@ -64,6 +66,16 @@ locals { job_short_name = "smk" job_container_args = "create_reports --smoke-test" } + collect_metrics = { + cron_expression = "*/5 * * * *" + environment_variables = { + RETRY_QUEUE_NAME = "notifications-message-batch-retries" + STATUS_UPDATES_QUEUE_NAME = "notifications-message-status-updates" + ENVIRONMENT = var.environment + } + job_short_name = "clm" + job_container_args = "collect_metrics" + } } } diff --git a/infrastructure/modules/container-apps/variables.tf b/infrastructure/modules/container-apps/variables.tf index e7f1f9120..c48166817 100644 --- a/infrastructure/modules/container-apps/variables.tf +++ b/infrastructure/modules/container-apps/variables.tf @@ -200,6 +200,11 @@ variable "app_insights_id" { type = string } +variable "queue_length_alert_threshold" { + description = "If alerting is enabled, alert if storage account queues are greater than this threshold." + type = number +} + variable "enable_notifications_jobs_schedule" { description = "Whether we apply the cron schedules for the notifications container app jobs" type = bool diff --git a/infrastructure/terraform/main.tf b/infrastructure/terraform/main.tf index 22d1f1a2a..d909dd7fb 100644 --- a/infrastructure/terraform/main.tf +++ b/infrastructure/terraform/main.tf @@ -78,4 +78,5 @@ module "container-apps" { target_url = var.deploy_container_apps ? "${module.container-apps[0].external_url}healthcheck" : null resource_group_name_infra = local.resource_group_name enable_notifications_jobs_schedule = var.enable_notifications_jobs_schedule + queue_length_alert_threshold = var.queue_length_alert_threshold } diff --git a/infrastructure/terraform/variables.tf b/infrastructure/terraform/variables.tf index cd6e78767..6ac36a1f8 100644 --- a/infrastructure/terraform/variables.tf +++ b/infrastructure/terraform/variables.tf @@ -184,6 +184,12 @@ variable "run_notifications_smoke_test" { type = bool } +variable "queue_length_alert_threshold" { + description = "If alerting is enabled, alert if storage account queues are greater than this threshold." + type = number + default = 5 +} + locals { region = "uksouth" diff --git a/manage_breast_screening/notifications/management/commands/collect_metrics.py b/manage_breast_screening/notifications/management/commands/collect_metrics.py new file mode 100644 index 000000000..b6f1ac90b --- /dev/null +++ b/manage_breast_screening/notifications/management/commands/collect_metrics.py @@ -0,0 +1,25 @@ +import logging + +from django.core.management.base import BaseCommand, CommandError + +from manage_breast_screening.notifications.services.metrics import Metrics +from manage_breast_screening.notifications.services.queue import Queue + +logger = logging.getLogger(__name__) + + +class Command(BaseCommand): + def handle(self, *args, **options): + try: + # Set queue_size metrics + for queue in [Queue.RetryMessageBatches(), Queue.MessageStatusUpdates()]: + Metrics().set_gauge_value( + f"queue_size_{queue.queue_name}", + "messages", + "Queue length", + queue.get_message_count(), + ) + + except Exception as e: + logger.error(e, exc_info=True) + raise CommandError(e) diff --git a/manage_breast_screening/notifications/services/metrics.py b/manage_breast_screening/notifications/services/metrics.py new file mode 100644 index 000000000..1f075abe0 --- /dev/null +++ b/manage_breast_screening/notifications/services/metrics.py @@ -0,0 +1,50 @@ +import logging +import os + +from azure.monitor.opentelemetry.exporter import AzureMonitorMetricExporter +from opentelemetry import metrics +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader + +logger = logging.getLogger(__name__) + + +class Metrics: + _instance = None + + def __new__(cls, *args, **kwargs): + if cls._instance is None: + cls._instance = super().__new__(cls) + return cls._instance + + def __init__(self): + environment = os.getenv("ENVIRONMENT") + logger.debug((f"Initialising Metrics(environment: {environment})")) + + exporter = AzureMonitorMetricExporter( + connection_string=os.getenv("APPLICATIONINSIGHTS_CONNECTION_STRING") + ) + metrics.set_meter_provider( + MeterProvider(metric_readers=[PeriodicExportingMetricReader(exporter)]) + ) + self.meter = metrics.get_meter(__name__) + self.environment = environment + + def set_gauge_value(self, metric_name, units, description, value): + logger.debug( + ( + f"Metrics: set_gauge_value(metric_name: {metric_name} " + f"units: {units}, description: {description}, value: {value})" + ) + ) + + # Create gauge metric + gauge = self.meter.create_gauge( + metric_name, unit=units, description=description + ) + + # Set metric value + gauge.set( + value, + {"environment": self.environment}, + ) diff --git a/manage_breast_screening/notifications/services/queue.py b/manage_breast_screening/notifications/services/queue.py index edc294431..b31002590 100644 --- a/manage_breast_screening/notifications/services/queue.py +++ b/manage_breast_screening/notifications/services/queue.py @@ -1,9 +1,12 @@ +import logging import os from azure.core.exceptions import ResourceExistsError from azure.identity import ManagedIdentityCredential from azure.storage.queue import QueueClient, QueueMessage +logger = logging.getLogger(__name__) + class QueueConfigurationError(Exception): """Raised when queue is not properly configured""" @@ -16,6 +19,7 @@ def __init__(self, queue_name): storage_account_name = os.getenv("STORAGE_ACCOUNT_NAME") queue_mi_client_id = os.getenv("QUEUE_MI_CLIENT_ID") connection_string = os.getenv("QUEUE_STORAGE_CONNECTION_STRING") + self.queue_name = queue_name if storage_account_name and queue_mi_client_id: self.client = QueueClient( @@ -55,6 +59,9 @@ def peek(self): def item(self): return self.client.receive_message() + def get_message_count(self): + return self.client.get_queue_properties().approximate_message_count + @classmethod def MessageStatusUpdates(cls): return cls( diff --git a/manage_breast_screening/notifications/tests/management/commands/test_collect_metrics.py b/manage_breast_screening/notifications/tests/management/commands/test_collect_metrics.py new file mode 100644 index 000000000..511874678 --- /dev/null +++ b/manage_breast_screening/notifications/tests/management/commands/test_collect_metrics.py @@ -0,0 +1,46 @@ +from unittest.mock import MagicMock, patch + +import pytest + +from manage_breast_screening.notifications.management.commands.collect_metrics import ( + Command, +) + + +class TestCollectMetrics: + @pytest.fixture(autouse=True) + def setup(self, monkeypatch): + monkeypatch.setenv("ENVIRONMENT", "test") + + @patch(f"{Command.__module__}.Queue") + @patch(f"{Command.__module__}.Metrics") + def test_handle_sends_queue_lengths(self, mock_metrics_class, mock_queue): + mock_retry = MagicMock() + mock_retry.queue_name = "retry_queue" + mock_retry.get_message_count.return_value = 8 + + mock_status = MagicMock() + mock_status.queue_name = "status_queue" + mock_status.get_message_count.return_value = 2 + + mock_queue.RetryMessageBatches.return_value = mock_retry + mock_queue.MessageStatusUpdates.return_value = mock_status + + Command().handle() + + metrics_instance = mock_metrics_class.return_value + + metrics_instance.set_gauge_value.assert_any_call( + "queue_size_retry_queue", + "messages", + "Queue length", + 8, + ) + metrics_instance.set_gauge_value.assert_any_call( + "queue_size_status_queue", + "messages", + "Queue length", + 2, + ) + + assert metrics_instance.set_gauge_value.call_count == 2 diff --git a/manage_breast_screening/notifications/tests/services/test_metrics.py b/manage_breast_screening/notifications/tests/services/test_metrics.py new file mode 100644 index 000000000..b6d033163 --- /dev/null +++ b/manage_breast_screening/notifications/tests/services/test_metrics.py @@ -0,0 +1,92 @@ +from unittest.mock import MagicMock, patch + +import pytest + +from manage_breast_screening.notifications.services.metrics import Metrics + + +@patch(f"{Metrics.__module__}.AzureMonitorMetricExporter") +@patch(f"{Metrics.__module__}.PeriodicExportingMetricReader") +@patch(f"{Metrics.__module__}.metrics") +@patch(f"{Metrics.__module__}.MeterProvider") +class TestMetrics: + @pytest.fixture + def conn_string(self): + return "InstrumentationKey=00000000-0000-0000-0000-000000000000" + + @pytest.fixture(autouse=True) + def setup(self, monkeypatch, conn_string): + monkeypatch.setenv("APPLICATIONINSIGHTS_CONNECTION_STRING", conn_string) + monkeypatch.setenv("ENVIRONMENT", "dev") + + def test_metrics_initialisation( + self, + mock_meter_provider, + mock_metrics, + mock_metric_reader, + mock_metric_exporter, + conn_string, + ): + mock_meter = MagicMock() + mock_metrics.get_meter.return_value = mock_meter + + subject = Metrics() + + mock_metric_exporter.assert_called_once_with(connection_string=str(conn_string)) + mock_metric_reader.assert_called_once_with(mock_metric_exporter.return_value) + mock_meter_provider.assert_called_once_with( + metric_readers=[mock_metric_reader.return_value] + ) + mock_metrics.set_meter_provider.assert_called_once_with( + mock_meter_provider.return_value + ) + mock_metrics.get_meter.assert_called_once_with( + "manage_breast_screening.notifications.services.metrics" + ) + + assert subject.meter == mock_meter + assert subject.environment == "dev" + + def test_metrics_is_a_singleton( + self, + mock_meter_provider, + mock_metrics, + mock_reader, + mock_exporter, + ): + subject = Metrics() + the_same_instance = Metrics() + assert subject == the_same_instance + + def test_set_gauge_value( + self, + mock_meter_provider, + mock_metrics, + mock_reader, + mock_exporter, + ): + mock_meter = MagicMock() + mock_gauge = MagicMock() + + mock_metrics.get_meter.return_value = mock_meter + mock_meter.create_gauge.return_value = mock_gauge + + subject = Metrics() + + subject.set_gauge_value( + metric_name="queue_depth", + units="messages", + description="Number of messages", + value=999, + ) + + mock_meter.create_gauge.assert_called_once_with( + "queue_depth", + unit="messages", + description="Number of messages", + ) + + mock_gauge.set.assert_called_once_with( + 999, + {"environment": "dev"}, + ) diff --git a/manage_breast_screening/notifications/tests/services/test_queue.py b/manage_breast_screening/notifications/tests/services/test_queue.py index 67ef4445f..d8c4e939e 100644 --- a/manage_breast_screening/notifications/tests/services/test_queue.py +++ b/manage_breast_screening/notifications/tests/services/test_queue.py @@ -1,4 +1,4 @@ -from unittest.mock import MagicMock, patch +from unittest.mock import MagicMock, PropertyMock, patch import pytest @@ -150,3 +150,15 @@ def test_queue_raises_configuration_error_when_no_config_provided( Queue("test-queue") assert "Queue not configured" in str(exc_info.value) + + def test_get_message_count_returns_correct_value(self, mock_queue_client, caplog): + with patch( + "manage_breast_screening.notifications.services.queue.QueueClient" + ) as queue_client: + queue_client.get_queue_properties.return_value = PropertyMock( + approximate_message_count=666 + ) + subject = Queue("A-Q") + subject.client = queue_client + + assert subject.get_message_count() == 666