Skip to content

Commit 9b49ce0

Browse files
committed
Implement a way to run the async supervisor as a separate process
Still running all its supervised processes as threads, but now can be run separately from Puma.
1 parent 353ace9 commit 9b49ce0

File tree

10 files changed

+324
-20
lines changed

10 files changed

+324
-20
lines changed

lib/puma/plugin/solid_queue.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ def start_forked(launcher)
4040
end
4141

4242
def start_async(launcher)
43-
launcher.events.on_booted { @solid_queue_supervisor = SolidQueue::Supervisor.start(mode: :async) }
43+
launcher.events.on_booted { @solid_queue_supervisor = SolidQueue::Supervisor.start(mode: :async, sidecar: true) }
4444
launcher.events.on_stopped { solid_queue_supervisor.stop }
4545
launcher.events.on_restart { solid_queue_supervisor.stop; solid_queue_supervisor.start }
4646
end

lib/solid_queue/supervisor.rb

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,23 +2,23 @@
22

33
module SolidQueue
44
class Supervisor < Processes::Base
5-
include Maintenance
5+
include Maintenance, Signals, Pidfiled
66

77
class << self
8-
def start(mode: :fork, load_configuration_from: nil)
8+
def start(mode: "fork", load_configuration_from: nil, **options)
99
SolidQueue.supervisor = true
1010
configuration = Configuration.new(mode: mode, load_from: load_configuration_from)
1111

1212
if configuration.configured_processes.any?
13-
klass = mode == :fork ? ForkSupervisor : AsyncSupervisor
14-
klass.new(configuration).tap(&:start)
13+
klass = mode.to_s.inquiry.fork? ? ForkSupervisor : AsyncSupervisor
14+
klass.new(configuration, **options).tap(&:start)
1515
else
1616
abort "No workers or processed configured. Exiting..."
1717
end
1818
end
1919
end
2020

21-
def initialize(configuration)
21+
def initialize(configuration, **options)
2222
@configuration = configuration
2323
super
2424
end

lib/solid_queue/supervisor/async_supervisor.rb

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,12 @@
22

33
module SolidQueue
44
class Supervisor::AsyncSupervisor < Supervisor
5-
def initialize(*)
5+
skip_callback :boot, :before, :register_signal_handlers, if: :sidecar?
6+
7+
def initialize(*, sidecar: false)
68
super
9+
10+
@sidecar = sidecar
711
@threads = Concurrent::Map.new
812
end
913

@@ -22,6 +26,10 @@ def stop
2226
private
2327
attr_reader :threads
2428

29+
def sidecar?
30+
@sidecar
31+
end
32+
2533
def start_process(configured_process)
2634
process_instance = configured_process.instantiate.tap do |instance|
2735
instance.supervised_by process
@@ -32,6 +40,19 @@ def start_process(configured_process)
3240
threads[process_instance.name] = process_instance
3341
end
3442

43+
def supervise
44+
unless sidecar?
45+
loop do
46+
break if stopped?
47+
48+
procline "supervising #{threads.keys.join(", ")}"
49+
process_signal_queue
50+
51+
interruptible_sleep(10.second) unless stopped?
52+
end
53+
end
54+
end
55+
3556
def stop_threads
3657
stop_threads = threads.values.map do |thr|
3758
Thread.new { thr.stop }
@@ -40,6 +61,21 @@ def stop_threads
4061
stop_threads.each { |thr| thr.join(SolidQueue.shutdown_timeout) }
4162
end
4263

64+
def terminate_gracefully
65+
SolidQueue.instrument(:graceful_termination, process_id: process_id, supervisor_pid: ::Process.pid, supervised_processes: threads.keys) do |payload|
66+
unless all_threads_terminated?
67+
payload[:shutdown_timeout_exceeded] = true
68+
terminate_immediately
69+
end
70+
end
71+
end
72+
73+
def terminate_immediately
74+
SolidQueue.instrument(:immediate_termination, process_id: process_id, supervisor_pid: ::Process.pid, supervised_processes: threads.keys) do
75+
exit!
76+
end
77+
end
78+
4379
def all_threads_terminated?
4480
threads.values.none?(&:alive?)
4581
end

lib/solid_queue/supervisor/fork_supervisor.rb

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,6 @@
22

33
module SolidQueue
44
class Supervisor::ForkSupervisor < Supervisor
5-
include Signals, Pidfiled
6-
75
def initialize(*)
86
super
97

test/dummy/app/jobs/store_result_job.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
class StoreResultJob < ApplicationJob
22
queue_as :background
33

4-
def perform(value, status: :completed, pause: nil, exception: nil, exit: nil)
4+
def perform(value, status: :completed, pause: nil, exception: nil, exit_value: nil)
55
result = JobResult.create!(queue_name: queue_name, status: "started", value: value)
66

77
sleep(pause) if pause
88
raise exception.new if exception
9-
exit! if exit
9+
exit!(exit_value) if exit_value
1010

1111
result.update!(status: status)
1212
end
Lines changed: 250 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,250 @@
1+
# frozen_string_literal: true
2+
3+
require "test_helper"
4+
5+
class AsyncProcessesLifecycleTest < ActiveSupport::TestCase
6+
self.use_transactional_tests = false
7+
8+
setup do
9+
config_as_hash = { workers: [ { queues: :background }, { queues: :default, threads: 5 } ], dispatchers: [] }
10+
@pid = run_supervisor_as_fork(load_configuration_from: config_as_hash, mode: :async)
11+
12+
wait_for_registered_processes(3, timeout: 3.second)
13+
assert_registered_workers_for(:background, :default, supervisor_pid: @pid)
14+
end
15+
16+
teardown do
17+
terminate_process(@pid) if process_exists?(@pid)
18+
end
19+
20+
test "enqueue jobs in multiple queues" do
21+
6.times { |i| enqueue_store_result_job("job_#{i}") }
22+
6.times { |i| enqueue_store_result_job("job_#{i}", :default) }
23+
24+
wait_for_jobs_to_finish_for(2.seconds)
25+
26+
assert_equal 12, JobResult.count
27+
6.times { |i| assert_completed_job_results("job_#{i}", :background) }
28+
6.times { |i| assert_completed_job_results("job_#{i}", :default) }
29+
30+
terminate_process(@pid)
31+
assert_clean_termination
32+
end
33+
34+
test "kill supervisor while there are jobs in-flight" do
35+
no_pause = enqueue_store_result_job("no pause")
36+
pause = enqueue_store_result_job("pause", pause: 0.2.seconds)
37+
38+
signal_process(@pid, :KILL, wait: 0.15.seconds)
39+
wait_for_jobs_to_finish_for(2.seconds)
40+
wait_for_registered_processes(1, timeout: 3.second)
41+
42+
assert_not process_exists?(@pid)
43+
44+
assert_completed_job_results("no pause")
45+
assert_job_status(no_pause, :finished)
46+
47+
# Nothing had the chance to finish orderly
48+
assert_registered_supervisor
49+
assert_registered_workers_for(:background, :default, supervisor_pid: @pid)
50+
assert_started_job_result("pause")
51+
assert_claimed_jobs
52+
end
53+
54+
test "term supervisor multiple times" do
55+
5.times do
56+
signal_process(@pid, :TERM, wait: 0.1.second)
57+
end
58+
59+
sleep(1.second)
60+
assert_clean_termination
61+
end
62+
63+
test "quit supervisor while there are jobs in-flight" do
64+
no_pause = enqueue_store_result_job("no pause")
65+
pause = enqueue_store_result_job("pause", pause: 1.second)
66+
67+
signal_process(@pid, :QUIT, wait: 0.4.second)
68+
wait_for_jobs_to_finish_for(2.seconds)
69+
70+
wait_while_with_timeout(2.seconds) { process_exists?(@pid) }
71+
assert_not process_exists?(@pid)
72+
73+
assert_no_unfinished_jobs
74+
assert_clean_termination
75+
end
76+
77+
test "term supervisor while there are jobs in-flight" do
78+
no_pause = enqueue_store_result_job("no pause")
79+
pause = enqueue_store_result_job("pause", pause: 0.2.seconds)
80+
81+
signal_process(@pid, :TERM, wait: 0.3.second)
82+
wait_for_jobs_to_finish_for(3.seconds)
83+
84+
assert_completed_job_results("no pause")
85+
assert_completed_job_results("pause")
86+
87+
assert_job_status(no_pause, :finished)
88+
assert_job_status(pause, :finished)
89+
90+
wait_for_process_termination_with_timeout(@pid, timeout: 1.second)
91+
assert_clean_termination
92+
end
93+
94+
test "int supervisor while there are jobs in-flight" do
95+
no_pause = enqueue_store_result_job("no pause")
96+
pause = enqueue_store_result_job("pause", pause: 0.2.seconds)
97+
98+
signal_process(@pid, :INT, wait: 0.3.second)
99+
wait_for_jobs_to_finish_for(2.second)
100+
101+
assert_completed_job_results("no pause")
102+
assert_completed_job_results("pause")
103+
104+
assert_job_status(no_pause, :finished)
105+
assert_job_status(pause, :finished)
106+
107+
wait_for_process_termination_with_timeout(@pid, timeout: 1.second)
108+
assert_clean_termination
109+
end
110+
111+
test "term supervisor exceeding timeout while there are jobs in-flight" do
112+
no_pause = enqueue_store_result_job("no pause")
113+
pause = enqueue_store_result_job("pause", pause: SolidQueue.shutdown_timeout + 10.second)
114+
115+
signal_process(@pid, :TERM, wait: 0.5.second)
116+
117+
sleep(SolidQueue.shutdown_timeout + 0.5.second)
118+
119+
assert_completed_job_results("no pause")
120+
assert_job_status(no_pause, :finished)
121+
122+
# This job was left claimed as the worker was shutdown without
123+
# a chance to terminate orderly
124+
assert_started_job_result("pause")
125+
assert_job_status(pause, :claimed)
126+
127+
# Now wait until the supervisor finishes for real, which will complete the cleanup
128+
wait_for_process_termination_with_timeout(@pid, timeout: 1.second)
129+
assert_clean_termination
130+
end
131+
132+
test "process some jobs that raise errors" do
133+
2.times { enqueue_store_result_job("no error", :background) }
134+
2.times { enqueue_store_result_job("no error", :default) }
135+
error1 = enqueue_store_result_job("error", :background, exception: RuntimeError)
136+
enqueue_store_result_job("no error", :background, pause: 0.03)
137+
error2 = enqueue_store_result_job("error", :background, exception: RuntimeError, pause: 0.05)
138+
2.times { enqueue_store_result_job("no error", :default, pause: 0.01) }
139+
error3 = enqueue_store_result_job("error", :default, exception: RuntimeError)
140+
141+
wait_for_jobs_to_finish_for(2.second, except: [ error1, error2, error3 ])
142+
143+
assert_completed_job_results("no error", :background, 3)
144+
assert_completed_job_results("no error", :default, 4)
145+
146+
wait_while_with_timeout(1.second) { SolidQueue::FailedExecution.count < 3 }
147+
[ error1, error2, error3 ].each do |job|
148+
assert_job_status(job, :failed)
149+
end
150+
151+
terminate_process(@pid)
152+
assert_clean_termination
153+
end
154+
155+
test "process a job that exits" do
156+
2.times do
157+
enqueue_store_result_job("no exit", :background)
158+
enqueue_store_result_job("no exit", :default)
159+
end
160+
paused_no_exit = enqueue_store_result_job("paused no exit", :default, pause: 0.5)
161+
exit_job = enqueue_store_result_job("exit", :background, exit_value: 9, pause: 0.2)
162+
pause_job = enqueue_store_result_job("exit", :background, pause: 0.3)
163+
164+
2.times { enqueue_store_result_job("no exit", :background) }
165+
166+
wait_for_jobs_to_finish_for(3.seconds, except: [ exit_job, pause_job, paused_no_exit ])
167+
168+
assert_completed_job_results("no exit", :default, 2)
169+
assert_completed_job_results("no exit", :background, 4)
170+
171+
# Everything exited because of the exit job, leaving all jobs that ran
172+
# after it claimed
173+
[ exit_job, pause_job, paused_no_exit ].each do |job|
174+
assert_job_status(job, :claimed)
175+
end
176+
177+
wait_for_process_termination_with_timeout(@pid, exitstatus: 9)
178+
assert_not process_exists?(@pid)
179+
180+
# Starting a supervisor again will clean things up
181+
# Claimed jobs will be marked as failed and processes will be pruned
182+
# Simulate time passing to expire heartbeats
183+
SolidQueue::Process.update_all(last_heartbeat_at: 1.hour.ago)
184+
@pid = run_supervisor_as_fork(mode: :async)
185+
sleep(10)
186+
187+
[ exit_job, pause_job, paused_no_exit ].each do |job|
188+
assert_job_status(job, :failed)
189+
end
190+
191+
terminate_process(@pid)
192+
assert_clean_termination
193+
end
194+
195+
196+
private
197+
def assert_clean_termination
198+
wait_for_registered_processes 0, timeout: 0.2.second
199+
assert_no_registered_processes
200+
assert_no_claimed_jobs
201+
assert_not process_exists?(@pid)
202+
end
203+
204+
def assert_registered_workers_for(*queues, supervisor_pid: nil)
205+
workers = find_processes_registered_as("Worker")
206+
registered_queues = workers.map { |process| process.metadata["queues"] }.compact
207+
assert_equal queues.map(&:to_s).sort, registered_queues.sort
208+
if supervisor_pid
209+
assert_equal [ supervisor_pid ], workers.map { |process| process.supervisor.pid }.uniq
210+
end
211+
end
212+
213+
def assert_registered_supervisor
214+
processes = find_processes_registered_as("Supervisor(async)")
215+
assert_equal 1, processes.count
216+
assert_equal @pid, processes.first.pid
217+
end
218+
219+
def assert_no_registered_workers
220+
assert_empty find_processes_registered_as("Worker").to_a
221+
end
222+
223+
def enqueue_store_result_job(value, queue_name = :background, **options)
224+
StoreResultJob.set(queue: queue_name).perform_later(value, **options)
225+
end
226+
227+
def assert_completed_job_results(value, queue_name = :background, count = 1)
228+
skip_active_record_query_cache do
229+
assert_equal count, JobResult.where(queue_name: queue_name, status: "completed", value: value).count
230+
end
231+
end
232+
233+
def assert_started_job_result(value, queue_name = :background, count = 1)
234+
skip_active_record_query_cache do
235+
assert_equal count, JobResult.where(queue_name: queue_name, status: "started", value: value).count
236+
end
237+
end
238+
239+
def assert_job_status(active_job, status)
240+
# Make sure we skip AR query cache. Otherwise the queries done here
241+
# might be cached and since we haven't done any non-SELECT queries
242+
# after they were cached on the connection used in the test, the cache
243+
# will still apply, even though the data returned by the cached queries
244+
# might have been deleted in the forked processes.
245+
skip_active_record_query_cache do
246+
job = SolidQueue::Job.find_by(active_job_id: active_job.job_id)
247+
assert job.public_send("#{status}?")
248+
end
249+
end
250+
end

test/integration/forked_processes_lifecycle_test.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ class ForkedProcessesLifecycleTest < ActiveSupport::TestCase
170170
enqueue_store_result_job("no exit", :default)
171171
end
172172
enqueue_store_result_job("paused no exit", :default, pause: 0.5)
173-
exit_job = enqueue_store_result_job("exit", :background, exit: true, pause: 0.2)
173+
exit_job = enqueue_store_result_job("exit", :background, exit_value: 1, pause: 0.2)
174174
pause_job = enqueue_store_result_job("exit", :background, pause: 0.3)
175175

176176
2.times { enqueue_store_result_job("no exit", :background) }

test/test_helpers/jobs_test_helper.rb

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,4 +28,10 @@ def assert_no_claimed_jobs
2828
assert SolidQueue::ClaimedExecution.none?
2929
end
3030
end
31+
32+
def assert_claimed_jobs(count = 1)
33+
skip_active_record_query_cache do
34+
assert_equal count, SolidQueue::ClaimedExecution.count
35+
end
36+
end
3137
end

0 commit comments

Comments
 (0)