diff --git a/app/celery/letters_pdf_tasks.py b/app/celery/letters_pdf_tasks.py index 087e7d67c9..2d6fe49d32 100644 --- a/app/celery/letters_pdf_tasks.py +++ b/app/celery/letters_pdf_tasks.py @@ -92,7 +92,10 @@ def get_pdf_for_templated_letter(self, notification_id): encoded_data = signing.encode(letter_data) notify_celery.send_task( - name=TaskNames.CREATE_PDF_FOR_TEMPLATED_LETTER, args=(encoded_data,), queue=QueueNames.SANITISE_LETTERS + name=TaskNames.CREATE_PDF_FOR_TEMPLATED_LETTER, + args=(encoded_data,), + queue=QueueNames.SANITISE_LETTERS, + MessageGroupId=self.message_group_id if self.message_group_id is not None else str(notification.service_id), ) except Exception as e: try: @@ -294,6 +297,7 @@ def sanitise_letter(self, filename): "allow_international_letters": notification.service.has_permission(INTERNATIONAL_LETTERS), }, queue=QueueNames.SANITISE_LETTERS, + MessageGroupId=self.message_group_id, ) except Exception: try: @@ -541,8 +545,8 @@ def replay_letters_in_error(filename=None): sanitise_letter.apply_async([filename], queue=QueueNames.LETTERS) -@notify_celery.task(name="resanitise-pdf") -def resanitise_pdf(notification_id): +@notify_celery.task(bind=True, name="resanitise-pdf") +def resanitise_pdf(self, notification_id): """ `notification_id` is the notification id for a PDF letter which was either uploaded or sent using the API. @@ -570,11 +574,12 @@ def resanitise_pdf(notification_id): "allow_international_letters": notification.service.has_permission(INTERNATIONAL_LETTERS), }, queue=QueueNames.SANITISE_LETTERS, + MessageGroupId=self.message_group_id if self.message_group_id is not None else str(notification.service_id), ) -@notify_celery.task(name="resanitise-letter-attachment") -def resanitise_letter_attachment(service_id, attachment_id, original_filename): +@notify_celery.task(bind=True, name="resanitise-letter-attachment") +def resanitise_letter_attachment(self, service_id, attachment_id, original_filename): """ `service_id` is the service id for a PDF letter attachment/template. `attachment_id` is the attachment id for a PDF letter attachment which was uploaded for a template. @@ -593,4 +598,5 @@ def resanitise_letter_attachment(service_id, attachment_id, original_filename): "original_filename": original_filename, }, queue=QueueNames.SANITISE_LETTERS, + MessageGroupId=self.message_group_id if self.message_group_id is not None else str(service_id), ) diff --git a/app/celery/nightly_tasks.py b/app/celery/nightly_tasks.py index 33336c7096..ecc13a68b9 100644 --- a/app/celery/nightly_tasks.py +++ b/app/celery/nightly_tasks.py @@ -82,8 +82,16 @@ def _remove_csv_files(job_types): @notify_celery.task(name="archive-unsubscribe-requests") def archive_unsubscribe_requests(): for service_id in get_service_ids_with_unsubscribe_requests(): - archive_batched_unsubscribe_requests.apply_async(queue=QueueNames.REPORTING, args=[service_id]) - archive_old_unsubscribe_requests.apply_async(queue=QueueNames.REPORTING, args=[service_id]) + archive_batched_unsubscribe_requests.apply_async( + queue=QueueNames.REPORTING, + args=[service_id], + MessageGroupId=str(service_id), + ) + archive_old_unsubscribe_requests.apply_async( + queue=QueueNames.REPORTING, + args=[service_id], + MessageGroupId=str(service_id), + ) @notify_celery.task(name="archive-batched-unsubscribe-requests") @@ -163,6 +171,7 @@ def _delete_notifications_older_than_retention_by_type( "datetime_to_delete_before": day_to_delete_backwards_from, }, countdown=(i / len(flexible_data_retention)) * stagger_total_period.seconds, + MessageGroupId=str(f.service_id), ) seven_days_ago = get_london_midnight_in_utc(convert_utc_to_bst(datetime.utcnow()).date() - timedelta(days=7)) @@ -185,6 +194,7 @@ def _delete_notifications_older_than_retention_by_type( "datetime_to_delete_before": seven_days_ago, }, countdown=(i / len(service_ids_to_purge)) * stagger_total_period.seconds, + MessageGroupId=str(service_id), ) extra = { @@ -204,8 +214,8 @@ def _delete_notifications_older_than_retention_by_type( ) -@notify_celery.task(name="delete-notifications-for-service-and-type") -def delete_notifications_for_service_and_type(service_id, notification_type, datetime_to_delete_before): +@notify_celery.task(bind=True, name="delete-notifications-for-service-and-type") +def delete_notifications_for_service_and_type(self, service_id, notification_type, datetime_to_delete_before): start = datetime.utcnow() num_deleted = move_notifications_to_notification_history( notification_type, @@ -237,12 +247,14 @@ def delete_notifications_for_service_and_type(service_id, notification_type, dat delete_notifications_for_service_and_type.apply_async( args=(service_id, notification_type, datetime_to_delete_before), queue=QueueNames.REPORTING, + MessageGroupId=self.message_group_id, ) else: # now we've deleted all the real notifications, clean up the test notifications delete_test_notifications_for_service_and_type.apply_async( args=(service_id, notification_type, datetime_to_delete_before), queue=QueueNames.REPORTING, + MessageGroupId=self.message_group_id, ) @@ -254,6 +266,7 @@ def delete_test_notifications_for_service_and_type(service_id, notification_type delete_test_notifications_for_service_and_type.apply_async( args=(service_id, notification_type, datetime_to_delete_before), queue=QueueNames.REPORTING, + MessageGroupId=str(service_id), ) diff --git a/app/celery/reporting_tasks.py b/app/celery/reporting_tasks.py index c60ca11dd8..6867edc5ef 100644 --- a/app/celery/reporting_tasks.py +++ b/app/celery/reporting_tasks.py @@ -174,6 +174,7 @@ def create_nightly_notification_status(): "service_id": service_id, }, queue=QueueNames.REPORTING, + MessageGroupId=str(service_id), ) diff --git a/app/celery/research_mode_tasks.py b/app/celery/research_mode_tasks.py index d326cd5fab..295c788f74 100644 --- a/app/celery/research_mode_tasks.py +++ b/app/celery/research_mode_tasks.py @@ -54,7 +54,7 @@ def send_sms_response(provider, reference, to): make_request(SMS_TYPE, provider, body, headers) -def send_email_response(reference, to): +def send_email_response(reference, to, service_id): if to == perm_fail_email: body = ses_hard_bounce_callback(reference) elif to == temp_fail_email: @@ -62,7 +62,11 @@ def send_email_response(reference, to): else: body = ses_notification_callback(reference) - process_ses_results.apply_async([body], queue=QueueNames.RESEARCH_MODE) + process_ses_results.apply_async( + [body], + queue=QueueNames.RESEARCH_MODE, + MessageGroupId=str(service_id), + ) def send_letter_response(notification_id: uuid.UUID, billable_units: int, postage: str): diff --git a/app/celery/scheduled_tasks.py b/app/celery/scheduled_tasks.py index 6192ed2051..82a78729e6 100644 --- a/app/celery/scheduled_tasks.py +++ b/app/celery/scheduled_tasks.py @@ -96,7 +96,11 @@ def run_scheduled_jobs(): try: for job in dao_set_scheduled_jobs_to_pending(): - process_job.apply_async([str(job.id)], queue=QueueNames.JOBS) + process_job.apply_async( + [str(job.id)], + queue=QueueNames.JOBS, + MessageGroupId=str(job.service_id), + ) current_app.logger.info("Job ID %s added to process job queue", job.id, extra={"job_id": job.id}) except SQLAlchemyError: current_app.logger.exception("Failed to run scheduled jobs") @@ -335,7 +339,11 @@ def replay_created_notifications() -> None: letter.id, extra={"notification_id": letter.id}, ) - get_pdf_for_templated_letter.apply_async([str(letter.id)], queue=QueueNames.CREATE_LETTERS_PDF) + get_pdf_for_templated_letter.apply_async( + [str(letter.id)], + queue=QueueNames.CREATE_LETTERS_PDF, + MessageGroupId=str(letter.service_id), + ) @notify_celery.task(name="check-if-letters-still-pending-virus-check") @@ -359,6 +367,7 @@ def check_if_letters_still_pending_virus_check(max_minutes_ago_to_check: int = 3 name=TaskNames.SCAN_FILE, kwargs={"filename": filename}, queue=QueueNames.ANTIVIRUS, + MessageGroupId=str(letter.service_id), ) else: current_app.logger.warning( @@ -438,7 +447,7 @@ def check_for_missing_rows_in_completed_jobs(): extra = {"job_row_number": row_to_process.missing_row, "job_id": job.id} current_app.logger.info("Processing missing row %(job_row_number)s for job %(job_id)s", extra, extra=extra) - process_job_row(template.template_type, task_args_kwargs) + process_job_row(template.template_type, task_args_kwargs, str(job.service_id)) @notify_celery.task(name="update-status-of-fully-processed-jobs") diff --git a/app/celery/tasks.py b/app/celery/tasks.py index 33f6a97a02..3891738797 100644 --- a/app/celery/tasks.py +++ b/app/celery/tasks.py @@ -68,8 +68,8 @@ class ProcessReportRequestException(Exception): pass -@notify_celery.task(name="process-job") -def process_job(job_id, sender_id=None, shatter_batch_size=DEFAULT_SHATTER_JOB_ROWS_BATCH_SIZE): +@notify_celery.task(bind=True, name="process-job") +def process_job(self, job_id, sender_id=None, shatter_batch_size=DEFAULT_SHATTER_JOB_ROWS_BATCH_SIZE): start = datetime.utcnow() job = dao_get_job_by_id(job_id) current_app.logger.info( @@ -116,12 +116,12 @@ def process_job(job_id, sender_id=None, shatter_batch_size=DEFAULT_SHATTER_JOB_R get_id_task_args_kwargs_for_job_row(row, template, job, service, sender_id=sender_id)[1] for row in shatter_batch ] - _shatter_job_rows_with_subdivision(template.template_type, batch_args_kwargs) + _shatter_job_rows_with_subdivision(template.template_type, batch_args_kwargs, self.message_group_id) job_complete(job, start=start) -def _shatter_job_rows_with_subdivision(template_type, args_kwargs_seq, top_level=True): +def _shatter_job_rows_with_subdivision(template_type, args_kwargs_seq, message_group_id, top_level=True): try: shatter_job_rows.apply_async( ( @@ -129,6 +129,7 @@ def _shatter_job_rows_with_subdivision(template_type, args_kwargs_seq, top_level args_kwargs_seq, ), queue=QueueNames.JOBS, + MessageGroupId=message_group_id, ) except BotoClientError as e: # this information is helpfully not preserved outside the message string of the exception, so @@ -146,7 +147,7 @@ def _shatter_job_rows_with_subdivision(template_type, args_kwargs_seq, top_level raise UnprocessableJobRow from e for sub_batch in (args_kwargs_seq[:split_batch_size], args_kwargs_seq[split_batch_size:]): - _shatter_job_rows_with_subdivision(template_type, sub_batch, top_level=False) + _shatter_job_rows_with_subdivision(template_type, sub_batch, message_group_id, top_level=False) else: if not top_level: @@ -156,16 +157,17 @@ def _shatter_job_rows_with_subdivision(template_type, args_kwargs_seq, top_level ) -@notify_celery.task(name="shatter-job-rows") +@notify_celery.task(bind=True, name="shatter-job-rows") def shatter_job_rows( + self, template_type: str, args_kwargs_seq: Sequence, ): for task_args_kwargs in args_kwargs_seq: - process_job_row(template_type, task_args_kwargs) + process_job_row(template_type, task_args_kwargs, self.message_group_id) -def process_job_row(template_type, task_args_kwargs): +def process_job_row(template_type, task_args_kwargs, message_group_id=None): send_fn = { SMS_TYPE: save_sms, EMAIL_TYPE: save_email, @@ -175,6 +177,7 @@ def process_job_row(template_type, task_args_kwargs): send_fn.apply_async( *task_args_kwargs, queue=QueueNames.DATABASE, + MessageGroupId=message_group_id, ) @@ -353,6 +356,7 @@ def save_sms( provider_tasks.deliver_sms.apply_async( [str(saved_notification.id)], queue=QueueNames.SEND_SMS, + MessageGroupId=self.message_group_id, ) else: extra = { @@ -437,6 +441,7 @@ def save_email(self, service_id, notification_id, encoded_notification, sender_i provider_tasks.deliver_email.apply_async( [str(saved_notification.id)], queue=QueueNames.SEND_EMAIL, + MessageGroupId=self.message_group_id, ) extra = { @@ -494,7 +499,9 @@ def save_letter( ) letters_pdf_tasks.get_pdf_for_templated_letter.apply_async( - [str(saved_notification.id)], queue=QueueNames.CREATE_LETTERS_PDF + [str(saved_notification.id)], + queue=QueueNames.CREATE_LETTERS_PDF, + MessageGroupId=self.message_group_id, ) extra = { @@ -529,7 +536,12 @@ def handle_exception(task, notification, notification_id, exc): # send to the retry queue. current_app.logger.exception("Retry: " + base_msg, extra, extra=extra) # noqa try: - task.retry(queue=QueueNames.RETRY, exc=exc) + retry_kwargs = { + "queue": QueueNames.RETRY, + "exc": exc, + "MessageGroupId": task.message_group_id, + } + task.retry(**retry_kwargs) except task.MaxRetriesExceededError: current_app.logger.error("Max retry failed: " + base_msg, extra, extra=extra) # noqa @@ -580,7 +592,7 @@ def process_incomplete_job(job_id, shatter_batch_size=DEFAULT_SHATTER_JOB_ROWS_B get_id_task_args_kwargs_for_job_row(row, template, job, job.service, sender_id=sender_id)[1] for row in shatter_batch ] - _shatter_job_rows_with_subdivision(template.template_type, batch_args_kwargs) + _shatter_job_rows_with_subdivision(template.template_type, batch_args_kwargs, str(job.service_id)) job_complete(job, resumed=True) @@ -625,7 +637,11 @@ def _check_and_queue_returned_letter_callback_task(notification_id, service_id): # queue callback task only if the service_callback_api exists if service_callback_api := get_returned_letter_callback_api_for_service(service_id=service_id): returned_letter_data = create_returned_letter_callback_data(notification_id, service_id, service_callback_api) - send_returned_letter_to_service.apply_async([returned_letter_data], queue=QueueNames.CALLBACKS) + send_returned_letter_to_service.apply_async( + [returned_letter_data], + queue=QueueNames.CALLBACKS, + MessageGroupId=str(service_id), + ) @notify_celery.task(bind=True, name="process-report-request") diff --git a/app/config.py b/app/config.py index c5bbd2597f..bcb647e61b 100644 --- a/app/config.py +++ b/app/config.py @@ -139,6 +139,9 @@ class Config: # URL of redis instance REDIS_URL = os.getenv("REDIS_URL") REDIS_ENABLED = False if os.environ.get("REDIS_ENABLED") == "0" else True + + ENABLE_SQS_MESSAGE_GROUP_IDS = os.environ.get("ENABLE_SQS_MESSAGE_GROUP_IDS", "1") == "1" + EXPIRE_CACHE_TEN_MINUTES = 600 EXPIRE_CACHE_EIGHT_DAYS = 8 * 24 * 60 * 60 @@ -587,6 +590,7 @@ class Development(Config): SERVER_NAME = os.getenv("SERVER_NAME") REDIS_ENABLED = os.getenv("REDIS_ENABLED") == "1" + ENABLE_SQS_MESSAGE_GROUP_IDS = os.environ.get("ENABLE_SQS_MESSAGE_GROUP_IDS", "1") == "1" S3_BUCKET_CSV_UPLOAD = "development-notifications-csv-upload" S3_BUCKET_CONTACT_LIST = "development-contact-list" diff --git a/app/delivery/send_to_providers.py b/app/delivery/send_to_providers.py index c8834af641..4434e0d596 100644 --- a/app/delivery/send_to_providers.py +++ b/app/delivery/send_to_providers.py @@ -163,7 +163,7 @@ def send_email_to_provider(notification): if notification.key_type == KEY_TYPE_TEST: notification.reference = str(create_uuid()) update_notification_to_sending(notification, provider) - send_email_response(notification.reference, notification.to) + send_email_response(notification.reference, notification.to, notification.service_id) else: email_sender_name = service.custom_email_sender_name or service.name from_address = ( diff --git a/app/job/rest.py b/app/job/rest.py index eb13487e6b..fc66c24056 100644 --- a/app/job/rest.py +++ b/app/job/rest.py @@ -176,7 +176,12 @@ def create_job(service_id): sender_id = data.get("sender_id") if job.job_status == JOB_STATUS_PENDING: - process_job.apply_async([str(job.id)], {"sender_id": sender_id}, queue=QueueNames.JOBS) + process_job.apply_async( + [str(job.id)], + {"sender_id": sender_id}, + queue=QueueNames.JOBS, + MessageGroupId=str(job.service_id), + ) job_json = job_schema.dump(job) job_json["statistics"] = [] diff --git a/app/notifications/notifications_ses_callback.py b/app/notifications/notifications_ses_callback.py index b57ea7cab6..9d46c0f30e 100644 --- a/app/notifications/notifications_ses_callback.py +++ b/app/notifications/notifications_ses_callback.py @@ -74,7 +74,9 @@ def check_and_queue_callback_task(notification): if service_callback_api: notification_data = create_delivery_status_callback_data(notification, service_callback_api) send_delivery_status_to_service.apply_async( - [str(notification.id), notification_data], queue=QueueNames.CALLBACKS + [str(notification.id), notification_data], + queue=QueueNames.CALLBACKS, + MessageGroupId=str(notification.service_id), ) @@ -83,4 +85,8 @@ def _check_and_queue_complaint_callback_task(complaint, notification, recipient) service_callback_api = get_complaint_callback_api_for_service(service_id=notification.service_id) if service_callback_api: complaint_data = create_complaint_callback_data(complaint, notification, service_callback_api, recipient) - send_complaint_to_service.apply_async([complaint_data], queue=QueueNames.CALLBACKS) + send_complaint_to_service.apply_async( + [complaint_data], + queue=QueueNames.CALLBACKS, + MessageGroupId=str(notification.service_id), + ) diff --git a/app/notifications/process_notifications.py b/app/notifications/process_notifications.py index 8cd0f8c39f..636e88d772 100644 --- a/app/notifications/process_notifications.py +++ b/app/notifications/process_notifications.py @@ -218,7 +218,9 @@ def increment_daily_limit_cache(service_id, notification_type): redis_store.incr(cache_key) -def send_notification_to_queue_detached(key_type, notification_type, notification_id, queue=None): +def send_notification_to_queue_detached( + key_type, notification_type, notification_id, queue=None, message_group_id=None +): if key_type == KEY_TYPE_TEST: queue = QueueNames.RESEARCH_MODE @@ -236,14 +238,20 @@ def send_notification_to_queue_detached(key_type, notification_type, notificatio deliver_task = get_pdf_for_templated_letter try: - deliver_task.apply_async([str(notification_id)], queue=queue) + deliver_task.apply_async([str(notification_id)], queue=queue, MessageGroupId=message_group_id) except Exception: dao_delete_notifications_by_id(notification_id) raise def send_notification_to_queue(notification, queue=None): - send_notification_to_queue_detached(notification.key_type, notification.notification_type, notification.id, queue) + send_notification_to_queue_detached( + notification.key_type, + notification.notification_type, + notification.id, + queue, + message_group_id=str(notification.service_id), + ) def simulated_recipient(to_address, notification_type): diff --git a/app/notifications/receive_notifications.py b/app/notifications/receive_notifications.py index bda86501fe..e5a28050fa 100644 --- a/app/notifications/receive_notifications.py +++ b/app/notifications/receive_notifications.py @@ -69,7 +69,9 @@ def receive_mmg_sms(): ) service_callback_tasks.send_inbound_sms_to_service.apply_async( - [str(inbound.id), str(service.id)], queue=QueueNames.CALLBACKS + [str(inbound.id), str(service.id)], + queue=QueueNames.CALLBACKS, + MessageGroupId=str(service.id), ) current_app.logger.info( @@ -115,7 +117,9 @@ def receive_firetext_sms(): INBOUND_SMS_COUNTER.labels("firetext").inc() service_callback_tasks.send_inbound_sms_to_service.apply_async( - [str(inbound.id), str(service.id)], queue=QueueNames.CALLBACKS + [str(inbound.id), str(service.id)], + queue=QueueNames.CALLBACKS, + MessageGroupId=str(service.id), ) current_app.logger.info( "%s received inbound SMS with reference %s from Firetext", diff --git a/app/service/rest.py b/app/service/rest.py index 86b13fa12f..269bddcb55 100644 --- a/app/service/rest.py +++ b/app/service/rest.py @@ -1610,6 +1610,7 @@ def create_report_request_by_type(service_id): process_report_request.apply_async( kwargs={"service_id": report_request.service_id, "report_request_id": report_request.id}, queue=QueueNames.REPORT_REQUESTS_NOTIFICATIONS, + MessageGroupId=str(report_request.service_id), ) return jsonify(data=created_request.serialize()), 201 diff --git a/app/v2/notifications/post_notifications.py b/app/v2/notifications/post_notifications.py index fc991bc072..b251f7b8b1 100644 --- a/app/v2/notifications/post_notifications.py +++ b/app/v2/notifications/post_notifications.py @@ -271,6 +271,7 @@ def process_sms_or_email_notification( key_type=api_user.key_type, notification_type=notification_type, notification_id=notification_id, + message_group_id=str(service.id), ) else: current_app.logger.info( @@ -373,12 +374,17 @@ def process_letter_notification( postage=postage, ) - get_pdf_for_templated_letter.apply_async([str(notification.id)], queue=queue) + get_pdf_for_templated_letter.apply_async( + [str(notification.id)], + queue=queue, + MessageGroupId=str(service.id), + ) if test_key and current_app.config["TEST_LETTERS_FAKE_DELIVERY"]: create_fake_letter_callback.apply_async( [notification.id, notification.billable_units, notification.postage], queue=queue, + MessageGroupId=str(service.id), ) resp = create_response_for_post_notification( @@ -428,10 +434,15 @@ def process_precompiled_letter_notifications(*, letter_data, api_key, service, t name=TaskNames.SCAN_FILE, kwargs={"filename": filename}, queue=QueueNames.ANTIVIRUS, + MessageGroupId=str(service.id), ) else: # stub out antivirus in dev - sanitise_letter.apply_async([filename], queue=QueueNames.LETTERS) + sanitise_letter.apply_async( + [filename], + queue=QueueNames.LETTERS, + MessageGroupId=str(service.id), + ) return resp diff --git a/tests/app/celery/test_letters_pdf_tasks.py b/tests/app/celery/test_letters_pdf_tasks.py index efdf78f151..562e1a3e99 100644 --- a/tests/app/celery/test_letters_pdf_tasks.py +++ b/tests/app/celery/test_letters_pdf_tasks.py @@ -55,6 +55,7 @@ create_notification, create_service, ) +from tests.conftest import _with_message_group_id def test_should_have_decorated_tasks_functions(): @@ -75,7 +76,8 @@ def test_get_pdf_for_templated_letter_happy_path(mocker, sample_letter_notificat mock_generate_letter_pdf_filename = mocker.patch( "app.celery.letters_pdf_tasks.generate_letter_pdf_filename", return_value="LETTER.PDF" ) - get_pdf_for_templated_letter(sample_letter_notification.id) + with _with_message_group_id(get_pdf_for_templated_letter, str(sample_letter_notification.service_id)): + get_pdf_for_templated_letter(sample_letter_notification.id) letter_data = { "letter_contact_block": sample_letter_notification.reply_to_text, @@ -97,7 +99,10 @@ def test_get_pdf_for_templated_letter_happy_path(mocker, sample_letter_notificat } mock_celery.assert_called_once_with( - name=TaskNames.CREATE_PDF_FOR_TEMPLATED_LETTER, args=(ANY,), queue=QueueNames.SANITISE_LETTERS + name=TaskNames.CREATE_PDF_FOR_TEMPLATED_LETTER, + args=(ANY,), + queue=QueueNames.SANITISE_LETTERS, + MessageGroupId=str(sample_letter_notification.service_id), ) actual_data = signing.decode(mock_celery.call_args.kwargs["args"][0]) @@ -117,7 +122,8 @@ def test_get_pdf_for_templated_letter_with_letter_attachment(mocker, sample_lett mock_celery = mocker.patch("app.celery.letters_pdf_tasks.notify_celery.send_task") mocker.patch("app.celery.letters_pdf_tasks.generate_letter_pdf_filename", return_value="LETTER.PDF") - get_pdf_for_templated_letter(sample_letter_notification.id) + with _with_message_group_id(get_pdf_for_templated_letter, str(sample_letter_notification.service_id)): + get_pdf_for_templated_letter(sample_letter_notification.id) actual_data = signing.decode(mock_celery.call_args.kwargs["args"][0]) @@ -134,8 +140,9 @@ def test_get_pdf_for_templated_letter_retries_upon_error(mocker, sample_letter_n mocker.patch("app.celery.letters_pdf_tasks.generate_letter_pdf_filename", return_value="LETTER.PDF") mock_retry = mocker.patch("app.celery.letters_pdf_tasks.get_pdf_for_templated_letter.retry") - with caplog.at_level("ERROR"): - get_pdf_for_templated_letter(sample_letter_notification.id) + with _with_message_group_id(get_pdf_for_templated_letter, str(sample_letter_notification.service_id)): + with caplog.at_level("ERROR"): + get_pdf_for_templated_letter(sample_letter_notification.id) assert mock_celery.called assert mock_retry.called @@ -153,8 +160,9 @@ def test_get_pdf_for_templated_letter_sets_technical_failure_max_retries(mocker, ) mock_update_noti = mocker.patch("app.celery.letters_pdf_tasks.update_notification_status_by_id") - with pytest.raises(NotificationTechnicalFailureException) as e: - get_pdf_for_templated_letter(sample_letter_notification.id) + with _with_message_group_id(get_pdf_for_templated_letter, str(sample_letter_notification.service_id)): + with pytest.raises(NotificationTechnicalFailureException) as e: + get_pdf_for_templated_letter(sample_letter_notification.id) assert ( e.value.args[0] == f"RETRY FAILED: Max retries reached. " @@ -390,9 +398,17 @@ def test_send_letters_volume_email_to_dvla(notify_db_session, mock_celery_task, emails_to_dvla = Notification.query.all() assert len(emails_to_dvla) == 2 - send_mock.called = 2 - send_mock.assert_any_call([str(emails_to_dvla[0].id)], queue=QueueNames.NOTIFY) - send_mock.assert_any_call([str(emails_to_dvla[1].id)], queue=QueueNames.NOTIFY) + assert send_mock.call_count == 2 + send_mock.assert_any_call( + [str(emails_to_dvla[0].id)], + queue=QueueNames.NOTIFY, + MessageGroupId=str(emails_to_dvla[0].service_id), + ) + send_mock.assert_any_call( + [str(emails_to_dvla[1].id)], + queue=QueueNames.NOTIFY, + MessageGroupId=str(emails_to_dvla[1].service_id), + ) for email in emails_to_dvla: assert str(email.template_id) == current_app.config["LETTERS_VOLUME_EMAIL_TEMPLATE_ID"] assert email.to in current_app.config["DVLA_EMAIL_ADDRESSES"] @@ -451,7 +467,8 @@ def test_sanitise_letter_calls_template_preview_sanitise_task( sample_letter_notification.service = create_service(service_permissions=permissions) sample_letter_notification.status = NOTIFICATION_PENDING_VIRUS_CHECK - sanitise_letter(filename) + with _with_message_group_id(sanitise_letter, str(sample_letter_notification.service_id)): + sanitise_letter(filename) mock_celery.assert_called_once_with( name=TaskNames.SANITISE_LETTER, @@ -461,6 +478,7 @@ def test_sanitise_letter_calls_template_preview_sanitise_task( "allow_international_letters": expected_international_letters_allowed, }, queue=QueueNames.SANITISE_LETTERS, + MessageGroupId=str(sample_letter_notification.service_id), ) @@ -881,7 +899,8 @@ def test_resanitise_pdf_calls_template_preview_with_letter_details( sample_letter_notification.created_at = datetime(2021, 2, 7, 12) sample_letter_notification.service = create_service(service_permissions=permissions) - resanitise_pdf(sample_letter_notification.id) + with _with_message_group_id(resanitise_pdf, str(sample_letter_notification.service_id)): + resanitise_pdf(sample_letter_notification.id) mock_celery.assert_called_once_with( name=TaskNames.RECREATE_PDF_FOR_PRECOMPILED_LETTER, @@ -891,6 +910,7 @@ def test_resanitise_pdf_calls_template_preview_with_letter_details( "allow_international_letters": expected_international_letters_allowed, }, queue=QueueNames.SANITISE_LETTERS, + MessageGroupId=str(sample_letter_notification.service_id), ) @@ -903,7 +923,8 @@ def test_resanitise_letter_attachment_calls_template_preview_with_attachment_det attachment_id = str(uuid.uuid4()) original_filename = "test-123abc.pdf" - resanitise_letter_attachment(service_id, attachment_id, original_filename) + with _with_message_group_id(resanitise_letter_attachment, service_id): + resanitise_letter_attachment(service_id, attachment_id, original_filename) mock_celery.assert_called_once_with( name=TaskNames.RECREATE_PDF_FOR_TEMPLATE_LETTER_ATTACHMENTS, @@ -913,4 +934,5 @@ def test_resanitise_letter_attachment_calls_template_preview_with_attachment_det "original_filename": original_filename, }, queue=QueueNames.SANITISE_LETTERS, + MessageGroupId=service_id, ) diff --git a/tests/app/celery/test_nightly_tasks.py b/tests/app/celery/test_nightly_tasks.py index 3f8482b67b..8eab52e763 100644 --- a/tests/app/celery/test_nightly_tasks.py +++ b/tests/app/celery/test_nightly_tasks.py @@ -63,6 +63,7 @@ create_unsubscribe_request, create_unsubscribe_request_report, ) +from tests.conftest import _with_message_group_id @freeze_time("2016-10-18T10:00:00") @@ -175,6 +176,12 @@ def test_archive_unsubscribe_requests(notify_db_session, mock_celery_task): == {service.id for service in services_with_requests} ) + assert ( + {call[1]["MessageGroupId"] for call in mock_archive_processed.call_args_list} + == {call[1]["MessageGroupId"] for call in mock_archive_old.call_args_list} + == {str(service.id) for service in services_with_requests} + ) + assert ( [call[1]["queue"] for call in mock_archive_processed.call_args_list] == [call[1]["queue"] for call in mock_archive_old.call_args_list] @@ -570,6 +577,7 @@ def test_delete_notifications_task_calls_task_for_services_with_data_retention_o "datetime_to_delete_before": datetime(2021, 6, 1, 23, 0), }, countdown=0.0, + MessageGroupId=str(sms_service.id), ) @@ -598,6 +606,7 @@ def test_delete_notifications_task_calls_task_for_services_with_data_retention_b "datetime_to_delete_before": datetime(2021, 3, 22, 0, 0), }, countdown=ANY, + MessageGroupId=str(service_14_days.id), ), call( queue=ANY, @@ -607,6 +616,7 @@ def test_delete_notifications_task_calls_task_for_services_with_data_retention_b "datetime_to_delete_before": datetime(2021, 4, 1, 23, 0), }, countdown=ANY, + MessageGroupId=str(service_3_days.id), ), ], ) @@ -655,6 +665,7 @@ def test_delete_notifications_task_calls_task_for_services_that_have_sent_notifi "datetime_to_delete_before": datetime(2021, 3, 27, 0, 0), }, countdown=ANY, + MessageGroupId=str(service_will_delete_1.id), ), call( queue=ANY, @@ -664,6 +675,7 @@ def test_delete_notifications_task_calls_task_for_services_that_have_sent_notifi "datetime_to_delete_before": datetime(2021, 3, 27, 0, 0), }, countdown=ANY, + MessageGroupId=str(service_will_delete_2.id), ), ], ) @@ -682,12 +694,15 @@ def test_delete_notifications_for_service_and_type_queues_up_second_task_if_thin notification_type = "some-str" datetime_to_delete_before = datetime.utcnow() - delete_notifications_for_service_and_type(service_id, notification_type, datetime_to_delete_before) + with _with_message_group_id(delete_notifications_for_service_and_type, str(service_id)): + delete_notifications_for_service_and_type(service_id, notification_type, datetime_to_delete_before) mock_move.assert_called_once_with(notification_type, service_id, datetime_to_delete_before) # the next task is queued up with the exact same args mock_task_call.assert_called_once_with( - args=(service_id, notification_type, datetime_to_delete_before), queue="reporting-tasks" + args=(service_id, notification_type, datetime_to_delete_before), + queue="reporting-tasks", + MessageGroupId=str(service_id), ) assert not mock_delete_tests.called @@ -703,13 +718,16 @@ def test_delete_notifications_for_service_and_type_removes_test_notifications_if notification_type = "some-str" datetime_to_delete_before = datetime.utcnow() - delete_notifications_for_service_and_type(service_id, notification_type, datetime_to_delete_before) + with _with_message_group_id(delete_notifications_for_service_and_type, str(service_id)): + delete_notifications_for_service_and_type(service_id, notification_type, datetime_to_delete_before) mock_move.assert_called_once_with(notification_type, service_id, datetime_to_delete_before) # the next task is not queued up assert not mock_delete_live_notis_task_call.called mock_delete_tests_task_call.assert_called_once_with( - args=(service_id, notification_type, datetime_to_delete_before), queue="reporting-tasks" + args=(service_id, notification_type, datetime_to_delete_before), + queue="reporting-tasks", + MessageGroupId=str(service_id), ) @@ -727,7 +745,9 @@ def test_delete_test_notifications_for_service_and_type_queues_up_second_task_if mock_delete.assert_called_once_with(notification_type, service_id, datetime_to_delete_before) mock_task_call.assert_called_once_with( - args=(service_id, notification_type, datetime_to_delete_before), queue="reporting-tasks" + args=(service_id, notification_type, datetime_to_delete_before), + queue="reporting-tasks", + MessageGroupId=str(service_id), ) diff --git a/tests/app/celery/test_reporting_tasks.py b/tests/app/celery/test_reporting_tasks.py index 988559c6d3..af741bfcbc 100644 --- a/tests/app/celery/test_reporting_tasks.py +++ b/tests/app/celery/test_reporting_tasks.py @@ -78,6 +78,7 @@ def test_create_nightly_notification_status_triggers_tasks( mock_celery.assert_called_with( kwargs={"service_id": sample_service.id, "process_day": "2019-07-31", "notification_type": SMS_TYPE}, queue=QueueNames.REPORTING, + MessageGroupId=str(sample_service.id), ) diff --git a/tests/app/celery/test_research_mode_tasks.py b/tests/app/celery/test_research_mode_tasks.py index b3e168bdbf..247fb605dc 100644 --- a/tests/app/celery/test_research_mode_tasks.py +++ b/tests/app/celery/test_research_mode_tasks.py @@ -71,9 +71,10 @@ def test_make_ses_callback(notify_api, mock_celery_task): mock_task = mock_celery_task(process_ses_results) some_ref = str(uuid.uuid4()) - send_email_response(reference=some_ref, to="test@test.com") + service_id = uuid.uuid4() + send_email_response(reference=some_ref, to="test@test.com", service_id=service_id) - mock_task.assert_called_once_with(ANY, queue=QueueNames.RESEARCH_MODE) + mock_task.assert_called_once_with(ANY, queue=QueueNames.RESEARCH_MODE, MessageGroupId=str(service_id)) assert mock_task.call_args[0][0][0] == ses_notification_callback(some_ref) diff --git a/tests/app/celery/test_scheduled_tasks.py b/tests/app/celery/test_scheduled_tasks.py index e00aff14d2..49da3d4b44 100644 --- a/tests/app/celery/test_scheduled_tasks.py +++ b/tests/app/celery/test_scheduled_tasks.py @@ -73,7 +73,7 @@ create_template, create_user, ) -from tests.conftest import set_config, set_config_values +from tests.conftest import _with_message_group_id, set_config, set_config_values def test_should_call_delete_codes_on_delete_verify_codes_task(notify_db_session, mocker): @@ -98,7 +98,11 @@ def test_should_update_scheduled_jobs_and_put_on_queue(mock_celery_task, sample_ updated_job = dao_get_job_by_id(job.id) assert updated_job.job_status == "pending" - mocked.assert_called_with([str(job.id)], queue="job-tasks") + mocked.assert_called_with( + [str(job.id)], + queue="job-tasks", + MessageGroupId=str(job.service_id), + ) def test_should_update_all_scheduled_jobs_and_put_on_queue(sample_template, mock_celery_task): @@ -117,11 +121,12 @@ def test_should_update_all_scheduled_jobs_and_put_on_queue(sample_template, mock assert dao_get_job_by_id(job_2.id).job_status == "pending" assert dao_get_job_by_id(job_2.id).job_status == "pending" + service_id = str(sample_template.service_id) mocked.assert_has_calls( [ - call([str(job_3.id)], queue="job-tasks"), - call([str(job_2.id)], queue="job-tasks"), - call([str(job_1.id)], queue="job-tasks"), + call([str(job_3.id)], queue="job-tasks", MessageGroupId=service_id), + call([str(job_2.id)], queue="job-tasks", MessageGroupId=service_id), + call([str(job_1.id)], queue="job-tasks", MessageGroupId=service_id), ] ) @@ -406,8 +411,11 @@ def test_check_job_status_task_sets_jobs_to_error(mock_celery_task, sample_templ def test_replay_created_notifications(sample_service, mock_celery_task): - email_delivery_queue = mock_celery_task(deliver_email) - sms_delivery_queue = mock_celery_task(deliver_sms) + with _with_message_group_id(deliver_email, str(sample_service.id)): + email_delivery_queue = mock_celery_task(deliver_email) + + with _with_message_group_id(deliver_sms, str(sample_service.id)): + sms_delivery_queue = mock_celery_task(deliver_sms) sms_template = create_template(service=sample_service, template_type="sms") email_template = create_template(service=sample_service, template_type="email") @@ -430,8 +438,12 @@ def test_replay_created_notifications(sample_service, mock_celery_task): create_notification(template=email_template, created_at=datetime.utcnow(), status="created") replay_created_notifications() - email_delivery_queue.assert_called_once_with([str(old_email.id)], queue="send-email-tasks") - sms_delivery_queue.assert_called_once_with([str(old_sms.id)], queue="send-sms-tasks") + email_delivery_queue.assert_called_once_with( + [str(old_email.id)], queue="send-email-tasks", MessageGroupId=str(sample_service.id) + ) + sms_delivery_queue.assert_called_once_with( + [str(old_sms.id)], queue="send-sms-tasks", MessageGroupId=str(sample_service.id) + ) def test_replay_created_notifications_get_pdf_for_templated_letter_tasks_for_letters_not_ready_to_send( @@ -454,9 +466,10 @@ def test_replay_created_notifications_get_pdf_for_templated_letter_tasks_for_let replay_created_notifications() + service_id = str(sample_letter_template.service_id) calls = [ - call([str(notification_1.id)], queue=QueueNames.CREATE_LETTERS_PDF), - call([str(notification_2.id)], queue=QueueNames.CREATE_LETTERS_PDF), + call([str(notification_1.id)], queue=QueueNames.CREATE_LETTERS_PDF, MessageGroupId=service_id), + call([str(notification_2.id)], queue=QueueNames.CREATE_LETTERS_PDF, MessageGroupId=service_id), ] mock_task.assert_has_calls(calls, any_order=True) @@ -512,7 +525,10 @@ def test_check_if_letters_still_pending_virus_check_restarts_scan_for_stuck_lett mock_file_exists.assert_called_once_with("test-letters-scan", expected_filename) mock_celery.assert_called_once_with( - name=TaskNames.SCAN_FILE, kwargs={"filename": expected_filename}, queue=QueueNames.ANTIVIRUS + name=TaskNames.SCAN_FILE, + kwargs={"filename": expected_filename}, + queue=QueueNames.ANTIVIRUS, + MessageGroupId=str(sample_letter_template.service_id), ) assert mock_create_ticket.called is False @@ -724,7 +740,12 @@ def test_check_for_missing_rows_in_completed_jobs(mocker, sample_email_template, ) ] assert mock_save_email.mock_calls == [ - mock.call((str(job.service_id), "some-uuid", "something_encoded"), {}, queue="database-tasks") + mock.call( + (str(job.service_id), "some-uuid", "something_encoded"), + {}, + queue="database-tasks", + MessageGroupId=str(job.service_id), + ) ] @@ -765,7 +786,10 @@ def test_check_for_missing_rows_in_completed_jobs_uses_sender_id( ] assert mock_save_email.mock_calls == [ mock.call( - (str(job.service_id), "some-uuid", "something_encoded"), {"sender_id": fake_uuid}, queue="database-tasks" + (str(job.service_id), "some-uuid", "something_encoded"), + {"sender_id": fake_uuid}, + queue="database-tasks", + MessageGroupId=str(job.service_id), ) ] diff --git a/tests/app/celery/test_tasks.py b/tests/app/celery/test_tasks.py index 65909a8d55..895a31e3ee 100644 --- a/tests/app/celery/test_tasks.py +++ b/tests/app/celery/test_tasks.py @@ -73,6 +73,7 @@ create_template_email_file, create_user, ) +from tests.conftest import _with_message_group_id class AnyStringWith(str): @@ -112,6 +113,10 @@ def email_job_with_placeholders(notify_db_session, sample_email_template_with_pl # -------------- process_job tests -------------- # +# +# Most tests simulate the broker by setting message_group_id on the task (via _with_message_group_id) +# so we assert the real MessageGroupId is passed. One test (test_should_process_sms_job_passes_none_...) +# does not simulate and asserts MessageGroupId=None when not set by broker. def test_should_process_sms_job(sample_job, mocker, mock_celery_task): @@ -123,7 +128,8 @@ def test_should_process_sms_job(sample_job, mocker, mock_celery_task): mock_encode = mocker.patch("app.signing.encode", return_value="something_encoded") mocker.patch("app.celery.tasks.create_uuid", return_value="uuid") - process_job(sample_job.id) + with _with_message_group_id(process_job, str(sample_job.service_id)): + process_job(sample_job.id) s3.get_job_and_metadata_from_s3.assert_called_once_with( service_id=str(sample_job.service.id), job_id=str(sample_job.id) @@ -155,6 +161,41 @@ def test_should_process_sms_job(sample_job, mocker, mock_celery_task): ], ), queue="job-tasks", + MessageGroupId=str(sample_job.service_id), + ) + ] + assert job.job_status == "finished" + + +def test_should_process_sms_job_passes_none_message_group_id_when_not_set_by_broker( + sample_job, mocker, mock_celery_task +): + # When the task is run with message_group_id=None (e.g. broker did not set it), None is passed. + mocker.patch( + "app.celery.tasks.s3.get_job_and_metadata_from_s3", + return_value=(load_example_csv("sms"), {"sender_id": None}), + ) + mock_task = mock_celery_task(shatter_job_rows) + mocker.patch("app.signing.encode", return_value="something_encoded") + mocker.patch("app.celery.tasks.create_uuid", return_value="uuid") + + with _with_message_group_id(process_job, None): + process_job(sample_job.id) + + job = jobs_dao.dao_get_job_by_id(sample_job.id) + assert mock_task.mock_calls == [ + call( + ( + job.template.template_type, + [ + ( + (str(sample_job.service_id), "uuid", "something_encoded"), + {}, + ) + ], + ), + queue="job-tasks", + MessageGroupId=None, ) ] assert job.job_status == "finished" @@ -169,7 +210,8 @@ def test_should_process_sms_job_with_sender_id(sample_job, mocker, mock_celery_t mock_encode = mocker.patch("app.signing.encode", return_value="something_encoded") mocker.patch("app.celery.tasks.create_uuid", return_value="uuid") - process_job(sample_job.id, sender_id=fake_uuid) + with _with_message_group_id(process_job, str(sample_job.service_id)): + process_job(sample_job.id, sender_id=fake_uuid) s3.get_job_and_metadata_from_s3.assert_called_once_with( service_id=str(sample_job.service.id), job_id=str(sample_job.id) @@ -201,6 +243,7 @@ def test_should_process_sms_job_with_sender_id(sample_job, mocker, mock_celery_t ], ), queue="job-tasks", + MessageGroupId=str(sample_job.service_id), ) ] assert job.job_status == "finished" @@ -284,7 +327,8 @@ def test_should_process_job_if_send_limits_are_not_exceeded(notify_api, notify_d "app.celery.tasks.check_service_over_daily_message_limit", return_value=None ) - process_job(job.id, shatter_batch_size=3) + with _with_message_group_id(process_job, str(job.service_id)): + process_job(job.id, shatter_batch_size=3) s3.get_job_and_metadata_from_s3.assert_called_once_with(service_id=str(job.service.id), job_id=str(job.id)) job = jobs_dao.dao_get_job_by_id(job.id) @@ -299,6 +343,7 @@ def test_should_process_job_if_send_limits_are_not_exceeded(notify_api, notify_d ], ), queue="job-tasks", + MessageGroupId=str(job.service_id), ), call( ( @@ -310,6 +355,7 @@ def test_should_process_job_if_send_limits_are_not_exceeded(notify_api, notify_d ], ), queue="job-tasks", + MessageGroupId=str(job.service_id), ), call( ( @@ -321,6 +367,7 @@ def test_should_process_job_if_send_limits_are_not_exceeded(notify_api, notify_d ], ), queue="job-tasks", + MessageGroupId=str(job.service_id), ), call( ( @@ -330,6 +377,7 @@ def test_should_process_job_if_send_limits_are_not_exceeded(notify_api, notify_d ], ), queue="job-tasks", + MessageGroupId=str(job.service_id), ), ] assert mock_check_message_limit.mock_calls == [ @@ -368,7 +416,8 @@ def test_should_process_email_job(email_job_with_placeholders, mocker, mock_cele mock_encode = mocker.patch("app.signing.encode", return_value="something_encoded") mocker.patch("app.celery.tasks.create_uuid", return_value="some_uuid") - process_job(email_job_with_placeholders.id) + with _with_message_group_id(process_job, str(email_job_with_placeholders.service_id)): + process_job(email_job_with_placeholders.id) s3.get_job_and_metadata_from_s3.assert_called_once_with( service_id=str(email_job_with_placeholders.service.id), @@ -405,6 +454,7 @@ def test_should_process_email_job(email_job_with_placeholders, mocker, mock_cele ], ), queue="job-tasks", + MessageGroupId=str(email_job_with_placeholders.service_id), ) ] @@ -424,7 +474,8 @@ def test_should_process_email_job_with_sender_id(email_job_with_placeholders, mo mock_encode = mocker.patch("app.signing.encode", return_value="something_encoded") mocker.patch("app.celery.tasks.create_uuid", return_value="some_uuid") - process_job(email_job_with_placeholders.id, sender_id=fake_uuid) + with _with_message_group_id(process_job, str(email_job_with_placeholders.service_id)): + process_job(email_job_with_placeholders.id, sender_id=fake_uuid) s3.get_job_and_metadata_from_s3.assert_called_once_with( service_id=str(email_job_with_placeholders.service.id), @@ -461,6 +512,7 @@ def test_should_process_email_job_with_sender_id(email_job_with_placeholders, mo ], ), queue="job-tasks", + MessageGroupId=str(email_job_with_placeholders.service_id), ) ] @@ -481,7 +533,8 @@ def test_should_process_letter_job(sample_letter_job, mocker, mock_celery_task): mock_shatter_job_rows = mock_celery_task(shatter_job_rows) mocker.patch("app.celery.tasks.create_uuid", return_value="uuid") - process_job(sample_letter_job.id) + with _with_message_group_id(process_job, str(sample_letter_job.service_id)): + process_job(sample_letter_job.id) s3_mock.assert_called_once_with(service_id=str(sample_letter_job.service.id), job_id=str(sample_letter_job.id)) @@ -521,6 +574,7 @@ def test_should_process_letter_job(sample_letter_job, mocker, mock_celery_task): ], ), queue="job-tasks", + MessageGroupId=str(sample_letter_job.service_id), ) ] @@ -537,7 +591,8 @@ def test_should_process_all_sms_job(sample_job_with_placeholdered_template, mock mock_encode = mocker.patch("app.signing.encode", side_effect=(f"something-encoded-{i}" for i in count())) mocker.patch("app.celery.tasks.create_uuid", side_effect=(f"uuid-{i}" for i in count())) - process_job(sample_job_with_placeholdered_template.id, shatter_batch_size=5) + with _with_message_group_id(process_job, str(sample_job_with_placeholdered_template.service_id)): + process_job(sample_job_with_placeholdered_template.id, shatter_batch_size=5) s3.get_job_and_metadata_from_s3.assert_called_once_with( service_id=str(sample_job_with_placeholdered_template.service.id), @@ -607,6 +662,7 @@ def test_should_process_all_sms_job(sample_job_with_placeholdered_template, mock ], ), queue="job-tasks", + MessageGroupId=str(service_id), ), call( ( @@ -620,6 +676,7 @@ def test_should_process_all_sms_job(sample_job_with_placeholdered_template, mock ], ), queue="job-tasks", + MessageGroupId=str(service_id), ), ] @@ -644,8 +701,9 @@ def shatter_job_rows_side_effect(*args, **kwargs): mock_encode = mocker.patch("app.signing.encode", side_effect=(f"something-encoded-{i}" for i in count())) mocker.patch("app.celery.tasks.create_uuid", side_effect=(f"uuid-{i}" for i in count())) - with pytest.raises(UnprocessableJobRow): - process_job(sample_job_with_placeholdered_template.id, shatter_batch_size=5) + with _with_message_group_id(process_job, str(sample_job_with_placeholdered_template.service_id)): + with pytest.raises(UnprocessableJobRow): + process_job(sample_job_with_placeholdered_template.id, shatter_batch_size=5) s3.get_job_and_metadata_from_s3.assert_called_once_with( service_id=str(sample_job_with_placeholdered_template.service.id), @@ -710,6 +768,7 @@ def shatter_job_rows_side_effect(*args, **kwargs): ], ), queue="job-tasks", + MessageGroupId=str(service_id), ), # fails - splits & retries first half call( @@ -721,6 +780,7 @@ def shatter_job_rows_side_effect(*args, **kwargs): ], ), queue="job-tasks", + MessageGroupId=str(service_id), ), # succeeds - retries second half call( @@ -733,6 +793,7 @@ def shatter_job_rows_side_effect(*args, **kwargs): ], ), queue="job-tasks", + MessageGroupId=str(service_id), ), # fails - splits & retries first half call( @@ -743,6 +804,7 @@ def shatter_job_rows_side_effect(*args, **kwargs): ], ), queue="job-tasks", + MessageGroupId=str(service_id), ), # succeeds - retries second half call( @@ -754,6 +816,7 @@ def shatter_job_rows_side_effect(*args, **kwargs): ], ), queue="job-tasks", + MessageGroupId=str(service_id), ), # fails - splits & retries first half call( @@ -764,6 +827,7 @@ def shatter_job_rows_side_effect(*args, **kwargs): ], ), queue="job-tasks", + MessageGroupId=str(service_id), ), # fails & gives up because we can't split any further. # doesn't proceed to second top-level batch. @@ -796,7 +860,8 @@ def shatter_job_rows_side_effect(*args, **kwargs): mock_encode = mocker.patch("app.signing.encode", side_effect=(f"something-encoded-{i}" for i in count())) mocker.patch("app.celery.tasks.create_uuid", side_effect=(f"uuid-{i}" for i in count())) - process_job(sample_job_with_placeholdered_template.id, shatter_batch_size=5) + with _with_message_group_id(process_job, str(sample_job_with_placeholdered_template.service_id)): + process_job(sample_job_with_placeholdered_template.id, shatter_batch_size=5) s3.get_job_and_metadata_from_s3.assert_called_once_with( service_id=str(sample_job_with_placeholdered_template.service.id), @@ -866,6 +931,7 @@ def shatter_job_rows_side_effect(*args, **kwargs): ], ), queue="job-tasks", + MessageGroupId=str(service_id), ), # fails - splits & retries first half call( @@ -877,6 +943,7 @@ def shatter_job_rows_side_effect(*args, **kwargs): ], ), queue="job-tasks", + MessageGroupId=str(service_id), ), # fails - splits & retries first half call( @@ -887,6 +954,7 @@ def shatter_job_rows_side_effect(*args, **kwargs): ], ), queue="job-tasks", + MessageGroupId=str(service_id), ), # succeeds - retries second half call( @@ -897,6 +965,7 @@ def shatter_job_rows_side_effect(*args, **kwargs): ], ), queue="job-tasks", + MessageGroupId=str(service_id), ), # succeeds - unwinds and retries second half call( @@ -909,6 +978,7 @@ def shatter_job_rows_side_effect(*args, **kwargs): ], ), queue="job-tasks", + MessageGroupId=str(service_id), ), # succeeds, proceeds to second top-level batch call( @@ -923,6 +993,7 @@ def shatter_job_rows_side_effect(*args, **kwargs): ], ), queue="job-tasks", + MessageGroupId=str(service_id), ), # succeeds ] @@ -1075,43 +1146,68 @@ def test_get_id_task_args_kwargs_for_job_row_when_reference_is_provided(mocker, ) def test_shatter_job_rows(template_type, send_fn, mock_celery_task, mocker): mock_send_fn = mock_celery_task(send_fn) + expected_group_id = "service-id-0" - shatter_job_rows( - template_type, - [ - ( - ("service-id-0", "notification-id-0", "encoded-0"), - {} if template_type == LETTER_TYPE else {"sender_id": "0"}, - ), - ( - ("service-id-1", "notification-id-1", "encoded-1"), - {} if template_type == LETTER_TYPE else {"sender_id": "1"}, - ), - ( - ("service-id-2", "notification-id-2", "encoded-2"), - {} if template_type == LETTER_TYPE else {"sender_id": "2"}, - ), - ], - ) + with _with_message_group_id(shatter_job_rows, expected_group_id): + shatter_job_rows( + template_type, + [ + ( + ("service-id-0", "notification-id-0", "encoded-0"), + {} if template_type == LETTER_TYPE else {"sender_id": "0"}, + ), + ( + ("service-id-1", "notification-id-1", "encoded-1"), + {} if template_type == LETTER_TYPE else {"sender_id": "1"}, + ), + ( + ("service-id-2", "notification-id-2", "encoded-2"), + {} if template_type == LETTER_TYPE else {"sender_id": "2"}, + ), + ], + ) assert mock_send_fn.mock_calls == [ call( ("service-id-0", "notification-id-0", "encoded-0"), {} if template_type == LETTER_TYPE else {"sender_id": "0"}, queue="database-tasks", + MessageGroupId=expected_group_id, ), call( ("service-id-1", "notification-id-1", "encoded-1"), {} if template_type == LETTER_TYPE else {"sender_id": "1"}, queue="database-tasks", + MessageGroupId=expected_group_id, ), call( ("service-id-2", "notification-id-2", "encoded-2"), {} if template_type == LETTER_TYPE else {"sender_id": "2"}, queue="database-tasks", + MessageGroupId=expected_group_id, ), ] +def test_shatter_job_rows_passes_none_message_group_id_when_not_set_by_broker(mock_celery_task): + ## When shatter_job_rows runs with message_group_id=None, None is passed to send_fn. + mock_save_sms = mock_celery_task(save_sms) + + with _with_message_group_id(shatter_job_rows, None): + shatter_job_rows( + SMS_TYPE, + [ + (("service-id-0", "notification-id-0", "encoded-0"), {}), + ], + ) + + mock_save_sms.assert_called_once_with( + ("service-id-0", "notification-id-0", "encoded-0"), + {}, + queue="database-tasks", + MessageGroupId=None, + ) + + # -------- save_sms and save_email tests -------- # @@ -1128,11 +1224,12 @@ def test_should_send_template_to_correct_sms_task_and_persist( mocked_task = mock_celery_task(provider_tasks.deliver_sms) - save_sms( - sample_template_with_placeholders.service_id, - uuid.uuid4(), - signing.encode(notification), - ) + with _with_message_group_id(save_sms, str(sample_template_with_placeholders.service_id)): + save_sms( + sample_template_with_placeholders.service_id, + uuid.uuid4(), + signing.encode(notification), + ) persisted_notification = Notification.query.one() assert persisted_notification.to == "+447234123123" @@ -1147,7 +1244,11 @@ def test_should_send_template_to_correct_sms_task_and_persist( assert persisted_notification._personalisation == signing.encode({"name": "Jo"}) assert persisted_notification.notification_type == "sms" assert persisted_notification.client_reference == client_reference - mocked_task.assert_called_once_with([str(persisted_notification.id)], queue="send-sms-tasks") + mocked_task.assert_called_once_with( + [str(persisted_notification.id)], + queue="send-sms-tasks", + MessageGroupId=str(persisted_notification.service_id), + ) @pytest.mark.parametrize("client_reference", [None, "ab1234"]) @@ -1164,7 +1265,8 @@ def test_notification_belonging_to_a_job_with_incorrect_number_saved_as_validati mock_deliver_sms_task = mock_celery_task(provider_tasks.deliver_sms) - save_sms(sample_template_with_placeholders.service_id, uuid.uuid4(), signing.encode(notification)) + with _with_message_group_id(save_sms, str(sample_template_with_placeholders.service_id)): + save_sms(sample_template_with_placeholders.service_id, uuid.uuid4(), signing.encode(notification)) persisted_notification = Notification.query.one() assert persisted_notification.to == "+447234123122343253243425324233" @@ -1185,11 +1287,12 @@ def test_should_save_sms_if_restricted_service_and_valid_number(notify_db_sessio notification_id = uuid.uuid4() encode_notification = signing.encode(notification) - save_sms( - service.id, - notification_id, - encode_notification, - ) + with _with_message_group_id(save_sms, str(service.id)): + save_sms( + service.id, + notification_id, + encode_notification, + ) persisted_notification = Notification.query.one() assert persisted_notification.to == "+447700900890" @@ -1203,7 +1306,9 @@ def test_should_save_sms_if_restricted_service_and_valid_number(notify_db_sessio assert not persisted_notification.personalisation assert persisted_notification.notification_type == "sms" provider_tasks.deliver_sms.apply_async.assert_called_once_with( - [str(persisted_notification.id)], queue="send-sms-tasks" + [str(persisted_notification.id)], + queue="send-sms-tasks", + MessageGroupId=str(persisted_notification.service_id), ) @@ -1216,11 +1321,12 @@ def test_save_email_should_save_default_email_reply_to_text_on_notification(noti mock_celery_task(provider_tasks.deliver_email) notification_id = uuid.uuid4() - save_email( - service.id, - notification_id, - signing.encode(notification), - ) + with _with_message_group_id(save_email, str(service.id)): + save_email( + service.id, + notification_id, + signing.encode(notification), + ) persisted_notification = Notification.query.one() assert persisted_notification.reply_to_text == "reply_to@digital.gov.uk" @@ -1234,11 +1340,12 @@ def test_save_sms_should_save_default_smm_sender_notification_reply_to_text_on(n mock_celery_task(provider_tasks.deliver_sms) notification_id = uuid.uuid4() - save_sms( - service.id, - notification_id, - signing.encode(notification), - ) + with _with_message_group_id(save_sms, str(service.id)): + save_sms( + service.id, + notification_id, + signing.encode(notification), + ) persisted_notification = Notification.query.one() assert persisted_notification.reply_to_text == "12345" @@ -1253,11 +1360,12 @@ def test_should_not_save_sms_if_restricted_service_and_invalid_number(notify_db_ mock_celery_task(provider_tasks.deliver_sms) notification_id = uuid.uuid4() - save_sms( - service.id, - notification_id, - signing.encode(notification), - ) + with _with_message_group_id(save_sms, str(service.id)): + save_sms( + service.id, + notification_id, + signing.encode(notification), + ) assert provider_tasks.deliver_sms.apply_async.called is False assert Notification.query.count() == 0 @@ -1269,11 +1377,12 @@ def test_should_not_save_email_if_restricted_service_and_invalid_email_address(n notification = _notification_json(template, to="test@example.com") notification_id = uuid.uuid4() - save_email( - service.id, - notification_id, - signing.encode(notification), - ) + with _with_message_group_id(save_email, str(service.id)): + save_email( + service.id, + notification_id, + signing.encode(notification), + ) assert Notification.query.count() == 0 @@ -1284,11 +1393,12 @@ def test_should_save_sms_template_to_and_persist_with_job_id(sample_job, mock_ce notification_id = uuid.uuid4() now = datetime.utcnow() - save_sms( - sample_job.service.id, - notification_id, - signing.encode(notification), - ) + with _with_message_group_id(save_sms, str(sample_job.service_id)): + save_sms( + sample_job.service.id, + notification_id, + signing.encode(notification), + ) persisted_notification = Notification.query.one() assert persisted_notification.to == "+447234123123" assert persisted_notification.job_id == sample_job.id @@ -1303,7 +1413,9 @@ def test_should_save_sms_template_to_and_persist_with_job_id(sample_job, mock_ce assert persisted_notification.notification_type == "sms" provider_tasks.deliver_sms.apply_async.assert_called_once_with( - [str(persisted_notification.id)], queue="send-sms-tasks" + [str(persisted_notification.id)], + queue="send-sms-tasks", + MessageGroupId=str(persisted_notification.service_id), ) @@ -1351,11 +1463,12 @@ def test_should_use_email_template_and_persist( ) with freeze_time("2016-01-01 11:10:00.00000"): - save_email( - sample_email_template_with_placeholders.service_id, - notification_id, - signing.encode(notification), - ) + with _with_message_group_id(save_email, str(sample_email_template_with_placeholders.service_id)): + save_email( + sample_email_template_with_placeholders.service_id, + notification_id, + signing.encode(notification), + ) persisted_notification = Notification.query.one() assert persisted_notification.to == "my_email@my_email.com" @@ -1374,7 +1487,9 @@ def test_should_use_email_template_and_persist( assert persisted_notification.client_reference == client_reference provider_tasks.deliver_email.apply_async.assert_called_once_with( - [str(persisted_notification.id)], queue="send-email-tasks" + [str(persisted_notification.id)], + queue="send-email-tasks", + MessageGroupId=str(persisted_notification.service_id), ) @@ -1393,11 +1508,12 @@ def test_save_email_should_use_template_version_from_job_not_latest(sample_email t = dao_get_template_by_id(sample_email_template.id) assert t.version > version_on_notification now = datetime.utcnow() - save_email( - sample_email_template.service_id, - uuid.uuid4(), - signing.encode(notification), - ) + with _with_message_group_id(save_email, str(sample_email_template.service_id)): + save_email( + sample_email_template.service_id, + uuid.uuid4(), + signing.encode(notification), + ) persisted_notification = Notification.query.one() assert persisted_notification.to == "my_email@my_email.com" @@ -1409,7 +1525,9 @@ def test_save_email_should_use_template_version_from_job_not_latest(sample_email assert not persisted_notification.sent_by assert persisted_notification.notification_type == "email" provider_tasks.deliver_email.apply_async.assert_called_once_with( - [str(persisted_notification.id)], queue="send-email-tasks" + [str(persisted_notification.id)], + queue="send-email-tasks", + MessageGroupId=str(persisted_notification.service_id), ) @@ -1419,11 +1537,12 @@ def test_should_use_email_template_subject_placeholders(sample_email_template_wi notification_id = uuid.uuid4() now = datetime.utcnow() - save_email( - sample_email_template_with_placeholders.service_id, - notification_id, - signing.encode(notification), - ) + with _with_message_group_id(save_email, str(sample_email_template_with_placeholders.service_id)): + save_email( + sample_email_template_with_placeholders.service_id, + notification_id, + signing.encode(notification), + ) persisted_notification = Notification.query.one() assert persisted_notification.to == "my_email@my_email.com" assert persisted_notification.template_id == sample_email_template_with_placeholders.id @@ -1434,7 +1553,9 @@ def test_should_use_email_template_subject_placeholders(sample_email_template_wi assert not persisted_notification.reference assert persisted_notification.notification_type == "email" provider_tasks.deliver_email.apply_async.assert_called_once_with( - [str(persisted_notification.id)], queue="send-email-tasks" + [str(persisted_notification.id)], + queue="send-email-tasks", + MessageGroupId=str(persisted_notification.service_id), ) @@ -1449,12 +1570,13 @@ def test_save_email_uses_the_reply_to_text_when_provided(sample_email_template, service.id, "other@example.com", False ) - save_email( - sample_email_template.service_id, - notification_id, - signing.encode(notification), - sender_id=other_email_reply_to.id, - ) + with _with_message_group_id(save_email, str(sample_email_template.service_id)): + save_email( + sample_email_template.service_id, + notification_id, + signing.encode(notification), + sender_id=other_email_reply_to.id, + ) persisted_notification = Notification.query.one() assert persisted_notification.notification_type == "email" assert persisted_notification.reply_to_text == "other@example.com" @@ -1468,12 +1590,13 @@ def test_save_email_uses_the_default_reply_to_text_if_sender_id_is_none(sample_e notification_id = uuid.uuid4() service_email_reply_to_dao.add_reply_to_email_address_for_service(service.id, "default@example.com", True) - save_email( - sample_email_template.service_id, - notification_id, - signing.encode(notification), - sender_id=None, - ) + with _with_message_group_id(save_email, str(sample_email_template.service_id)): + save_email( + sample_email_template.service_id, + notification_id, + signing.encode(notification), + sender_id=None, + ) persisted_notification = Notification.query.one() assert persisted_notification.notification_type == "email" assert persisted_notification.reply_to_text == "default@example.com" @@ -1486,11 +1609,12 @@ def test_should_use_email_template_and_persist_without_personalisation(sample_em notification_id = uuid.uuid4() now = datetime.utcnow() - save_email( - sample_email_template.service_id, - notification_id, - signing.encode(notification), - ) + with _with_message_group_id(save_email, str(sample_email_template.service_id)): + save_email( + sample_email_template.service_id, + notification_id, + signing.encode(notification), + ) persisted_notification = Notification.query.one() assert persisted_notification.to == "my_email@my_email.com" assert persisted_notification.template_id == sample_email_template.id @@ -1502,7 +1626,9 @@ def test_should_use_email_template_and_persist_without_personalisation(sample_em assert not persisted_notification.reference assert persisted_notification.notification_type == "email" provider_tasks.deliver_email.apply_async.assert_called_once_with( - [str(persisted_notification.id)], queue="send-email-tasks" + [str(persisted_notification.id)], + queue="send-email-tasks", + MessageGroupId=str(persisted_notification.service_id), ) @@ -1560,12 +1686,13 @@ def test_send_email_with_template_email_files( notification_id = uuid.uuid4() mock_celery_task(provider_tasks.deliver_email) - save_email( - sample_service.id, - notification_id, - signing.encode(notification), - sender_id=None, - ) + with _with_message_group_id(save_email, str(sample_service.id)): + save_email( + sample_service.id, + notification_id, + signing.encode(notification), + sender_id=None, + ) persisted_notification = Notification.query.one() assert persisted_notification.template_id == template.id @@ -1602,11 +1729,12 @@ def test_send_email_with_template_email_files_from_old_template_version( assert one_of_email_files.template_version > version_on_notification assert one_of_email_files.retention_period == 5 - save_email( - template.service_id, - uuid.uuid4(), - signing.encode(notification), - ) + with _with_message_group_id(save_email, str(template.service_id)): + save_email( + template.service_id, + uuid.uuid4(), + signing.encode(notification), + ) persisted_notification = Notification.query.one() assert persisted_notification.to == "anne@example.com" @@ -1636,7 +1764,9 @@ def test_send_email_with_template_email_files_from_old_template_version( ] provider_tasks.deliver_email.apply_async.assert_called_once_with( - [str(persisted_notification.id)], queue="send-email-tasks" + [str(persisted_notification.id)], + queue="send-email-tasks", + MessageGroupId=str(persisted_notification.service_id), ) @@ -1655,13 +1785,14 @@ def test_save_sms_should_go_to_retry_queue_if_database_errors(sample_template, m notification_id = uuid.uuid4() with pytest.raises(Retry): - save_sms( - sample_template.service_id, - notification_id, - signing.encode(notification), - ) + with _with_message_group_id(save_sms, None): + save_sms( + sample_template.service_id, + notification_id, + signing.encode(notification), + ) assert provider_tasks.deliver_sms.apply_async.called is False - tasks.save_sms.retry.assert_called_with(exc=expected_exception, queue="retry-tasks") + tasks.save_sms.retry.assert_called_with(exc=expected_exception, queue="retry-tasks", MessageGroupId=None) assert Notification.query.count() == 0 @@ -1681,13 +1812,14 @@ def test_save_email_should_go_to_retry_queue_if_database_errors(sample_email_tem notification_id = uuid.uuid4() with pytest.raises(Retry): - save_email( - sample_email_template.service_id, - notification_id, - signing.encode(notification), - ) + with _with_message_group_id(save_email, None): + save_email( + sample_email_template.service_id, + notification_id, + signing.encode(notification), + ) assert not provider_tasks.deliver_email.apply_async.called - tasks.save_email.retry.assert_called_with(exc=expected_exception, queue="retry-tasks") + tasks.save_email.retry.assert_called_with(exc=expected_exception, queue="retry-tasks", MessageGroupId=None) assert Notification.query.count() == 0 @@ -1803,11 +1935,12 @@ def test_save_letter_saves_letter_to_database( notification_id = uuid.uuid4() created_at = datetime.utcnow() - save_letter( - job.service_id, - notification_id, - signing.encode(notification_json), - ) + with _with_message_group_id(save_letter, str(job.service_id)): + save_letter( + job.service_id, + notification_id, + signing.encode(notification_json), + ) notification_db = Notification.query.one() assert notification_db.id == notification_id @@ -1863,11 +1996,12 @@ def test_save_letter_saves_letter_to_database_with_correct_postage( row_number=1, ) notification_id = uuid.uuid4() - save_letter( - letter_job.service_id, - notification_id, - signing.encode(notification_json), - ) + with _with_message_group_id(save_letter, str(letter_job.service_id)): + save_letter( + letter_job.service_id, + notification_id, + signing.encode(notification_json), + ) notification_db = Notification.query.one() assert notification_db.id == notification_id @@ -1893,11 +2027,12 @@ def test_save_letter_saves_letter_to_database_with_formatted_postcode(mocker, mo row_number=1, ) notification_id = uuid.uuid4() - save_letter( - letter_job.service_id, - notification_id, - signing.encode(notification_json), - ) + with _with_message_group_id(save_letter, str(letter_job.service_id)): + save_letter( + letter_job.service_id, + notification_id, + signing.encode(notification_json), + ) notification_db = Notification.query.one() assert notification_db.id == notification_id @@ -1935,11 +2070,12 @@ def test_save_letter_saves_letter_to_database_right_reply_to(mocker, mock_celery notification_id = uuid.uuid4() created_at = datetime.utcnow() - save_letter( - job.service_id, - notification_id, - signing.encode(notification_json), - ) + with _with_message_group_id(save_letter, str(job.service_id)): + save_letter( + job.service_id, + notification_id, + signing.encode(notification_json), + ) notification_db = Notification.query.one() assert notification_db.id == notification_id @@ -1986,11 +2122,12 @@ def test_save_letter_uses_template_reply_to_text(mocker, mock_celery_task, notif row_number=1, ) - save_letter( - job.service_id, - uuid.uuid4(), - signing.encode(notification_json), - ) + with _with_message_group_id(save_letter, str(job.service_id)): + save_letter( + job.service_id, + uuid.uuid4(), + signing.encode(notification_json), + ) notification_db = Notification.query.one() assert notification_db.reply_to_text == "Template address contact" @@ -2004,11 +2141,12 @@ def test_save_sms_uses_sms_sender_reply_to_text(mocker, mock_celery_task, notify mock_celery_task(provider_tasks.deliver_sms) notification_id = uuid.uuid4() - save_sms( - service.id, - notification_id, - signing.encode(notification), - ) + with _with_message_group_id(save_sms, str(service.id)): + save_sms( + service.id, + notification_id, + signing.encode(notification), + ) persisted_notification = Notification.query.one() assert persisted_notification.reply_to_text == "447123123123" @@ -2023,12 +2161,13 @@ def test_save_sms_uses_non_default_sms_sender_reply_to_text_if_provided(mocker, mock_celery_task(provider_tasks.deliver_sms) notification_id = uuid.uuid4() - save_sms( - service.id, - notification_id, - signing.encode(notification), - sender_id=new_sender.id, - ) + with _with_message_group_id(save_sms, str(service.id)): + save_sms( + service.id, + notification_id, + signing.encode(notification), + sender_id=new_sender.id, + ) persisted_notification = Notification.query.one() assert persisted_notification.reply_to_text == "new-sender" @@ -2051,11 +2190,12 @@ def test_save_sms_doesnt_check_international_sms_limit( from tests.conftest import set_config with set_config(notify_api, "REDIS_ENABLED", True): - save_sms( - service.id, - notification_id, - signing.encode(notification), - ) + with _with_message_group_id(save_sms, str(service.id)): + save_sms( + service.id, + notification_id, + signing.encode(notification), + ) persisted_notification = Notification.query.one() assert persisted_notification.normalised_to == "48697894044" @@ -2080,14 +2220,19 @@ def test_save_letter_calls_get_pdf_for_templated_letter_task( ) notification_id = uuid.uuid4() - save_letter( - sample_letter_job.service_id, - notification_id, - signing.encode(notification_json), - ) + with _with_message_group_id(save_letter, str(sample_letter_job.service_id)): + save_letter( + sample_letter_job.service_id, + notification_id, + signing.encode(notification_json), + ) assert mock_create_letters_pdf.called - mock_create_letters_pdf.assert_called_once_with([str(notification_id)], queue=QueueNames.CREATE_LETTERS_PDF) + mock_create_letters_pdf.assert_called_once_with( + [str(notification_id)], + queue=QueueNames.CREATE_LETTERS_PDF, + MessageGroupId=str(sample_letter_job.service_id), + ) def test_should_cancel_job_if_service_is_inactive(sample_service, sample_job, mocker, mock_celery_task): @@ -2249,6 +2394,7 @@ def test_process_incomplete_job_sms(mocker, mock_celery_task, sample_template): ], ), queue="job-tasks", + MessageGroupId=str(job.service_id), ), call( ( @@ -2260,6 +2406,7 @@ def test_process_incomplete_job_sms(mocker, mock_celery_task, sample_template): ], ), queue="job-tasks", + MessageGroupId=str(job.service_id), ), call( ( @@ -2270,6 +2417,7 @@ def test_process_incomplete_job_sms(mocker, mock_celery_task, sample_template): ], ), queue="job-tasks", + MessageGroupId=str(job.service_id), ), ] @@ -2403,6 +2551,7 @@ def test_process_incomplete_jobs_sms(mocker, mock_celery_task, sample_template): ], ), queue="job-tasks", + MessageGroupId=str(job.service_id), ), call( ( @@ -2435,6 +2584,7 @@ def test_process_incomplete_jobs_sms(mocker, mock_celery_task, sample_template): ], ), queue="job-tasks", + MessageGroupId=str(job.service_id), ), call( ( @@ -2475,6 +2625,7 @@ def test_process_incomplete_jobs_sms(mocker, mock_celery_task, sample_template): ], ), queue="job-tasks", + MessageGroupId=str(job.service_id), ), call( ( @@ -2491,6 +2642,7 @@ def test_process_incomplete_jobs_sms(mocker, mock_celery_task, sample_template): ], ), queue="job-tasks", + MessageGroupId=str(job2.service_id), ), ] @@ -2588,6 +2740,7 @@ def shatter_job_rows_side_effect(*args, **kwargs): ], ), queue="job-tasks", + MessageGroupId=str(job.service_id), ), # fails - splits & retries first half call( @@ -2605,6 +2758,7 @@ def shatter_job_rows_side_effect(*args, **kwargs): ], ), queue="job-tasks", + MessageGroupId=str(job.service_id), ), # succeeds - retries second half call( @@ -2630,6 +2784,7 @@ def shatter_job_rows_side_effect(*args, **kwargs): ], ), queue="job-tasks", + MessageGroupId=str(job.service_id), ), # fails - splits & retries first half call( @@ -2647,6 +2802,7 @@ def shatter_job_rows_side_effect(*args, **kwargs): ], ), queue="job-tasks", + MessageGroupId=str(job.service_id), ), # fails - gives up on this job & proceeds to next call( @@ -2680,6 +2836,7 @@ def shatter_job_rows_side_effect(*args, **kwargs): ], ), queue="job-tasks", + MessageGroupId=str(job2.service_id), ), call( ( @@ -2704,6 +2861,7 @@ def shatter_job_rows_side_effect(*args, **kwargs): ], ), queue="job-tasks", + MessageGroupId=str(job2.service_id), ), ] @@ -3002,12 +3160,13 @@ def test_save_tasks_use_cached_service_and_template( wraps=SerialisedTemplate.get_dict, ) - for _ in range(3): - task_function( - service.id, - uuid.uuid4(), - signing.encode(notification), - ) + with _with_message_group_id(task_function, str(service.id)): + for _ in range(3): + task_function( + service.id, + uuid.uuid4(), + signing.encode(notification), + ) # We talk to the database once for the service and once for the # template; subsequent calls are caught by the in memory cache diff --git a/tests/app/delivery/test_send_to_providers.py b/tests/app/delivery/test_send_to_providers.py index 545fa17018..b042135ce0 100644 --- a/tests/app/delivery/test_send_to_providers.py +++ b/tests/app/delivery/test_send_to_providers.py @@ -366,7 +366,9 @@ def test_send_email_to_provider_should_call_response_task_if_test_key(sample_ema send_to_providers.send_email_to_provider(notification) assert not app.aws_ses_client.send_email.called - app.delivery.send_to_providers.send_email_response.assert_called_once_with(str(reference), "john@smith.com") + app.delivery.send_to_providers.send_email_response.assert_called_once_with( + str(reference), "john@smith.com", notification.service_id + ) persisted_notification = Notification.query.filter_by(id=notification.id).one() assert persisted_notification.to == "john@smith.com" assert persisted_notification.template_id == sample_email_template.id diff --git a/tests/app/job/test_rest.py b/tests/app/job/test_rest.py index d1ccef03a9..353c67ebbb 100644 --- a/tests/app/job/test_rest.py +++ b/tests/app/job/test_rest.py @@ -152,7 +152,10 @@ def test_create_unscheduled_job(client, sample_template, mocker, mock_celery_tas assert response.status_code == 201 app.celery.tasks.process_job.apply_async.assert_called_once_with( - ([str(fake_uuid)]), {"sender_id": None}, queue="job-tasks" + [str(fake_uuid)], + {"sender_id": None}, + queue="job-tasks", + MessageGroupId=str(sample_template.service_id), ) resp_json = json.loads(response.get_data(as_text=True)) @@ -193,7 +196,10 @@ def test_create_unscheduled_job_with_sender_id_in_metadata( assert response.status_code == 201 app.celery.tasks.process_job.apply_async.assert_called_once_with( - ([str(fake_uuid)]), {"sender_id": fake_uuid}, queue="job-tasks" + [str(fake_uuid)], + {"sender_id": fake_uuid}, + queue="job-tasks", + MessageGroupId=str(sample_template.service_id), ) diff --git a/tests/app/notifications/test_notifications_ses_callback.py b/tests/app/notifications/test_notifications_ses_callback.py index 2b4327ad95..174f3ce97a 100644 --- a/tests/app/notifications/test_notifications_ses_callback.py +++ b/tests/app/notifications/test_notifications_ses_callback.py @@ -91,7 +91,9 @@ def test_check_and_queue_callback_task(mocker, mock_celery_task, sample_notifica assert mock_create_args[1].id == callback_api.id mock_send.assert_called_once_with( - [str(sample_notification.id), mock_create.return_value], queue="service-callbacks" + [str(sample_notification.id), mock_create.return_value], + queue="service-callbacks", + MessageGroupId=str(sample_notification.service_id), ) diff --git a/tests/app/notifications/test_process_notification.py b/tests/app/notifications/test_process_notification.py index 0991f9cb09..84f95be16d 100644 --- a/tests/app/notifications/test_process_notification.py +++ b/tests/app/notifications/test_process_notification.py @@ -667,17 +667,19 @@ def test_send_notification_to_queue( mocker, ): mocked = mocker.patch(f"app.celery.{expected_task}.apply_async") - Notification = namedtuple("Notification", ["id", "key_type", "notification_type", "created_at"]) + Notification = namedtuple("Notification", ["id", "key_type", "notification_type", "created_at", "service_id"]) + service_id = uuid.uuid4() notification = Notification( id=uuid.uuid4(), key_type=key_type, notification_type=notification_type, created_at=datetime.datetime(2016, 11, 11, 16, 8, 18), + service_id=service_id, ) send_notification_to_queue(notification=notification, queue=requested_queue) - mocked.assert_called_once_with([str(notification.id)], queue=expected_queue) + mocked.assert_called_once_with([str(notification.id)], queue=expected_queue, MessageGroupId=str(service_id)) def test_send_notification_to_queue_throws_exception_deletes_notification(sample_notification, mocker): @@ -687,7 +689,11 @@ def test_send_notification_to_queue_throws_exception_deletes_notification(sample ) with pytest.raises(Boto3Error): send_notification_to_queue(sample_notification) - mocked.assert_called_once_with([str(sample_notification.id)], queue="send-sms-tasks") + mocked.assert_called_once_with( + [str(sample_notification.id)], + queue="send-sms-tasks", + MessageGroupId=str(sample_notification.service_id), + ) assert Notification.query.count() == 0 assert NotificationHistory.query.count() == 0 diff --git a/tests/app/notifications/test_receive_notification.py b/tests/app/notifications/test_receive_notification.py index 2ac1d230b7..21f351cf33 100644 --- a/tests/app/notifications/test_receive_notification.py +++ b/tests/app/notifications/test_receive_notification.py @@ -72,7 +72,9 @@ def test_receive_notification_returns_received_to_mmg(client, mocker, sample_ser inbound_sms_id = InboundSms.query.all()[0].id mocked.assert_called_once_with( - [str(inbound_sms_id), str(sample_service_full_permissions.id)], queue="service-callbacks" + [str(inbound_sms_id), str(sample_service_full_permissions.id)], + queue="service-callbacks", + MessageGroupId=str(sample_service_full_permissions.id), ) @@ -340,7 +342,11 @@ def test_receive_notification_returns_received_to_firetext(notify_db_session, cl assert result["status"] == "ok" inbound_sms_id = InboundSms.query.all()[0].id - mocked.assert_called_once_with([str(inbound_sms_id), str(service.id)], queue="service-callbacks") + mocked.assert_called_once_with( + [str(inbound_sms_id), str(service.id)], + queue="service-callbacks", + MessageGroupId=str(service.id), + ) def test_receive_notification_from_firetext_persists_message(notify_db_session, client, mocker): @@ -368,7 +374,11 @@ def test_receive_notification_from_firetext_persists_message(notify_db_session, assert persisted.content == "this is a message" assert persisted.provider == "firetext" assert persisted.provider_date == datetime(2017, 1, 1, 12, 0, 0, 0) - mocked.assert_called_once_with([str(persisted.id), str(service.id)], queue="service-callbacks") + mocked.assert_called_once_with( + [str(persisted.id), str(service.id)], + queue="service-callbacks", + MessageGroupId=str(service.id), + ) def test_receive_notification_from_firetext_persists_message_with_normalized_phone(notify_db_session, client, mocker): diff --git a/tests/app/organisation/test_invite_rest.py b/tests/app/organisation/test_invite_rest.py index 06ed5c0000..0874aebea4 100644 --- a/tests/app/organisation/test_invite_rest.py +++ b/tests/app/organisation/test_invite_rest.py @@ -69,7 +69,11 @@ def test_create_invited_org_user( assert notification.personalisation["url"].startswith(expected_start_of_invite_url.format(hostnames=hostnames)) assert len(notification.personalisation["url"]) > len(expected_start_of_invite_url.format(hostnames=hostnames)) - mocked.assert_called_once_with([(str(notification.id))], queue="notify-internal-tasks") + mocked.assert_called_once_with( + [(str(notification.id))], + queue="notify-internal-tasks", + MessageGroupId=str(notification.service_id), + ) def test_create_invited_user_invalid_email(admin_request, sample_organisation, sample_user, mocker): diff --git a/tests/app/service/test_rest.py b/tests/app/service/test_rest.py index c4d129fd52..46bf3cfc14 100644 --- a/tests/app/service/test_rest.py +++ b/tests/app/service/test_rest.py @@ -2598,7 +2598,11 @@ def test_verify_reply_to_email_address_should_send_verification_email( notification = Notification.query.first() assert notification.template_id == verify_reply_to_address_email_template.id assert response["data"] == {"id": str(notification.id)} - mocked.assert_called_once_with([str(notification.id)], queue="notify-internal-tasks") + mocked.assert_called_once_with( + [str(notification.id)], + queue="notify-internal-tasks", + MessageGroupId=str(notification.service_id), + ) assert notification.reply_to_text == notify_service.get_default_reply_to_email_address() @@ -4594,7 +4598,11 @@ def test_update_service_join_request_by_id_notification_sent( ) notification = Notification.query.first() - mock_deliver_email_task.assert_called_once_with(([str(notification.id)]), queue="notify-internal-tasks") + mock_deliver_email_task.assert_called_once_with( + ([str(notification.id)]), + queue="notify-internal-tasks", + MessageGroupId=str(notification.service_id), + ) assert notification.reply_to_text == notify_service.get_default_reply_to_email_address() assert notification.to == f"{requester_id}@digital.cabinet-office.gov.uk" @@ -4640,7 +4648,11 @@ def test_update_service_join_request_get_template( ) notification = Notification.query.first() - mock_deliver_email_task.assert_called_once_with(([str(notification.id)]), queue="notify-internal-tasks") + mock_deliver_email_task.assert_called_once_with( + ([str(notification.id)]), + queue="notify-internal-tasks", + MessageGroupId=str(notification.service_id), + ) assert notification.template.version == template.version assert notification.template_id == template.id @@ -4786,6 +4798,7 @@ def test_create_report_request_by_type( "service_id": sample_service.id, }, queue="report-requests-notifications-tasks", + MessageGroupId=str(sample_service.id), ) assert json_resp["data"]["id"] @@ -4869,6 +4882,7 @@ def test_create_report_request_by_type_creates_new_when_no_existing(admin_reques "service_id": sample_service.id, }, queue="report-requests-notifications-tasks", + MessageGroupId=str(sample_service.id), ) @@ -4917,4 +4931,5 @@ def test_create_report_request_by_type_creates_new_if_existing_is_stale( "service_id": sample_service.id, }, queue="report-requests-notifications-tasks", + MessageGroupId=str(sample_service.id), ) diff --git a/tests/app/service_invite/test_service_invite_rest.py b/tests/app/service_invite/test_service_invite_rest.py index 47c9ac6dc0..293042490a 100644 --- a/tests/app/service_invite/test_service_invite_rest.py +++ b/tests/app/service_invite/test_service_invite_rest.py @@ -65,7 +65,11 @@ def test_create_invited_user( assert len(notification.personalisation["url"]) > len(expected_start_of_invite_url.format(hostnames=hostnames)) assert str(notification.template_id) == current_app.config["INVITATION_EMAIL_TEMPLATE_ID"] - mocked.assert_called_once_with([(str(notification.id))], queue="notify-internal-tasks") + mocked.assert_called_once_with( + [(str(notification.id))], + queue="notify-internal-tasks", + MessageGroupId=str(notification.service_id), + ) def test_create_invited_user_without_auth_type(admin_request, sample_service, mocker, invitation_email_template): diff --git a/tests/app/user/test_rest.py b/tests/app/user/test_rest.py index afb1b341d1..c9f1456b60 100644 --- a/tests/app/user/test_rest.py +++ b/tests/app/user/test_rest.py @@ -659,7 +659,11 @@ def test_send_user_reset_password_should_send_reset_password_link( ) notification = Notification.query.first() - mocked.assert_called_once_with([str(notification.id)], queue="notify-internal-tasks") + mocked.assert_called_once_with( + [str(notification.id)], + queue="notify-internal-tasks", + MessageGroupId=str(notification.service_id), + ) assert notification.reply_to_text == notify_service.get_default_reply_to_email_address() @@ -716,7 +720,11 @@ def test_send_user_reset_password_reset_password_link_contains_redirect_link_if_ notification = Notification.query.first() assert "?next=blob" in notification.content - mocked.assert_called_once_with([str(notification.id)], queue="notify-internal-tasks") + mocked.assert_called_once_with( + [str(notification.id)], + queue="notify-internal-tasks", + MessageGroupId=str(notification.service_id), + ) def test_send_user_reset_password_should_return_400_when_email_is_missing(admin_request, mocker): @@ -775,7 +783,11 @@ def test_send_already_registered_email(admin_request, sample_user, already_regis ) notification = Notification.query.first() - mocked.assert_called_once_with(([str(notification.id)]), queue="notify-internal-tasks") + mocked.assert_called_once_with( + ([str(notification.id)]), + queue="notify-internal-tasks", + MessageGroupId=str(notification.service_id), + ) assert notification.reply_to_text == notify_service.get_default_reply_to_email_address() @@ -807,7 +819,11 @@ def test_send_user_confirm_new_email_returns_204( ) notification = Notification.query.first() - mocked.assert_called_once_with(([str(notification.id)]), queue="notify-internal-tasks") + mocked.assert_called_once_with( + ([str(notification.id)]), + queue="notify-internal-tasks", + MessageGroupId=str(notification.service_id), + ) assert notification.reply_to_text == notify_service.get_default_reply_to_email_address() diff --git a/tests/app/user/test_rest_verify.py b/tests/app/user/test_rest_verify.py index e457f61eec..6a1e474b24 100644 --- a/tests/app/user/test_rest_verify.py +++ b/tests/app/user/test_rest_verify.py @@ -211,7 +211,9 @@ def test_send_user_sms_code(client, sample_user, sms_code_template, mocker): assert notification.reply_to_text == notify_service.get_default_sms_sender() app.celery.provider_tasks.deliver_sms.apply_async.assert_called_once_with( - ([str(notification.id)]), queue="notify-internal-tasks" + ([str(notification.id)]), + queue="notify-internal-tasks", + MessageGroupId=str(notification.service_id), ) @@ -236,7 +238,9 @@ def test_send_user_code_for_sms_with_optional_to_field(client, sample_user, sms_ notification = Notification.query.first() assert notification.to == to_number app.celery.provider_tasks.deliver_sms.apply_async.assert_called_once_with( - ([str(notification.id)]), queue="notify-internal-tasks" + ([str(notification.id)]), + queue="notify-internal-tasks", + MessageGroupId=str(notification.service_id), ) @@ -307,7 +311,11 @@ def test_send_new_user_email_verification( assert resp.status_code == 204 notification = Notification.query.first() assert VerifyCode.query.count() == 0 - mocked.assert_called_once_with(([str(notification.id)]), queue="notify-internal-tasks") + mocked.assert_called_once_with( + ([str(notification.id)]), + queue="notify-internal-tasks", + MessageGroupId=str(notification.service_id), + ) assert notification.reply_to_text == notify_service.get_default_reply_to_email_address() assert notification.personalisation["name"] == "Test User" assert notification.personalisation["url"].startswith(expected_url_starts_with.format(hostnames=hostnames)) @@ -413,7 +421,11 @@ def test_send_user_email_code( assert str(noti.template_id) == current_app.config["EMAIL_2FA_TEMPLATE_ID"] assert noti.personalisation["name"] == "Test User" assert noti.personalisation["url"].startswith(expected_auth_url.format(hostnames=hostnames)) - deliver_email.assert_called_once_with([str(noti.id)], queue="notify-internal-tasks") + deliver_email.assert_called_once_with( + [str(noti.id)], + queue="notify-internal-tasks", + MessageGroupId=str(noti.service_id), + ) def test_send_user_email_code_with_urlencoded_next_param(admin_request, mocker, sample_user, email_2fa_code_template): diff --git a/tests/app/v2/notifications/test_post_letter_notifications.py b/tests/app/v2/notifications/test_post_letter_notifications.py index 94e172c3b9..18aca4616e 100644 --- a/tests/app/v2/notifications/test_post_letter_notifications.py +++ b/tests/app/v2/notifications/test_post_letter_notifications.py @@ -71,7 +71,11 @@ def test_post_letter_notification_returns_201(api_client_request, sample_letter_ ) assert not resp_json["scheduled_for"] assert not notification.reply_to_text - mock.assert_called_once_with([str(notification.id)], queue=QueueNames.CREATE_LETTERS_PDF) + mock.assert_called_once_with( + [str(notification.id)], + queue=QueueNames.CREATE_LETTERS_PDF, + MessageGroupId=str(sample_letter_template.service_id), + ) def test_post_letter_notification_sets_postage(api_client_request, notify_db_session, mocker): @@ -288,7 +292,11 @@ def test_post_letter_notification_with_test_key_creates_pdf_and_sets_status_to_d notification = Notification.query.one() - fake_create_letter_task.assert_called_once_with([str(notification.id)], queue="research-mode-tasks") + fake_create_letter_task.assert_called_once_with( + [str(notification.id)], + queue="research-mode-tasks", + MessageGroupId=str(sample_letter_template.service_id), + ) assert not fake_create_dvla_response_task.called assert notification.status == NOTIFICATION_DELIVERED assert notification.updated_at is not None @@ -322,7 +330,11 @@ def test_post_letter_notification_with_test_key_creates_pdf_and_sets_status_to_s notification = Notification.query.one() - fake_create_letter_task.assert_called_once_with([str(notification.id)], queue="research-mode-tasks") + fake_create_letter_task.assert_called_once_with( + [str(notification.id)], + queue="research-mode-tasks", + MessageGroupId=str(sample_letter_template.service_id), + ) assert fake_create_dvla_response_task.called assert notification.status == NOTIFICATION_SENDING @@ -563,7 +575,11 @@ def test_post_letter_notification_is_delivered_but_still_creates_pdf_if_in_trial notification = Notification.query.one() assert notification.status == NOTIFICATION_DELIVERED - fake_create_letter_task.assert_called_once_with([str(notification.id)], queue="research-mode-tasks") + fake_create_letter_task.assert_called_once_with( + [str(notification.id)], + queue="research-mode-tasks", + MessageGroupId=str(sample_trial_letter_template.service_id), + ) def test_post_letter_notification_is_delivered_and_has_pdf_uploaded_to_test_letters_bucket_using_test_key( diff --git a/tests/app/v2/notifications/test_post_notifications.py b/tests/app/v2/notifications/test_post_notifications.py index 99475461e4..fca55c45fe 100644 --- a/tests/app/v2/notifications/test_post_notifications.py +++ b/tests/app/v2/notifications/test_post_notifications.py @@ -94,7 +94,11 @@ def test_post_sms_notification_uses_inbound_number_as_sender(api_client_request, assert resp_json["id"] == str(notification_id) assert resp_json["content"]["from_number"] == "1" assert notifications[0].reply_to_text == "1" - mocked.assert_called_once_with([str(notification_id)], queue="send-sms-tasks") + mocked.assert_called_once_with( + [str(notification_id)], + queue="send-sms-tasks", + MessageGroupId=str(service.id), + ) def test_post_sms_notification_uses_inbound_number_reply_to_as_sender(api_client_request, notify_db_session, mocker): @@ -118,7 +122,11 @@ def test_post_sms_notification_uses_inbound_number_reply_to_as_sender(api_client assert resp_json["id"] == str(notification_id) assert resp_json["content"]["from_number"] == "447123123123" assert notifications[0].reply_to_text == "447123123123" - mocked.assert_called_once_with([str(notification_id)], queue="send-sms-tasks") + mocked.assert_called_once_with( + [str(notification_id)], + queue="send-sms-tasks", + MessageGroupId=str(service.id), + ) def test_post_sms_notification_returns_201_with_sms_sender_id( @@ -145,7 +153,11 @@ def test_post_sms_notification_returns_201_with_sms_sender_id( notifications = Notification.query.all() assert len(notifications) == 1 assert notifications[0].reply_to_text == sms_sender.sms_sender - mocked.assert_called_once_with([resp_json["id"]], queue="send-sms-tasks") + mocked.assert_called_once_with( + [resp_json["id"]], + queue="send-sms-tasks", + MessageGroupId=str(sample_template_with_placeholders.service_id), + ) def test_post_sms_notification_uses_sms_sender_id_reply_to( @@ -173,7 +185,11 @@ def test_post_sms_notification_uses_sms_sender_id_reply_to( notifications = Notification.query.all() assert len(notifications) == 1 assert notifications[0].reply_to_text == "447123123123" - mocked.assert_called_once_with([resp_json["id"]], queue="send-sms-tasks") + mocked.assert_called_once_with( + [resp_json["id"]], + queue="send-sms-tasks", + MessageGroupId=str(sample_template_with_placeholders.service_id), + ) def test_notification_reply_to_text_is_original_value_if_sender_is_changed_after_post_notification( diff --git a/tests/conftest.py b/tests/conftest.py index 4987e52a43..7497cc565f 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -2,6 +2,7 @@ import subprocess from collections import namedtuple from contextlib import contextmanager +from unittest.mock import patch from urllib.parse import urlparse import freezegun @@ -265,6 +266,12 @@ def set_config_values(app, dict): app.config[key] = old_values[key] +@contextmanager +def _with_message_group_id(task, value): + with patch("notifications_utils.celery.NotifyTask.message_group_id", new=value, create=True): + yield + + class Matcher: def __init__(self, description, key): self.description = description