Skip to content

Commit 908e527

Browse files
committed
feat: Add MessageGroupId to job and save task queueing
- Pass MessageGroupId (service id) when calling shatter_job_rows.apply_async so batches use the same group id - Add _service_id_from_job_row_args_kwargs helper and use it in _shatter_job_rows_with_subdivision and process_job_row - Update tests to expect MessageGroupId on shatter, deliver, and retry calls; add tests for the new helper
1 parent c8bffa8 commit 908e527

File tree

3 files changed

+124
-20
lines changed

3 files changed

+124
-20
lines changed

app/celery/tasks.py

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -122,13 +122,16 @@ def process_job(job_id, sender_id=None, shatter_batch_size=DEFAULT_SHATTER_JOB_R
122122

123123

124124
def _shatter_job_rows_with_subdivision(template_type, args_kwargs_seq, top_level=True):
125+
service_id = str(_service_id_from_job_row_args_kwargs(args_kwargs_seq[0])) if args_kwargs_seq else None
126+
125127
try:
126128
shatter_job_rows.apply_async(
127129
(
128130
template_type,
129131
args_kwargs_seq,
130132
),
131133
queue=QueueNames.JOBS,
134+
MessageGroupId=service_id,
132135
)
133136
except BotoClientError as e:
134137
# this information is helpfully not preserved outside the message string of the exception, so
@@ -172,10 +175,9 @@ def process_job_row(template_type, task_args_kwargs):
172175
LETTER_TYPE: save_letter,
173176
}[template_type]
174177

175-
send_fn.apply_async(
176-
*task_args_kwargs,
177-
queue=QueueNames.DATABASE,
178-
)
178+
service_id = _service_id_from_job_row_args_kwargs(task_args_kwargs)
179+
apply_kwargs = {"queue": QueueNames.DATABASE, "MessageGroupId": str(service_id) if service_id else None}
180+
send_fn.apply_async(*task_args_kwargs, **apply_kwargs)
179181

180182

181183
def job_complete(job, resumed=False, start=None):
@@ -242,6 +244,12 @@ def get_id_task_args_kwargs_for_job_row(row, template, job, service, sender_id=N
242244
return notification_id, (task_args, task_kwargs)
243245

244246

247+
def _service_id_from_job_row_args_kwargs(task_args_kwargs):
248+
"""Extract service_id from (task_args, task_kwargs) produced by get_id_task_args_kwargs_for_job_row."""
249+
task_args, _ = task_args_kwargs
250+
return task_args[0]
251+
252+
245253
def __sending_limits_for_job_exceeded(service, job, job_id):
246254
try:
247255
check_service_over_daily_message_limit(
@@ -353,6 +361,7 @@ def save_sms(
353361
provider_tasks.deliver_sms.apply_async(
354362
[str(saved_notification.id)],
355363
queue=QueueNames.SEND_SMS,
364+
MessageGroupId=str(service.id),
356365
)
357366
else:
358367
extra = {
@@ -437,6 +446,7 @@ def save_email(self, service_id, notification_id, encoded_notification, sender_i
437446
provider_tasks.deliver_email.apply_async(
438447
[str(saved_notification.id)],
439448
queue=QueueNames.SEND_EMAIL,
449+
MessageGroupId=str(service.id),
440450
)
441451

442452
extra = {
@@ -494,7 +504,9 @@ def save_letter(
494504
)
495505

496506
letters_pdf_tasks.get_pdf_for_templated_letter.apply_async(
497-
[str(saved_notification.id)], queue=QueueNames.CREATE_LETTERS_PDF
507+
[str(saved_notification.id)],
508+
queue=QueueNames.CREATE_LETTERS_PDF,
509+
MessageGroupId=str(service.id),
498510
)
499511

500512
extra = {
@@ -529,7 +541,12 @@ def handle_exception(task, notification, notification_id, exc):
529541
# send to the retry queue.
530542
current_app.logger.exception("Retry: " + base_msg, extra, extra=extra) # noqa
531543
try:
532-
task.retry(queue=QueueNames.RETRY, exc=exc)
544+
retry_kwargs = {
545+
"queue": QueueNames.RETRY,
546+
"exc": exc,
547+
"MessageGroupId": getattr(task, "message_group_id", None),
548+
}
549+
task.retry(**retry_kwargs)
533550
except task.MaxRetriesExceededError:
534551
current_app.logger.error("Max retry failed: " + base_msg, extra, extra=extra) # noqa
535552

@@ -625,7 +642,11 @@ def _check_and_queue_returned_letter_callback_task(notification_id, service_id):
625642
# queue callback task only if the service_callback_api exists
626643
if service_callback_api := get_returned_letter_callback_api_for_service(service_id=service_id):
627644
returned_letter_data = create_returned_letter_callback_data(notification_id, service_id, service_callback_api)
628-
send_returned_letter_to_service.apply_async([returned_letter_data], queue=QueueNames.CALLBACKS)
645+
send_returned_letter_to_service.apply_async(
646+
[returned_letter_data],
647+
queue=QueueNames.CALLBACKS,
648+
MessageGroupId=str(service_id),
649+
)
629650

630651

631652
@notify_celery.task(bind=True, name="process-report-request")

tests/app/celery/test_scheduled_tasks.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -724,7 +724,12 @@ def test_check_for_missing_rows_in_completed_jobs(mocker, sample_email_template,
724724
)
725725
]
726726
assert mock_save_email.mock_calls == [
727-
mock.call((str(job.service_id), "some-uuid", "something_encoded"), {}, queue="database-tasks")
727+
mock.call(
728+
(str(job.service_id), "some-uuid", "something_encoded"),
729+
{},
730+
queue="database-tasks",
731+
MessageGroupId=str(job.service_id),
732+
)
728733
]
729734

730735

@@ -765,7 +770,10 @@ def test_check_for_missing_rows_in_completed_jobs_uses_sender_id(
765770
]
766771
assert mock_save_email.mock_calls == [
767772
mock.call(
768-
(str(job.service_id), "some-uuid", "something_encoded"), {"sender_id": fake_uuid}, queue="database-tasks"
773+
(str(job.service_id), "some-uuid", "something_encoded"),
774+
{"sender_id": fake_uuid},
775+
queue="database-tasks",
776+
MessageGroupId=str(job.service_id),
769777
)
770778
]
771779

0 commit comments

Comments
 (0)