Skip to content

Commit 7ecb3ee

Browse files
committed
feat: message group id for nightly, letters PDF and research mode
1 parent 02b3dff commit 7ecb3ee

File tree

7 files changed

+97
-32
lines changed

7 files changed

+97
-32
lines changed

app/celery/letters_pdf_tasks.py

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,10 @@ def get_pdf_for_templated_letter(self, notification_id):
9292
encoded_data = signing.encode(letter_data)
9393

9494
notify_celery.send_task(
95-
name=TaskNames.CREATE_PDF_FOR_TEMPLATED_LETTER, args=(encoded_data,), queue=QueueNames.SANITISE_LETTERS
95+
name=TaskNames.CREATE_PDF_FOR_TEMPLATED_LETTER,
96+
args=(encoded_data,),
97+
queue=QueueNames.SANITISE_LETTERS,
98+
MessageGroupId=self.message_group_id if self.message_group_id is not None else str(notification.service_id),
9699
)
97100
except Exception as e:
98101
try:
@@ -294,6 +297,7 @@ def sanitise_letter(self, filename):
294297
"allow_international_letters": notification.service.has_permission(INTERNATIONAL_LETTERS),
295298
},
296299
queue=QueueNames.SANITISE_LETTERS,
300+
MessageGroupId=self.message_group_id,
297301
)
298302
except Exception:
299303
try:
@@ -541,8 +545,8 @@ def replay_letters_in_error(filename=None):
541545
sanitise_letter.apply_async([filename], queue=QueueNames.LETTERS)
542546

543547

544-
@notify_celery.task(name="resanitise-pdf")
545-
def resanitise_pdf(notification_id):
548+
@notify_celery.task(bind=True, name="resanitise-pdf")
549+
def resanitise_pdf(self, notification_id):
546550
"""
547551
`notification_id` is the notification id for a PDF letter which was either uploaded or sent using the API.
548552
@@ -570,11 +574,12 @@ def resanitise_pdf(notification_id):
570574
"allow_international_letters": notification.service.has_permission(INTERNATIONAL_LETTERS),
571575
},
572576
queue=QueueNames.SANITISE_LETTERS,
577+
MessageGroupId=self.message_group_id if self.message_group_id is not None else str(notification.service_id),
573578
)
574579

575580

576-
@notify_celery.task(name="resanitise-letter-attachment")
577-
def resanitise_letter_attachment(service_id, attachment_id, original_filename):
581+
@notify_celery.task(bind=True, name="resanitise-letter-attachment")
582+
def resanitise_letter_attachment(self, service_id, attachment_id, original_filename):
578583
"""
579584
`service_id` is the service id for a PDF letter attachment/template.
580585
`attachment_id` is the attachment id for a PDF letter attachment which was uploaded for a template.
@@ -593,4 +598,5 @@ def resanitise_letter_attachment(service_id, attachment_id, original_filename):
593598
"original_filename": original_filename,
594599
},
595600
queue=QueueNames.SANITISE_LETTERS,
601+
MessageGroupId=self.message_group_id if self.message_group_id is not None else str(service_id),
596602
)

app/celery/nightly_tasks.py

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -82,8 +82,16 @@ def _remove_csv_files(job_types):
8282
@notify_celery.task(name="archive-unsubscribe-requests")
8383
def archive_unsubscribe_requests():
8484
for service_id in get_service_ids_with_unsubscribe_requests():
85-
archive_batched_unsubscribe_requests.apply_async(queue=QueueNames.REPORTING, args=[service_id])
86-
archive_old_unsubscribe_requests.apply_async(queue=QueueNames.REPORTING, args=[service_id])
85+
archive_batched_unsubscribe_requests.apply_async(
86+
queue=QueueNames.REPORTING,
87+
args=[service_id],
88+
MessageGroupId=str(service_id),
89+
)
90+
archive_old_unsubscribe_requests.apply_async(
91+
queue=QueueNames.REPORTING,
92+
args=[service_id],
93+
MessageGroupId=str(service_id),
94+
)
8795

8896

8997
@notify_celery.task(name="archive-batched-unsubscribe-requests")
@@ -163,6 +171,7 @@ def _delete_notifications_older_than_retention_by_type(
163171
"datetime_to_delete_before": day_to_delete_backwards_from,
164172
},
165173
countdown=(i / len(flexible_data_retention)) * stagger_total_period.seconds,
174+
MessageGroupId=str(f.service_id),
166175
)
167176

168177
seven_days_ago = get_london_midnight_in_utc(convert_utc_to_bst(datetime.utcnow()).date() - timedelta(days=7))
@@ -185,6 +194,7 @@ def _delete_notifications_older_than_retention_by_type(
185194
"datetime_to_delete_before": seven_days_ago,
186195
},
187196
countdown=(i / len(service_ids_to_purge)) * stagger_total_period.seconds,
197+
MessageGroupId=str(service_id),
188198
)
189199

190200
extra = {
@@ -204,8 +214,8 @@ def _delete_notifications_older_than_retention_by_type(
204214
)
205215

206216

207-
@notify_celery.task(name="delete-notifications-for-service-and-type")
208-
def delete_notifications_for_service_and_type(service_id, notification_type, datetime_to_delete_before):
217+
@notify_celery.task(bind=True, name="delete-notifications-for-service-and-type")
218+
def delete_notifications_for_service_and_type(self, service_id, notification_type, datetime_to_delete_before):
209219
start = datetime.utcnow()
210220
num_deleted = move_notifications_to_notification_history(
211221
notification_type,
@@ -237,12 +247,14 @@ def delete_notifications_for_service_and_type(service_id, notification_type, dat
237247
delete_notifications_for_service_and_type.apply_async(
238248
args=(service_id, notification_type, datetime_to_delete_before),
239249
queue=QueueNames.REPORTING,
250+
MessageGroupId=self.message_group_id,
240251
)
241252
else:
242253
# now we've deleted all the real notifications, clean up the test notifications
243254
delete_test_notifications_for_service_and_type.apply_async(
244255
args=(service_id, notification_type, datetime_to_delete_before),
245256
queue=QueueNames.REPORTING,
257+
MessageGroupId=self.message_group_id,
246258
)
247259

248260

@@ -254,6 +266,7 @@ def delete_test_notifications_for_service_and_type(service_id, notification_type
254266
delete_test_notifications_for_service_and_type.apply_async(
255267
args=(service_id, notification_type, datetime_to_delete_before),
256268
queue=QueueNames.REPORTING,
269+
MessageGroupId=str(service_id),
257270
)
258271

259272

app/celery/research_mode_tasks.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,15 +54,19 @@ def send_sms_response(provider, reference, to):
5454
make_request(SMS_TYPE, provider, body, headers)
5555

5656

57-
def send_email_response(reference, to):
57+
def send_email_response(reference, to, service_id):
5858
if to == perm_fail_email:
5959
body = ses_hard_bounce_callback(reference)
6060
elif to == temp_fail_email:
6161
body = ses_soft_bounce_callback(reference)
6262
else:
6363
body = ses_notification_callback(reference)
6464

65-
process_ses_results.apply_async([body], queue=QueueNames.RESEARCH_MODE)
65+
process_ses_results.apply_async(
66+
[body],
67+
queue=QueueNames.RESEARCH_MODE,
68+
MessageGroupId=str(service_id),
69+
)
6670

6771

6872
def send_letter_response(notification_id: uuid.UUID, billable_units: int, postage: str):

app/delivery/send_to_providers.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ def send_email_to_provider(notification):
163163
if notification.key_type == KEY_TYPE_TEST:
164164
notification.reference = str(create_uuid())
165165
update_notification_to_sending(notification, provider)
166-
send_email_response(notification.reference, notification.to)
166+
send_email_response(notification.reference, notification.to, notification.service_id)
167167
else:
168168
email_sender_name = service.custom_email_sender_name or service.name
169169
from_address = (

tests/app/celery/test_letters_pdf_tasks.py

Lines changed: 34 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
create_notification,
5656
create_service,
5757
)
58+
from tests.conftest import _with_message_group_id
5859

5960

6061
def test_should_have_decorated_tasks_functions():
@@ -75,7 +76,8 @@ def test_get_pdf_for_templated_letter_happy_path(mocker, sample_letter_notificat
7576
mock_generate_letter_pdf_filename = mocker.patch(
7677
"app.celery.letters_pdf_tasks.generate_letter_pdf_filename", return_value="LETTER.PDF"
7778
)
78-
get_pdf_for_templated_letter(sample_letter_notification.id)
79+
with _with_message_group_id(get_pdf_for_templated_letter, str(sample_letter_notification.service_id)):
80+
get_pdf_for_templated_letter(sample_letter_notification.id)
7981

8082
letter_data = {
8183
"letter_contact_block": sample_letter_notification.reply_to_text,
@@ -97,7 +99,10 @@ def test_get_pdf_for_templated_letter_happy_path(mocker, sample_letter_notificat
9799
}
98100

99101
mock_celery.assert_called_once_with(
100-
name=TaskNames.CREATE_PDF_FOR_TEMPLATED_LETTER, args=(ANY,), queue=QueueNames.SANITISE_LETTERS
102+
name=TaskNames.CREATE_PDF_FOR_TEMPLATED_LETTER,
103+
args=(ANY,),
104+
queue=QueueNames.SANITISE_LETTERS,
105+
MessageGroupId=str(sample_letter_notification.service_id),
101106
)
102107

103108
actual_data = signing.decode(mock_celery.call_args.kwargs["args"][0])
@@ -117,7 +122,8 @@ def test_get_pdf_for_templated_letter_with_letter_attachment(mocker, sample_lett
117122

118123
mock_celery = mocker.patch("app.celery.letters_pdf_tasks.notify_celery.send_task")
119124
mocker.patch("app.celery.letters_pdf_tasks.generate_letter_pdf_filename", return_value="LETTER.PDF")
120-
get_pdf_for_templated_letter(sample_letter_notification.id)
125+
with _with_message_group_id(get_pdf_for_templated_letter, str(sample_letter_notification.service_id)):
126+
get_pdf_for_templated_letter(sample_letter_notification.id)
121127

122128
actual_data = signing.decode(mock_celery.call_args.kwargs["args"][0])
123129

@@ -134,8 +140,9 @@ def test_get_pdf_for_templated_letter_retries_upon_error(mocker, sample_letter_n
134140
mocker.patch("app.celery.letters_pdf_tasks.generate_letter_pdf_filename", return_value="LETTER.PDF")
135141
mock_retry = mocker.patch("app.celery.letters_pdf_tasks.get_pdf_for_templated_letter.retry")
136142

137-
with caplog.at_level("ERROR"):
138-
get_pdf_for_templated_letter(sample_letter_notification.id)
143+
with _with_message_group_id(get_pdf_for_templated_letter, str(sample_letter_notification.service_id)):
144+
with caplog.at_level("ERROR"):
145+
get_pdf_for_templated_letter(sample_letter_notification.id)
139146

140147
assert mock_celery.called
141148
assert mock_retry.called
@@ -153,8 +160,9 @@ def test_get_pdf_for_templated_letter_sets_technical_failure_max_retries(mocker,
153160
)
154161
mock_update_noti = mocker.patch("app.celery.letters_pdf_tasks.update_notification_status_by_id")
155162

156-
with pytest.raises(NotificationTechnicalFailureException) as e:
157-
get_pdf_for_templated_letter(sample_letter_notification.id)
163+
with _with_message_group_id(get_pdf_for_templated_letter, str(sample_letter_notification.service_id)):
164+
with pytest.raises(NotificationTechnicalFailureException) as e:
165+
get_pdf_for_templated_letter(sample_letter_notification.id)
158166

159167
assert (
160168
e.value.args[0] == f"RETRY FAILED: Max retries reached. "
@@ -390,9 +398,17 @@ def test_send_letters_volume_email_to_dvla(notify_db_session, mock_celery_task,
390398

391399
emails_to_dvla = Notification.query.all()
392400
assert len(emails_to_dvla) == 2
393-
send_mock.called = 2
394-
send_mock.assert_any_call([str(emails_to_dvla[0].id)], queue=QueueNames.NOTIFY)
395-
send_mock.assert_any_call([str(emails_to_dvla[1].id)], queue=QueueNames.NOTIFY)
401+
assert send_mock.call_count == 2
402+
send_mock.assert_any_call(
403+
[str(emails_to_dvla[0].id)],
404+
queue=QueueNames.NOTIFY,
405+
MessageGroupId=str(emails_to_dvla[0].service_id),
406+
)
407+
send_mock.assert_any_call(
408+
[str(emails_to_dvla[1].id)],
409+
queue=QueueNames.NOTIFY,
410+
MessageGroupId=str(emails_to_dvla[1].service_id),
411+
)
396412
for email in emails_to_dvla:
397413
assert str(email.template_id) == current_app.config["LETTERS_VOLUME_EMAIL_TEMPLATE_ID"]
398414
assert email.to in current_app.config["DVLA_EMAIL_ADDRESSES"]
@@ -451,7 +467,8 @@ def test_sanitise_letter_calls_template_preview_sanitise_task(
451467
sample_letter_notification.service = create_service(service_permissions=permissions)
452468
sample_letter_notification.status = NOTIFICATION_PENDING_VIRUS_CHECK
453469

454-
sanitise_letter(filename)
470+
with _with_message_group_id(sanitise_letter, str(sample_letter_notification.service_id)):
471+
sanitise_letter(filename)
455472

456473
mock_celery.assert_called_once_with(
457474
name=TaskNames.SANITISE_LETTER,
@@ -461,6 +478,7 @@ def test_sanitise_letter_calls_template_preview_sanitise_task(
461478
"allow_international_letters": expected_international_letters_allowed,
462479
},
463480
queue=QueueNames.SANITISE_LETTERS,
481+
MessageGroupId=str(sample_letter_notification.service_id),
464482
)
465483

466484

@@ -881,7 +899,8 @@ def test_resanitise_pdf_calls_template_preview_with_letter_details(
881899
sample_letter_notification.created_at = datetime(2021, 2, 7, 12)
882900
sample_letter_notification.service = create_service(service_permissions=permissions)
883901

884-
resanitise_pdf(sample_letter_notification.id)
902+
with _with_message_group_id(resanitise_pdf, str(sample_letter_notification.service_id)):
903+
resanitise_pdf(sample_letter_notification.id)
885904

886905
mock_celery.assert_called_once_with(
887906
name=TaskNames.RECREATE_PDF_FOR_PRECOMPILED_LETTER,
@@ -904,7 +923,8 @@ def test_resanitise_letter_attachment_calls_template_preview_with_attachment_det
904923
attachment_id = str(uuid.uuid4())
905924
original_filename = "test-123abc.pdf"
906925

907-
resanitise_letter_attachment(service_id, attachment_id, original_filename)
926+
with _with_message_group_id(resanitise_letter_attachment, service_id):
927+
resanitise_letter_attachment(service_id, attachment_id, original_filename)
908928

909929
mock_celery.assert_called_once_with(
910930
name=TaskNames.RECREATE_PDF_FOR_TEMPLATE_LETTER_ATTACHMENTS,
@@ -914,4 +934,5 @@ def test_resanitise_letter_attachment_calls_template_preview_with_attachment_det
914934
"original_filename": original_filename,
915935
},
916936
queue=QueueNames.SANITISE_LETTERS,
937+
MessageGroupId=service_id,
917938
)

tests/app/celery/test_nightly_tasks.py

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
create_unsubscribe_request,
6464
create_unsubscribe_request_report,
6565
)
66+
from tests.conftest import _with_message_group_id
6667

6768

6869
@freeze_time("2016-10-18T10:00:00")
@@ -175,6 +176,12 @@ def test_archive_unsubscribe_requests(notify_db_session, mock_celery_task):
175176
== {service.id for service in services_with_requests}
176177
)
177178

179+
assert (
180+
{call[1]["MessageGroupId"] for call in mock_archive_processed.call_args_list}
181+
== {call[1]["MessageGroupId"] for call in mock_archive_old.call_args_list}
182+
== {str(service.id) for service in services_with_requests}
183+
)
184+
178185
assert (
179186
[call[1]["queue"] for call in mock_archive_processed.call_args_list]
180187
== [call[1]["queue"] for call in mock_archive_old.call_args_list]
@@ -570,6 +577,7 @@ def test_delete_notifications_task_calls_task_for_services_with_data_retention_o
570577
"datetime_to_delete_before": datetime(2021, 6, 1, 23, 0),
571578
},
572579
countdown=0.0,
580+
MessageGroupId=str(sms_service.id),
573581
)
574582

575583

@@ -598,6 +606,7 @@ def test_delete_notifications_task_calls_task_for_services_with_data_retention_b
598606
"datetime_to_delete_before": datetime(2021, 3, 22, 0, 0),
599607
},
600608
countdown=ANY,
609+
MessageGroupId=str(service_14_days.id),
601610
),
602611
call(
603612
queue=ANY,
@@ -607,6 +616,7 @@ def test_delete_notifications_task_calls_task_for_services_with_data_retention_b
607616
"datetime_to_delete_before": datetime(2021, 4, 1, 23, 0),
608617
},
609618
countdown=ANY,
619+
MessageGroupId=str(service_3_days.id),
610620
),
611621
],
612622
)
@@ -655,6 +665,7 @@ def test_delete_notifications_task_calls_task_for_services_that_have_sent_notifi
655665
"datetime_to_delete_before": datetime(2021, 3, 27, 0, 0),
656666
},
657667
countdown=ANY,
668+
MessageGroupId=str(service_will_delete_1.id),
658669
),
659670
call(
660671
queue=ANY,
@@ -664,6 +675,7 @@ def test_delete_notifications_task_calls_task_for_services_that_have_sent_notifi
664675
"datetime_to_delete_before": datetime(2021, 3, 27, 0, 0),
665676
},
666677
countdown=ANY,
678+
MessageGroupId=str(service_will_delete_2.id),
667679
),
668680
],
669681
)
@@ -682,12 +694,15 @@ def test_delete_notifications_for_service_and_type_queues_up_second_task_if_thin
682694
notification_type = "some-str"
683695
datetime_to_delete_before = datetime.utcnow()
684696

685-
delete_notifications_for_service_and_type(service_id, notification_type, datetime_to_delete_before)
697+
with _with_message_group_id(delete_notifications_for_service_and_type, str(service_id)):
698+
delete_notifications_for_service_and_type(service_id, notification_type, datetime_to_delete_before)
686699

687700
mock_move.assert_called_once_with(notification_type, service_id, datetime_to_delete_before)
688701
# the next task is queued up with the exact same args
689702
mock_task_call.assert_called_once_with(
690-
args=(service_id, notification_type, datetime_to_delete_before), queue="reporting-tasks"
703+
args=(service_id, notification_type, datetime_to_delete_before),
704+
queue="reporting-tasks",
705+
MessageGroupId=str(service_id),
691706
)
692707
assert not mock_delete_tests.called
693708

@@ -703,13 +718,16 @@ def test_delete_notifications_for_service_and_type_removes_test_notifications_if
703718
notification_type = "some-str"
704719
datetime_to_delete_before = datetime.utcnow()
705720

706-
delete_notifications_for_service_and_type(service_id, notification_type, datetime_to_delete_before)
721+
with _with_message_group_id(delete_notifications_for_service_and_type, str(service_id)):
722+
delete_notifications_for_service_and_type(service_id, notification_type, datetime_to_delete_before)
707723

708724
mock_move.assert_called_once_with(notification_type, service_id, datetime_to_delete_before)
709725
# the next task is not queued up
710726
assert not mock_delete_live_notis_task_call.called
711727
mock_delete_tests_task_call.assert_called_once_with(
712-
args=(service_id, notification_type, datetime_to_delete_before), queue="reporting-tasks"
728+
args=(service_id, notification_type, datetime_to_delete_before),
729+
queue="reporting-tasks",
730+
MessageGroupId=str(service_id),
713731
)
714732

715733

@@ -727,7 +745,9 @@ def test_delete_test_notifications_for_service_and_type_queues_up_second_task_if
727745

728746
mock_delete.assert_called_once_with(notification_type, service_id, datetime_to_delete_before)
729747
mock_task_call.assert_called_once_with(
730-
args=(service_id, notification_type, datetime_to_delete_before), queue="reporting-tasks"
748+
args=(service_id, notification_type, datetime_to_delete_before),
749+
queue="reporting-tasks",
750+
MessageGroupId=str(service_id),
731751
)
732752

733753

0 commit comments

Comments
 (0)