Skip to content

Commit d39660e

Browse files
committed
Revert "Remove retries for failed but recoverable message batches"
This reverts commit fa1dd50.
1 parent 508a07c commit d39660e

File tree

11 files changed

+835
-50
lines changed

11 files changed

+835
-50
lines changed

infrastructure/modules/container-apps/jobs.tf

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,29 @@ locals {
1616
job_short_name = "cap"
1717
job_container_args = "create_appointments"
1818
}
19+
send_message_batch = {
20+
21+
# cron_expression = "0,30 9 * * 1-5"
22+
cron_expression = null
23+
environment_variables = {
24+
API_OAUTH_TOKEN_URL = var.api_oauth_token_url
25+
NHS_NOTIFY_API_MESSAGE_BATCH_URL = var.nhs_notify_api_message_batch_url
26+
RETRY_QUEUE_NAME = "notifications-message-batch-retries"
27+
}
28+
job_short_name = "smb"
29+
job_container_args = "send_message_batch"
30+
}
31+
retry_failed_message_batch = {
32+
# cron_expression = "0,30 9-12 * * 1-5"
33+
cron_expression = null
34+
environment_variables = {
35+
API_OAUTH_TOKEN_URL = var.api_oauth_token_url
36+
NHS_NOTIFY_API_MESSAGE_BATCH_URL = var.nhs_notify_api_message_batch_url
37+
RETRY_QUEUE_NAME = "notifications-message-batch-retries"
38+
}
39+
job_short_name = "rmb"
40+
job_container_args = "retry_failed_message_batch"
41+
}
1942
save_message_status = {
2043
# cron_expression = "0,30 * * * *"
2144
cron_expression = null
@@ -46,6 +69,7 @@ locals {
4669
collect_metrics = {
4770
cron_expression = "*/5 * * * *"
4871
environment_variables = {
72+
RETRY_QUEUE_NAME = "notifications-message-batch-retries"
4973
STATUS_UPDATES_QUEUE_NAME = "notifications-message-status-updates"
5074
ENVIRONMENT = var.environment
5175
}

manage_breast_screening/notifications/management/commands/collect_metrics.py

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,14 @@
1111
class Command(BaseCommand):
1212
def handle(self, *args, **options):
1313
try:
14-
queue = Queue.MessageStatusUpdates()
15-
Metrics().set_gauge_value(
16-
f"queue_size_{queue.queue_name}",
17-
"messages",
18-
"Queue length",
19-
queue.get_message_count(),
20-
)
14+
# Set queue_size metrics
15+
for queue in [Queue.RetryMessageBatches(), Queue.MessageStatusUpdates()]:
16+
Metrics().set_gauge_value(
17+
f"queue_size_{queue.queue_name}",
18+
"messages",
19+
"Queue length",
20+
queue.get_message_count(),
21+
)
2122

2223
except Exception as e:
2324
logger.error(e, exc_info=True)

manage_breast_screening/notifications/management/commands/helpers/message_batch_helpers.py

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import json
12
import logging
23
import re
34
from datetime import datetime
@@ -14,6 +15,7 @@
1415
from manage_breast_screening.notifications.services.application_insights_logging import (
1516
ApplicationInsightsLogging,
1617
)
18+
from manage_breast_screening.notifications.services.queue import Queue
1719

1820
logger = logging.getLogger(__name__)
1921

@@ -98,14 +100,23 @@ def process_validation_errors(message_batch: MessageBatch, retry_count: int = 0)
98100

99101
message.save()
100102

101-
message_batch.status = MessageBatchStatusChoices.FAILED_UNRECOVERABLE.value
103+
message_batch.status = MessageBatchStatusChoices.FAILED_RECOVERABLE.value
102104
message_batch.save()
103105

104-
logger.error(
105-
"MessageBatch %s failed to send.",
106+
logger.info(
107+
"Adding MessageBatch %s to retry queue after validation failure",
106108
message_batch.id,
107109
)
108110

111+
Queue.RetryMessageBatches().add(
112+
json.dumps(
113+
{
114+
"message_batch_id": str(message_batch.id),
115+
"retry_count": retry_count,
116+
}
117+
)
118+
)
119+
109120
@staticmethod
110121
def process_unrecoverable_batch(message_batch: MessageBatch):
111122
message_batch.status = MessageBatchStatusChoices.FAILED_UNRECOVERABLE.value
@@ -121,15 +132,19 @@ def process_unrecoverable_batch(message_batch: MessageBatch):
121132

122133
@staticmethod
123134
def process_recoverable_batch(message_batch: MessageBatch, retry_count: int):
124-
message_batch.status = MessageBatchStatusChoices.FAILED_UNRECOVERABLE.value
135+
message_batch.status = MessageBatchStatusChoices.FAILED_RECOVERABLE.value
125136
message_batch.save()
126137

127-
for message in message_batch.messages.all():
128-
message.status = MessageStatusChoices.FAILED.value
129-
message.sent_at = datetime.now(tz=ZONE_INFO)
130-
message.save()
131-
132-
logger.error(
133-
"MessageBatch %s failed to send.",
138+
logger.info(
139+
"Adding MessageBatch %s to retry queue after recoverable failure",
134140
message_batch.id,
135141
)
142+
143+
Queue.RetryMessageBatches().add(
144+
json.dumps(
145+
{
146+
"message_batch_id": str(message_batch.id),
147+
"retry_count": retry_count,
148+
}
149+
)
150+
)
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
import json
2+
import os
3+
from logging import getLogger
4+
5+
from django.core.management.base import BaseCommand, CommandError
6+
7+
from manage_breast_screening.notifications.management.commands.helpers.command_handler import (
8+
CommandHandler,
9+
)
10+
from manage_breast_screening.notifications.management.commands.helpers.message_batch_helpers import (
11+
MessageBatchHelpers,
12+
)
13+
from manage_breast_screening.notifications.models import (
14+
MessageBatch,
15+
MessageBatchStatusChoices,
16+
)
17+
from manage_breast_screening.notifications.services.api_client import ApiClient
18+
from manage_breast_screening.notifications.services.queue import Queue
19+
20+
logger = getLogger(__name__)
21+
INSIGHTS_JOB_NAME = "RetryFailedMessageBatch"
22+
23+
24+
class Command(BaseCommand):
25+
"""
26+
Django Admin command which takes an ID of a MessageBatch with
27+
a failed status and retries sending it to the Communications API.
28+
"""
29+
30+
def handle(self, *args, **options):
31+
with CommandHandler.handle(INSIGHTS_JOB_NAME):
32+
logger.info("Retry Failed Message Batch Command started")
33+
queue = Queue.RetryMessageBatches()
34+
logger.debug("Retry queue items: %s", queue.peek())
35+
queue_message = queue.item()
36+
37+
if queue_message is None:
38+
logger.info("No messages on queue")
39+
return
40+
41+
message_batch_id = json.loads(queue_message.content)["message_batch_id"]
42+
message_batch = MessageBatch.objects.filter(
43+
id=message_batch_id,
44+
status=MessageBatchStatusChoices.FAILED_RECOVERABLE.value,
45+
).first()
46+
47+
queue.delete(queue_message)
48+
logger.info(
49+
"Queue message %s for MessageBatch with id %s deleted from queue",
50+
queue_message.id,
51+
message_batch_id,
52+
)
53+
54+
if message_batch is None:
55+
raise CommandError(
56+
(
57+
f"Message Batch with id {message_batch_id} and status of "
58+
f"'{MessageBatchStatusChoices.FAILED_RECOVERABLE.value}' not found"
59+
)
60+
)
61+
62+
retry_count = int(json.loads(queue_message.content)["retry_count"])
63+
if retry_count < int(os.getenv("NOTIFICATIONS_BATCH_RETRY_LIMIT", "5")):
64+
logger.info(
65+
"Retrying Message Batch with id %s with retry count %s",
66+
message_batch_id,
67+
retry_count,
68+
)
69+
70+
response = ApiClient().send_message_batch(message_batch)
71+
72+
if response.status_code == 201:
73+
MessageBatchHelpers.mark_batch_as_sent(
74+
message_batch=message_batch, response_json=response.json()
75+
)
76+
else:
77+
MessageBatchHelpers.mark_batch_as_failed(
78+
message_batch=message_batch,
79+
response=response,
80+
retry_count=(retry_count + 1),
81+
)
82+
else:
83+
logger.error(
84+
"Failed Message Batch with id %s not sent: Retry limit exceeded",
85+
message_batch_id,
86+
)
87+
message_batch.status = (
88+
MessageBatchStatusChoices.FAILED_UNRECOVERABLE.value
89+
)
90+
message_batch.save()
91+
raise CommandError(
92+
f"Message Batch with id {message_batch_id} not sent: Retry limit exceeded"
93+
)

manage_breast_screening/notifications/services/queue.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,3 +69,7 @@ def MessageStatusUpdates(cls):
6969
"STATUS_UPDATES_QUEUE_NAME", "notifications-message-status-updates"
7070
)
7171
)
72+
73+
@classmethod
74+
def RetryMessageBatches(cls):
75+
return cls(os.getenv("RETRY_QUEUE_NAME", "notifications-message-batch-retries"))

0 commit comments

Comments
 (0)