Skip to content
15 changes: 14 additions & 1 deletion app/celery/letters_pdf_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from notifications_utils.letter_timings import LETTER_PROCESSING_DEADLINE
from notifications_utils.recipient_validation.postal_address import PostalAddress
from notifications_utils.timezones import convert_bst_to_utc, convert_utc_to_bst
from app.celery.queue_utils import get_message_group_id_for_queue

from app import notify_celery, signing
from app.aws import s3
Expand Down Expand Up @@ -262,8 +263,20 @@ def shatter_deliver_letter_tasks(notification_ids):
# If the number or size of arguments to this function change, then the default
# `batch_size` argument of `send_dvla_letters_via_api` needs updating to keep
# within SQS’s maximum message size
queue_name = QueueNames.SEND_LETTER

for id in notification_ids:
deliver_letter.apply_async(kwargs={"notification_id": id}, queue=QueueNames.SEND_LETTER)
message_group_kwargs = {}
if (current_app.config.get("ENABLE_SQS_FAIR_GROUPING", False)):
notification = get_notification_by_id(id)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This celery task was explicitly written so it didn't have to hit the database and this perfectly illustrates my objection to implementing group ids in this way, as mentioned here #4676 (comment) and in many other places.

message_group_kwargs = get_message_group_id_for_queue(
queue_name=queue_name,
service_id=str(notification.service_id),
origin=notification.template.origin,
key_type=notification.key_type,
)

deliver_letter.apply_async(kwargs={"notification_id": id}, queue=queue_name, **message_group_kwargs)


@notify_celery.task(bind=True, name="sanitise-letter", max_retries=15, default_retry_delay=300)
Expand Down
20 changes: 18 additions & 2 deletions app/celery/scheduled_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from datetime import date, datetime, timedelta

import jinja2
from app.celery.queue_utils import get_message_group_id_for_queue
import sentry_sdk
from flask import current_app
from notifications_utils.clients.zendesk.zendesk_client import (
Expand Down Expand Up @@ -96,7 +97,14 @@
def run_scheduled_jobs():
try:
for job in dao_set_scheduled_jobs_to_pending():
process_job.apply_async([str(job.id)], queue=QueueNames.JOBS)
queue_name = QueueNames.JOBS
message_group_kwargs = get_message_group_id_for_queue(
queue_name=queue_name,
service_id=str(job.service_id),
notification_type=KEY_TYPE_NORMAL,
)

process_job.apply_async([str(job.id)], queue=queue_name, **message_group_kwargs)
current_app.logger.info("Job ID %s added to process job queue", job.id, extra={"job_id": job.id})
except SQLAlchemyError:
current_app.logger.exception("Failed to run scheduled jobs")
Expand Down Expand Up @@ -332,7 +340,15 @@ def replay_created_notifications():
letter.id,
extra={"notification_id": letter.id},
)
get_pdf_for_templated_letter.apply_async([str(letter.id)], queue=QueueNames.CREATE_LETTERS_PDF)

queue_name = QueueNames.CREATE_LETTERS_PDF
message_group_kwargs = get_message_group_id_for_queue(
queue_name=queue_name,
service_id=str(letter.service_id),
key_type=letter.key_type,
)

get_pdf_for_templated_letter.apply_async([str(letter.id)], queue=queue_name, **message_group_kwargs)


@notify_celery.task(name="check-if-letters-still-pending-virus-check")
Expand Down
41 changes: 37 additions & 4 deletions app/celery/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from datetime import datetime
from uuid import UUID

from app.celery.queue_utils import get_message_group_id_for_queue
from botocore.exceptions import ClientError as BotoClientError
from flask import current_app
from notifications_utils.insensitive_dict import InsensitiveDict
Expand Down Expand Up @@ -350,9 +351,18 @@ def save_sms(
)

if saved_notification.status != NOTIFICATION_VALIDATION_FAILED:
queue_name = QueueNames.SEND_SMS
message_group_kwargs = get_message_group_id_for_queue(
queue_name=queue_name,
service_id=str(service.id),
origin=template.origin,
key_type=KEY_TYPE_NORMAL,
)

provider_tasks.deliver_sms.apply_async(
[str(saved_notification.id)],
queue=QueueNames.SEND_SMS,
queue=queue_name,
**message_group_kwargs,
)
else:
extra = {
Expand Down Expand Up @@ -434,9 +444,18 @@ def save_email(self, service_id, notification_id, encoded_notification, sender_i
client_reference=notification.get("client_reference", None),
)

queue_name = QueueNames.SEND_EMAIL
message_group_kwargs = get_message_group_id_for_queue(
queue_name=queue_name,
service_id=str(service.id),
origin=template.origin,
key_type=KEY_TYPE_NORMAL,
)

provider_tasks.deliver_email.apply_async(
[str(saved_notification.id)],
queue=QueueNames.SEND_EMAIL,
queue=queue_name,
**message_group_kwargs,
)

extra = {
Expand Down Expand Up @@ -493,8 +512,15 @@ def save_letter(
status=NOTIFICATION_CREATED,
)

queue_name = QueueNames.CREATE_LETTERS_PDF
message_group_kwargs = get_message_group_id_for_queue(
queue_name=queue_name,
service_id=str(service.id),
key_type=KEY_TYPE_NORMAL,
)

letters_pdf_tasks.get_pdf_for_templated_letter.apply_async(
[str(saved_notification.id)], queue=QueueNames.CREATE_LETTERS_PDF
[str(saved_notification.id)], queue=queue_name, **message_group_kwargs
)

extra = {
Expand Down Expand Up @@ -624,8 +650,15 @@ def _process_returned_letters_callback(notification_references):
def _check_and_queue_returned_letter_callback_task(notification_id, service_id):
# queue callback task only if the service_callback_api exists
if service_callback_api := get_returned_letter_callback_api_for_service(service_id=service_id):
queue_name = QueueNames.CALLBACKS
message_group_kwargs = get_message_group_id_for_queue(
queue_name=queue_name,
service_id=str(service_id),
notification_type=NOTIFICATION_RETURNED_LETTER,
)

returned_letter_data = create_returned_letter_callback_data(notification_id, service_id, service_callback_api)
send_returned_letter_to_service.apply_async([returned_letter_data], queue=QueueNames.CALLBACKS)
send_returned_letter_to_service.apply_async([returned_letter_data], queue=queue_name, **message_group_kwargs)


@notify_celery.task(bind=True, name="process-report-request")
Expand Down
30 changes: 28 additions & 2 deletions app/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import click
import flask
from app.celery.queue_utils import get_message_group_id_for_queue
from click_datetime import Datetime as click_dt
from dateutil import rrule
from flask import current_app, json
Expand Down Expand Up @@ -292,7 +293,20 @@ def insert_inbound_numbers_from_file(file_name):
)
def replay_create_pdf_for_templated_letter(notification_id):
print(f"Create task to get_pdf_for_templated_letter for notification: {notification_id}")
get_pdf_for_templated_letter.apply_async([str(notification_id)], queue=QueueNames.CREATE_LETTERS_PDF)

queue_name = QueueNames.CREATE_LETTERS_PDF
message_group_kwargs = {}

if (current_app.config.get("ENABLE_SQS_FAIR_GROUPING", False)):
notification = Notification.query.filter(Notification.id == notification_id).one()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't want to do this (calling the database) - this is put in as an example

message_group_kwargs = get_message_group_id_for_queue(
queue_name=queue_name,
service_id=str(notification.service_id),
origin=notification.template.origin,
key_type=notification.key_type,
)

get_pdf_for_templated_letter.apply_async([str(notification_id)], queue=queue_name, **message_group_kwargs)


@notify_command(name="recreate-pdf-for-precompiled-or-uploaded-letter")
Expand All @@ -305,7 +319,19 @@ def replay_create_pdf_for_templated_letter(notification_id):
)
def recreate_pdf_for_precompiled_or_uploaded_letter(notification_id):
print(f"Call resanitise_pdf task for notification: {notification_id}")
resanitise_pdf.apply_async([str(notification_id)], queue=QueueNames.LETTERS)

queue_name = QueueNames.LETTERS
message_group_kwargs = {}
if (current_app.config.get("ENABLE_SQS_FAIR_GROUPING", False)):
notification = Notification.query.filter(Notification.id == notification_id).one()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't want to do this (calling the database) - this is put in as an example

message_group_kwargs = get_message_group_id_for_queue(
queue_name=queue_name,
service_id=str(notification.service_id),
origin=notification.template.origin,
key_type=notification.key_type,
)

resanitise_pdf.apply_async([str(notification_id)], queue=queue_name, **message_group_kwargs)


def setup_commands(application):
Expand Down
11 changes: 10 additions & 1 deletion app/job/rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from flask import Blueprint, current_app, jsonify, request

from app.aws.s3 import get_job_metadata_from_s3
from app.celery.queue_utils import get_message_group_id_for_queue
from app.celery.tasks import process_job
from app.config import QueueNames
from app.constants import (
Expand Down Expand Up @@ -176,7 +177,15 @@ def create_job(service_id):
sender_id = data.get("sender_id")

if job.job_status == JOB_STATUS_PENDING:
process_job.apply_async([str(job.id)], {"sender_id": sender_id}, queue=QueueNames.JOBS)
queue_name = QueueNames.JOBS
message_group_kwargs = get_message_group_id_for_queue(
queue_name=queue_name,
service_id=str(service_id),
notification_type=template.template_type,
origin=template.origin,
)

process_job.apply_async([str(job.id)], {"sender_id": sender_id}, queue=queue_name, **message_group_kwargs)

job_json = job_schema.dump(job)
job_json["statistics"] = []
Expand Down
19 changes: 17 additions & 2 deletions app/notifications/notifications_ses_callback.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from flask import current_app

from app.celery.queue_utils import get_message_group_id_for_queue
from app.celery.service_callback_tasks import (
create_complaint_callback_data,
create_delivery_status_callback_data,
Expand Down Expand Up @@ -73,8 +74,15 @@ def check_and_queue_callback_task(notification):
service_callback_api = get_delivery_status_callback_api_for_service(service_id=notification.service_id)
if service_callback_api:
notification_data = create_delivery_status_callback_data(notification, service_callback_api)
queue_name = QueueNames.CALLBACKS
message_group_kwargs = get_message_group_id_for_queue(
queue_name=queue_name,
service_id=str(notification.service_id),
notification_type=notification.notification_type,
)

send_delivery_status_to_service.apply_async(
[str(notification.id), notification_data], queue=QueueNames.CALLBACKS
[str(notification.id), notification_data], queue=queue_name, **message_group_kwargs
)


Expand All @@ -83,4 +91,11 @@ def _check_and_queue_complaint_callback_task(complaint, notification, recipient)
service_callback_api = get_complaint_callback_api_for_service(service_id=notification.service_id)
if service_callback_api:
complaint_data = create_complaint_callback_data(complaint, notification, service_callback_api, recipient)
send_complaint_to_service.apply_async([complaint_data], queue=QueueNames.CALLBACKS)
queue_name = QueueNames.CALLBACKS
message_group_kwargs = get_message_group_id_for_queue(
queue_name=queue_name,
service_id=str(notification.service_id),
notification_type=notification.notification_type,
)

send_complaint_to_service.apply_async([complaint_data], queue=queue_name, **message_group_kwargs)
14 changes: 13 additions & 1 deletion app/notifications/process_notifications.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
from datetime import datetime

from flask import current_app
from app.celery.queue_utils import get_message_group_id_for_queue
from app.dao import notifications_dao
from gds_metrics import Histogram
from notifications_utils.clients import redis
from notifications_utils.formatters import strip_and_remove_obscure_whitespace
Expand Down Expand Up @@ -232,8 +234,18 @@ def send_notification_to_queue_detached(key_type, notification_type, notificatio
queue = QueueNames.CREATE_LETTERS_PDF
deliver_task = get_pdf_for_templated_letter

message_group_kwargs = {}
if (current_app.config.get("ENABLE_SQS_FAIR_GROUPING", False)):
notification = notifications_dao.get_notification_by_id(notification_id)
Copy link
Contributor Author

@DilwoarH DilwoarH Dec 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't want to do this (calling the database) - this is put in as an example

message_group_kwargs = get_message_group_id_for_queue(
queue_name=queue,
service_id=str(notification.service_id),
origin=notification.template.origin,
key_type=key_type,
)

try:
deliver_task.apply_async([str(notification_id)], queue=queue)
deliver_task.apply_async([str(notification_id)], queue=queue, **message_group_kwargs)
except Exception:
dao_delete_notifications_by_id(notification_id)
raise
Expand Down
19 changes: 17 additions & 2 deletions app/notifications/receive_notifications.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from datetime import datetime
from urllib.parse import unquote

from app.celery.queue_utils import get_message_group_id_for_queue
import iso8601
from flask import Blueprint, abort, current_app, jsonify, request
from gds_metrics.metrics import Counter
Expand Down Expand Up @@ -69,8 +70,15 @@ def receive_mmg_sms():
provider_name="mmg",
)

queue_name = QueueNames.CALLBACKS
message_group_kwargs = get_message_group_id_for_queue(
queue_name=queue_name,
service_id=str(service.id),
notification_type=INBOUND_SMS_TYPE,
)

service_callback_tasks.send_inbound_sms_to_service.apply_async(
[str(inbound.id), str(service.id)], queue=QueueNames.CALLBACKS
[str(inbound.id), str(service.id)], queue=queue_name, **message_group_kwargs
)

current_app.logger.info(
Expand Down Expand Up @@ -115,8 +123,15 @@ def receive_firetext_sms():

INBOUND_SMS_COUNTER.labels("firetext").inc()

queue_name = QueueNames.CALLBACKS
message_group_kwargs = get_message_group_id_for_queue(
queue_name=queue_name,
service_id=str(service.id),
notification_type=INBOUND_SMS_TYPE,
)

service_callback_tasks.send_inbound_sms_to_service.apply_async(
[str(inbound.id), str(service.id)], queue=QueueNames.CALLBACKS
[str(inbound.id), str(service.id)], queue=queue_name, **message_group_kwargs
)
current_app.logger.info(
"%s received inbound SMS with reference %s from Firetext",
Expand Down
9 changes: 8 additions & 1 deletion app/v2/notifications/post_notifications.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from datetime import datetime

from flask import abort, current_app, jsonify, request
from app.celery.queue_utils import get_message_group_id_for_queue
from gds_metrics import Histogram

from app import (
Expand Down Expand Up @@ -318,6 +319,11 @@ def process_letter_notification(
updated_at = datetime.utcnow()

queue = QueueNames.CREATE_LETTERS_PDF if not test_key else QueueNames.RESEARCH_MODE
message_group_kwargs = get_message_group_id_for_queue(
queue_name=queue,
service_id=str(service.id),
key_type=api_key.key_type,
)

notification = create_letter_notification(
letter_data=letter_data,
Expand All @@ -330,12 +336,13 @@ def process_letter_notification(
postage=postage,
)

get_pdf_for_templated_letter.apply_async([str(notification.id)], queue=queue)
get_pdf_for_templated_letter.apply_async([str(notification.id)], queue=queue, **message_group_kwargs)

if test_key and current_app.config["TEST_LETTERS_FAKE_DELIVERY"]:
create_fake_letter_callback.apply_async(
[notification.id, notification.billable_units, notification.postage],
queue=queue,
**message_group_kwargs,
)

resp = create_response_for_post_notification(
Expand Down