diff --git a/Gemfile b/Gemfile index 2fc586e86..141d0f85c 100644 --- a/Gemfile +++ b/Gemfile @@ -65,8 +65,11 @@ gem "aws-sdk-cloudwatch" gem "aws-sdk-codepipeline", "~> 1.97" gem "aws-sdk-s3" gem "aws-sdk-sesv2" -gem "aws-sdk-sqs" -gem "aws-sdk-sts" + +# TODO: Install gem from RubyGems when events support is released +# https://github.com/aws/aws-activejob-sqs-ruby/pull/22 +gem "aws-activejob-sqs" +gem "aws-sdk-rails" # For sending submissions as CSV gem "csv" diff --git a/Gemfile.lock b/Gemfile.lock index 92558b355..3c859f351 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -103,6 +103,10 @@ GEM addressable (2.8.6) public_suffix (>= 2.0.2, < 6.0) ast (2.4.3) + aws-activejob-sqs (1.0.2) + activejob (>= 7.1.0) + aws-sdk-sqs (~> 1, >= 1.56.0) + concurrent-ruby (~> 1) aws-eventstream (1.3.2) aws-partitions (1.1100.0) aws-sdk-cloudwatch (1.113.0) @@ -121,6 +125,9 @@ GEM aws-sdk-kms (1.100.0) aws-sdk-core (~> 3, >= 3.216.0) aws-sigv4 (~> 1.5) + aws-sdk-rails (5.1.0) + aws-sdk-core (~> 3) + railties (>= 7.1.0) aws-sdk-s3 (1.185.0) aws-sdk-core (~> 3, >= 3.216.0) aws-sdk-kms (~> 1) @@ -131,9 +138,6 @@ GEM aws-sdk-sqs (1.94.0) aws-sdk-core (~> 3, >= 3.216.0) aws-sigv4 (~> 1.5) - aws-sdk-sts (1.11.0) - aws-sdk-core (~> 3, >= 3.110.0) - aws-sigv4 (~> 1.1) aws-sigv4 (1.11.0) aws-eventstream (~> 1, >= 1.0.2) axe-core-api (4.10.3) @@ -510,12 +514,12 @@ PLATFORMS DEPENDENCIES activeresource + aws-activejob-sqs aws-sdk-cloudwatch aws-sdk-codepipeline (~> 1.97) + aws-sdk-rails aws-sdk-s3 aws-sdk-sesv2 - aws-sdk-sqs - aws-sdk-sts axe-core-rspec bootsnap brakeman (~> 7.0) diff --git a/app/jobs/receive_submission_bounces_and_complaints_job.rb b/app/jobs/receive_submission_bounces_and_complaints_job.rb index 8779bf4b3..489f00bc1 100644 --- a/app/jobs/receive_submission_bounces_and_complaints_job.rb +++ b/app/jobs/receive_submission_bounces_and_complaints_job.rb @@ -1,91 +1,33 @@ class ReceiveSubmissionBouncesAndComplaintsJob < ApplicationJob - require "aws-sdk-sqs" - require "aws-sdk-sts" + self.queue_adapter = :sqs - queue_as :background + def perform(message) + sqs_message_id = message["message_id"] + CurrentJobLoggingAttributes.sqs_message_id = sqs_message_id - REGION = "eu-west-2".freeze - SQS_QUEUE_NAME = "submission_email_ses_bounces_and_complaints_queue".freeze - MAX_NUMBER_OF_MESSAGES = 10 - POLLING_PERIOD = 20 - - def perform - CloudWatchService.log_job_started(self.class.name) - CurrentJobLoggingAttributes.job_id = job_id - - sts_client = Aws::STS::Client.new(region: REGION) - queue_url = "https://sqs.#{REGION}.amazonaws.com/#{sts_client.get_caller_identity.account}/#{SQS_QUEUE_NAME}" - sqs_client = Aws::SQS::Client.new(region: REGION) - - loop do - messages = receive_messages(sqs_client, queue_url) - - break if messages.empty? - - process_messages(sqs_client, queue_url, messages) - end + process_message(message["body"]) end private - def receive_messages(sqs_client, queue_url) - response = sqs_client.receive_message( - queue_url: queue_url, - max_number_of_messages: MAX_NUMBER_OF_MESSAGES, - wait_time_seconds: POLLING_PERIOD, # Any value > 0 enables long polling - ) - - Rails.logger.info("Received #{response.messages.length} messages - #{self.class.name}") - - response.messages - end - - def process_messages(sqs_client, queue_url, messages) - messages.each do |message| - sqs_message_id = message.message_id - CurrentJobLoggingAttributes.sqs_message_id = sqs_message_id - - receipt_handle = message.receipt_handle - sns_message = JSON.parse(message.body) - ses_message = JSON.parse(sns_message["Message"]) - ses_message_id = ses_message["mail"]["messageId"] - CurrentJobLoggingAttributes.mail_message_id = ses_message_id - - ses_event_type = ses_message["eventType"] + def process_message(ses_message) + ses_message_id = ses_message["Message"]["mail"]["messageId"] + CurrentJobLoggingAttributes.mail_message_id = ses_message_id - raise "Unexpected event type:#{ses_event_type}" unless %w[Bounce Complaint].include?(ses_event_type) + ses_event_type = ses_message["Message"]["eventType"] - submission = Submission.find_by!(mail_message_id: ses_message_id) - - process_bounce(submission) if ses_event_type == "Bounce" - process_complaint(submission) if ses_event_type == "Complaint" - - sqs_client.delete_message(queue_url: queue_url, receipt_handle: receipt_handle) - rescue StandardError => e - Rails.logger.warn("Error processing message - #{e.class.name}: #{e.message}") - Sentry.capture_exception(e) - ensure - CurrentJobLoggingAttributes.reset - end - end - - def process_bounce(submission) - set_submission_logging_attributes(submission) - - submission.bounced! - - EventLogger.log_form_event("submission_bounced") - - Sentry.capture_message("Submission email bounced - #{self.class.name}:", extra: { - form_id: submission.form_id, - submission_reference: submission.reference, - job_id:, - }) - end - - def process_complaint(submission) + submission = Submission.find_by!(mail_message_id: ses_message_id) set_submission_logging_attributes(submission) - EventLogger.log_form_event("submission_complaint") + case ses_event_type + when "Bounce" + submission.bounced! + EventLogger.log_form_event("submission_bounced") + when "Complaint" + submission.complained! + EventLogger.log_form_event("submission_complained") + else + raise "Unexpected event type:#{ses_event_type}" + end end end diff --git a/app/jobs/receive_submission_deliveries_job.rb b/app/jobs/receive_submission_deliveries_job.rb index d5dd470cf..c5db23e3a 100644 --- a/app/jobs/receive_submission_deliveries_job.rb +++ b/app/jobs/receive_submission_deliveries_job.rb @@ -1,77 +1,26 @@ class ReceiveSubmissionDeliveriesJob < ApplicationJob - require "aws-sdk-sqs" - require "aws-sdk-sts" + self.queue_adapter = :sqs - queue_as :background + def perform(message) + sqs_message_id = message["message_id"] + CurrentJobLoggingAttributes.sqs_message_id = sqs_message_id - REGION = "eu-west-2".freeze - SQS_QUEUE_NAME = "submission_email_ses_successful_deliveries_queue".freeze - MAX_NUMBER_OF_MESSAGES = 10 - POLLING_PERIOD = 20 - - def perform - CloudWatchService.log_job_started(self.class.name) - CurrentJobLoggingAttributes.job_id = job_id - - sts_client = Aws::STS::Client.new(region: REGION) - queue_url = "https://sqs.#{REGION}.amazonaws.com/#{sts_client.get_caller_identity.account}/#{SQS_QUEUE_NAME}" - sqs_client = Aws::SQS::Client.new(region: REGION) - - loop do - messages = receive_messages(sqs_client, queue_url) - - break if messages.empty? - - process_messages(sqs_client, queue_url, messages) - end + process_message(message["body"]) end private - def receive_messages(sqs_client, queue_url) - response = sqs_client.receive_message( - queue_url: queue_url, - max_number_of_messages: MAX_NUMBER_OF_MESSAGES, - wait_time_seconds: POLLING_PERIOD, # Any value > 0 enables long polling - ) - - Rails.logger.info("Received #{response.messages.length} messages - #{self.class.name}") - - response.messages - end - - def process_messages(sqs_client, queue_url, messages) - messages.each do |message| - sqs_message_id = message.message_id - CurrentJobLoggingAttributes.sqs_message_id = sqs_message_id + def process_message(ses_message) + ses_message_id = ses_message["Message"]["mail"]["messageId"] + CurrentJobLoggingAttributes.mail_message_id = ses_message_id - receipt_handle = message.receipt_handle - sns_message = JSON.parse(message.body) - ses_message = JSON.parse(sns_message["Message"]) - ses_message_id = ses_message["mail"]["messageId"] - CurrentJobLoggingAttributes.mail_message_id = ses_message_id + ses_event_type = ses_message["Message"]["eventType"] + raise "Unexpected event type:#{ses_event_type}" unless ses_event_type == "Delivery" - ses_event_type = ses_message["eventType"] - - raise "Unexpected event type:#{ses_event_type}" unless ses_event_type == "Delivery" - - submission = Submission.find_by!(mail_message_id: ses_message_id) - - process_delivery(submission) - - sqs_client.delete_message(queue_url: queue_url, receipt_handle: receipt_handle) - rescue StandardError => e - Rails.logger.warn("Error processing message - #{e.class.name}: #{e.message}") - Sentry.capture_exception(e) - ensure - CurrentJobLoggingAttributes.reset - end - end - - def process_delivery(submission) + submission = Submission.find_by!(mail_message_id: ses_message_id) set_submission_logging_attributes(submission) - submission.delivered! + EventLogger.log_form_event("submission_delivered") end end diff --git a/config/aws_active_job_sqs.yml b/config/aws_active_job_sqs.yml new file mode 100644 index 000000000..e59304ecc --- /dev/null +++ b/config/aws_active_job_sqs.yml @@ -0,0 +1,9 @@ +backpressure: 5 # configure global options for poller +max_messages: 3 +queues: + deliveries: + event_message_class: "ReceiveSubmissionDeliveriesJob" + # Set the sqs url via the AWS_ACTIVE_JOB_SQS_DELIVERIES_URL env var + bounces_and_complaints: # You can name this queue as you see fit + event_message_class: "ReceiveSubmissionBouncesAndComplaintsJob" + # Set the sqs url via the AWS_ACTIVE_JOB_SQS_BOUNCES_AND_COMPLAINTS_URL env var diff --git a/config/puma.rb b/config/puma.rb index 361216c8f..c1e6165a8 100644 --- a/config/puma.rb +++ b/config/puma.rb @@ -42,3 +42,8 @@ persistent_timeout ENV.fetch("RAILS_PERSISTENT_TIMEOUT", 75) plugin :solid_queue if Rails.env.production? +plugin :aws_activejob_sqs if Rails.env.production? + +Puma::Plugins::AwsActiveJobSQS.sqs_poller_options = { + queues: %i[deliveries bounces_and_complaints], +} diff --git a/lib/puma/plugin/aws_activejob_sqs.rb b/lib/puma/plugin/aws_activejob_sqs.rb new file mode 100644 index 000000000..deb179c55 --- /dev/null +++ b/lib/puma/plugin/aws_activejob_sqs.rb @@ -0,0 +1,99 @@ +# TODO: contribute this back to the aws-sdk-rails gem +require "puma/plugin" + +module Puma + module Plugin + module AwsActivejobSqs + module PluginInstanceMethods + class << self + attr_writer :sqs_poller_options + + def sqs_poller_options + @sqs_poller_options ||= {} + end + end + + attr_reader :puma_pid, :sqs_poller_pid, :log_writer + + def start(launcher) + @log_writer = launcher.log_writer + @puma_pid = $$ + + in_background do + monitor_sqs_poller + end + + launcher.events.on_booted do + # TODO: there should be separate process per queue + @sqs_poller_pid = fork do + Thread.new { monitor_puma } + Aws::ActiveJob::SQS::Poller.new(self.class.sqs_poller_options.to_h.compact).run + end + end + + launcher.events.on_stopped { stop_sqs_poller } + launcher.events.on_restart { stop_sqs_poller } + end + + private + + def stop_sqs_poller + return unless sqs_poller_pid + + begin + Process.waitpid(sqs_poller_pid, Process::WNOHANG) + log "Stopping SQS Poller..." + Process.kill(:INT, sqs_poller_pid) + Process.wait(sqs_poller_pid) + rescue Errno::ECHILD, Errno::ESRCH + log "SQS Poller process not found or already stopped." + end + end + + def monitor_puma + monitor(:puma_dead?, "Detected Puma has gone away, stopping SQS Poller...") + end + + def monitor_sqs_poller + monitor(:sqs_poller_dead?, "Detected SQS Poller has gone away, stopping Puma...") + end + + def monitor(process_dead, message) + loop do + if send(process_dead) + log message + Process.kill(:INT, $$) + break + end + sleep 2 + end + end + + def sqs_poller_dead? + if sqs_poller_started? + Process.waitpid(sqs_poller_pid, Process::WNOHANG) + end + false + rescue Errno::ECHILD, Errno::ESRCH + true + end + + def sqs_poller_started? + sqs_poller_pid.present? + end + + def puma_dead? + Process.ppid != puma_pid + end + + def log(...) + log_writer.log(...) + end + end + end + end +end + +Puma::Plugin.create do + include Puma::Plugin::AwsActivejobSqs::PluginInstanceMethods +end