Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions backend/notifications/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ def send_email(
if not recipient and not recipient_email:
raise ValueError("Either recipient or recipient_email must be provided")

from notifications.tasks import send_pending_emails
from notifications.tasks import send_pending_email

recipient_email = recipient_email or recipient.email

Expand All @@ -207,7 +207,7 @@ def send_email(
or settings.DEFAULT_FROM_EMAIL
)

SentEmail.objects.create(
sent_email = SentEmail.objects.create(
email_template=self,
conference=self.conference,
from_email=from_email,
Expand All @@ -223,7 +223,7 @@ def send_email(
bcc_addresses=self.bcc_addresses,
)

transaction.on_commit(lambda: send_pending_emails.delay())
transaction.on_commit(lambda: send_pending_email.delay(sent_email.id))

@property
def is_custom(self):
Expand Down
80 changes: 42 additions & 38 deletions backend/notifications/tasks.py
Original file line number Diff line number Diff line change
@@ -1,59 +1,63 @@
from django.db import transaction

import logging
from uuid import uuid4
from pycon.celery_utils import OnlyOneAtTimeTask
from notifications.models import SentEmail
from django.db import transaction
from pycon.celery import app
from django.core.mail import EmailMultiAlternatives
from django.core.mail import get_connection

logger = logging.getLogger(__name__)


@app.task(base=OnlyOneAtTimeTask)
def send_pending_emails():
pending_emails = (
SentEmail.objects.pending().order_by("created").values_list("id", flat=True)
def send_pending_email_failed(self, exc, task_id, args, kwargs, einfo):
sent_email_id = args[0]

sent_email = SentEmail.objects.get(id=sent_email_id)
sent_email.mark_as_failed()
logger.error(
"Failed to send email sent_email_id=%s",
sent_email.id,
)
total_pending_emails = pending_emails.count()

if total_pending_emails == 0:
return

logger.info("Found %s pending emails", pending_emails.count())
@app.task(
bind=True,
autoretry_for=(Exception,),
retry_backoff=True,
max_retries=5,
default_retry_delay=2,
on_failure=send_pending_email_failed,
)
@transaction.atomic()
def send_pending_email(self, sent_email_id: int):
logger.info(
"Sending sent_email=%s (attempt=%s of %s)",
sent_email_id,
self.request.retries,
self.max_retries,
)

email_backend_connection = get_connection()
sent_email = (
SentEmail.objects.select_for_update(skip_locked=True)
.pending()
.filter(id=sent_email_id)
.first()
)

for email_id in pending_emails.iterator():
with transaction.atomic():
sent_email = (
SentEmail.objects.select_for_update(skip_locked=True)
.filter(
id=email_id,
)
.first()
)
if not sent_email:
return

if not sent_email or not sent_email.is_pending:
return
email_backend_connection = get_connection()

try:
message_id = send_email(sent_email, email_backend_connection)
sent_email.mark_as_sent(message_id)
message_id = send_email(sent_email, email_backend_connection)
sent_email.mark_as_sent(message_id)

logger.info(
"Email sent_email_id=%s sent with message_id=%s",
sent_email.id,
message_id,
)
except Exception as e:
sent_email.mark_as_failed()
logger.exception(
"Failed to send email sent_email_id=%s error=%s",
email_id,
e,
exc_info=e,
)
logger.info(
"Email sent_email_id=%s sent with message_id=%s",
sent_email.id,
message_id,
)


def send_email(sent_email, email_backend_connection):
Expand Down
8 changes: 4 additions & 4 deletions backend/notifications/tests/test_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ def test_send_email_template_to_recipient_email(
reply_to="[email protected]",
)

mock_send_pending_emails = mocker.patch(
"notifications.tasks.send_pending_emails.delay"
mock_send_pending_email = mocker.patch(
"notifications.tasks.send_pending_email.delay"
)

with django_capture_on_commit_callbacks(execute=True):
Expand All @@ -64,12 +64,12 @@ def test_send_email_template_to_recipient_email(
},
)

mock_send_pending_emails.assert_called_once()

sent_email = SentEmail.objects.get(
email_template=email_template,
)

mock_send_pending_email.assert_called_once_with(sent_email.id)

assert sent_email.recipient is None
assert sent_email.recipient_email == "[email protected]"

Expand Down
105 changes: 31 additions & 74 deletions backend/notifications/tests/test_tasks.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,24 @@
import smtplib
from unittest.mock import patch
from uuid import uuid4
import time_machine
from django.core import mail
from notifications.tasks import send_pending_emails
from notifications.tasks import send_pending_email, send_pending_email_failed
from notifications.models import SentEmail
from notifications.tests.factories import SentEmailFactory


def test_send_pending_emails_does_nothing_with_no_pending_emails():
SentEmailFactory(status=SentEmail.Status.sent, sent_at="2020-01-01 12:00Z")
SentEmailFactory(status=SentEmail.Status.sent, sent_at="2020-01-01 12:00Z")
SentEmailFactory(status=SentEmail.Status.sent, sent_at="2020-01-01 12:00Z")
SentEmailFactory(status=SentEmail.Status.sent, sent_at="2020-01-01 12:00Z")
def test_send_pending_email_does_nothing_with_non_pending_email():
sent_email = SentEmailFactory(
status=SentEmail.Status.sent, sent_at="2020-01-01 12:00Z"
)

send_pending_emails()
send_pending_email(sent_email.id)

assert len(mail.outbox) == 0


def test_send_pending_emails_task_sends_data_correctly():
def test_send_pending_email_task_sends_data_correctly():
pending_email_1 = SentEmailFactory(
status=SentEmail.Status.pending,
reply_to="[email protected]",
Expand All @@ -29,7 +30,8 @@ def test_send_pending_emails_task_sends_data_correctly():
subject="subject",
)

send_pending_emails()
with time_machine.travel("2021-01-01 12:00Z", tick=False):
send_pending_email(pending_email_1.id)

assert len(mail.outbox) == 1

Expand All @@ -42,8 +44,16 @@ def test_send_pending_emails_task_sends_data_correctly():
assert mail.outbox[0].alternatives == [("html body", "text/html")]
assert mail.outbox[0].subject == "subject"

pending_email_1.refresh_from_db()

assert len(mail.outbox) == 1

assert pending_email_1.status == SentEmail.Status.sent
assert pending_email_1.message_id.startswith("local-")
assert pending_email_1.sent_at.isoformat() == "2021-01-01T12:00:00+00:00"


def test_send_pending_emails_task_doesnt_double_send():
def test_send_pending_email_task_doesnt_double_send():
pending_email_1 = SentEmailFactory(status=SentEmail.Status.pending)
original_qs = SentEmail.objects.select_for_update(skip_locked=True).filter(
id=pending_email_1.id
Expand All @@ -57,81 +67,28 @@ def side_effect(*args, **kwargs):
"notifications.tasks.SentEmail.objects.select_for_update",
side_effect=side_effect,
):
send_pending_emails()
send_pending_email(pending_email_1.id)

pending_email_1.refresh_from_db()

assert len(mail.outbox) == 0


def test_send_pending_emails_task():
pending_email_1 = SentEmailFactory(status=SentEmail.Status.pending)
pending_email_2 = SentEmailFactory(status=SentEmail.Status.pending)
sent_email_1 = SentEmailFactory(
status=SentEmail.Status.sent, message_id="abc-abc", sent_at="2020-01-01 12:00Z"
)

with time_machine.travel("2021-01-01 12:00Z", tick=False):
send_pending_emails()

pending_email_1.refresh_from_db()
pending_email_2.refresh_from_db()
sent_email_1.refresh_from_db()

assert len(mail.outbox) == 2

assert mail.outbox[0].to == [pending_email_1.recipient_email]
assert mail.outbox[1].to == [pending_email_2.recipient_email]

assert pending_email_1.status == SentEmail.Status.sent
assert pending_email_1.message_id.startswith("local-")
assert pending_email_1.sent_at.isoformat() == "2021-01-01T12:00:00+00:00"

assert pending_email_2.status == SentEmail.Status.sent
assert pending_email_2.message_id.startswith("local-")
assert pending_email_2.sent_at.isoformat() == "2021-01-01T12:00:00+00:00"

assert sent_email_1.status == SentEmail.Status.sent
assert sent_email_1.message_id == "abc-abc"
assert sent_email_1.sent_at.isoformat() == "2020-01-01T12:00:00+00:00"


def test_send_pending_emails_handles_failures(mocker):
def test_send_pending_email_failure():
pending_email_1 = SentEmailFactory(
status=SentEmail.Status.pending, created="2020-01-01 12:00Z"
)
pending_email_2 = SentEmailFactory(
status=SentEmail.Status.pending, created="2020-01-02 12:00Z"
)

original_method = SentEmail.mark_as_sent

def _side_effect(*args, **kwargs):
if _side_effect.counter == 0:
_side_effect.counter = 1
raise ValueError("test")

return original_method(pending_email_2, *args, **kwargs)

_side_effect.counter = 0

mocker.patch("notifications.tasks.SentEmail.mark_as_sent", side_effect=_side_effect)

with time_machine.travel("2021-01-01 12:00Z", tick=False):
send_pending_emails()
send_pending_email_failed(
None,
smtplib.SMTPException("test"),
uuid4().hex,
(pending_email_1.id,),
{},
None,
)

pending_email_1.refresh_from_db()
pending_email_2.refresh_from_db()

assert len(mail.outbox) == 2

assert mail.outbox[0].to == [pending_email_1.recipient_email]
assert mail.outbox[1].to == [pending_email_2.recipient_email]

assert len(mail.outbox) == 0
assert pending_email_1.status == SentEmail.Status.failed
assert pending_email_1.message_id == ""
assert pending_email_1.sent_at is None

assert pending_email_2.status == SentEmail.Status.sent
assert pending_email_2.message_id.startswith("local-")
assert pending_email_2.sent_at.isoformat() == "2021-01-01T12:00:00+00:00"
6 changes: 0 additions & 6 deletions backend/pycon/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ def setup_periodic_tasks(sender, **kwargs):
from schedule.tasks import process_schedule_items_videos_to_upload
from files_upload.tasks import delete_unused_files
from pycon.tasks import check_for_idle_heavy_processing_workers
from notifications.tasks import send_pending_emails

add = sender.add_periodic_task

Expand All @@ -48,8 +47,3 @@ def setup_periodic_tasks(sender, **kwargs):
check_for_idle_heavy_processing_workers,
name="Check for idle heavy processing workers",
)
add(
timedelta(minutes=1),
send_pending_emails,
name="Send pending emails",
)