Skip to content

Commit fa32d56

Browse files
committed
Add puma plugin to run SQS Poller
1 parent 0fccf8e commit fa32d56

File tree

2 files changed

+104
-0
lines changed

2 files changed

+104
-0
lines changed

config/puma.rb

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,3 +42,8 @@
4242
persistent_timeout ENV.fetch("RAILS_PERSISTENT_TIMEOUT", 75)
4343

4444
plugin :solid_queue if Rails.env.production?
45+
plugin :aws_activejob_sqs if Rails.env.production?
46+
47+
Puma::Plugins::AwsActiveJobSQS.sqs_poller_options = {
48+
queues: %i[deliveries bounces_and_complaints],
49+
}
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
# TODO: contribute this back to the aws-sdk-rails gem
2+
require "puma/plugin"
3+
4+
module Puma
5+
module Plugin
6+
module AwsActivejobSqs
7+
module PluginInstanceMethods
8+
class << self
9+
attr_writer :sqs_poller_options
10+
11+
def sqs_poller_options
12+
@sqs_poller_options ||= {}
13+
end
14+
end
15+
16+
attr_reader :puma_pid, :sqs_poller_pid, :log_writer
17+
18+
def start(launcher)
19+
@log_writer = launcher.log_writer
20+
@puma_pid = $$
21+
22+
in_background do
23+
monitor_sqs_poller
24+
end
25+
26+
launcher.events.on_booted do
27+
# TODO: there should be separate process per queue
28+
@sqs_poller_pid = fork do
29+
Thread.new { monitor_puma }
30+
Aws::ActiveJob::SQS::Poller.new(self.class.sqs_poller_options.to_h.compact).run
31+
end
32+
end
33+
34+
launcher.events.on_stopped { stop_sqs_poller }
35+
launcher.events.on_restart { stop_sqs_poller }
36+
end
37+
38+
private
39+
40+
def stop_sqs_poller
41+
return unless sqs_poller_pid
42+
43+
begin
44+
Process.waitpid(sqs_poller_pid, Process::WNOHANG)
45+
log "Stopping SQS Poller..."
46+
Process.kill(:INT, sqs_poller_pid)
47+
Process.wait(sqs_poller_pid)
48+
rescue Errno::ECHILD, Errno::ESRCH
49+
log "SQS Poller process not found or already stopped."
50+
end
51+
end
52+
53+
def monitor_puma
54+
monitor(:puma_dead?, "Detected Puma has gone away, stopping SQS Poller...")
55+
end
56+
57+
def monitor_sqs_poller
58+
monitor(:sqs_poller_dead?, "Detected SQS Poller has gone away, stopping Puma...")
59+
end
60+
61+
def monitor(process_dead, message)
62+
loop do
63+
if send(process_dead)
64+
log message
65+
Process.kill(:INT, $$)
66+
break
67+
end
68+
sleep 2
69+
end
70+
end
71+
72+
def sqs_poller_dead?
73+
if sqs_poller_started?
74+
Process.waitpid(sqs_poller_pid, Process::WNOHANG)
75+
end
76+
false
77+
rescue Errno::ECHILD, Errno::ESRCH
78+
true
79+
end
80+
81+
def sqs_poller_started?
82+
sqs_poller_pid.present?
83+
end
84+
85+
def puma_dead?
86+
Process.ppid != puma_pid
87+
end
88+
89+
def log(...)
90+
log_writer.log(...)
91+
end
92+
end
93+
end
94+
end
95+
end
96+
97+
Puma::Plugin.create do
98+
include Puma::Plugin::AwsActivejobSqs::PluginInstanceMethods
99+
end

0 commit comments

Comments
 (0)