Skip to content

Commit 167122d

Browse files
committed
Add puma plugin to run SQS Poller
1 parent 0fccf8e commit 167122d

File tree

2 files changed

+94
-0
lines changed

2 files changed

+94
-0
lines changed

config/puma.rb

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,3 +42,8 @@
4242
persistent_timeout ENV.fetch("RAILS_PERSISTENT_TIMEOUT", 75)
4343

4444
plugin :solid_queue if Rails.env.production?
45+
plugin :aws_activejob_sqs if Rails.env.production?
46+
47+
Puma::Plugins::AwsActiveJobSQS.sqs_poller_options = {
48+
queues: %i[deliveries bounces_and_complaints],
49+
}
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
# TODO: contribute this back to the aws-sdk-rails gem
2+
require "puma/plugin"
3+
4+
Puma::Plugin.create do
5+
class << self
6+
attr_writer :sqs_poller_options
7+
8+
def sqs_poller_options
9+
@sqs_poller_options ||= {}
10+
end
11+
end
12+
13+
attr_reader :puma_pid, :sqs_poller_pid, :log_writer
14+
15+
def start(launcher)
16+
@log_writer = launcher.log_writer
17+
@puma_pid = $$
18+
19+
in_background do
20+
monitor_sqs_poller
21+
end
22+
23+
launcher.events.on_booted do
24+
# TODO: there should be separate process per queue
25+
@sqs_poller_pid = fork do
26+
Thread.new { monitor_puma }
27+
Aws::ActiveJob::SQS::Poller.new(self.class.sqs_poller_options.to_h.compact).run
28+
end
29+
end
30+
31+
launcher.events.on_stopped { stop_sqs_poller }
32+
launcher.events.on_restart { stop_sqs_poller }
33+
end
34+
35+
private
36+
37+
def stop_sqs_poller
38+
return unless sqs_poller_pid
39+
40+
begin
41+
Process.waitpid(sqs_poller_pid, Process::WNOHANG)
42+
log "Stopping SQS Poller..."
43+
Process.kill(:INT, sqs_poller_pid)
44+
Process.wait(sqs_poller_pid)
45+
rescue Errno::ECHILD, Errno::ESRCH
46+
log "SQS Poller process not found or already stopped."
47+
end
48+
end
49+
50+
def monitor_puma
51+
monitor(:puma_dead?, "Detected Puma has gone away, stopping SQS Poller...")
52+
end
53+
54+
def monitor_sqs_poller
55+
monitor(:sqs_poller_dead?, "Detected SQS Poller has gone away, stopping Puma...")
56+
end
57+
58+
def monitor(process_dead, message)
59+
loop do
60+
if send(process_dead)
61+
log message
62+
Process.kill(:INT, $$)
63+
break
64+
end
65+
sleep 2
66+
end
67+
end
68+
69+
def sqs_poller_dead?
70+
if sqs_poller_started?
71+
Process.waitpid(sqs_poller_pid, Process::WNOHANG)
72+
end
73+
false
74+
rescue Errno::ECHILD, Errno::ESRCH
75+
true
76+
end
77+
78+
def sqs_poller_started?
79+
sqs_poller_pid.present?
80+
end
81+
82+
def puma_dead?
83+
Process.ppid != puma_pid
84+
end
85+
86+
def log(...)
87+
log_writer.log(...)
88+
end
89+
end

0 commit comments

Comments
 (0)