Skip to content

Commit c6dd35f

Browse files
committed
TMP mgid
1 parent 0b4e59f commit c6dd35f

22 files changed

+360
-102
lines changed

app/celery/letters_pdf_tasks.py

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,10 @@ def get_pdf_for_templated_letter(self, notification_id):
9292
encoded_data = signing.encode(letter_data)
9393

9494
notify_celery.send_task(
95-
name=TaskNames.CREATE_PDF_FOR_TEMPLATED_LETTER, args=(encoded_data,), queue=QueueNames.SANITISE_LETTERS
95+
name=TaskNames.CREATE_PDF_FOR_TEMPLATED_LETTER,
96+
args=(encoded_data,),
97+
queue=QueueNames.SANITISE_LETTERS,
98+
MessageGroupId=getattr(self, "message_group_id", None),
9699
)
97100
except Exception as e:
98101
try:
@@ -245,7 +248,7 @@ def send_letters_volume_email_to_dvla(letters_volumes, date):
245248
reply_to_text=reply_to,
246249
)
247250

248-
send_notification_to_queue(saved_notification, queue=QueueNames.NOTIFY)
251+
send_notification_to_queue(saved_notification, queue=QueueNames.NOTIFY, origin="scheduled")
249252

250253

251254
def send_dvla_letters_via_api(print_run_deadline_local, batch_size=100):
@@ -294,6 +297,7 @@ def sanitise_letter(self, filename):
294297
"allow_international_letters": notification.service.has_permission(INTERNATIONAL_LETTERS),
295298
},
296299
queue=QueueNames.SANITISE_LETTERS,
300+
MessageGroupId=getattr(self, "message_group_id", None),
297301
)
298302
except Exception:
299303
try:
@@ -541,8 +545,8 @@ def replay_letters_in_error(filename=None):
541545
sanitise_letter.apply_async([filename], queue=QueueNames.LETTERS)
542546

543547

544-
@notify_celery.task(name="resanitise-pdf")
545-
def resanitise_pdf(notification_id):
548+
@notify_celery.task(name="resanitise-pdf", bind=True)
549+
def resanitise_pdf(self, notification_id):
546550
"""
547551
`notification_id` is the notification id for a PDF letter which was either uploaded or sent using the API.
548552
@@ -570,11 +574,12 @@ def resanitise_pdf(notification_id):
570574
"allow_international_letters": notification.service.has_permission(INTERNATIONAL_LETTERS),
571575
},
572576
queue=QueueNames.SANITISE_LETTERS,
577+
MessageGroupId=getattr(self, "message_group_id", None),
573578
)
574579

575580

576-
@notify_celery.task(name="resanitise-letter-attachment")
577-
def resanitise_letter_attachment(service_id, attachment_id, original_filename):
581+
@notify_celery.task(name="resanitise-letter-attachment", bind=True)
582+
def resanitise_letter_attachment(self, service_id, attachment_id, original_filename):
578583
"""
579584
`service_id` is the service id for a PDF letter attachment/template.
580585
`attachment_id` is the attachment id for a PDF letter attachment which was uploaded for a template.
@@ -593,4 +598,5 @@ def resanitise_letter_attachment(service_id, attachment_id, original_filename):
593598
"original_filename": original_filename,
594599
},
595600
queue=QueueNames.SANITISE_LETTERS,
601+
MessageGroupId=getattr(self, "message_group_id", None),
596602
)

app/celery/scheduled_tasks.py

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,11 @@
9696
def run_scheduled_jobs():
9797
try:
9898
for job in dao_set_scheduled_jobs_to_pending():
99-
process_job.apply_async([str(job.id)], queue=QueueNames.JOBS)
99+
process_job.apply_async(
100+
[str(job.id)],
101+
queue=QueueNames.JOBS,
102+
MessageGroupId="#".join((str(job.service_id), job.template.template_type, "normal", "scheduled")),
103+
)
100104
current_app.logger.info("Job ID %s added to process job queue", job.id, extra={"job_id": job.id})
101105
except SQLAlchemyError:
102106
current_app.logger.exception("Failed to run scheduled jobs")
@@ -323,7 +327,7 @@ def replay_created_notifications() -> None:
323327
extra,
324328
extra=extra,
325329
)
326-
send_notification_to_queue(notification=n)
330+
send_notification_to_queue(notification=n, origin="")
327331

328332
# if the letter has not be sent after an hour, then create a zendesk ticket
329333
letters = letters_missing_from_sending_bucket(grace_period, session=db.session_bulk, inner_retry_attempts=2)
@@ -335,7 +339,11 @@ def replay_created_notifications() -> None:
335339
letter.id,
336340
extra={"notification_id": letter.id},
337341
)
338-
get_pdf_for_templated_letter.apply_async([str(letter.id)], queue=QueueNames.CREATE_LETTERS_PDF)
342+
get_pdf_for_templated_letter.apply_async(
343+
[str(letter.id)],
344+
queue=QueueNames.CREATE_LETTERS_PDF,
345+
MessageGroupId="#".join((str(letter.service_id), letter.notification_type, letter.key_type, "")),
346+
)
339347

340348

341349
@notify_celery.task(name="check-if-letters-still-pending-virus-check")
@@ -438,7 +446,11 @@ def check_for_missing_rows_in_completed_jobs():
438446

439447
extra = {"job_row_number": row_to_process.missing_row, "job_id": job.id}
440448
current_app.logger.info("Processing missing row %(job_row_number)s for job %(job_id)s", extra, extra=extra)
441-
process_job_row(template.template_type, task_args_kwargs)
449+
process_job_row(
450+
template.template_type,
451+
task_args_kwargs,
452+
"#".join((str(job.service_id), template.template_type, "normal", "")),
453+
)
442454

443455

444456
@notify_celery.task(name="update-status-of-fully-processed-jobs")

app/celery/tasks.py

Lines changed: 28 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,8 @@ class ProcessReportRequestException(Exception):
6868
pass
6969

7070

71-
@notify_celery.task(name="process-job")
72-
def process_job(job_id, sender_id=None, shatter_batch_size=DEFAULT_SHATTER_JOB_ROWS_BATCH_SIZE):
71+
@notify_celery.task(name="process-job", bind=True)
72+
def process_job(self, job_id, sender_id=None, shatter_batch_size=DEFAULT_SHATTER_JOB_ROWS_BATCH_SIZE):
7373
start = datetime.utcnow()
7474
job = dao_get_job_by_id(job_id)
7575
current_app.logger.info(
@@ -116,19 +116,22 @@ def process_job(job_id, sender_id=None, shatter_batch_size=DEFAULT_SHATTER_JOB_R
116116
get_id_task_args_kwargs_for_job_row(row, template, job, service, sender_id=sender_id)[1]
117117
for row in shatter_batch
118118
]
119-
_shatter_job_rows_with_subdivision(template.template_type, batch_args_kwargs)
119+
_shatter_job_rows_with_subdivision(
120+
template.template_type, batch_args_kwargs, getattr(self, "message_group_id", None)
121+
)
120122

121123
job_complete(job, start=start)
122124

123125

124-
def _shatter_job_rows_with_subdivision(template_type, args_kwargs_seq, top_level=True):
126+
def _shatter_job_rows_with_subdivision(template_type, args_kwargs_seq, message_group_id, top_level=True):
125127
try:
126128
shatter_job_rows.apply_async(
127129
(
128130
template_type,
129131
args_kwargs_seq,
130132
),
131133
queue=QueueNames.JOBS,
134+
MessageGroupId=message_group_id,
132135
)
133136
except BotoClientError as e:
134137
# this information is helpfully not preserved outside the message string of the exception, so
@@ -146,7 +149,7 @@ def _shatter_job_rows_with_subdivision(template_type, args_kwargs_seq, top_level
146149
raise UnprocessableJobRow from e
147150

148151
for sub_batch in (args_kwargs_seq[:split_batch_size], args_kwargs_seq[split_batch_size:]):
149-
_shatter_job_rows_with_subdivision(template_type, sub_batch, top_level=False)
152+
_shatter_job_rows_with_subdivision(template_type, sub_batch, message_group_id, top_level=False)
150153

151154
else:
152155
if not top_level:
@@ -156,16 +159,17 @@ def _shatter_job_rows_with_subdivision(template_type, args_kwargs_seq, top_level
156159
)
157160

158161

159-
@notify_celery.task(name="shatter-job-rows")
162+
@notify_celery.task(name="shatter-job-rows", bind=True)
160163
def shatter_job_rows(
164+
self,
161165
template_type: str,
162166
args_kwargs_seq: Sequence,
163167
):
164168
for task_args_kwargs in args_kwargs_seq:
165-
process_job_row(template_type, task_args_kwargs)
169+
process_job_row(template_type, task_args_kwargs, getattr(self, "message_group_id", None))
166170

167171

168-
def process_job_row(template_type, task_args_kwargs):
172+
def process_job_row(template_type, task_args_kwargs, message_group_id):
169173
send_fn = {
170174
SMS_TYPE: save_sms,
171175
EMAIL_TYPE: save_email,
@@ -175,6 +179,7 @@ def process_job_row(template_type, task_args_kwargs):
175179
send_fn.apply_async(
176180
*task_args_kwargs,
177181
queue=QueueNames.DATABASE,
182+
MessageGroupId=message_group_id,
178183
)
179184

180185

@@ -353,6 +358,7 @@ def save_sms(
353358
provider_tasks.deliver_sms.apply_async(
354359
[str(saved_notification.id)],
355360
queue=QueueNames.SEND_SMS,
361+
MessageGroupId=getattr(self, "message_group_id", None),
356362
)
357363
else:
358364
extra = {
@@ -437,6 +443,7 @@ def save_email(self, service_id, notification_id, encoded_notification, sender_i
437443
provider_tasks.deliver_email.apply_async(
438444
[str(saved_notification.id)],
439445
queue=QueueNames.SEND_EMAIL,
446+
MessageGroupId=getattr(self, "message_group_id", None),
440447
)
441448

442449
extra = {
@@ -494,7 +501,9 @@ def save_letter(
494501
)
495502

496503
letters_pdf_tasks.get_pdf_for_templated_letter.apply_async(
497-
[str(saved_notification.id)], queue=QueueNames.CREATE_LETTERS_PDF
504+
[str(saved_notification.id)],
505+
queue=QueueNames.CREATE_LETTERS_PDF,
506+
MessageGroupId=getattr(self, "message_group_id", None),
498507
)
499508

500509
extra = {
@@ -580,7 +589,11 @@ def process_incomplete_job(job_id, shatter_batch_size=DEFAULT_SHATTER_JOB_ROWS_B
580589
get_id_task_args_kwargs_for_job_row(row, template, job, job.service, sender_id=sender_id)[1]
581590
for row in shatter_batch
582591
]
583-
_shatter_job_rows_with_subdivision(template.template_type, batch_args_kwargs)
592+
_shatter_job_rows_with_subdivision(
593+
template.template_type,
594+
batch_args_kwargs,
595+
"#".join((str(job.service_id), job.template.template_type, "normal", "")),
596+
)
584597

585598
job_complete(job, resumed=True)
586599

@@ -625,7 +638,11 @@ def _check_and_queue_returned_letter_callback_task(notification_id, service_id):
625638
# queue callback task only if the service_callback_api exists
626639
if service_callback_api := get_returned_letter_callback_api_for_service(service_id=service_id):
627640
returned_letter_data = create_returned_letter_callback_data(notification_id, service_id, service_callback_api)
628-
send_returned_letter_to_service.apply_async([returned_letter_data], queue=QueueNames.CALLBACKS)
641+
send_returned_letter_to_service.apply_async(
642+
[returned_letter_data],
643+
queue=QueueNames.CALLBACKS,
644+
MessageGroupId="#".join((str(service_id), LETTER_TYPE, "normal", "")),
645+
)
629646

630647

631648
@notify_celery.task(bind=True, name="process-report-request")

app/job/rest.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,12 @@ def create_job(service_id):
176176
sender_id = data.get("sender_id")
177177

178178
if job.job_status == JOB_STATUS_PENDING:
179-
process_job.apply_async([str(job.id)], {"sender_id": sender_id}, queue=QueueNames.JOBS)
179+
process_job.apply_async(
180+
[str(job.id)],
181+
{"sender_id": sender_id},
182+
queue=QueueNames.JOBS,
183+
MessageGroupId="#".join((str(job.service_id), template.template_type, "normal", "dashboard")),
184+
)
180185

181186
job_json = job_schema.dump(job)
182187
job_json["statistics"] = []

app/notifications/notifications_ses_callback.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,11 @@ def check_and_queue_callback_task(notification):
7474
if service_callback_api:
7575
notification_data = create_delivery_status_callback_data(notification, service_callback_api)
7676
send_delivery_status_to_service.apply_async(
77-
[str(notification.id), notification_data], queue=QueueNames.CALLBACKS
77+
[str(notification.id), notification_data],
78+
queue=QueueNames.CALLBACKS,
79+
MessageGroupId="#".join(
80+
(str(notification.service_id), notification.notification_type, notification.key_type, "")
81+
),
7882
)
7983

8084

@@ -83,4 +87,10 @@ def _check_and_queue_complaint_callback_task(complaint, notification, recipient)
8387
service_callback_api = get_complaint_callback_api_for_service(service_id=notification.service_id)
8488
if service_callback_api:
8589
complaint_data = create_complaint_callback_data(complaint, notification, service_callback_api, recipient)
86-
send_complaint_to_service.apply_async([complaint_data], queue=QueueNames.CALLBACKS)
90+
send_complaint_to_service.apply_async(
91+
[complaint_data],
92+
queue=QueueNames.CALLBACKS,
93+
MessageGroupId="#".join(
94+
(str(notification.service_id), notification.notification_type, notification.key_type, "")
95+
),
96+
)

app/notifications/process_notifications.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,7 @@ def increment_daily_limit_cache(service_id, notification_type):
218218
redis_store.incr(cache_key)
219219

220220

221-
def send_notification_to_queue_detached(key_type, notification_type, notification_id, queue=None):
221+
def send_notification_to_queue_detached(key_type, notification_type, notification_id, message_group_id, queue=None):
222222
if key_type == KEY_TYPE_TEST:
223223
queue = QueueNames.RESEARCH_MODE
224224

@@ -236,14 +236,20 @@ def send_notification_to_queue_detached(key_type, notification_type, notificatio
236236
deliver_task = get_pdf_for_templated_letter
237237

238238
try:
239-
deliver_task.apply_async([str(notification_id)], queue=queue)
239+
deliver_task.apply_async([str(notification_id)], queue=queue, MessageGroupId=message_group_id)
240240
except Exception:
241241
dao_delete_notifications_by_id(notification_id)
242242
raise
243243

244244

245-
def send_notification_to_queue(notification, queue=None):
246-
send_notification_to_queue_detached(notification.key_type, notification.notification_type, notification.id, queue)
245+
def send_notification_to_queue(notification, queue=None, origin="dashboard"):
246+
send_notification_to_queue_detached(
247+
notification.key_type,
248+
notification.notification_type,
249+
notification.id,
250+
"#".join((str(notification.service_id), notification.notification_type, notification.key_type, origin)),
251+
queue,
252+
)
247253

248254

249255
def simulated_recipient(to_address, notification_type):

app/notifications/receive_notifications.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,9 @@ def receive_mmg_sms():
7070
)
7171

7272
service_callback_tasks.send_inbound_sms_to_service.apply_async(
73-
[str(inbound.id), str(service.id)], queue=QueueNames.CALLBACKS
73+
[str(inbound.id), str(service.id)],
74+
queue=QueueNames.CALLBACKS,
75+
MessageGroupId="#".join((str(service.id), INBOUND_SMS_TYPE, "normal", "")),
7476
)
7577

7678
current_app.logger.info(
@@ -116,7 +118,9 @@ def receive_firetext_sms():
116118
INBOUND_SMS_COUNTER.labels("firetext").inc()
117119

118120
service_callback_tasks.send_inbound_sms_to_service.apply_async(
119-
[str(inbound.id), str(service.id)], queue=QueueNames.CALLBACKS
121+
[str(inbound.id), str(service.id)],
122+
queue=QueueNames.CALLBACKS,
123+
MessageGroupId="#".join((str(service.id), INBOUND_SMS_TYPE, "normal", "")),
120124
)
121125
current_app.logger.info(
122126
"%s received inbound SMS with reference %s from Firetext",

app/v2/notifications/post_notifications.py

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,7 @@ def process_sms_or_email_notification(
228228
key_type=api_user.key_type,
229229
notification_type=notification_type,
230230
notification_id=notification_id,
231+
message_group_id="#".join((str(service.id), notification_type, api_user.key_type, "api")),
231232
)
232233
else:
233234
current_app.logger.info(
@@ -297,9 +298,16 @@ def process_letter_notification(
297298
if service.restricted and api_key.key_type != KEY_TYPE_TEST:
298299
raise BadRequestError(message="Cannot send letters when service is in trial mode", status_code=403)
299300

301+
message_group_id = "#".join((str(service.id), LETTER_TYPE, api_key.key_type, "api"))
302+
300303
if precompiled:
301304
return process_precompiled_letter_notifications(
302-
letter_data=letter_data, api_key=api_key, service=service, template=template, reply_to_text=reply_to_text
305+
letter_data=letter_data,
306+
api_key=api_key,
307+
service=service,
308+
template=template,
309+
reply_to_text=reply_to_text,
310+
message_group_id=message_group_id,
303311
)
304312

305313
postage = validate_address(service, letter_data["personalisation"])
@@ -330,12 +338,13 @@ def process_letter_notification(
330338
postage=postage,
331339
)
332340

333-
get_pdf_for_templated_letter.apply_async([str(notification.id)], queue=queue)
341+
get_pdf_for_templated_letter.apply_async([str(notification.id)], queue=queue, MessageGroupId=message_group_id)
334342

335343
if test_key and current_app.config["TEST_LETTERS_FAKE_DELIVERY"]:
336344
create_fake_letter_callback.apply_async(
337345
[notification.id, notification.billable_units, notification.postage],
338346
queue=queue,
347+
MessageGroupId=message_group_id,
339348
)
340349

341350
resp = create_response_for_post_notification(
@@ -351,7 +360,9 @@ def process_letter_notification(
351360
return resp
352361

353362

354-
def process_precompiled_letter_notifications(*, letter_data, api_key, service, template, reply_to_text):
363+
def process_precompiled_letter_notifications(
364+
*, letter_data, api_key, service, template, reply_to_text, message_group_id
365+
):
355366
try:
356367
status = NOTIFICATION_PENDING_VIRUS_CHECK
357368
letter_content = base64.b64decode(letter_data["content"])
@@ -385,10 +396,11 @@ def process_precompiled_letter_notifications(*, letter_data, api_key, service, t
385396
name=TaskNames.SCAN_FILE,
386397
kwargs={"filename": filename},
387398
queue=QueueNames.ANTIVIRUS,
399+
MessageGroupId=message_group_id,
388400
)
389401
else:
390402
# stub out antivirus in dev
391-
sanitise_letter.apply_async([filename], queue=QueueNames.LETTERS)
403+
sanitise_letter.apply_async([filename], queue=QueueNames.LETTERS, MessageGroupId=message_group_id)
392404

393405
return resp
394406

0 commit comments

Comments
 (0)