-
Notifications
You must be signed in to change notification settings - Fork 32
Expand file tree
/
Copy pathnotifications_ses_callback.py
More file actions
101 lines (83 loc) · 4.03 KB
/
notifications_ses_callback.py
File metadata and controls
101 lines (83 loc) · 4.03 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
from flask import current_app
from app.celery.queue_utils import get_message_group_id_for_queue
from app.celery.service_callback_tasks import (
create_complaint_callback_data,
create_delivery_status_callback_data,
send_complaint_to_service,
send_delivery_status_to_service,
)
from app.config import QueueNames
from app.dao.complaint_dao import save_complaint
from app.dao.notifications_dao import (
dao_get_notification_or_history_by_reference,
)
from app.dao.service_callback_api_dao import (
get_complaint_callback_api_for_service,
get_delivery_status_callback_api_for_service,
)
from app.models import Complaint
def determine_notification_bounce_type(notification_type, ses_message):
remove_emails_from_bounce(ses_message)
if ses_message["bounce"]["bounceType"] == "Permanent":
notification_type = ses_message["bounce"]["bounceType"] # permanent or not
else:
notification_type = "Temporary"
return notification_type, ses_message
def handle_complaint(ses_message):
recipient_email = remove_emails_from_complaint(ses_message)[0]
current_app.logger.info("Complaint from SES: \n%s", ses_message)
try:
reference = ses_message["mail"]["messageId"]
except KeyError:
current_app.logger.exception("Complaint from SES failed to get reference from message")
return
notification = dao_get_notification_or_history_by_reference(reference)
ses_complaint = ses_message.get("complaint", None)
complaint = Complaint(
notification_id=notification.id,
service_id=notification.service_id,
ses_feedback_id=ses_complaint.get("feedbackId", None) if ses_complaint else None,
complaint_type=ses_complaint.get("complaintFeedbackType", None) if ses_complaint else None,
complaint_date=ses_complaint.get("timestamp", None) if ses_complaint else None,
)
save_complaint(complaint)
return complaint, notification, recipient_email
def remove_mail_headers(dict_to_edit):
if dict_to_edit["mail"].get("headers"):
dict_to_edit["mail"].pop("headers")
if dict_to_edit["mail"].get("commonHeaders"):
dict_to_edit["mail"].pop("commonHeaders")
def remove_emails_from_bounce(bounce_dict):
remove_mail_headers(bounce_dict)
bounce_dict["mail"].pop("destination")
bounce_dict["bounce"].pop("bouncedRecipients")
def remove_emails_from_complaint(complaint_dict):
remove_mail_headers(complaint_dict)
complaint_dict["complaint"].pop("complainedRecipients")
return complaint_dict["mail"].pop("destination")
def check_and_queue_callback_task(notification):
# queue callback task only if the service_callback_api exists
service_callback_api = get_delivery_status_callback_api_for_service(service_id=notification.service_id)
if service_callback_api:
notification_data = create_delivery_status_callback_data(notification, service_callback_api)
queue_name = QueueNames.CALLBACKS
message_group_kwargs = get_message_group_id_for_queue(
queue_name=queue_name,
service_id=str(notification.service_id),
notification_type=notification.notification_type,
)
send_delivery_status_to_service.apply_async(
[str(notification.id), notification_data], queue=queue_name, **message_group_kwargs
)
def _check_and_queue_complaint_callback_task(complaint, notification, recipient):
# queue callback task only if the service_callback_api exists
service_callback_api = get_complaint_callback_api_for_service(service_id=notification.service_id)
if service_callback_api:
complaint_data = create_complaint_callback_data(complaint, notification, service_callback_api, recipient)
queue_name = QueueNames.CALLBACKS
message_group_kwargs = get_message_group_id_for_queue(
queue_name=queue_name,
service_id=str(notification.service_id),
notification_type=notification.notification_type,
)
send_complaint_to_service.apply_async([complaint_data], queue=queue_name, **message_group_kwargs)