Skip to content

Commit 0fccf8e

Browse files
committed
Convert sumbission ActiveJobs to use SQS adapter
1 parent ff3804e commit 0fccf8e

File tree

3 files changed

+41
-141
lines changed

3 files changed

+41
-141
lines changed
Lines changed: 20 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -1,91 +1,33 @@
11
class ReceiveSubmissionBouncesAndComplaintsJob < ApplicationJob
2-
require "aws-sdk-sqs"
3-
require "aws-sdk-sts"
2+
self.queue_adapter = :sqs
43

5-
queue_as :background
4+
def perform(message)
5+
sqs_message_id = message["message_id"]
6+
CurrentJobLoggingAttributes.sqs_message_id = sqs_message_id
67

7-
REGION = "eu-west-2".freeze
8-
SQS_QUEUE_NAME = "submission_email_ses_bounces_and_complaints_queue".freeze
9-
MAX_NUMBER_OF_MESSAGES = 10
10-
POLLING_PERIOD = 20
11-
12-
def perform
13-
CloudWatchService.log_job_started(self.class.name)
14-
CurrentJobLoggingAttributes.job_id = job_id
15-
16-
sts_client = Aws::STS::Client.new(region: REGION)
17-
queue_url = "https://sqs.#{REGION}.amazonaws.com/#{sts_client.get_caller_identity.account}/#{SQS_QUEUE_NAME}"
18-
sqs_client = Aws::SQS::Client.new(region: REGION)
19-
20-
loop do
21-
messages = receive_messages(sqs_client, queue_url)
22-
23-
break if messages.empty?
24-
25-
process_messages(sqs_client, queue_url, messages)
26-
end
8+
process_message(message["body"])
279
end
2810

2911
private
3012

31-
def receive_messages(sqs_client, queue_url)
32-
response = sqs_client.receive_message(
33-
queue_url: queue_url,
34-
max_number_of_messages: MAX_NUMBER_OF_MESSAGES,
35-
wait_time_seconds: POLLING_PERIOD, # Any value > 0 enables long polling
36-
)
37-
38-
Rails.logger.info("Received #{response.messages.length} messages - #{self.class.name}")
39-
40-
response.messages
41-
end
42-
43-
def process_messages(sqs_client, queue_url, messages)
44-
messages.each do |message|
45-
sqs_message_id = message.message_id
46-
CurrentJobLoggingAttributes.sqs_message_id = sqs_message_id
47-
48-
receipt_handle = message.receipt_handle
49-
sns_message = JSON.parse(message.body)
50-
ses_message = JSON.parse(sns_message["Message"])
51-
ses_message_id = ses_message["mail"]["messageId"]
52-
CurrentJobLoggingAttributes.mail_message_id = ses_message_id
53-
54-
ses_event_type = ses_message["eventType"]
13+
def process_message(ses_message)
14+
ses_message_id = ses_message["Message"]["mail"]["messageId"]
15+
CurrentJobLoggingAttributes.mail_message_id = ses_message_id
5516

56-
raise "Unexpected event type:#{ses_event_type}" unless %w[Bounce Complaint].include?(ses_event_type)
17+
ses_event_type = ses_message["Message"]["eventType"]
5718

58-
submission = Submission.find_by!(mail_message_id: ses_message_id)
59-
60-
process_bounce(submission) if ses_event_type == "Bounce"
61-
process_complaint(submission) if ses_event_type == "Complaint"
62-
63-
sqs_client.delete_message(queue_url: queue_url, receipt_handle: receipt_handle)
64-
rescue StandardError => e
65-
Rails.logger.warn("Error processing message - #{e.class.name}: #{e.message}")
66-
Sentry.capture_exception(e)
67-
ensure
68-
CurrentJobLoggingAttributes.reset
69-
end
70-
end
71-
72-
def process_bounce(submission)
73-
set_submission_logging_attributes(submission)
74-
75-
submission.bounced!
76-
77-
EventLogger.log_form_event("submission_bounced")
78-
79-
Sentry.capture_message("Submission email bounced - #{self.class.name}:", extra: {
80-
form_id: submission.form_id,
81-
submission_reference: submission.reference,
82-
job_id:,
83-
})
84-
end
85-
86-
def process_complaint(submission)
19+
submission = Submission.find_by!(mail_message_id: ses_message_id)
8720
set_submission_logging_attributes(submission)
8821

89-
EventLogger.log_form_event("submission_complaint")
22+
case ses_event_type
23+
when "Bounce"
24+
submission.bounced!
25+
EventLogger.log_form_event("submission_bounced")
26+
when "Complaint"
27+
submission.complained!
28+
EventLogger.log_form_event("submission_complained")
29+
else
30+
raise "Unexpected event type:#{ses_event_type}"
31+
end
9032
end
9133
end
Lines changed: 12 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -1,77 +1,26 @@
11
class ReceiveSubmissionDeliveriesJob < ApplicationJob
2-
require "aws-sdk-sqs"
3-
require "aws-sdk-sts"
2+
self.queue_adapter = :sqs
43

5-
queue_as :background
4+
def perform(message)
5+
sqs_message_id = message["message_id"]
6+
CurrentJobLoggingAttributes.sqs_message_id = sqs_message_id
67

7-
REGION = "eu-west-2".freeze
8-
SQS_QUEUE_NAME = "submission_email_ses_successful_deliveries_queue".freeze
9-
MAX_NUMBER_OF_MESSAGES = 10
10-
POLLING_PERIOD = 20
11-
12-
def perform
13-
CloudWatchService.log_job_started(self.class.name)
14-
CurrentJobLoggingAttributes.job_id = job_id
15-
16-
sts_client = Aws::STS::Client.new(region: REGION)
17-
queue_url = "https://sqs.#{REGION}.amazonaws.com/#{sts_client.get_caller_identity.account}/#{SQS_QUEUE_NAME}"
18-
sqs_client = Aws::SQS::Client.new(region: REGION)
19-
20-
loop do
21-
messages = receive_messages(sqs_client, queue_url)
22-
23-
break if messages.empty?
24-
25-
process_messages(sqs_client, queue_url, messages)
26-
end
8+
process_message(message["body"])
279
end
2810

2911
private
3012

31-
def receive_messages(sqs_client, queue_url)
32-
response = sqs_client.receive_message(
33-
queue_url: queue_url,
34-
max_number_of_messages: MAX_NUMBER_OF_MESSAGES,
35-
wait_time_seconds: POLLING_PERIOD, # Any value > 0 enables long polling
36-
)
37-
38-
Rails.logger.info("Received #{response.messages.length} messages - #{self.class.name}")
39-
40-
response.messages
41-
end
42-
43-
def process_messages(sqs_client, queue_url, messages)
44-
messages.each do |message|
45-
sqs_message_id = message.message_id
46-
CurrentJobLoggingAttributes.sqs_message_id = sqs_message_id
13+
def process_message(ses_message)
14+
ses_message_id = ses_message["Message"]["mail"]["messageId"]
15+
CurrentJobLoggingAttributes.mail_message_id = ses_message_id
4716

48-
receipt_handle = message.receipt_handle
49-
sns_message = JSON.parse(message.body)
50-
ses_message = JSON.parse(sns_message["Message"])
51-
ses_message_id = ses_message["mail"]["messageId"]
52-
CurrentJobLoggingAttributes.mail_message_id = ses_message_id
17+
ses_event_type = ses_message["Message"]["eventType"]
18+
raise "Unexpected event type:#{ses_event_type}" unless ses_event_type == "Delivery"
5319

54-
ses_event_type = ses_message["eventType"]
55-
56-
raise "Unexpected event type:#{ses_event_type}" unless ses_event_type == "Delivery"
57-
58-
submission = Submission.find_by!(mail_message_id: ses_message_id)
59-
60-
process_delivery(submission)
61-
62-
sqs_client.delete_message(queue_url: queue_url, receipt_handle: receipt_handle)
63-
rescue StandardError => e
64-
Rails.logger.warn("Error processing message - #{e.class.name}: #{e.message}")
65-
Sentry.capture_exception(e)
66-
ensure
67-
CurrentJobLoggingAttributes.reset
68-
end
69-
end
70-
71-
def process_delivery(submission)
20+
submission = Submission.find_by!(mail_message_id: ses_message_id)
7221
set_submission_logging_attributes(submission)
73-
7422
submission.delivered!
23+
7524
EventLogger.log_form_event("submission_delivered")
7625
end
7726
end

config/aws_active_job_sqs.yml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
backpressure: 5 # configure global options for poller
2+
max_messages: 3
3+
queues:
4+
deliveries:
5+
event_message_class: "ReceiveSubmissionDeliveriesJob"
6+
# Set the sqs url via the AWS_ACTIVE_JOB_SQS_DELIVERIES_URL env var
7+
bounces_and_complaints: # You can name this queue as you see fit
8+
event_message_class: "ReceiveSubmissionBouncesAndComplaintsJob"
9+
# Set the sqs url via the AWS_ACTIVE_JOB_SQS_BOUNCES_AND_COMPLAINTS_URL env var

0 commit comments

Comments
 (0)