Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions app/celery/letters_pdf_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,10 @@ def get_pdf_for_templated_letter(self, notification_id):
encoded_data = signing.encode(letter_data)

notify_celery.send_task(
name=TaskNames.CREATE_PDF_FOR_TEMPLATED_LETTER, args=(encoded_data,), queue=QueueNames.SANITISE_LETTERS
name=TaskNames.CREATE_PDF_FOR_TEMPLATED_LETTER,
args=(encoded_data,),
queue=QueueNames.SANITISE_LETTERS,
MessageGroupId=self.message_group_id if self.message_group_id is not None else str(notification.service_id),
)
except Exception as e:
try:
Expand Down Expand Up @@ -294,6 +297,7 @@ def sanitise_letter(self, filename):
"allow_international_letters": notification.service.has_permission(INTERNATIONAL_LETTERS),
},
queue=QueueNames.SANITISE_LETTERS,
MessageGroupId=self.message_group_id,
)
except Exception:
try:
Expand Down Expand Up @@ -541,8 +545,8 @@ def replay_letters_in_error(filename=None):
sanitise_letter.apply_async([filename], queue=QueueNames.LETTERS)


@notify_celery.task(name="resanitise-pdf")
def resanitise_pdf(notification_id):
@notify_celery.task(bind=True, name="resanitise-pdf")
def resanitise_pdf(self, notification_id):
"""
`notification_id` is the notification id for a PDF letter which was either uploaded or sent using the API.

Expand Down Expand Up @@ -570,11 +574,12 @@ def resanitise_pdf(notification_id):
"allow_international_letters": notification.service.has_permission(INTERNATIONAL_LETTERS),
},
queue=QueueNames.SANITISE_LETTERS,
MessageGroupId=self.message_group_id if self.message_group_id is not None else str(notification.service_id),
)


@notify_celery.task(name="resanitise-letter-attachment")
def resanitise_letter_attachment(service_id, attachment_id, original_filename):
@notify_celery.task(bind=True, name="resanitise-letter-attachment")
def resanitise_letter_attachment(self, service_id, attachment_id, original_filename):
"""
`service_id` is the service id for a PDF letter attachment/template.
`attachment_id` is the attachment id for a PDF letter attachment which was uploaded for a template.
Expand All @@ -593,4 +598,5 @@ def resanitise_letter_attachment(service_id, attachment_id, original_filename):
"original_filename": original_filename,
},
queue=QueueNames.SANITISE_LETTERS,
MessageGroupId=self.message_group_id if self.message_group_id is not None else str(service_id),
)
21 changes: 17 additions & 4 deletions app/celery/nightly_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,16 @@ def _remove_csv_files(job_types):
@notify_celery.task(name="archive-unsubscribe-requests")
def archive_unsubscribe_requests():
for service_id in get_service_ids_with_unsubscribe_requests():
archive_batched_unsubscribe_requests.apply_async(queue=QueueNames.REPORTING, args=[service_id])
archive_old_unsubscribe_requests.apply_async(queue=QueueNames.REPORTING, args=[service_id])
archive_batched_unsubscribe_requests.apply_async(
queue=QueueNames.REPORTING,
args=[service_id],
MessageGroupId=str(service_id),
)
archive_old_unsubscribe_requests.apply_async(
queue=QueueNames.REPORTING,
args=[service_id],
MessageGroupId=str(service_id),
)


@notify_celery.task(name="archive-batched-unsubscribe-requests")
Expand Down Expand Up @@ -163,6 +171,7 @@ def _delete_notifications_older_than_retention_by_type(
"datetime_to_delete_before": day_to_delete_backwards_from,
},
countdown=(i / len(flexible_data_retention)) * stagger_total_period.seconds,
MessageGroupId=str(f.service_id),
)

seven_days_ago = get_london_midnight_in_utc(convert_utc_to_bst(datetime.utcnow()).date() - timedelta(days=7))
Expand All @@ -185,6 +194,7 @@ def _delete_notifications_older_than_retention_by_type(
"datetime_to_delete_before": seven_days_ago,
},
countdown=(i / len(service_ids_to_purge)) * stagger_total_period.seconds,
MessageGroupId=str(service_id),
)

extra = {
Expand All @@ -204,8 +214,8 @@ def _delete_notifications_older_than_retention_by_type(
)


@notify_celery.task(name="delete-notifications-for-service-and-type")
def delete_notifications_for_service_and_type(service_id, notification_type, datetime_to_delete_before):
@notify_celery.task(bind=True, name="delete-notifications-for-service-and-type")
def delete_notifications_for_service_and_type(self, service_id, notification_type, datetime_to_delete_before):
start = datetime.utcnow()
num_deleted = move_notifications_to_notification_history(
notification_type,
Expand Down Expand Up @@ -237,12 +247,14 @@ def delete_notifications_for_service_and_type(service_id, notification_type, dat
delete_notifications_for_service_and_type.apply_async(
args=(service_id, notification_type, datetime_to_delete_before),
queue=QueueNames.REPORTING,
MessageGroupId=self.message_group_id,
)
else:
# now we've deleted all the real notifications, clean up the test notifications
delete_test_notifications_for_service_and_type.apply_async(
args=(service_id, notification_type, datetime_to_delete_before),
queue=QueueNames.REPORTING,
MessageGroupId=self.message_group_id,
)


Expand All @@ -254,6 +266,7 @@ def delete_test_notifications_for_service_and_type(service_id, notification_type
delete_test_notifications_for_service_and_type.apply_async(
args=(service_id, notification_type, datetime_to_delete_before),
queue=QueueNames.REPORTING,
MessageGroupId=str(service_id),
)


Expand Down
1 change: 1 addition & 0 deletions app/celery/reporting_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ def create_nightly_notification_status():
"service_id": service_id,
},
queue=QueueNames.REPORTING,
MessageGroupId=str(service_id),
)


Expand Down
8 changes: 6 additions & 2 deletions app/celery/research_mode_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,19 @@ def send_sms_response(provider, reference, to):
make_request(SMS_TYPE, provider, body, headers)


def send_email_response(reference, to):
def send_email_response(reference, to, service_id):
if to == perm_fail_email:
body = ses_hard_bounce_callback(reference)
elif to == temp_fail_email:
body = ses_soft_bounce_callback(reference)
else:
body = ses_notification_callback(reference)

process_ses_results.apply_async([body], queue=QueueNames.RESEARCH_MODE)
process_ses_results.apply_async(
[body],
queue=QueueNames.RESEARCH_MODE,
MessageGroupId=str(service_id),
)


def send_letter_response(notification_id: uuid.UUID, billable_units: int, postage: str):
Expand Down
15 changes: 12 additions & 3 deletions app/celery/scheduled_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,11 @@
def run_scheduled_jobs():
try:
for job in dao_set_scheduled_jobs_to_pending():
process_job.apply_async([str(job.id)], queue=QueueNames.JOBS)
process_job.apply_async(
[str(job.id)],
queue=QueueNames.JOBS,
MessageGroupId=str(job.service_id),
)
current_app.logger.info("Job ID %s added to process job queue", job.id, extra={"job_id": job.id})
except SQLAlchemyError:
current_app.logger.exception("Failed to run scheduled jobs")
Expand Down Expand Up @@ -335,7 +339,11 @@ def replay_created_notifications() -> None:
letter.id,
extra={"notification_id": letter.id},
)
get_pdf_for_templated_letter.apply_async([str(letter.id)], queue=QueueNames.CREATE_LETTERS_PDF)
get_pdf_for_templated_letter.apply_async(
[str(letter.id)],
queue=QueueNames.CREATE_LETTERS_PDF,
MessageGroupId=str(letter.service_id),
)


@notify_celery.task(name="check-if-letters-still-pending-virus-check")
Expand All @@ -359,6 +367,7 @@ def check_if_letters_still_pending_virus_check(max_minutes_ago_to_check: int = 3
name=TaskNames.SCAN_FILE,
kwargs={"filename": filename},
queue=QueueNames.ANTIVIRUS,
MessageGroupId=str(letter.service_id),
)
else:
current_app.logger.warning(
Expand Down Expand Up @@ -438,7 +447,7 @@ def check_for_missing_rows_in_completed_jobs():

extra = {"job_row_number": row_to_process.missing_row, "job_id": job.id}
current_app.logger.info("Processing missing row %(job_row_number)s for job %(job_id)s", extra, extra=extra)
process_job_row(template.template_type, task_args_kwargs)
process_job_row(template.template_type, task_args_kwargs, str(job.service_id))


@notify_celery.task(name="update-status-of-fully-processed-jobs")
Expand Down
40 changes: 28 additions & 12 deletions app/celery/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ class ProcessReportRequestException(Exception):
pass


@notify_celery.task(name="process-job")
def process_job(job_id, sender_id=None, shatter_batch_size=DEFAULT_SHATTER_JOB_ROWS_BATCH_SIZE):
@notify_celery.task(bind=True, name="process-job")
def process_job(self, job_id, sender_id=None, shatter_batch_size=DEFAULT_SHATTER_JOB_ROWS_BATCH_SIZE):
start = datetime.utcnow()
job = dao_get_job_by_id(job_id)
current_app.logger.info(
Expand Down Expand Up @@ -116,19 +116,20 @@ def process_job(job_id, sender_id=None, shatter_batch_size=DEFAULT_SHATTER_JOB_R
get_id_task_args_kwargs_for_job_row(row, template, job, service, sender_id=sender_id)[1]
for row in shatter_batch
]
_shatter_job_rows_with_subdivision(template.template_type, batch_args_kwargs)
_shatter_job_rows_with_subdivision(template.template_type, batch_args_kwargs, self.message_group_id)

job_complete(job, start=start)


def _shatter_job_rows_with_subdivision(template_type, args_kwargs_seq, top_level=True):
def _shatter_job_rows_with_subdivision(template_type, args_kwargs_seq, message_group_id, top_level=True):
try:
shatter_job_rows.apply_async(
(
template_type,
args_kwargs_seq,
),
queue=QueueNames.JOBS,
MessageGroupId=message_group_id,
)
except BotoClientError as e:
# this information is helpfully not preserved outside the message string of the exception, so
Expand All @@ -146,7 +147,7 @@ def _shatter_job_rows_with_subdivision(template_type, args_kwargs_seq, top_level
raise UnprocessableJobRow from e

for sub_batch in (args_kwargs_seq[:split_batch_size], args_kwargs_seq[split_batch_size:]):
_shatter_job_rows_with_subdivision(template_type, sub_batch, top_level=False)
_shatter_job_rows_with_subdivision(template_type, sub_batch, message_group_id, top_level=False)

else:
if not top_level:
Expand All @@ -156,16 +157,17 @@ def _shatter_job_rows_with_subdivision(template_type, args_kwargs_seq, top_level
)


@notify_celery.task(name="shatter-job-rows")
@notify_celery.task(bind=True, name="shatter-job-rows")
def shatter_job_rows(
self,
template_type: str,
args_kwargs_seq: Sequence,
):
for task_args_kwargs in args_kwargs_seq:
process_job_row(template_type, task_args_kwargs)
process_job_row(template_type, task_args_kwargs, self.message_group_id)


def process_job_row(template_type, task_args_kwargs):
def process_job_row(template_type, task_args_kwargs, message_group_id=None):
send_fn = {
SMS_TYPE: save_sms,
EMAIL_TYPE: save_email,
Expand All @@ -175,6 +177,7 @@ def process_job_row(template_type, task_args_kwargs):
send_fn.apply_async(
*task_args_kwargs,
queue=QueueNames.DATABASE,
MessageGroupId=message_group_id,
)


Expand Down Expand Up @@ -353,6 +356,7 @@ def save_sms(
provider_tasks.deliver_sms.apply_async(
[str(saved_notification.id)],
queue=QueueNames.SEND_SMS,
MessageGroupId=self.message_group_id,
)
else:
extra = {
Expand Down Expand Up @@ -437,6 +441,7 @@ def save_email(self, service_id, notification_id, encoded_notification, sender_i
provider_tasks.deliver_email.apply_async(
[str(saved_notification.id)],
queue=QueueNames.SEND_EMAIL,
MessageGroupId=self.message_group_id,
)

extra = {
Expand Down Expand Up @@ -494,7 +499,9 @@ def save_letter(
)

letters_pdf_tasks.get_pdf_for_templated_letter.apply_async(
[str(saved_notification.id)], queue=QueueNames.CREATE_LETTERS_PDF
[str(saved_notification.id)],
queue=QueueNames.CREATE_LETTERS_PDF,
MessageGroupId=self.message_group_id,
)

extra = {
Expand Down Expand Up @@ -529,7 +536,12 @@ def handle_exception(task, notification, notification_id, exc):
# send to the retry queue.
current_app.logger.exception("Retry: " + base_msg, extra, extra=extra) # noqa
try:
task.retry(queue=QueueNames.RETRY, exc=exc)
retry_kwargs = {
"queue": QueueNames.RETRY,
"exc": exc,
"MessageGroupId": task.message_group_id,
}
task.retry(**retry_kwargs)
except task.MaxRetriesExceededError:
current_app.logger.error("Max retry failed: " + base_msg, extra, extra=extra) # noqa

Expand Down Expand Up @@ -580,7 +592,7 @@ def process_incomplete_job(job_id, shatter_batch_size=DEFAULT_SHATTER_JOB_ROWS_B
get_id_task_args_kwargs_for_job_row(row, template, job, job.service, sender_id=sender_id)[1]
for row in shatter_batch
]
_shatter_job_rows_with_subdivision(template.template_type, batch_args_kwargs)
_shatter_job_rows_with_subdivision(template.template_type, batch_args_kwargs, str(job.service_id))

job_complete(job, resumed=True)

Expand Down Expand Up @@ -625,7 +637,11 @@ def _check_and_queue_returned_letter_callback_task(notification_id, service_id):
# queue callback task only if the service_callback_api exists
if service_callback_api := get_returned_letter_callback_api_for_service(service_id=service_id):
returned_letter_data = create_returned_letter_callback_data(notification_id, service_id, service_callback_api)
send_returned_letter_to_service.apply_async([returned_letter_data], queue=QueueNames.CALLBACKS)
send_returned_letter_to_service.apply_async(
[returned_letter_data],
queue=QueueNames.CALLBACKS,
MessageGroupId=str(service_id),
)


@notify_celery.task(bind=True, name="process-report-request")
Expand Down
4 changes: 4 additions & 0 deletions app/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,9 @@ class Config:
# URL of redis instance
REDIS_URL = os.getenv("REDIS_URL")
REDIS_ENABLED = False if os.environ.get("REDIS_ENABLED") == "0" else True

ENABLE_SQS_MESSAGE_GROUP_IDS = os.environ.get("ENABLE_SQS_MESSAGE_GROUP_IDS", "1") == "1"

EXPIRE_CACHE_TEN_MINUTES = 600
EXPIRE_CACHE_EIGHT_DAYS = 8 * 24 * 60 * 60

Expand Down Expand Up @@ -587,6 +590,7 @@ class Development(Config):
SERVER_NAME = os.getenv("SERVER_NAME")

REDIS_ENABLED = os.getenv("REDIS_ENABLED") == "1"
ENABLE_SQS_MESSAGE_GROUP_IDS = os.environ.get("ENABLE_SQS_MESSAGE_GROUP_IDS", "1") == "1"

S3_BUCKET_CSV_UPLOAD = "development-notifications-csv-upload"
S3_BUCKET_CONTACT_LIST = "development-contact-list"
Expand Down
2 changes: 1 addition & 1 deletion app/delivery/send_to_providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ def send_email_to_provider(notification):
if notification.key_type == KEY_TYPE_TEST:
notification.reference = str(create_uuid())
update_notification_to_sending(notification, provider)
send_email_response(notification.reference, notification.to)
send_email_response(notification.reference, notification.to, notification.service_id)
else:
email_sender_name = service.custom_email_sender_name or service.name
from_address = (
Expand Down
7 changes: 6 additions & 1 deletion app/job/rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,12 @@ def create_job(service_id):
sender_id = data.get("sender_id")

if job.job_status == JOB_STATUS_PENDING:
process_job.apply_async([str(job.id)], {"sender_id": sender_id}, queue=QueueNames.JOBS)
process_job.apply_async(
[str(job.id)],
{"sender_id": sender_id},
queue=QueueNames.JOBS,
MessageGroupId=str(job.service_id),
)

job_json = job_schema.dump(job)
job_json["statistics"] = []
Expand Down
10 changes: 8 additions & 2 deletions app/notifications/notifications_ses_callback.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,9 @@ def check_and_queue_callback_task(notification):
if service_callback_api:
notification_data = create_delivery_status_callback_data(notification, service_callback_api)
send_delivery_status_to_service.apply_async(
[str(notification.id), notification_data], queue=QueueNames.CALLBACKS
[str(notification.id), notification_data],
queue=QueueNames.CALLBACKS,
MessageGroupId=str(notification.service_id),
)


Expand All @@ -83,4 +85,8 @@ def _check_and_queue_complaint_callback_task(complaint, notification, recipient)
service_callback_api = get_complaint_callback_api_for_service(service_id=notification.service_id)
if service_callback_api:
complaint_data = create_complaint_callback_data(complaint, notification, service_callback_api, recipient)
send_complaint_to_service.apply_async([complaint_data], queue=QueueNames.CALLBACKS)
send_complaint_to_service.apply_async(
[complaint_data],
queue=QueueNames.CALLBACKS,
MessageGroupId=str(notification.service_id),
)
Loading
Loading