Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions app/celery/letters_pdf_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,10 @@ def get_pdf_for_templated_letter(self, notification_id):
encoded_data = signing.encode(letter_data)

notify_celery.send_task(
name=TaskNames.CREATE_PDF_FOR_TEMPLATED_LETTER, args=(encoded_data,), queue=QueueNames.SANITISE_LETTERS
name=TaskNames.CREATE_PDF_FOR_TEMPLATED_LETTER,
args=(encoded_data,),
queue=QueueNames.SANITISE_LETTERS,
MessageGroupId=getattr(self, "message_group_id", None),
)
except Exception as e:
try:
Expand Down Expand Up @@ -245,7 +248,7 @@ def send_letters_volume_email_to_dvla(letters_volumes, date):
reply_to_text=reply_to,
)

send_notification_to_queue(saved_notification, queue=QueueNames.NOTIFY)
send_notification_to_queue(saved_notification, queue=QueueNames.NOTIFY, origin="scheduled")


def send_dvla_letters_via_api(print_run_deadline_local, batch_size=100):
Expand Down Expand Up @@ -294,6 +297,7 @@ def sanitise_letter(self, filename):
"allow_international_letters": notification.service.has_permission(INTERNATIONAL_LETTERS),
},
queue=QueueNames.SANITISE_LETTERS,
MessageGroupId=getattr(self, "message_group_id", None),
)
except Exception:
try:
Expand Down Expand Up @@ -541,8 +545,8 @@ def replay_letters_in_error(filename=None):
sanitise_letter.apply_async([filename], queue=QueueNames.LETTERS)


@notify_celery.task(name="resanitise-pdf")
def resanitise_pdf(notification_id):
@notify_celery.task(name="resanitise-pdf", bind=True)
def resanitise_pdf(self, notification_id):
"""
`notification_id` is the notification id for a PDF letter which was either uploaded or sent using the API.

Expand Down Expand Up @@ -570,11 +574,12 @@ def resanitise_pdf(notification_id):
"allow_international_letters": notification.service.has_permission(INTERNATIONAL_LETTERS),
},
queue=QueueNames.SANITISE_LETTERS,
MessageGroupId=getattr(self, "message_group_id", None),
)


@notify_celery.task(name="resanitise-letter-attachment")
def resanitise_letter_attachment(service_id, attachment_id, original_filename):
@notify_celery.task(name="resanitise-letter-attachment", bind=True)
def resanitise_letter_attachment(self, service_id, attachment_id, original_filename):
"""
`service_id` is the service id for a PDF letter attachment/template.
`attachment_id` is the attachment id for a PDF letter attachment which was uploaded for a template.
Expand All @@ -593,4 +598,5 @@ def resanitise_letter_attachment(service_id, attachment_id, original_filename):
"original_filename": original_filename,
},
queue=QueueNames.SANITISE_LETTERS,
MessageGroupId=getattr(self, "message_group_id", None),
)
20 changes: 16 additions & 4 deletions app/celery/scheduled_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,11 @@
def run_scheduled_jobs():
try:
for job in dao_set_scheduled_jobs_to_pending():
process_job.apply_async([str(job.id)], queue=QueueNames.JOBS)
process_job.apply_async(
[str(job.id)],
queue=QueueNames.JOBS,
MessageGroupId="#".join((str(job.service_id), job.template.template_type, "normal", "scheduled")),
)
current_app.logger.info("Job ID %s added to process job queue", job.id, extra={"job_id": job.id})
except SQLAlchemyError:
current_app.logger.exception("Failed to run scheduled jobs")
Expand Down Expand Up @@ -323,7 +327,7 @@ def replay_created_notifications() -> None:
extra,
extra=extra,
)
send_notification_to_queue(notification=n)
send_notification_to_queue(notification=n, origin="")

# if the letter has not be sent after an hour, then create a zendesk ticket
letters = letters_missing_from_sending_bucket(grace_period, session=db.session_bulk, inner_retry_attempts=2)
Expand All @@ -335,7 +339,11 @@ def replay_created_notifications() -> None:
letter.id,
extra={"notification_id": letter.id},
)
get_pdf_for_templated_letter.apply_async([str(letter.id)], queue=QueueNames.CREATE_LETTERS_PDF)
get_pdf_for_templated_letter.apply_async(
[str(letter.id)],
queue=QueueNames.CREATE_LETTERS_PDF,
MessageGroupId="#".join((str(letter.service_id), letter.notification_type, letter.key_type, "")),
)


@notify_celery.task(name="check-if-letters-still-pending-virus-check")
Expand Down Expand Up @@ -438,7 +446,11 @@ def check_for_missing_rows_in_completed_jobs():

extra = {"job_row_number": row_to_process.missing_row, "job_id": job.id}
current_app.logger.info("Processing missing row %(job_row_number)s for job %(job_id)s", extra, extra=extra)
process_job_row(template.template_type, task_args_kwargs)
process_job_row(
template.template_type,
task_args_kwargs,
"#".join((str(job.service_id), template.template_type, "normal", "")),
)


@notify_celery.task(name="update-status-of-fully-processed-jobs")
Expand Down
39 changes: 28 additions & 11 deletions app/celery/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ class ProcessReportRequestException(Exception):
pass


@notify_celery.task(name="process-job")
def process_job(job_id, sender_id=None, shatter_batch_size=DEFAULT_SHATTER_JOB_ROWS_BATCH_SIZE):
@notify_celery.task(name="process-job", bind=True)
def process_job(self, job_id, sender_id=None, shatter_batch_size=DEFAULT_SHATTER_JOB_ROWS_BATCH_SIZE):
start = datetime.utcnow()
job = dao_get_job_by_id(job_id)
current_app.logger.info(
Expand Down Expand Up @@ -116,19 +116,22 @@ def process_job(job_id, sender_id=None, shatter_batch_size=DEFAULT_SHATTER_JOB_R
get_id_task_args_kwargs_for_job_row(row, template, job, service, sender_id=sender_id)[1]
for row in shatter_batch
]
_shatter_job_rows_with_subdivision(template.template_type, batch_args_kwargs)
_shatter_job_rows_with_subdivision(
template.template_type, batch_args_kwargs, getattr(self, "message_group_id", None)
)

job_complete(job, start=start)


def _shatter_job_rows_with_subdivision(template_type, args_kwargs_seq, top_level=True):
def _shatter_job_rows_with_subdivision(template_type, args_kwargs_seq, message_group_id, top_level=True):
try:
shatter_job_rows.apply_async(
(
template_type,
args_kwargs_seq,
),
queue=QueueNames.JOBS,
MessageGroupId=message_group_id,
)
except BotoClientError as e:
# this information is helpfully not preserved outside the message string of the exception, so
Expand All @@ -146,7 +149,7 @@ def _shatter_job_rows_with_subdivision(template_type, args_kwargs_seq, top_level
raise UnprocessableJobRow from e

for sub_batch in (args_kwargs_seq[:split_batch_size], args_kwargs_seq[split_batch_size:]):
_shatter_job_rows_with_subdivision(template_type, sub_batch, top_level=False)
_shatter_job_rows_with_subdivision(template_type, sub_batch, message_group_id, top_level=False)

else:
if not top_level:
Expand All @@ -156,16 +159,17 @@ def _shatter_job_rows_with_subdivision(template_type, args_kwargs_seq, top_level
)


@notify_celery.task(name="shatter-job-rows")
@notify_celery.task(name="shatter-job-rows", bind=True)
def shatter_job_rows(
self,
template_type: str,
args_kwargs_seq: Sequence,
):
for task_args_kwargs in args_kwargs_seq:
process_job_row(template_type, task_args_kwargs)
process_job_row(template_type, task_args_kwargs, getattr(self, "message_group_id", None))


def process_job_row(template_type, task_args_kwargs):
def process_job_row(template_type, task_args_kwargs, message_group_id):
send_fn = {
SMS_TYPE: save_sms,
EMAIL_TYPE: save_email,
Expand All @@ -175,6 +179,7 @@ def process_job_row(template_type, task_args_kwargs):
send_fn.apply_async(
*task_args_kwargs,
queue=QueueNames.DATABASE,
MessageGroupId=message_group_id,
)


Expand Down Expand Up @@ -353,6 +358,7 @@ def save_sms(
provider_tasks.deliver_sms.apply_async(
[str(saved_notification.id)],
queue=QueueNames.SEND_SMS,
MessageGroupId=getattr(self, "message_group_id", None),
)
else:
extra = {
Expand Down Expand Up @@ -437,6 +443,7 @@ def save_email(self, service_id, notification_id, encoded_notification, sender_i
provider_tasks.deliver_email.apply_async(
[str(saved_notification.id)],
queue=QueueNames.SEND_EMAIL,
MessageGroupId=getattr(self, "message_group_id", None),
)

extra = {
Expand Down Expand Up @@ -494,7 +501,9 @@ def save_letter(
)

letters_pdf_tasks.get_pdf_for_templated_letter.apply_async(
[str(saved_notification.id)], queue=QueueNames.CREATE_LETTERS_PDF
[str(saved_notification.id)],
queue=QueueNames.CREATE_LETTERS_PDF,
MessageGroupId=getattr(self, "message_group_id", None),
)

extra = {
Expand Down Expand Up @@ -580,7 +589,11 @@ def process_incomplete_job(job_id, shatter_batch_size=DEFAULT_SHATTER_JOB_ROWS_B
get_id_task_args_kwargs_for_job_row(row, template, job, job.service, sender_id=sender_id)[1]
for row in shatter_batch
]
_shatter_job_rows_with_subdivision(template.template_type, batch_args_kwargs)
_shatter_job_rows_with_subdivision(
template.template_type,
batch_args_kwargs,
"#".join((str(job.service_id), job.template.template_type, "normal", "")),
)

job_complete(job, resumed=True)

Expand Down Expand Up @@ -625,7 +638,11 @@ def _check_and_queue_returned_letter_callback_task(notification_id, service_id):
# queue callback task only if the service_callback_api exists
if service_callback_api := get_returned_letter_callback_api_for_service(service_id=service_id):
returned_letter_data = create_returned_letter_callback_data(notification_id, service_id, service_callback_api)
send_returned_letter_to_service.apply_async([returned_letter_data], queue=QueueNames.CALLBACKS)
send_returned_letter_to_service.apply_async(
[returned_letter_data],
queue=QueueNames.CALLBACKS,
MessageGroupId="#".join((str(service_id), LETTER_TYPE, "normal", "")),
)


@notify_celery.task(bind=True, name="process-report-request")
Expand Down
7 changes: 6 additions & 1 deletion app/job/rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,12 @@ def create_job(service_id):
sender_id = data.get("sender_id")

if job.job_status == JOB_STATUS_PENDING:
process_job.apply_async([str(job.id)], {"sender_id": sender_id}, queue=QueueNames.JOBS)
process_job.apply_async(
[str(job.id)],
{"sender_id": sender_id},
queue=QueueNames.JOBS,
MessageGroupId="#".join((str(job.service_id), template.template_type, "normal", "dashboard")),
)

job_json = job_schema.dump(job)
job_json["statistics"] = []
Expand Down
14 changes: 12 additions & 2 deletions app/notifications/notifications_ses_callback.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,11 @@ def check_and_queue_callback_task(notification):
if service_callback_api:
notification_data = create_delivery_status_callback_data(notification, service_callback_api)
send_delivery_status_to_service.apply_async(
[str(notification.id), notification_data], queue=QueueNames.CALLBACKS
[str(notification.id), notification_data],
queue=QueueNames.CALLBACKS,
MessageGroupId="#".join(
(str(notification.service_id), notification.notification_type, notification.key_type, "")
),
)


Expand All @@ -83,4 +87,10 @@ def _check_and_queue_complaint_callback_task(complaint, notification, recipient)
service_callback_api = get_complaint_callback_api_for_service(service_id=notification.service_id)
if service_callback_api:
complaint_data = create_complaint_callback_data(complaint, notification, service_callback_api, recipient)
send_complaint_to_service.apply_async([complaint_data], queue=QueueNames.CALLBACKS)
send_complaint_to_service.apply_async(
[complaint_data],
queue=QueueNames.CALLBACKS,
MessageGroupId="#".join(
(str(notification.service_id), notification.notification_type, notification.key_type, "")
),
)
14 changes: 10 additions & 4 deletions app/notifications/process_notifications.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ def increment_daily_limit_cache(service_id, notification_type):
redis_store.incr(cache_key)


def send_notification_to_queue_detached(key_type, notification_type, notification_id, queue=None):
def send_notification_to_queue_detached(key_type, notification_type, notification_id, message_group_id, queue=None):
if key_type == KEY_TYPE_TEST:
queue = QueueNames.RESEARCH_MODE

Expand All @@ -236,14 +236,20 @@ def send_notification_to_queue_detached(key_type, notification_type, notificatio
deliver_task = get_pdf_for_templated_letter

try:
deliver_task.apply_async([str(notification_id)], queue=queue)
deliver_task.apply_async([str(notification_id)], queue=queue, MessageGroupId=message_group_id)
except Exception:
dao_delete_notifications_by_id(notification_id)
raise


def send_notification_to_queue(notification, queue=None):
send_notification_to_queue_detached(notification.key_type, notification.notification_type, notification.id, queue)
def send_notification_to_queue(notification, queue=None, origin="dashboard"):
send_notification_to_queue_detached(
notification.key_type,
notification.notification_type,
notification.id,
"#".join((str(notification.service_id), notification.notification_type, notification.key_type, origin)),
queue,
)


def simulated_recipient(to_address, notification_type):
Expand Down
8 changes: 6 additions & 2 deletions app/notifications/receive_notifications.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@ def receive_mmg_sms():
)

service_callback_tasks.send_inbound_sms_to_service.apply_async(
[str(inbound.id), str(service.id)], queue=QueueNames.CALLBACKS
[str(inbound.id), str(service.id)],
queue=QueueNames.CALLBACKS,
MessageGroupId="#".join((str(service.id), INBOUND_SMS_TYPE, "normal", "")),
)

current_app.logger.info(
Expand Down Expand Up @@ -116,7 +118,9 @@ def receive_firetext_sms():
INBOUND_SMS_COUNTER.labels("firetext").inc()

service_callback_tasks.send_inbound_sms_to_service.apply_async(
[str(inbound.id), str(service.id)], queue=QueueNames.CALLBACKS
[str(inbound.id), str(service.id)],
queue=QueueNames.CALLBACKS,
MessageGroupId="#".join((str(service.id), INBOUND_SMS_TYPE, "normal", "")),
)
current_app.logger.info(
"%s received inbound SMS with reference %s from Firetext",
Expand Down
20 changes: 16 additions & 4 deletions app/v2/notifications/post_notifications.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ def process_sms_or_email_notification(
key_type=api_user.key_type,
notification_type=notification_type,
notification_id=notification_id,
message_group_id="#".join((str(service.id), notification_type, api_user.key_type, "api")),
)
else:
current_app.logger.info(
Expand Down Expand Up @@ -297,9 +298,16 @@ def process_letter_notification(
if service.restricted and api_key.key_type != KEY_TYPE_TEST:
raise BadRequestError(message="Cannot send letters when service is in trial mode", status_code=403)

message_group_id = "#".join((str(service.id), LETTER_TYPE, api_key.key_type, "api"))

if precompiled:
return process_precompiled_letter_notifications(
letter_data=letter_data, api_key=api_key, service=service, template=template, reply_to_text=reply_to_text
letter_data=letter_data,
api_key=api_key,
service=service,
template=template,
reply_to_text=reply_to_text,
message_group_id=message_group_id,
)

postage = validate_address(service, letter_data["personalisation"])
Expand Down Expand Up @@ -330,12 +338,13 @@ def process_letter_notification(
postage=postage,
)

get_pdf_for_templated_letter.apply_async([str(notification.id)], queue=queue)
get_pdf_for_templated_letter.apply_async([str(notification.id)], queue=queue, MessageGroupId=message_group_id)

if test_key and current_app.config["TEST_LETTERS_FAKE_DELIVERY"]:
create_fake_letter_callback.apply_async(
[notification.id, notification.billable_units, notification.postage],
queue=queue,
MessageGroupId=message_group_id,
)

resp = create_response_for_post_notification(
Expand All @@ -351,7 +360,9 @@ def process_letter_notification(
return resp


def process_precompiled_letter_notifications(*, letter_data, api_key, service, template, reply_to_text):
def process_precompiled_letter_notifications(
*, letter_data, api_key, service, template, reply_to_text, message_group_id
):
try:
status = NOTIFICATION_PENDING_VIRUS_CHECK
letter_content = base64.b64decode(letter_data["content"])
Expand Down Expand Up @@ -385,10 +396,11 @@ def process_precompiled_letter_notifications(*, letter_data, api_key, service, t
name=TaskNames.SCAN_FILE,
kwargs={"filename": filename},
queue=QueueNames.ANTIVIRUS,
MessageGroupId=message_group_id,
)
else:
# stub out antivirus in dev
sanitise_letter.apply_async([filename], queue=QueueNames.LETTERS)
sanitise_letter.apply_async([filename], queue=QueueNames.LETTERS, MessageGroupId=message_group_id)

return resp

Expand Down
Loading