Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,11 @@ gem "aws-sdk-cloudwatch"
gem "aws-sdk-codepipeline", "~> 1.97"
gem "aws-sdk-s3"
gem "aws-sdk-sesv2"
gem "aws-sdk-sqs"
gem "aws-sdk-sts"

# TODO: Install gem from RubyGems when events support is released
# https://github.com/aws/aws-activejob-sqs-ruby/pull/22
gem "aws-activejob-sqs"
gem "aws-sdk-rails"

# For sending submissions as CSV
gem "csv"
Expand Down
14 changes: 9 additions & 5 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ GEM
addressable (2.8.6)
public_suffix (>= 2.0.2, < 6.0)
ast (2.4.3)
aws-activejob-sqs (1.0.2)
activejob (>= 7.1.0)
aws-sdk-sqs (~> 1, >= 1.56.0)
concurrent-ruby (~> 1)
aws-eventstream (1.3.2)
aws-partitions (1.1100.0)
aws-sdk-cloudwatch (1.113.0)
Expand All @@ -121,6 +125,9 @@ GEM
aws-sdk-kms (1.100.0)
aws-sdk-core (~> 3, >= 3.216.0)
aws-sigv4 (~> 1.5)
aws-sdk-rails (5.1.0)
aws-sdk-core (~> 3)
railties (>= 7.1.0)
aws-sdk-s3 (1.185.0)
aws-sdk-core (~> 3, >= 3.216.0)
aws-sdk-kms (~> 1)
Expand All @@ -131,9 +138,6 @@ GEM
aws-sdk-sqs (1.94.0)
aws-sdk-core (~> 3, >= 3.216.0)
aws-sigv4 (~> 1.5)
aws-sdk-sts (1.11.0)
aws-sdk-core (~> 3, >= 3.110.0)
aws-sigv4 (~> 1.1)
aws-sigv4 (1.11.0)
aws-eventstream (~> 1, >= 1.0.2)
axe-core-api (4.10.3)
Expand Down Expand Up @@ -510,12 +514,12 @@ PLATFORMS

DEPENDENCIES
activeresource
aws-activejob-sqs
aws-sdk-cloudwatch
aws-sdk-codepipeline (~> 1.97)
aws-sdk-rails
aws-sdk-s3
aws-sdk-sesv2
aws-sdk-sqs
aws-sdk-sts
axe-core-rspec
bootsnap
brakeman (~> 7.0)
Expand Down
98 changes: 20 additions & 78 deletions app/jobs/receive_submission_bounces_and_complaints_job.rb
Original file line number Diff line number Diff line change
@@ -1,91 +1,33 @@
class ReceiveSubmissionBouncesAndComplaintsJob < ApplicationJob
require "aws-sdk-sqs"
require "aws-sdk-sts"
self.queue_adapter = :sqs

queue_as :background
def perform(message)
sqs_message_id = message["message_id"]
CurrentJobLoggingAttributes.sqs_message_id = sqs_message_id

REGION = "eu-west-2".freeze
SQS_QUEUE_NAME = "submission_email_ses_bounces_and_complaints_queue".freeze
MAX_NUMBER_OF_MESSAGES = 10
POLLING_PERIOD = 20

def perform
CloudWatchService.log_job_started(self.class.name)
CurrentJobLoggingAttributes.job_id = job_id

sts_client = Aws::STS::Client.new(region: REGION)
queue_url = "https://sqs.#{REGION}.amazonaws.com/#{sts_client.get_caller_identity.account}/#{SQS_QUEUE_NAME}"
sqs_client = Aws::SQS::Client.new(region: REGION)

loop do
messages = receive_messages(sqs_client, queue_url)

break if messages.empty?

process_messages(sqs_client, queue_url, messages)
end
process_message(message["body"])
end

private

def receive_messages(sqs_client, queue_url)
response = sqs_client.receive_message(
queue_url: queue_url,
max_number_of_messages: MAX_NUMBER_OF_MESSAGES,
wait_time_seconds: POLLING_PERIOD, # Any value > 0 enables long polling
)

Rails.logger.info("Received #{response.messages.length} messages - #{self.class.name}")

response.messages
end

def process_messages(sqs_client, queue_url, messages)
messages.each do |message|
sqs_message_id = message.message_id
CurrentJobLoggingAttributes.sqs_message_id = sqs_message_id

receipt_handle = message.receipt_handle
sns_message = JSON.parse(message.body)
ses_message = JSON.parse(sns_message["Message"])
ses_message_id = ses_message["mail"]["messageId"]
CurrentJobLoggingAttributes.mail_message_id = ses_message_id

ses_event_type = ses_message["eventType"]
def process_message(ses_message)
ses_message_id = ses_message["Message"]["mail"]["messageId"]
CurrentJobLoggingAttributes.mail_message_id = ses_message_id

raise "Unexpected event type:#{ses_event_type}" unless %w[Bounce Complaint].include?(ses_event_type)
ses_event_type = ses_message["Message"]["eventType"]

submission = Submission.find_by!(mail_message_id: ses_message_id)

process_bounce(submission) if ses_event_type == "Bounce"
process_complaint(submission) if ses_event_type == "Complaint"

sqs_client.delete_message(queue_url: queue_url, receipt_handle: receipt_handle)
rescue StandardError => e
Rails.logger.warn("Error processing message - #{e.class.name}: #{e.message}")
Sentry.capture_exception(e)
ensure
CurrentJobLoggingAttributes.reset
end
end

def process_bounce(submission)
set_submission_logging_attributes(submission)

submission.bounced!

EventLogger.log_form_event("submission_bounced")

Sentry.capture_message("Submission email bounced - #{self.class.name}:", extra: {
form_id: submission.form_id,
submission_reference: submission.reference,
job_id:,
})
end

def process_complaint(submission)
submission = Submission.find_by!(mail_message_id: ses_message_id)
set_submission_logging_attributes(submission)

EventLogger.log_form_event("submission_complaint")
case ses_event_type
when "Bounce"
submission.bounced!
EventLogger.log_form_event("submission_bounced")
when "Complaint"
submission.complained!
EventLogger.log_form_event("submission_complained")
else
raise "Unexpected event type:#{ses_event_type}"
end
end
end
75 changes: 12 additions & 63 deletions app/jobs/receive_submission_deliveries_job.rb
Original file line number Diff line number Diff line change
@@ -1,77 +1,26 @@
class ReceiveSubmissionDeliveriesJob < ApplicationJob
require "aws-sdk-sqs"
require "aws-sdk-sts"
self.queue_adapter = :sqs

queue_as :background
def perform(message)
sqs_message_id = message["message_id"]
CurrentJobLoggingAttributes.sqs_message_id = sqs_message_id

REGION = "eu-west-2".freeze
SQS_QUEUE_NAME = "submission_email_ses_successful_deliveries_queue".freeze
MAX_NUMBER_OF_MESSAGES = 10
POLLING_PERIOD = 20

def perform
CloudWatchService.log_job_started(self.class.name)
CurrentJobLoggingAttributes.job_id = job_id

sts_client = Aws::STS::Client.new(region: REGION)
queue_url = "https://sqs.#{REGION}.amazonaws.com/#{sts_client.get_caller_identity.account}/#{SQS_QUEUE_NAME}"
sqs_client = Aws::SQS::Client.new(region: REGION)

loop do
messages = receive_messages(sqs_client, queue_url)

break if messages.empty?

process_messages(sqs_client, queue_url, messages)
end
process_message(message["body"])
end

private

def receive_messages(sqs_client, queue_url)
response = sqs_client.receive_message(
queue_url: queue_url,
max_number_of_messages: MAX_NUMBER_OF_MESSAGES,
wait_time_seconds: POLLING_PERIOD, # Any value > 0 enables long polling
)

Rails.logger.info("Received #{response.messages.length} messages - #{self.class.name}")

response.messages
end

def process_messages(sqs_client, queue_url, messages)
messages.each do |message|
sqs_message_id = message.message_id
CurrentJobLoggingAttributes.sqs_message_id = sqs_message_id
def process_message(ses_message)
ses_message_id = ses_message["Message"]["mail"]["messageId"]
CurrentJobLoggingAttributes.mail_message_id = ses_message_id

receipt_handle = message.receipt_handle
sns_message = JSON.parse(message.body)
ses_message = JSON.parse(sns_message["Message"])
ses_message_id = ses_message["mail"]["messageId"]
CurrentJobLoggingAttributes.mail_message_id = ses_message_id
ses_event_type = ses_message["Message"]["eventType"]
raise "Unexpected event type:#{ses_event_type}" unless ses_event_type == "Delivery"

ses_event_type = ses_message["eventType"]

raise "Unexpected event type:#{ses_event_type}" unless ses_event_type == "Delivery"

submission = Submission.find_by!(mail_message_id: ses_message_id)

process_delivery(submission)

sqs_client.delete_message(queue_url: queue_url, receipt_handle: receipt_handle)
rescue StandardError => e
Rails.logger.warn("Error processing message - #{e.class.name}: #{e.message}")
Sentry.capture_exception(e)
ensure
CurrentJobLoggingAttributes.reset
end
end

def process_delivery(submission)
submission = Submission.find_by!(mail_message_id: ses_message_id)
set_submission_logging_attributes(submission)

submission.delivered!

EventLogger.log_form_event("submission_delivered")
end
end
9 changes: 9 additions & 0 deletions config/aws_active_job_sqs.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
backpressure: 5 # configure global options for poller
max_messages: 3
queues:
deliveries:
event_message_class: "ReceiveSubmissionDeliveriesJob"
# Set the sqs url via the AWS_ACTIVE_JOB_SQS_DELIVERIES_URL env var
bounces_and_complaints: # You can name this queue as you see fit
event_message_class: "ReceiveSubmissionBouncesAndComplaintsJob"
# Set the sqs url via the AWS_ACTIVE_JOB_SQS_BOUNCES_AND_COMPLAINTS_URL env var
5 changes: 5 additions & 0 deletions config/puma.rb
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,8 @@
persistent_timeout ENV.fetch("RAILS_PERSISTENT_TIMEOUT", 75)

plugin :solid_queue if Rails.env.production?
plugin :aws_activejob_sqs if Rails.env.production?

Puma::Plugins::AwsActiveJobSQS.sqs_poller_options = {
queues: %i[deliveries bounces_and_complaints],
}
99 changes: 99 additions & 0 deletions lib/puma/plugin/aws_activejob_sqs.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
# TODO: contribute this back to the aws-sdk-rails gem
require "puma/plugin"

module Puma
module Plugin
module AwsActivejobSqs
module PluginInstanceMethods
class << self
attr_writer :sqs_poller_options

def sqs_poller_options
@sqs_poller_options ||= {}
end
end

attr_reader :puma_pid, :sqs_poller_pid, :log_writer

def start(launcher)
@log_writer = launcher.log_writer
@puma_pid = $$

in_background do
monitor_sqs_poller
end

launcher.events.on_booted do
# TODO: there should be separate process per queue
@sqs_poller_pid = fork do
Thread.new { monitor_puma }
Aws::ActiveJob::SQS::Poller.new(self.class.sqs_poller_options.to_h.compact).run
end
end

launcher.events.on_stopped { stop_sqs_poller }
launcher.events.on_restart { stop_sqs_poller }
end

private

def stop_sqs_poller
return unless sqs_poller_pid

begin
Process.waitpid(sqs_poller_pid, Process::WNOHANG)
log "Stopping SQS Poller..."
Process.kill(:INT, sqs_poller_pid)
Process.wait(sqs_poller_pid)
rescue Errno::ECHILD, Errno::ESRCH
log "SQS Poller process not found or already stopped."
end
end

def monitor_puma
monitor(:puma_dead?, "Detected Puma has gone away, stopping SQS Poller...")
end

def monitor_sqs_poller
monitor(:sqs_poller_dead?, "Detected SQS Poller has gone away, stopping Puma...")
end

def monitor(process_dead, message)
loop do
if send(process_dead)
log message
Process.kill(:INT, $$)
break
end
sleep 2
end
end

def sqs_poller_dead?
if sqs_poller_started?
Process.waitpid(sqs_poller_pid, Process::WNOHANG)
end
false
rescue Errno::ECHILD, Errno::ESRCH
true
end

def sqs_poller_started?
sqs_poller_pid.present?
end

def puma_dead?
Process.ppid != puma_pid
end

def log(...)
log_writer.log(...)
end
end
end
end
end

Puma::Plugin.create do
include Puma::Plugin::AwsActivejobSqs::PluginInstanceMethods
end