Skip to content

Commit 802e5e1

Browse files
committed
feat: message group id for job pipeline (bulk upload, tasks, scheduled)
1 parent 8ce34c5 commit 802e5e1

File tree

6 files changed

+430
-211
lines changed

6 files changed

+430
-211
lines changed

app/celery/scheduled_tasks.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,11 @@
9696
def run_scheduled_jobs():
9797
try:
9898
for job in dao_set_scheduled_jobs_to_pending():
99-
process_job.apply_async([str(job.id)], queue=QueueNames.JOBS)
99+
process_job.apply_async(
100+
[str(job.id)],
101+
queue=QueueNames.JOBS,
102+
MessageGroupId=str(job.service_id),
103+
)
100104
current_app.logger.info("Job ID %s added to process job queue", job.id, extra={"job_id": job.id})
101105
except SQLAlchemyError:
102106
current_app.logger.exception("Failed to run scheduled jobs")
@@ -335,7 +339,11 @@ def replay_created_notifications() -> None:
335339
letter.id,
336340
extra={"notification_id": letter.id},
337341
)
338-
get_pdf_for_templated_letter.apply_async([str(letter.id)], queue=QueueNames.CREATE_LETTERS_PDF)
342+
get_pdf_for_templated_letter.apply_async(
343+
[str(letter.id)],
344+
queue=QueueNames.CREATE_LETTERS_PDF,
345+
MessageGroupId=str(letter.service_id),
346+
)
339347

340348

341349
@notify_celery.task(name="check-if-letters-still-pending-virus-check")
@@ -359,6 +367,7 @@ def check_if_letters_still_pending_virus_check(max_minutes_ago_to_check: int = 3
359367
name=TaskNames.SCAN_FILE,
360368
kwargs={"filename": filename},
361369
queue=QueueNames.ANTIVIRUS,
370+
MessageGroupId=str(letter.service_id),
362371
)
363372
else:
364373
current_app.logger.warning(
@@ -438,7 +447,7 @@ def check_for_missing_rows_in_completed_jobs():
438447

439448
extra = {"job_row_number": row_to_process.missing_row, "job_id": job.id}
440449
current_app.logger.info("Processing missing row %(job_row_number)s for job %(job_id)s", extra, extra=extra)
441-
process_job_row(template.template_type, task_args_kwargs)
450+
process_job_row(template.template_type, task_args_kwargs, str(job.service_id))
442451

443452

444453
@notify_celery.task(name="update-status-of-fully-processed-jobs")

app/celery/tasks.py

Lines changed: 28 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,8 @@ class ProcessReportRequestException(Exception):
6868
pass
6969

7070

71-
@notify_celery.task(name="process-job")
72-
def process_job(job_id, sender_id=None, shatter_batch_size=DEFAULT_SHATTER_JOB_ROWS_BATCH_SIZE):
71+
@notify_celery.task(bind=True, name="process-job")
72+
def process_job(self, job_id, sender_id=None, shatter_batch_size=DEFAULT_SHATTER_JOB_ROWS_BATCH_SIZE):
7373
start = datetime.utcnow()
7474
job = dao_get_job_by_id(job_id)
7575
current_app.logger.info(
@@ -116,19 +116,20 @@ def process_job(job_id, sender_id=None, shatter_batch_size=DEFAULT_SHATTER_JOB_R
116116
get_id_task_args_kwargs_for_job_row(row, template, job, service, sender_id=sender_id)[1]
117117
for row in shatter_batch
118118
]
119-
_shatter_job_rows_with_subdivision(template.template_type, batch_args_kwargs)
119+
_shatter_job_rows_with_subdivision(template.template_type, batch_args_kwargs, self.message_group_id)
120120

121121
job_complete(job, start=start)
122122

123123

124-
def _shatter_job_rows_with_subdivision(template_type, args_kwargs_seq, top_level=True):
124+
def _shatter_job_rows_with_subdivision(template_type, args_kwargs_seq, message_group_id, top_level=True):
125125
try:
126126
shatter_job_rows.apply_async(
127127
(
128128
template_type,
129129
args_kwargs_seq,
130130
),
131131
queue=QueueNames.JOBS,
132+
MessageGroupId=message_group_id,
132133
)
133134
except BotoClientError as e:
134135
# this information is helpfully not preserved outside the message string of the exception, so
@@ -146,7 +147,7 @@ def _shatter_job_rows_with_subdivision(template_type, args_kwargs_seq, top_level
146147
raise UnprocessableJobRow from e
147148

148149
for sub_batch in (args_kwargs_seq[:split_batch_size], args_kwargs_seq[split_batch_size:]):
149-
_shatter_job_rows_with_subdivision(template_type, sub_batch, top_level=False)
150+
_shatter_job_rows_with_subdivision(template_type, sub_batch, message_group_id, top_level=False)
150151

151152
else:
152153
if not top_level:
@@ -156,16 +157,17 @@ def _shatter_job_rows_with_subdivision(template_type, args_kwargs_seq, top_level
156157
)
157158

158159

159-
@notify_celery.task(name="shatter-job-rows")
160+
@notify_celery.task(bind=True, name="shatter-job-rows")
160161
def shatter_job_rows(
162+
self,
161163
template_type: str,
162164
args_kwargs_seq: Sequence,
163165
):
164166
for task_args_kwargs in args_kwargs_seq:
165-
process_job_row(template_type, task_args_kwargs)
167+
process_job_row(template_type, task_args_kwargs, self.message_group_id)
166168

167169

168-
def process_job_row(template_type, task_args_kwargs):
170+
def process_job_row(template_type, task_args_kwargs, message_group_id=None):
169171
send_fn = {
170172
SMS_TYPE: save_sms,
171173
EMAIL_TYPE: save_email,
@@ -175,6 +177,7 @@ def process_job_row(template_type, task_args_kwargs):
175177
send_fn.apply_async(
176178
*task_args_kwargs,
177179
queue=QueueNames.DATABASE,
180+
MessageGroupId=message_group_id,
178181
)
179182

180183

@@ -353,6 +356,7 @@ def save_sms(
353356
provider_tasks.deliver_sms.apply_async(
354357
[str(saved_notification.id)],
355358
queue=QueueNames.SEND_SMS,
359+
MessageGroupId=self.message_group_id,
356360
)
357361
else:
358362
extra = {
@@ -437,6 +441,7 @@ def save_email(self, service_id, notification_id, encoded_notification, sender_i
437441
provider_tasks.deliver_email.apply_async(
438442
[str(saved_notification.id)],
439443
queue=QueueNames.SEND_EMAIL,
444+
MessageGroupId=self.message_group_id,
440445
)
441446

442447
extra = {
@@ -494,7 +499,9 @@ def save_letter(
494499
)
495500

496501
letters_pdf_tasks.get_pdf_for_templated_letter.apply_async(
497-
[str(saved_notification.id)], queue=QueueNames.CREATE_LETTERS_PDF
502+
[str(saved_notification.id)],
503+
queue=QueueNames.CREATE_LETTERS_PDF,
504+
MessageGroupId=self.message_group_id,
498505
)
499506

500507
extra = {
@@ -529,7 +536,12 @@ def handle_exception(task, notification, notification_id, exc):
529536
# send to the retry queue.
530537
current_app.logger.exception("Retry: " + base_msg, extra, extra=extra) # noqa
531538
try:
532-
task.retry(queue=QueueNames.RETRY, exc=exc)
539+
retry_kwargs = {
540+
"queue": QueueNames.RETRY,
541+
"exc": exc,
542+
"MessageGroupId": task.message_group_id,
543+
}
544+
task.retry(**retry_kwargs)
533545
except task.MaxRetriesExceededError:
534546
current_app.logger.error("Max retry failed: " + base_msg, extra, extra=extra) # noqa
535547

@@ -580,7 +592,7 @@ def process_incomplete_job(job_id, shatter_batch_size=DEFAULT_SHATTER_JOB_ROWS_B
580592
get_id_task_args_kwargs_for_job_row(row, template, job, job.service, sender_id=sender_id)[1]
581593
for row in shatter_batch
582594
]
583-
_shatter_job_rows_with_subdivision(template.template_type, batch_args_kwargs)
595+
_shatter_job_rows_with_subdivision(template.template_type, batch_args_kwargs, str(job.service_id))
584596

585597
job_complete(job, resumed=True)
586598

@@ -625,7 +637,11 @@ def _check_and_queue_returned_letter_callback_task(notification_id, service_id):
625637
# queue callback task only if the service_callback_api exists
626638
if service_callback_api := get_returned_letter_callback_api_for_service(service_id=service_id):
627639
returned_letter_data = create_returned_letter_callback_data(notification_id, service_id, service_callback_api)
628-
send_returned_letter_to_service.apply_async([returned_letter_data], queue=QueueNames.CALLBACKS)
640+
send_returned_letter_to_service.apply_async(
641+
[returned_letter_data],
642+
queue=QueueNames.CALLBACKS,
643+
MessageGroupId=str(service_id),
644+
)
629645

630646

631647
@notify_celery.task(bind=True, name="process-report-request")

app/job/rest.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,12 @@ def create_job(service_id):
176176
sender_id = data.get("sender_id")
177177

178178
if job.job_status == JOB_STATUS_PENDING:
179-
process_job.apply_async([str(job.id)], {"sender_id": sender_id}, queue=QueueNames.JOBS)
179+
process_job.apply_async(
180+
[str(job.id)],
181+
{"sender_id": sender_id},
182+
queue=QueueNames.JOBS,
183+
MessageGroupId=str(job.service_id),
184+
)
180185

181186
job_json = job_schema.dump(job)
182187
job_json["statistics"] = []

tests/app/celery/test_scheduled_tasks.py

Lines changed: 38 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@
7373
create_template,
7474
create_user,
7575
)
76-
from tests.conftest import set_config, set_config_values
76+
from tests.conftest import _with_message_group_id, set_config, set_config_values
7777

7878

7979
def test_should_call_delete_codes_on_delete_verify_codes_task(notify_db_session, mocker):
@@ -98,7 +98,11 @@ def test_should_update_scheduled_jobs_and_put_on_queue(mock_celery_task, sample_
9898

9999
updated_job = dao_get_job_by_id(job.id)
100100
assert updated_job.job_status == "pending"
101-
mocked.assert_called_with([str(job.id)], queue="job-tasks")
101+
mocked.assert_called_with(
102+
[str(job.id)],
103+
queue="job-tasks",
104+
MessageGroupId=str(job.service_id),
105+
)
102106

103107

104108
def test_should_update_all_scheduled_jobs_and_put_on_queue(sample_template, mock_celery_task):
@@ -117,11 +121,12 @@ def test_should_update_all_scheduled_jobs_and_put_on_queue(sample_template, mock
117121
assert dao_get_job_by_id(job_2.id).job_status == "pending"
118122
assert dao_get_job_by_id(job_2.id).job_status == "pending"
119123

124+
service_id = str(sample_template.service_id)
120125
mocked.assert_has_calls(
121126
[
122-
call([str(job_3.id)], queue="job-tasks"),
123-
call([str(job_2.id)], queue="job-tasks"),
124-
call([str(job_1.id)], queue="job-tasks"),
127+
call([str(job_3.id)], queue="job-tasks", MessageGroupId=service_id),
128+
call([str(job_2.id)], queue="job-tasks", MessageGroupId=service_id),
129+
call([str(job_1.id)], queue="job-tasks", MessageGroupId=service_id),
125130
]
126131
)
127132

@@ -406,8 +411,11 @@ def test_check_job_status_task_sets_jobs_to_error(mock_celery_task, sample_templ
406411

407412

408413
def test_replay_created_notifications(sample_service, mock_celery_task):
409-
email_delivery_queue = mock_celery_task(deliver_email)
410-
sms_delivery_queue = mock_celery_task(deliver_sms)
414+
with _with_message_group_id(deliver_email, str(sample_service.id)):
415+
email_delivery_queue = mock_celery_task(deliver_email)
416+
417+
with _with_message_group_id(deliver_sms, str(sample_service.id)):
418+
sms_delivery_queue = mock_celery_task(deliver_sms)
411419

412420
sms_template = create_template(service=sample_service, template_type="sms")
413421
email_template = create_template(service=sample_service, template_type="email")
@@ -430,8 +438,12 @@ def test_replay_created_notifications(sample_service, mock_celery_task):
430438
create_notification(template=email_template, created_at=datetime.utcnow(), status="created")
431439

432440
replay_created_notifications()
433-
email_delivery_queue.assert_called_once_with([str(old_email.id)], queue="send-email-tasks")
434-
sms_delivery_queue.assert_called_once_with([str(old_sms.id)], queue="send-sms-tasks")
441+
email_delivery_queue.assert_called_once_with(
442+
[str(old_email.id)], queue="send-email-tasks", MessageGroupId=str(sample_service.id)
443+
)
444+
sms_delivery_queue.assert_called_once_with(
445+
[str(old_sms.id)], queue="send-sms-tasks", MessageGroupId=str(sample_service.id)
446+
)
435447

436448

437449
def test_replay_created_notifications_get_pdf_for_templated_letter_tasks_for_letters_not_ready_to_send(
@@ -454,9 +466,10 @@ def test_replay_created_notifications_get_pdf_for_templated_letter_tasks_for_let
454466

455467
replay_created_notifications()
456468

469+
service_id = str(sample_letter_template.service_id)
457470
calls = [
458-
call([str(notification_1.id)], queue=QueueNames.CREATE_LETTERS_PDF),
459-
call([str(notification_2.id)], queue=QueueNames.CREATE_LETTERS_PDF),
471+
call([str(notification_1.id)], queue=QueueNames.CREATE_LETTERS_PDF, MessageGroupId=service_id),
472+
call([str(notification_2.id)], queue=QueueNames.CREATE_LETTERS_PDF, MessageGroupId=service_id),
460473
]
461474
mock_task.assert_has_calls(calls, any_order=True)
462475

@@ -512,7 +525,10 @@ def test_check_if_letters_still_pending_virus_check_restarts_scan_for_stuck_lett
512525
mock_file_exists.assert_called_once_with("test-letters-scan", expected_filename)
513526

514527
mock_celery.assert_called_once_with(
515-
name=TaskNames.SCAN_FILE, kwargs={"filename": expected_filename}, queue=QueueNames.ANTIVIRUS
528+
name=TaskNames.SCAN_FILE,
529+
kwargs={"filename": expected_filename},
530+
queue=QueueNames.ANTIVIRUS,
531+
MessageGroupId=str(sample_letter_template.service_id),
516532
)
517533

518534
assert mock_create_ticket.called is False
@@ -724,7 +740,12 @@ def test_check_for_missing_rows_in_completed_jobs(mocker, sample_email_template,
724740
)
725741
]
726742
assert mock_save_email.mock_calls == [
727-
mock.call((str(job.service_id), "some-uuid", "something_encoded"), {}, queue="database-tasks")
743+
mock.call(
744+
(str(job.service_id), "some-uuid", "something_encoded"),
745+
{},
746+
queue="database-tasks",
747+
MessageGroupId=str(job.service_id),
748+
)
728749
]
729750

730751

@@ -765,7 +786,10 @@ def test_check_for_missing_rows_in_completed_jobs_uses_sender_id(
765786
]
766787
assert mock_save_email.mock_calls == [
767788
mock.call(
768-
(str(job.service_id), "some-uuid", "something_encoded"), {"sender_id": fake_uuid}, queue="database-tasks"
789+
(str(job.service_id), "some-uuid", "something_encoded"),
790+
{"sender_id": fake_uuid},
791+
queue="database-tasks",
792+
MessageGroupId=str(job.service_id),
769793
)
770794
]
771795

0 commit comments

Comments
 (0)