From 0b4e59fecd8b9a58cb5d449d53510fc337774cf8 Mon Sep 17 00:00:00 2001 From: Robert Scott Date: Wed, 31 Dec 2025 12:28:54 +0000 Subject: [PATCH 1/2] TMP utils --- requirements.in | 2 +- requirements.txt | 2 +- requirements_for_test.txt | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/requirements.in b/requirements.in index 22b8ad41a4..fb13a7248a 100644 --- a/requirements.in +++ b/requirements.in @@ -24,7 +24,7 @@ psutil>=6.0.0,<7.0.0 notifications-python-client==10.0.1 # Run `make bump-utils` to update to the latest version -notifications-utils @ git+https://github.com/alphagov/notifications-utils.git@104.3.2 +notifications-utils @ git+https://github.com/alphagov/notifications-utils.git@e5c5f3ecd4e6d7753eea03ff824d195a34992ee2 # gds-metrics requires prometheseus 0.2.0, override that requirement as 0.7.1 brings significant performance gains prometheus-client==0.14.1 diff --git a/requirements.txt b/requirements.txt index 0c6f4558d3..2b0787f171 100644 --- a/requirements.txt +++ b/requirements.txt @@ -148,7 +148,7 @@ mistune==0.8.4 # via notifications-utils notifications-python-client==10.0.1 # via -r requirements.in -notifications-utils @ git+https://github.com/alphagov/notifications-utils.git@4e6442f18c3b7656b8006abd2139eade40f64156 +notifications-utils @ git+https://github.com/alphagov/notifications-utils.git@e5c5f3ecd4e6d7753eea03ff824d195a34992ee2 # via -r requirements.in ordered-set==4.1.0 # via notifications-utils diff --git a/requirements_for_test.txt b/requirements_for_test.txt index 0053582bce..4770a6cb64 100644 --- a/requirements_for_test.txt +++ b/requirements_for_test.txt @@ -220,7 +220,7 @@ mypy-extensions==1.1.0 # via mypy notifications-python-client==10.0.1 # via -r requirements.txt -notifications-utils @ git+https://github.com/alphagov/notifications-utils.git@4e6442f18c3b7656b8006abd2139eade40f64156 +notifications-utils @ git+https://github.com/alphagov/notifications-utils.git@e5c5f3ecd4e6d7753eea03ff824d195a34992ee2 # via -r requirements.txt ordered-set==4.1.0 # via From c6dd35f6c681a12cec63718d77e9c7987999c924 Mon Sep 17 00:00:00 2001 From: Robert Scott Date: Thu, 1 Jan 2026 19:13:34 +0000 Subject: [PATCH 2/2] TMP mgid --- app/celery/letters_pdf_tasks.py | 18 +++-- app/celery/scheduled_tasks.py | 20 ++++- app/celery/tasks.py | 39 ++++++--- app/job/rest.py | 7 +- .../notifications_ses_callback.py | 14 +++- app/notifications/process_notifications.py | 14 +++- app/notifications/receive_notifications.py | 8 +- app/v2/notifications/post_notifications.py | 20 ++++- tests/app/celery/test_letters_pdf_tasks.py | 20 ++++- tests/app/celery/test_scheduled_tasks.py | 40 +++++++--- tests/app/celery/test_tasks.py | 79 ++++++++++++++++--- tests/app/job/test_rest.py | 10 ++- .../test_notifications_ses_callback.py | 4 +- .../test_process_notification.py | 47 ++++++----- .../test_receive_notification.py | 16 +++- tests/app/organisation/test_invite_rest.py | 6 +- tests/app/service/test_rest.py | 18 ++++- .../test_service_invite_rest.py | 6 +- tests/app/user/test_rest.py | 24 +++++- tests/app/user/test_rest_verify.py | 18 ++++- .../test_post_letter_notifications.py | 18 ++++- .../notifications/test_post_notifications.py | 16 +++- 22 files changed, 360 insertions(+), 102 deletions(-) diff --git a/app/celery/letters_pdf_tasks.py b/app/celery/letters_pdf_tasks.py index 087e7d67c9..9b2f3f03a9 100644 --- a/app/celery/letters_pdf_tasks.py +++ b/app/celery/letters_pdf_tasks.py @@ -92,7 +92,10 @@ def get_pdf_for_templated_letter(self, notification_id): encoded_data = signing.encode(letter_data) notify_celery.send_task( - name=TaskNames.CREATE_PDF_FOR_TEMPLATED_LETTER, args=(encoded_data,), queue=QueueNames.SANITISE_LETTERS + name=TaskNames.CREATE_PDF_FOR_TEMPLATED_LETTER, + args=(encoded_data,), + queue=QueueNames.SANITISE_LETTERS, + MessageGroupId=getattr(self, "message_group_id", None), ) except Exception as e: try: @@ -245,7 +248,7 @@ def send_letters_volume_email_to_dvla(letters_volumes, date): reply_to_text=reply_to, ) - send_notification_to_queue(saved_notification, queue=QueueNames.NOTIFY) + send_notification_to_queue(saved_notification, queue=QueueNames.NOTIFY, origin="scheduled") def send_dvla_letters_via_api(print_run_deadline_local, batch_size=100): @@ -294,6 +297,7 @@ def sanitise_letter(self, filename): "allow_international_letters": notification.service.has_permission(INTERNATIONAL_LETTERS), }, queue=QueueNames.SANITISE_LETTERS, + MessageGroupId=getattr(self, "message_group_id", None), ) except Exception: try: @@ -541,8 +545,8 @@ def replay_letters_in_error(filename=None): sanitise_letter.apply_async([filename], queue=QueueNames.LETTERS) -@notify_celery.task(name="resanitise-pdf") -def resanitise_pdf(notification_id): +@notify_celery.task(name="resanitise-pdf", bind=True) +def resanitise_pdf(self, notification_id): """ `notification_id` is the notification id for a PDF letter which was either uploaded or sent using the API. @@ -570,11 +574,12 @@ def resanitise_pdf(notification_id): "allow_international_letters": notification.service.has_permission(INTERNATIONAL_LETTERS), }, queue=QueueNames.SANITISE_LETTERS, + MessageGroupId=getattr(self, "message_group_id", None), ) -@notify_celery.task(name="resanitise-letter-attachment") -def resanitise_letter_attachment(service_id, attachment_id, original_filename): +@notify_celery.task(name="resanitise-letter-attachment", bind=True) +def resanitise_letter_attachment(self, service_id, attachment_id, original_filename): """ `service_id` is the service id for a PDF letter attachment/template. `attachment_id` is the attachment id for a PDF letter attachment which was uploaded for a template. @@ -593,4 +598,5 @@ def resanitise_letter_attachment(service_id, attachment_id, original_filename): "original_filename": original_filename, }, queue=QueueNames.SANITISE_LETTERS, + MessageGroupId=getattr(self, "message_group_id", None), ) diff --git a/app/celery/scheduled_tasks.py b/app/celery/scheduled_tasks.py index 7286ebdff5..37d5aac67b 100644 --- a/app/celery/scheduled_tasks.py +++ b/app/celery/scheduled_tasks.py @@ -96,7 +96,11 @@ def run_scheduled_jobs(): try: for job in dao_set_scheduled_jobs_to_pending(): - process_job.apply_async([str(job.id)], queue=QueueNames.JOBS) + process_job.apply_async( + [str(job.id)], + queue=QueueNames.JOBS, + MessageGroupId="#".join((str(job.service_id), job.template.template_type, "normal", "scheduled")), + ) current_app.logger.info("Job ID %s added to process job queue", job.id, extra={"job_id": job.id}) except SQLAlchemyError: current_app.logger.exception("Failed to run scheduled jobs") @@ -323,7 +327,7 @@ def replay_created_notifications() -> None: extra, extra=extra, ) - send_notification_to_queue(notification=n) + send_notification_to_queue(notification=n, origin="") # if the letter has not be sent after an hour, then create a zendesk ticket letters = letters_missing_from_sending_bucket(grace_period, session=db.session_bulk, inner_retry_attempts=2) @@ -335,7 +339,11 @@ def replay_created_notifications() -> None: letter.id, extra={"notification_id": letter.id}, ) - get_pdf_for_templated_letter.apply_async([str(letter.id)], queue=QueueNames.CREATE_LETTERS_PDF) + get_pdf_for_templated_letter.apply_async( + [str(letter.id)], + queue=QueueNames.CREATE_LETTERS_PDF, + MessageGroupId="#".join((str(letter.service_id), letter.notification_type, letter.key_type, "")), + ) @notify_celery.task(name="check-if-letters-still-pending-virus-check") @@ -438,7 +446,11 @@ def check_for_missing_rows_in_completed_jobs(): extra = {"job_row_number": row_to_process.missing_row, "job_id": job.id} current_app.logger.info("Processing missing row %(job_row_number)s for job %(job_id)s", extra, extra=extra) - process_job_row(template.template_type, task_args_kwargs) + process_job_row( + template.template_type, + task_args_kwargs, + "#".join((str(job.service_id), template.template_type, "normal", "")), + ) @notify_celery.task(name="update-status-of-fully-processed-jobs") diff --git a/app/celery/tasks.py b/app/celery/tasks.py index 33f6a97a02..2db2ac9150 100644 --- a/app/celery/tasks.py +++ b/app/celery/tasks.py @@ -68,8 +68,8 @@ class ProcessReportRequestException(Exception): pass -@notify_celery.task(name="process-job") -def process_job(job_id, sender_id=None, shatter_batch_size=DEFAULT_SHATTER_JOB_ROWS_BATCH_SIZE): +@notify_celery.task(name="process-job", bind=True) +def process_job(self, job_id, sender_id=None, shatter_batch_size=DEFAULT_SHATTER_JOB_ROWS_BATCH_SIZE): start = datetime.utcnow() job = dao_get_job_by_id(job_id) current_app.logger.info( @@ -116,12 +116,14 @@ def process_job(job_id, sender_id=None, shatter_batch_size=DEFAULT_SHATTER_JOB_R get_id_task_args_kwargs_for_job_row(row, template, job, service, sender_id=sender_id)[1] for row in shatter_batch ] - _shatter_job_rows_with_subdivision(template.template_type, batch_args_kwargs) + _shatter_job_rows_with_subdivision( + template.template_type, batch_args_kwargs, getattr(self, "message_group_id", None) + ) job_complete(job, start=start) -def _shatter_job_rows_with_subdivision(template_type, args_kwargs_seq, top_level=True): +def _shatter_job_rows_with_subdivision(template_type, args_kwargs_seq, message_group_id, top_level=True): try: shatter_job_rows.apply_async( ( @@ -129,6 +131,7 @@ def _shatter_job_rows_with_subdivision(template_type, args_kwargs_seq, top_level args_kwargs_seq, ), queue=QueueNames.JOBS, + MessageGroupId=message_group_id, ) except BotoClientError as e: # this information is helpfully not preserved outside the message string of the exception, so @@ -146,7 +149,7 @@ def _shatter_job_rows_with_subdivision(template_type, args_kwargs_seq, top_level raise UnprocessableJobRow from e for sub_batch in (args_kwargs_seq[:split_batch_size], args_kwargs_seq[split_batch_size:]): - _shatter_job_rows_with_subdivision(template_type, sub_batch, top_level=False) + _shatter_job_rows_with_subdivision(template_type, sub_batch, message_group_id, top_level=False) else: if not top_level: @@ -156,16 +159,17 @@ def _shatter_job_rows_with_subdivision(template_type, args_kwargs_seq, top_level ) -@notify_celery.task(name="shatter-job-rows") +@notify_celery.task(name="shatter-job-rows", bind=True) def shatter_job_rows( + self, template_type: str, args_kwargs_seq: Sequence, ): for task_args_kwargs in args_kwargs_seq: - process_job_row(template_type, task_args_kwargs) + process_job_row(template_type, task_args_kwargs, getattr(self, "message_group_id", None)) -def process_job_row(template_type, task_args_kwargs): +def process_job_row(template_type, task_args_kwargs, message_group_id): send_fn = { SMS_TYPE: save_sms, EMAIL_TYPE: save_email, @@ -175,6 +179,7 @@ def process_job_row(template_type, task_args_kwargs): send_fn.apply_async( *task_args_kwargs, queue=QueueNames.DATABASE, + MessageGroupId=message_group_id, ) @@ -353,6 +358,7 @@ def save_sms( provider_tasks.deliver_sms.apply_async( [str(saved_notification.id)], queue=QueueNames.SEND_SMS, + MessageGroupId=getattr(self, "message_group_id", None), ) else: extra = { @@ -437,6 +443,7 @@ def save_email(self, service_id, notification_id, encoded_notification, sender_i provider_tasks.deliver_email.apply_async( [str(saved_notification.id)], queue=QueueNames.SEND_EMAIL, + MessageGroupId=getattr(self, "message_group_id", None), ) extra = { @@ -494,7 +501,9 @@ def save_letter( ) letters_pdf_tasks.get_pdf_for_templated_letter.apply_async( - [str(saved_notification.id)], queue=QueueNames.CREATE_LETTERS_PDF + [str(saved_notification.id)], + queue=QueueNames.CREATE_LETTERS_PDF, + MessageGroupId=getattr(self, "message_group_id", None), ) extra = { @@ -580,7 +589,11 @@ def process_incomplete_job(job_id, shatter_batch_size=DEFAULT_SHATTER_JOB_ROWS_B get_id_task_args_kwargs_for_job_row(row, template, job, job.service, sender_id=sender_id)[1] for row in shatter_batch ] - _shatter_job_rows_with_subdivision(template.template_type, batch_args_kwargs) + _shatter_job_rows_with_subdivision( + template.template_type, + batch_args_kwargs, + "#".join((str(job.service_id), job.template.template_type, "normal", "")), + ) job_complete(job, resumed=True) @@ -625,7 +638,11 @@ def _check_and_queue_returned_letter_callback_task(notification_id, service_id): # queue callback task only if the service_callback_api exists if service_callback_api := get_returned_letter_callback_api_for_service(service_id=service_id): returned_letter_data = create_returned_letter_callback_data(notification_id, service_id, service_callback_api) - send_returned_letter_to_service.apply_async([returned_letter_data], queue=QueueNames.CALLBACKS) + send_returned_letter_to_service.apply_async( + [returned_letter_data], + queue=QueueNames.CALLBACKS, + MessageGroupId="#".join((str(service_id), LETTER_TYPE, "normal", "")), + ) @notify_celery.task(bind=True, name="process-report-request") diff --git a/app/job/rest.py b/app/job/rest.py index eb13487e6b..745ab47f07 100644 --- a/app/job/rest.py +++ b/app/job/rest.py @@ -176,7 +176,12 @@ def create_job(service_id): sender_id = data.get("sender_id") if job.job_status == JOB_STATUS_PENDING: - process_job.apply_async([str(job.id)], {"sender_id": sender_id}, queue=QueueNames.JOBS) + process_job.apply_async( + [str(job.id)], + {"sender_id": sender_id}, + queue=QueueNames.JOBS, + MessageGroupId="#".join((str(job.service_id), template.template_type, "normal", "dashboard")), + ) job_json = job_schema.dump(job) job_json["statistics"] = [] diff --git a/app/notifications/notifications_ses_callback.py b/app/notifications/notifications_ses_callback.py index b57ea7cab6..9a15e4b2c8 100644 --- a/app/notifications/notifications_ses_callback.py +++ b/app/notifications/notifications_ses_callback.py @@ -74,7 +74,11 @@ def check_and_queue_callback_task(notification): if service_callback_api: notification_data = create_delivery_status_callback_data(notification, service_callback_api) send_delivery_status_to_service.apply_async( - [str(notification.id), notification_data], queue=QueueNames.CALLBACKS + [str(notification.id), notification_data], + queue=QueueNames.CALLBACKS, + MessageGroupId="#".join( + (str(notification.service_id), notification.notification_type, notification.key_type, "") + ), ) @@ -83,4 +87,10 @@ def _check_and_queue_complaint_callback_task(complaint, notification, recipient) service_callback_api = get_complaint_callback_api_for_service(service_id=notification.service_id) if service_callback_api: complaint_data = create_complaint_callback_data(complaint, notification, service_callback_api, recipient) - send_complaint_to_service.apply_async([complaint_data], queue=QueueNames.CALLBACKS) + send_complaint_to_service.apply_async( + [complaint_data], + queue=QueueNames.CALLBACKS, + MessageGroupId="#".join( + (str(notification.service_id), notification.notification_type, notification.key_type, "") + ), + ) diff --git a/app/notifications/process_notifications.py b/app/notifications/process_notifications.py index 7ad06e8239..b911ee5b65 100644 --- a/app/notifications/process_notifications.py +++ b/app/notifications/process_notifications.py @@ -218,7 +218,7 @@ def increment_daily_limit_cache(service_id, notification_type): redis_store.incr(cache_key) -def send_notification_to_queue_detached(key_type, notification_type, notification_id, queue=None): +def send_notification_to_queue_detached(key_type, notification_type, notification_id, message_group_id, queue=None): if key_type == KEY_TYPE_TEST: queue = QueueNames.RESEARCH_MODE @@ -236,14 +236,20 @@ def send_notification_to_queue_detached(key_type, notification_type, notificatio deliver_task = get_pdf_for_templated_letter try: - deliver_task.apply_async([str(notification_id)], queue=queue) + deliver_task.apply_async([str(notification_id)], queue=queue, MessageGroupId=message_group_id) except Exception: dao_delete_notifications_by_id(notification_id) raise -def send_notification_to_queue(notification, queue=None): - send_notification_to_queue_detached(notification.key_type, notification.notification_type, notification.id, queue) +def send_notification_to_queue(notification, queue=None, origin="dashboard"): + send_notification_to_queue_detached( + notification.key_type, + notification.notification_type, + notification.id, + "#".join((str(notification.service_id), notification.notification_type, notification.key_type, origin)), + queue, + ) def simulated_recipient(to_address, notification_type): diff --git a/app/notifications/receive_notifications.py b/app/notifications/receive_notifications.py index dcfd5eb91b..a01be2eeb5 100644 --- a/app/notifications/receive_notifications.py +++ b/app/notifications/receive_notifications.py @@ -70,7 +70,9 @@ def receive_mmg_sms(): ) service_callback_tasks.send_inbound_sms_to_service.apply_async( - [str(inbound.id), str(service.id)], queue=QueueNames.CALLBACKS + [str(inbound.id), str(service.id)], + queue=QueueNames.CALLBACKS, + MessageGroupId="#".join((str(service.id), INBOUND_SMS_TYPE, "normal", "")), ) current_app.logger.info( @@ -116,7 +118,9 @@ def receive_firetext_sms(): INBOUND_SMS_COUNTER.labels("firetext").inc() service_callback_tasks.send_inbound_sms_to_service.apply_async( - [str(inbound.id), str(service.id)], queue=QueueNames.CALLBACKS + [str(inbound.id), str(service.id)], + queue=QueueNames.CALLBACKS, + MessageGroupId="#".join((str(service.id), INBOUND_SMS_TYPE, "normal", "")), ) current_app.logger.info( "%s received inbound SMS with reference %s from Firetext", diff --git a/app/v2/notifications/post_notifications.py b/app/v2/notifications/post_notifications.py index 90496cfaf2..8f319d4480 100644 --- a/app/v2/notifications/post_notifications.py +++ b/app/v2/notifications/post_notifications.py @@ -228,6 +228,7 @@ def process_sms_or_email_notification( key_type=api_user.key_type, notification_type=notification_type, notification_id=notification_id, + message_group_id="#".join((str(service.id), notification_type, api_user.key_type, "api")), ) else: current_app.logger.info( @@ -297,9 +298,16 @@ def process_letter_notification( if service.restricted and api_key.key_type != KEY_TYPE_TEST: raise BadRequestError(message="Cannot send letters when service is in trial mode", status_code=403) + message_group_id = "#".join((str(service.id), LETTER_TYPE, api_key.key_type, "api")) + if precompiled: return process_precompiled_letter_notifications( - letter_data=letter_data, api_key=api_key, service=service, template=template, reply_to_text=reply_to_text + letter_data=letter_data, + api_key=api_key, + service=service, + template=template, + reply_to_text=reply_to_text, + message_group_id=message_group_id, ) postage = validate_address(service, letter_data["personalisation"]) @@ -330,12 +338,13 @@ def process_letter_notification( postage=postage, ) - get_pdf_for_templated_letter.apply_async([str(notification.id)], queue=queue) + get_pdf_for_templated_letter.apply_async([str(notification.id)], queue=queue, MessageGroupId=message_group_id) if test_key and current_app.config["TEST_LETTERS_FAKE_DELIVERY"]: create_fake_letter_callback.apply_async( [notification.id, notification.billable_units, notification.postage], queue=queue, + MessageGroupId=message_group_id, ) resp = create_response_for_post_notification( @@ -351,7 +360,9 @@ def process_letter_notification( return resp -def process_precompiled_letter_notifications(*, letter_data, api_key, service, template, reply_to_text): +def process_precompiled_letter_notifications( + *, letter_data, api_key, service, template, reply_to_text, message_group_id +): try: status = NOTIFICATION_PENDING_VIRUS_CHECK letter_content = base64.b64decode(letter_data["content"]) @@ -385,10 +396,11 @@ def process_precompiled_letter_notifications(*, letter_data, api_key, service, t name=TaskNames.SCAN_FILE, kwargs={"filename": filename}, queue=QueueNames.ANTIVIRUS, + MessageGroupId=message_group_id, ) else: # stub out antivirus in dev - sanitise_letter.apply_async([filename], queue=QueueNames.LETTERS) + sanitise_letter.apply_async([filename], queue=QueueNames.LETTERS, MessageGroupId=message_group_id) return resp diff --git a/tests/app/celery/test_letters_pdf_tasks.py b/tests/app/celery/test_letters_pdf_tasks.py index efdf78f151..7038aa474a 100644 --- a/tests/app/celery/test_letters_pdf_tasks.py +++ b/tests/app/celery/test_letters_pdf_tasks.py @@ -97,7 +97,10 @@ def test_get_pdf_for_templated_letter_happy_path(mocker, sample_letter_notificat } mock_celery.assert_called_once_with( - name=TaskNames.CREATE_PDF_FOR_TEMPLATED_LETTER, args=(ANY,), queue=QueueNames.SANITISE_LETTERS + name=TaskNames.CREATE_PDF_FOR_TEMPLATED_LETTER, + args=(ANY,), + queue=QueueNames.SANITISE_LETTERS, + MessageGroupId=None, ) actual_data = signing.decode(mock_celery.call_args.kwargs["args"][0]) @@ -391,8 +394,16 @@ def test_send_letters_volume_email_to_dvla(notify_db_session, mock_celery_task, emails_to_dvla = Notification.query.all() assert len(emails_to_dvla) == 2 send_mock.called = 2 - send_mock.assert_any_call([str(emails_to_dvla[0].id)], queue=QueueNames.NOTIFY) - send_mock.assert_any_call([str(emails_to_dvla[1].id)], queue=QueueNames.NOTIFY) + send_mock.assert_any_call( + [str(emails_to_dvla[0].id)], + queue=QueueNames.NOTIFY, + MessageGroupId=f"{current_app.config['NOTIFY_SERVICE_ID']}#email#normal#scheduled", + ) + send_mock.assert_any_call( + [str(emails_to_dvla[1].id)], + queue=QueueNames.NOTIFY, + MessageGroupId=f"{current_app.config['NOTIFY_SERVICE_ID']}#email#normal#scheduled", + ) for email in emails_to_dvla: assert str(email.template_id) == current_app.config["LETTERS_VOLUME_EMAIL_TEMPLATE_ID"] assert email.to in current_app.config["DVLA_EMAIL_ADDRESSES"] @@ -461,6 +472,7 @@ def test_sanitise_letter_calls_template_preview_sanitise_task( "allow_international_letters": expected_international_letters_allowed, }, queue=QueueNames.SANITISE_LETTERS, + MessageGroupId=None, ) @@ -891,6 +903,7 @@ def test_resanitise_pdf_calls_template_preview_with_letter_details( "allow_international_letters": expected_international_letters_allowed, }, queue=QueueNames.SANITISE_LETTERS, + MessageGroupId=None, ) @@ -913,4 +926,5 @@ def test_resanitise_letter_attachment_calls_template_preview_with_attachment_det "original_filename": original_filename, }, queue=QueueNames.SANITISE_LETTERS, + MessageGroupId=None, ) diff --git a/tests/app/celery/test_scheduled_tasks.py b/tests/app/celery/test_scheduled_tasks.py index 1ec5aca1b1..82dffd0be3 100644 --- a/tests/app/celery/test_scheduled_tasks.py +++ b/tests/app/celery/test_scheduled_tasks.py @@ -98,7 +98,7 @@ def test_should_update_scheduled_jobs_and_put_on_queue(mock_celery_task, sample_ updated_job = dao_get_job_by_id(job.id) assert updated_job.job_status == "pending" - mocked.assert_called_with([str(job.id)], queue="job-tasks") + mocked.assert_called_with([str(job.id)], queue="job-tasks", MessageGroupId=f"{job.service_id}#sms#normal#scheduled") def test_should_update_all_scheduled_jobs_and_put_on_queue(sample_template, mock_celery_task): @@ -119,9 +119,9 @@ def test_should_update_all_scheduled_jobs_and_put_on_queue(sample_template, mock mocked.assert_has_calls( [ - call([str(job_3.id)], queue="job-tasks"), - call([str(job_2.id)], queue="job-tasks"), - call([str(job_1.id)], queue="job-tasks"), + call([str(job_3.id)], queue="job-tasks", MessageGroupId=f"{job_3.service_id}#sms#normal#scheduled"), + call([str(job_2.id)], queue="job-tasks", MessageGroupId=f"{job_2.service_id}#sms#normal#scheduled"), + call([str(job_1.id)], queue="job-tasks", MessageGroupId=f"{job_1.service_id}#sms#normal#scheduled"), ] ) @@ -430,8 +430,12 @@ def test_replay_created_notifications(sample_service, mock_celery_task): create_notification(template=email_template, created_at=datetime.utcnow(), status="created") replay_created_notifications() - email_delivery_queue.assert_called_once_with([str(old_email.id)], queue="send-email-tasks") - sms_delivery_queue.assert_called_once_with([str(old_sms.id)], queue="send-sms-tasks") + email_delivery_queue.assert_called_once_with( + [str(old_email.id)], queue="send-email-tasks", MessageGroupId=f"{old_email.service_id}#email#normal#" + ) + sms_delivery_queue.assert_called_once_with( + [str(old_sms.id)], queue="send-sms-tasks", MessageGroupId=f"{old_sms.service_id}#sms#normal#" + ) def test_replay_created_notifications_get_pdf_for_templated_letter_tasks_for_letters_not_ready_to_send( @@ -455,8 +459,16 @@ def test_replay_created_notifications_get_pdf_for_templated_letter_tasks_for_let replay_created_notifications() calls = [ - call([str(notification_1.id)], queue=QueueNames.CREATE_LETTERS_PDF), - call([str(notification_2.id)], queue=QueueNames.CREATE_LETTERS_PDF), + call( + [str(notification_1.id)], + queue=QueueNames.CREATE_LETTERS_PDF, + MessageGroupId=f"{notification_1.service_id}#letter#normal#", + ), + call( + [str(notification_2.id)], + queue=QueueNames.CREATE_LETTERS_PDF, + MessageGroupId=f"{notification_2.service_id}#letter#normal#", + ), ] mock_task.assert_has_calls(calls, any_order=True) @@ -724,7 +736,12 @@ def test_check_for_missing_rows_in_completed_jobs(mocker, sample_email_template, ) ] assert mock_save_email.mock_calls == [ - mock.call((str(job.service_id), "some-uuid", "something_encoded"), {}, queue="database-tasks") + mock.call( + (str(job.service_id), "some-uuid", "something_encoded"), + {}, + queue="database-tasks", + MessageGroupId=f"{job.service_id}#email#normal#", + ) ] @@ -765,7 +782,10 @@ def test_check_for_missing_rows_in_completed_jobs_uses_sender_id( ] assert mock_save_email.mock_calls == [ mock.call( - (str(job.service_id), "some-uuid", "something_encoded"), {"sender_id": fake_uuid}, queue="database-tasks" + (str(job.service_id), "some-uuid", "something_encoded"), + {"sender_id": fake_uuid}, + queue="database-tasks", + MessageGroupId=f"{job.service_id}#email#normal#", ) ] diff --git a/tests/app/celery/test_tasks.py b/tests/app/celery/test_tasks.py index 369b5cbd04..c1a1e05a95 100644 --- a/tests/app/celery/test_tasks.py +++ b/tests/app/celery/test_tasks.py @@ -154,6 +154,7 @@ def test_should_process_sms_job(sample_job, mocker, mock_celery_task): ], ), queue="job-tasks", + MessageGroupId=None, ) ] assert job.job_status == "finished" @@ -200,6 +201,7 @@ def test_should_process_sms_job_with_sender_id(sample_job, mocker, mock_celery_t ], ), queue="job-tasks", + MessageGroupId=None, ) ] assert job.job_status == "finished" @@ -298,6 +300,7 @@ def test_should_process_job_if_send_limits_are_not_exceeded(notify_api, notify_d ], ), queue="job-tasks", + MessageGroupId=None, ), call( ( @@ -309,6 +312,7 @@ def test_should_process_job_if_send_limits_are_not_exceeded(notify_api, notify_d ], ), queue="job-tasks", + MessageGroupId=None, ), call( ( @@ -320,6 +324,7 @@ def test_should_process_job_if_send_limits_are_not_exceeded(notify_api, notify_d ], ), queue="job-tasks", + MessageGroupId=None, ), call( ( @@ -329,6 +334,7 @@ def test_should_process_job_if_send_limits_are_not_exceeded(notify_api, notify_d ], ), queue="job-tasks", + MessageGroupId=None, ), ] assert mock_check_message_limit.mock_calls == [ @@ -404,6 +410,7 @@ def test_should_process_email_job(email_job_with_placeholders, mocker, mock_cele ], ), queue="job-tasks", + MessageGroupId=None, ) ] @@ -460,6 +467,7 @@ def test_should_process_email_job_with_sender_id(email_job_with_placeholders, mo ], ), queue="job-tasks", + MessageGroupId=None, ) ] @@ -520,6 +528,7 @@ def test_should_process_letter_job(sample_letter_job, mocker, mock_celery_task): ], ), queue="job-tasks", + MessageGroupId=None, ) ] @@ -606,6 +615,7 @@ def test_should_process_all_sms_job(sample_job_with_placeholdered_template, mock ], ), queue="job-tasks", + MessageGroupId=None, ), call( ( @@ -619,6 +629,7 @@ def test_should_process_all_sms_job(sample_job_with_placeholdered_template, mock ], ), queue="job-tasks", + MessageGroupId=None, ), ] @@ -709,6 +720,7 @@ def shatter_job_rows_side_effect(*args, **kwargs): ], ), queue="job-tasks", + MessageGroupId=None, ), # fails - splits & retries first half call( @@ -720,6 +732,7 @@ def shatter_job_rows_side_effect(*args, **kwargs): ], ), queue="job-tasks", + MessageGroupId=None, ), # succeeds - retries second half call( @@ -732,6 +745,7 @@ def shatter_job_rows_side_effect(*args, **kwargs): ], ), queue="job-tasks", + MessageGroupId=None, ), # fails - splits & retries first half call( @@ -742,6 +756,7 @@ def shatter_job_rows_side_effect(*args, **kwargs): ], ), queue="job-tasks", + MessageGroupId=None, ), # succeeds - retries second half call( @@ -753,6 +768,7 @@ def shatter_job_rows_side_effect(*args, **kwargs): ], ), queue="job-tasks", + MessageGroupId=None, ), # fails - splits & retries first half call( @@ -763,6 +779,7 @@ def shatter_job_rows_side_effect(*args, **kwargs): ], ), queue="job-tasks", + MessageGroupId=None, ), # fails & gives up because we can't split any further. # doesn't proceed to second top-level batch. @@ -865,6 +882,7 @@ def shatter_job_rows_side_effect(*args, **kwargs): ], ), queue="job-tasks", + MessageGroupId=None, ), # fails - splits & retries first half call( @@ -876,6 +894,7 @@ def shatter_job_rows_side_effect(*args, **kwargs): ], ), queue="job-tasks", + MessageGroupId=None, ), # fails - splits & retries first half call( @@ -886,6 +905,7 @@ def shatter_job_rows_side_effect(*args, **kwargs): ], ), queue="job-tasks", + MessageGroupId=None, ), # succeeds - retries second half call( @@ -896,6 +916,7 @@ def shatter_job_rows_side_effect(*args, **kwargs): ], ), queue="job-tasks", + MessageGroupId=None, ), # succeeds - unwinds and retries second half call( @@ -908,6 +929,7 @@ def shatter_job_rows_side_effect(*args, **kwargs): ], ), queue="job-tasks", + MessageGroupId=None, ), # succeeds, proceeds to second top-level batch call( @@ -922,6 +944,7 @@ def shatter_job_rows_side_effect(*args, **kwargs): ], ), queue="job-tasks", + MessageGroupId=None, ), # succeeds ] @@ -1097,16 +1120,19 @@ def test_shatter_job_rows(template_type, send_fn, mock_celery_task, mocker): ("service-id-0", "notification-id-0", "encoded-0"), {} if template_type == LETTER_TYPE else {"sender_id": "0"}, queue="database-tasks", + MessageGroupId=None, ), call( ("service-id-1", "notification-id-1", "encoded-1"), {} if template_type == LETTER_TYPE else {"sender_id": "1"}, queue="database-tasks", + MessageGroupId=None, ), call( ("service-id-2", "notification-id-2", "encoded-2"), {} if template_type == LETTER_TYPE else {"sender_id": "2"}, queue="database-tasks", + MessageGroupId=None, ), ] @@ -1146,7 +1172,11 @@ def test_should_send_template_to_correct_sms_task_and_persist( assert persisted_notification._personalisation == signing.encode({"name": "Jo"}) assert persisted_notification.notification_type == "sms" assert persisted_notification.client_reference == client_reference - mocked_task.assert_called_once_with([str(persisted_notification.id)], queue="send-sms-tasks") + mocked_task.assert_called_once_with( + [str(persisted_notification.id)], + queue="send-sms-tasks", + MessageGroupId=None, + ) @pytest.mark.parametrize("client_reference", [None, "ab1234"]) @@ -1202,7 +1232,9 @@ def test_should_save_sms_if_restricted_service_and_valid_number(notify_db_sessio assert not persisted_notification.personalisation assert persisted_notification.notification_type == "sms" provider_tasks.deliver_sms.apply_async.assert_called_once_with( - [str(persisted_notification.id)], queue="send-sms-tasks" + [str(persisted_notification.id)], + queue="send-sms-tasks", + MessageGroupId=None, ) @@ -1302,7 +1334,9 @@ def test_should_save_sms_template_to_and_persist_with_job_id(sample_job, mock_ce assert persisted_notification.notification_type == "sms" provider_tasks.deliver_sms.apply_async.assert_called_once_with( - [str(persisted_notification.id)], queue="send-sms-tasks" + [str(persisted_notification.id)], + queue="send-sms-tasks", + MessageGroupId=None, ) @@ -1373,7 +1407,9 @@ def test_should_use_email_template_and_persist( assert persisted_notification.client_reference == client_reference provider_tasks.deliver_email.apply_async.assert_called_once_with( - [str(persisted_notification.id)], queue="send-email-tasks" + [str(persisted_notification.id)], + queue="send-email-tasks", + MessageGroupId=None, ) @@ -1408,7 +1444,9 @@ def test_save_email_should_use_template_version_from_job_not_latest(sample_email assert not persisted_notification.sent_by assert persisted_notification.notification_type == "email" provider_tasks.deliver_email.apply_async.assert_called_once_with( - [str(persisted_notification.id)], queue="send-email-tasks" + [str(persisted_notification.id)], + queue="send-email-tasks", + MessageGroupId=None, ) @@ -1433,7 +1471,9 @@ def test_should_use_email_template_subject_placeholders(sample_email_template_wi assert not persisted_notification.reference assert persisted_notification.notification_type == "email" provider_tasks.deliver_email.apply_async.assert_called_once_with( - [str(persisted_notification.id)], queue="send-email-tasks" + [str(persisted_notification.id)], + queue="send-email-tasks", + MessageGroupId=None, ) @@ -1501,7 +1541,9 @@ def test_should_use_email_template_and_persist_without_personalisation(sample_em assert not persisted_notification.reference assert persisted_notification.notification_type == "email" provider_tasks.deliver_email.apply_async.assert_called_once_with( - [str(persisted_notification.id)], queue="send-email-tasks" + [str(persisted_notification.id)], + queue="send-email-tasks", + MessageGroupId=None, ) @@ -1634,7 +1676,9 @@ def test_send_email_with_template_email_files_from_old_template_version( ] provider_tasks.deliver_email.apply_async.assert_called_once_with( - [str(persisted_notification.id)], queue="send-email-tasks" + [str(persisted_notification.id)], + queue="send-email-tasks", + MessageGroupId=None, ) @@ -2085,7 +2129,11 @@ def test_save_letter_calls_get_pdf_for_templated_letter_task( ) assert mock_create_letters_pdf.called - mock_create_letters_pdf.assert_called_once_with([str(notification_id)], queue=QueueNames.CREATE_LETTERS_PDF) + mock_create_letters_pdf.assert_called_once_with( + [str(notification_id)], + queue=QueueNames.CREATE_LETTERS_PDF, + MessageGroupId=None, + ) def test_should_cancel_job_if_service_is_inactive(sample_service, sample_job, mocker, mock_celery_task): @@ -2247,6 +2295,7 @@ def test_process_incomplete_job_sms(mocker, mock_celery_task, sample_template): ], ), queue="job-tasks", + MessageGroupId=f"{job.service_id}#sms#normal#", ), call( ( @@ -2258,6 +2307,7 @@ def test_process_incomplete_job_sms(mocker, mock_celery_task, sample_template): ], ), queue="job-tasks", + MessageGroupId=f"{job.service_id}#sms#normal#", ), call( ( @@ -2268,6 +2318,7 @@ def test_process_incomplete_job_sms(mocker, mock_celery_task, sample_template): ], ), queue="job-tasks", + MessageGroupId=f"{job.service_id}#sms#normal#", ), ] @@ -2401,6 +2452,7 @@ def test_process_incomplete_jobs_sms(mocker, mock_celery_task, sample_template): ], ), queue="job-tasks", + MessageGroupId=f"{job.service_id}#sms#normal#", ), call( ( @@ -2433,6 +2485,7 @@ def test_process_incomplete_jobs_sms(mocker, mock_celery_task, sample_template): ], ), queue="job-tasks", + MessageGroupId=f"{job.service_id}#sms#normal#", ), call( ( @@ -2473,6 +2526,7 @@ def test_process_incomplete_jobs_sms(mocker, mock_celery_task, sample_template): ], ), queue="job-tasks", + MessageGroupId=f"{job.service_id}#sms#normal#", ), call( ( @@ -2489,6 +2543,7 @@ def test_process_incomplete_jobs_sms(mocker, mock_celery_task, sample_template): ], ), queue="job-tasks", + MessageGroupId=f"{job.service_id}#sms#normal#", ), ] @@ -2586,6 +2641,7 @@ def shatter_job_rows_side_effect(*args, **kwargs): ], ), queue="job-tasks", + MessageGroupId=f"{job.service_id}#sms#normal#", ), # fails - splits & retries first half call( @@ -2603,6 +2659,7 @@ def shatter_job_rows_side_effect(*args, **kwargs): ], ), queue="job-tasks", + MessageGroupId=f"{job.service_id}#sms#normal#", ), # succeeds - retries second half call( @@ -2628,6 +2685,7 @@ def shatter_job_rows_side_effect(*args, **kwargs): ], ), queue="job-tasks", + MessageGroupId=f"{job.service_id}#sms#normal#", ), # fails - splits & retries first half call( @@ -2645,6 +2703,7 @@ def shatter_job_rows_side_effect(*args, **kwargs): ], ), queue="job-tasks", + MessageGroupId=f"{job.service_id}#sms#normal#", ), # fails - gives up on this job & proceeds to next call( @@ -2678,6 +2737,7 @@ def shatter_job_rows_side_effect(*args, **kwargs): ], ), queue="job-tasks", + MessageGroupId=f"{job2.service_id}#sms#normal#", ), call( ( @@ -2702,6 +2762,7 @@ def shatter_job_rows_side_effect(*args, **kwargs): ], ), queue="job-tasks", + MessageGroupId=f"{job2.service_id}#sms#normal#", ), ] diff --git a/tests/app/job/test_rest.py b/tests/app/job/test_rest.py index d1ccef03a9..829e7308a7 100644 --- a/tests/app/job/test_rest.py +++ b/tests/app/job/test_rest.py @@ -152,7 +152,10 @@ def test_create_unscheduled_job(client, sample_template, mocker, mock_celery_tas assert response.status_code == 201 app.celery.tasks.process_job.apply_async.assert_called_once_with( - ([str(fake_uuid)]), {"sender_id": None}, queue="job-tasks" + ([str(fake_uuid)]), + {"sender_id": None}, + queue="job-tasks", + MessageGroupId=f"{sample_template.service_id}#{sample_template.template_type}#normal#dashboard", ) resp_json = json.loads(response.get_data(as_text=True)) @@ -193,7 +196,10 @@ def test_create_unscheduled_job_with_sender_id_in_metadata( assert response.status_code == 201 app.celery.tasks.process_job.apply_async.assert_called_once_with( - ([str(fake_uuid)]), {"sender_id": fake_uuid}, queue="job-tasks" + ([str(fake_uuid)]), + {"sender_id": fake_uuid}, + queue="job-tasks", + MessageGroupId=f"{sample_template.service_id}#{sample_template.template_type}#normal#dashboard", ) diff --git a/tests/app/notifications/test_notifications_ses_callback.py b/tests/app/notifications/test_notifications_ses_callback.py index 2b4327ad95..c930d38edf 100644 --- a/tests/app/notifications/test_notifications_ses_callback.py +++ b/tests/app/notifications/test_notifications_ses_callback.py @@ -91,7 +91,9 @@ def test_check_and_queue_callback_task(mocker, mock_celery_task, sample_notifica assert mock_create_args[1].id == callback_api.id mock_send.assert_called_once_with( - [str(sample_notification.id), mock_create.return_value], queue="service-callbacks" + [str(sample_notification.id), mock_create.return_value], + queue="service-callbacks", + MessageGroupId=f"{sample_notification.service_id}#{sample_notification.notification_type}#{sample_notification.key_type}#", ) diff --git a/tests/app/notifications/test_process_notification.py b/tests/app/notifications/test_process_notification.py index 1575a6f632..42be6deb12 100644 --- a/tests/app/notifications/test_process_notification.py +++ b/tests/app/notifications/test_process_notification.py @@ -1,6 +1,5 @@ import datetime import uuid -from collections import namedtuple from unittest.mock import call import pytest @@ -11,6 +10,8 @@ ) from sqlalchemy.exc import SQLAlchemyError +from app.celery.letters_pdf_tasks import get_pdf_for_templated_letter +from app.celery.provider_tasks import deliver_email, deliver_sms from app.constants import ( EMAIL_TYPE, KEY_TYPE_NORMAL, @@ -626,42 +627,43 @@ def test_persist_notification_increments_cache_for_international_sms_if_the_cach @pytest.mark.parametrize( ("requested_queue, notification_type, key_type, expected_queue, expected_task"), [ - (None, SMS_TYPE, "normal", "send-sms-tasks", "provider_tasks.deliver_sms"), - (None, EMAIL_TYPE, "normal", "send-email-tasks", "provider_tasks.deliver_email"), - (None, SMS_TYPE, "team", "send-sms-tasks", "provider_tasks.deliver_sms"), + (None, SMS_TYPE, "normal", "send-sms-tasks", deliver_sms), + (None, EMAIL_TYPE, "normal", "send-email-tasks", deliver_email), + (None, SMS_TYPE, "team", "send-sms-tasks", deliver_sms), ( None, LETTER_TYPE, "normal", "create-letters-pdf-tasks", - "letters_pdf_tasks.get_pdf_for_templated_letter", + get_pdf_for_templated_letter, ), - (None, SMS_TYPE, "test", "research-mode-tasks", "provider_tasks.deliver_sms"), + (None, SMS_TYPE, "test", "research-mode-tasks", deliver_sms), ( "notify-internal-tasks", SMS_TYPE, "normal", "notify-internal-tasks", - "provider_tasks.deliver_sms", + deliver_sms, ), ( "notify-internal-tasks", EMAIL_TYPE, "normal", "notify-internal-tasks", - "provider_tasks.deliver_email", + deliver_email, ), ( "notify-internal-tasks", SMS_TYPE, "test", "research-mode-tasks", - "provider_tasks.deliver_sms", + deliver_sms, ), ], ) def test_send_notification_to_queue( notify_db_session, + mock_celery_task, requested_queue, notification_type, key_type, @@ -669,28 +671,35 @@ def test_send_notification_to_queue( expected_task, mocker, ): - mocked = mocker.patch(f"app.celery.{expected_task}.apply_async") - Notification = namedtuple("Notification", ["id", "key_type", "notification_type", "created_at"]) - notification = Notification( + mocked = mock_celery_task(expected_task) + notification = mocker.Mock(spec=Notification) + notification.configure_mock( id=uuid.uuid4(), key_type=key_type, notification_type=notification_type, created_at=datetime.datetime(2016, 11, 11, 16, 8, 18), + service_id=uuid.uuid4(), ) send_notification_to_queue(notification=notification, queue=requested_queue) - mocked.assert_called_once_with([str(notification.id)], queue=expected_queue) + mocked.assert_called_once_with( + [str(notification.id)], + queue=expected_queue, + MessageGroupId=f"{notification.service_id}#{notification_type}#{key_type}#dashboard", + ) -def test_send_notification_to_queue_throws_exception_deletes_notification(sample_notification, mocker): - mocked = mocker.patch( - "app.celery.provider_tasks.deliver_sms.apply_async", - side_effect=Boto3Error("EXPECTED"), - ) +def test_send_notification_to_queue_throws_exception_deletes_notification(sample_notification, mock_celery_task): + mocked = mock_celery_task(deliver_sms) + mocked.side_effect = Boto3Error("EXPECTED") with pytest.raises(Boto3Error): send_notification_to_queue(sample_notification) - mocked.assert_called_once_with([str(sample_notification.id)], queue="send-sms-tasks") + mocked.assert_called_once_with( + [str(sample_notification.id)], + queue="send-sms-tasks", + MessageGroupId=f"{sample_notification.service_id}#sms#{sample_notification.key_type}#dashboard", + ) assert Notification.query.count() == 0 assert NotificationHistory.query.count() == 0 diff --git a/tests/app/notifications/test_receive_notification.py b/tests/app/notifications/test_receive_notification.py index 2ac1d230b7..6eb409e38a 100644 --- a/tests/app/notifications/test_receive_notification.py +++ b/tests/app/notifications/test_receive_notification.py @@ -72,7 +72,9 @@ def test_receive_notification_returns_received_to_mmg(client, mocker, sample_ser inbound_sms_id = InboundSms.query.all()[0].id mocked.assert_called_once_with( - [str(inbound_sms_id), str(sample_service_full_permissions.id)], queue="service-callbacks" + [str(inbound_sms_id), str(sample_service_full_permissions.id)], + queue="service-callbacks", + MessageGroupId=f"{sample_service_full_permissions.id}#inbound_sms#normal#", ) @@ -340,7 +342,11 @@ def test_receive_notification_returns_received_to_firetext(notify_db_session, cl assert result["status"] == "ok" inbound_sms_id = InboundSms.query.all()[0].id - mocked.assert_called_once_with([str(inbound_sms_id), str(service.id)], queue="service-callbacks") + mocked.assert_called_once_with( + [str(inbound_sms_id), str(service.id)], + queue="service-callbacks", + MessageGroupId=f"{service.id}#inbound_sms#normal#", + ) def test_receive_notification_from_firetext_persists_message(notify_db_session, client, mocker): @@ -368,7 +374,11 @@ def test_receive_notification_from_firetext_persists_message(notify_db_session, assert persisted.content == "this is a message" assert persisted.provider == "firetext" assert persisted.provider_date == datetime(2017, 1, 1, 12, 0, 0, 0) - mocked.assert_called_once_with([str(persisted.id), str(service.id)], queue="service-callbacks") + mocked.assert_called_once_with( + [str(persisted.id), str(service.id)], + queue="service-callbacks", + MessageGroupId=f"{service.id}#inbound_sms#normal#", + ) def test_receive_notification_from_firetext_persists_message_with_normalized_phone(notify_db_session, client, mocker): diff --git a/tests/app/organisation/test_invite_rest.py b/tests/app/organisation/test_invite_rest.py index 06ed5c0000..ac811368e3 100644 --- a/tests/app/organisation/test_invite_rest.py +++ b/tests/app/organisation/test_invite_rest.py @@ -69,7 +69,11 @@ def test_create_invited_org_user( assert notification.personalisation["url"].startswith(expected_start_of_invite_url.format(hostnames=hostnames)) assert len(notification.personalisation["url"]) > len(expected_start_of_invite_url.format(hostnames=hostnames)) - mocked.assert_called_once_with([(str(notification.id))], queue="notify-internal-tasks") + mocked.assert_called_once_with( + [(str(notification.id))], + queue="notify-internal-tasks", + MessageGroupId=f"{notification.service_id}#email#normal#dashboard", + ) def test_create_invited_user_invalid_email(admin_request, sample_organisation, sample_user, mocker): diff --git a/tests/app/service/test_rest.py b/tests/app/service/test_rest.py index c4d129fd52..c80b037972 100644 --- a/tests/app/service/test_rest.py +++ b/tests/app/service/test_rest.py @@ -2598,7 +2598,11 @@ def test_verify_reply_to_email_address_should_send_verification_email( notification = Notification.query.first() assert notification.template_id == verify_reply_to_address_email_template.id assert response["data"] == {"id": str(notification.id)} - mocked.assert_called_once_with([str(notification.id)], queue="notify-internal-tasks") + mocked.assert_called_once_with( + [str(notification.id)], + queue="notify-internal-tasks", + MessageGroupId=f"{notification.service_id}#email#normal#dashboard", + ) assert notification.reply_to_text == notify_service.get_default_reply_to_email_address() @@ -4594,7 +4598,11 @@ def test_update_service_join_request_by_id_notification_sent( ) notification = Notification.query.first() - mock_deliver_email_task.assert_called_once_with(([str(notification.id)]), queue="notify-internal-tasks") + mock_deliver_email_task.assert_called_once_with( + ([str(notification.id)]), + queue="notify-internal-tasks", + MessageGroupId=f"{notification.service_id}#email#normal#dashboard", + ) assert notification.reply_to_text == notify_service.get_default_reply_to_email_address() assert notification.to == f"{requester_id}@digital.cabinet-office.gov.uk" @@ -4640,7 +4648,11 @@ def test_update_service_join_request_get_template( ) notification = Notification.query.first() - mock_deliver_email_task.assert_called_once_with(([str(notification.id)]), queue="notify-internal-tasks") + mock_deliver_email_task.assert_called_once_with( + ([str(notification.id)]), + queue="notify-internal-tasks", + MessageGroupId=f"{notification.service_id}#email#normal#dashboard", + ) assert notification.template.version == template.version assert notification.template_id == template.id diff --git a/tests/app/service_invite/test_service_invite_rest.py b/tests/app/service_invite/test_service_invite_rest.py index 47c9ac6dc0..e478d0abcf 100644 --- a/tests/app/service_invite/test_service_invite_rest.py +++ b/tests/app/service_invite/test_service_invite_rest.py @@ -65,7 +65,11 @@ def test_create_invited_user( assert len(notification.personalisation["url"]) > len(expected_start_of_invite_url.format(hostnames=hostnames)) assert str(notification.template_id) == current_app.config["INVITATION_EMAIL_TEMPLATE_ID"] - mocked.assert_called_once_with([(str(notification.id))], queue="notify-internal-tasks") + mocked.assert_called_once_with( + [(str(notification.id))], + queue="notify-internal-tasks", + MessageGroupId=f"{notification.service_id}#email#normal#dashboard", + ) def test_create_invited_user_without_auth_type(admin_request, sample_service, mocker, invitation_email_template): diff --git a/tests/app/user/test_rest.py b/tests/app/user/test_rest.py index afb1b341d1..ff85f6614b 100644 --- a/tests/app/user/test_rest.py +++ b/tests/app/user/test_rest.py @@ -659,7 +659,11 @@ def test_send_user_reset_password_should_send_reset_password_link( ) notification = Notification.query.first() - mocked.assert_called_once_with([str(notification.id)], queue="notify-internal-tasks") + mocked.assert_called_once_with( + [str(notification.id)], + queue="notify-internal-tasks", + MessageGroupId=f"{notification.service_id}#email#normal#dashboard", + ) assert notification.reply_to_text == notify_service.get_default_reply_to_email_address() @@ -716,7 +720,11 @@ def test_send_user_reset_password_reset_password_link_contains_redirect_link_if_ notification = Notification.query.first() assert "?next=blob" in notification.content - mocked.assert_called_once_with([str(notification.id)], queue="notify-internal-tasks") + mocked.assert_called_once_with( + [str(notification.id)], + queue="notify-internal-tasks", + MessageGroupId=f"{notification.service_id}#email#normal#dashboard", + ) def test_send_user_reset_password_should_return_400_when_email_is_missing(admin_request, mocker): @@ -775,7 +783,11 @@ def test_send_already_registered_email(admin_request, sample_user, already_regis ) notification = Notification.query.first() - mocked.assert_called_once_with(([str(notification.id)]), queue="notify-internal-tasks") + mocked.assert_called_once_with( + ([str(notification.id)]), + queue="notify-internal-tasks", + MessageGroupId=f"{notification.service_id}#email#normal#dashboard", + ) assert notification.reply_to_text == notify_service.get_default_reply_to_email_address() @@ -807,7 +819,11 @@ def test_send_user_confirm_new_email_returns_204( ) notification = Notification.query.first() - mocked.assert_called_once_with(([str(notification.id)]), queue="notify-internal-tasks") + mocked.assert_called_once_with( + ([str(notification.id)]), + queue="notify-internal-tasks", + MessageGroupId=f"{notification.service_id}#email#normal#dashboard", + ) assert notification.reply_to_text == notify_service.get_default_reply_to_email_address() diff --git a/tests/app/user/test_rest_verify.py b/tests/app/user/test_rest_verify.py index e457f61eec..f967a856bd 100644 --- a/tests/app/user/test_rest_verify.py +++ b/tests/app/user/test_rest_verify.py @@ -211,7 +211,9 @@ def test_send_user_sms_code(client, sample_user, sms_code_template, mocker): assert notification.reply_to_text == notify_service.get_default_sms_sender() app.celery.provider_tasks.deliver_sms.apply_async.assert_called_once_with( - ([str(notification.id)]), queue="notify-internal-tasks" + ([str(notification.id)]), + queue="notify-internal-tasks", + MessageGroupId=f"{notification.service_id}#sms#normal#dashboard", ) @@ -236,7 +238,9 @@ def test_send_user_code_for_sms_with_optional_to_field(client, sample_user, sms_ notification = Notification.query.first() assert notification.to == to_number app.celery.provider_tasks.deliver_sms.apply_async.assert_called_once_with( - ([str(notification.id)]), queue="notify-internal-tasks" + ([str(notification.id)]), + queue="notify-internal-tasks", + MessageGroupId=f"{notification.service_id}#sms#normal#dashboard", ) @@ -307,7 +311,11 @@ def test_send_new_user_email_verification( assert resp.status_code == 204 notification = Notification.query.first() assert VerifyCode.query.count() == 0 - mocked.assert_called_once_with(([str(notification.id)]), queue="notify-internal-tasks") + mocked.assert_called_once_with( + ([str(notification.id)]), + queue="notify-internal-tasks", + MessageGroupId=f"{notification.service_id}#email#normal#dashboard", + ) assert notification.reply_to_text == notify_service.get_default_reply_to_email_address() assert notification.personalisation["name"] == "Test User" assert notification.personalisation["url"].startswith(expected_url_starts_with.format(hostnames=hostnames)) @@ -413,7 +421,9 @@ def test_send_user_email_code( assert str(noti.template_id) == current_app.config["EMAIL_2FA_TEMPLATE_ID"] assert noti.personalisation["name"] == "Test User" assert noti.personalisation["url"].startswith(expected_auth_url.format(hostnames=hostnames)) - deliver_email.assert_called_once_with([str(noti.id)], queue="notify-internal-tasks") + deliver_email.assert_called_once_with( + [str(noti.id)], queue="notify-internal-tasks", MessageGroupId=f"{noti.service_id}#email#normal#dashboard" + ) def test_send_user_email_code_with_urlencoded_next_param(admin_request, mocker, sample_user, email_2fa_code_template): diff --git a/tests/app/v2/notifications/test_post_letter_notifications.py b/tests/app/v2/notifications/test_post_letter_notifications.py index 751c72bc12..e3a9cc208b 100644 --- a/tests/app/v2/notifications/test_post_letter_notifications.py +++ b/tests/app/v2/notifications/test_post_letter_notifications.py @@ -71,7 +71,11 @@ def test_post_letter_notification_returns_201(api_client_request, sample_letter_ ) assert not resp_json["scheduled_for"] assert not notification.reply_to_text - mock.assert_called_once_with([str(notification.id)], queue=QueueNames.CREATE_LETTERS_PDF) + mock.assert_called_once_with( + [str(notification.id)], + queue=QueueNames.CREATE_LETTERS_PDF, + MessageGroupId=f"{notification.service_id}#letter#normal#api", + ) def test_post_letter_notification_sets_postage(api_client_request, notify_db_session, mocker): @@ -268,7 +272,9 @@ def test_post_letter_notification_with_test_key_creates_pdf_and_sets_status_to_d notification = Notification.query.one() - fake_create_letter_task.assert_called_once_with([str(notification.id)], queue="research-mode-tasks") + fake_create_letter_task.assert_called_once_with( + [str(notification.id)], queue="research-mode-tasks", MessageGroupId=f"{notification.service_id}#letter#test#api" + ) assert not fake_create_dvla_response_task.called assert notification.status == NOTIFICATION_DELIVERED assert notification.updated_at is not None @@ -302,7 +308,9 @@ def test_post_letter_notification_with_test_key_creates_pdf_and_sets_status_to_s notification = Notification.query.one() - fake_create_letter_task.assert_called_once_with([str(notification.id)], queue="research-mode-tasks") + fake_create_letter_task.assert_called_once_with( + [str(notification.id)], queue="research-mode-tasks", MessageGroupId=f"{notification.service_id}#letter#test#api" + ) assert fake_create_dvla_response_task.called assert notification.status == NOTIFICATION_SENDING @@ -543,7 +551,9 @@ def test_post_letter_notification_is_delivered_but_still_creates_pdf_if_in_trial notification = Notification.query.one() assert notification.status == NOTIFICATION_DELIVERED - fake_create_letter_task.assert_called_once_with([str(notification.id)], queue="research-mode-tasks") + fake_create_letter_task.assert_called_once_with( + [str(notification.id)], queue="research-mode-tasks", MessageGroupId=f"{notification.service_id}#letter#test#api" + ) def test_post_letter_notification_is_delivered_and_has_pdf_uploaded_to_test_letters_bucket_using_test_key( diff --git a/tests/app/v2/notifications/test_post_notifications.py b/tests/app/v2/notifications/test_post_notifications.py index 49e94fd212..45f5053c15 100644 --- a/tests/app/v2/notifications/test_post_notifications.py +++ b/tests/app/v2/notifications/test_post_notifications.py @@ -92,7 +92,9 @@ def test_post_sms_notification_uses_inbound_number_as_sender(api_client_request, assert resp_json["id"] == str(notification_id) assert resp_json["content"]["from_number"] == "1" assert notifications[0].reply_to_text == "1" - mocked.assert_called_once_with([str(notification_id)], queue="send-sms-tasks") + mocked.assert_called_once_with( + [str(notification_id)], queue="send-sms-tasks", MessageGroupId=f"{service.id}#sms#normal#api" + ) def test_post_sms_notification_uses_inbound_number_reply_to_as_sender(api_client_request, notify_db_session, mocker): @@ -116,7 +118,9 @@ def test_post_sms_notification_uses_inbound_number_reply_to_as_sender(api_client assert resp_json["id"] == str(notification_id) assert resp_json["content"]["from_number"] == "447123123123" assert notifications[0].reply_to_text == "447123123123" - mocked.assert_called_once_with([str(notification_id)], queue="send-sms-tasks") + mocked.assert_called_once_with( + [str(notification_id)], queue="send-sms-tasks", MessageGroupId=f"{service.id}#sms#normal#api" + ) def test_post_sms_notification_returns_201_with_sms_sender_id( @@ -143,7 +147,9 @@ def test_post_sms_notification_returns_201_with_sms_sender_id( notifications = Notification.query.all() assert len(notifications) == 1 assert notifications[0].reply_to_text == sms_sender.sms_sender - mocked.assert_called_once_with([resp_json["id"]], queue="send-sms-tasks") + mocked.assert_called_once_with( + [resp_json["id"]], queue="send-sms-tasks", MessageGroupId=f"{notifications[0].service_id}#sms#normal#api" + ) def test_post_sms_notification_uses_sms_sender_id_reply_to( @@ -171,7 +177,9 @@ def test_post_sms_notification_uses_sms_sender_id_reply_to( notifications = Notification.query.all() assert len(notifications) == 1 assert notifications[0].reply_to_text == "447123123123" - mocked.assert_called_once_with([resp_json["id"]], queue="send-sms-tasks") + mocked.assert_called_once_with( + [resp_json["id"]], queue="send-sms-tasks", MessageGroupId=f"{notifications[0].service_id}#sms#normal#api" + ) def test_notification_reply_to_text_is_original_value_if_sender_is_changed_after_post_notification(