Skip to content

Commit ae2d84d

Browse files
committed
Run non-standalone supervisor in a separate thread
Otherwise `supervise` will block and we won't be able to stop it externally (from the Puma plugin).
1 parent 064d30c commit ae2d84d

File tree

7 files changed

+82
-72
lines changed

7 files changed

+82
-72
lines changed

lib/solid_queue/app_executor.rb

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,5 +17,15 @@ def handle_thread_error(error)
1717
SolidQueue.on_thread_error.call(error)
1818
end
1919
end
20+
21+
def create_thread(&block)
22+
Thread.new do
23+
Thread.current.name = name
24+
block.call
25+
rescue Exception => exception
26+
handle_thread_error(exception)
27+
raise
28+
end
29+
end
2030
end
2131
end

lib/solid_queue/async_supervisor.rb

Lines changed: 32 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -2,36 +2,47 @@
22

33
module SolidQueue
44
class AsyncSupervisor < Supervisor
5+
def stop
6+
super
7+
@thread&.join
8+
end
9+
510
private
11+
def supervise
12+
if standalone? then super
13+
else
14+
@thread = create_thread { super }
15+
end
16+
end
617

7-
def check_and_replace_terminated_processes
8-
terminated_threads = process_instances.select { |thread_id, instance| !instance.alive? }
9-
terminated_threads.each { |thread_id, instance| replace_thread(thread_id, instance) }
10-
end
18+
def check_and_replace_terminated_processes
19+
terminated_threads = process_instances.select { |thread_id, instance| !instance.alive? }
20+
terminated_threads.each { |thread_id, instance| replace_thread(thread_id, instance) }
21+
end
1122

12-
def replace_thread(thread_id, instance)
13-
SolidQueue.instrument(:replace_thread, supervisor_pid: ::Process.pid) do |payload|
14-
payload[:thread] = instance
23+
def replace_thread(thread_id, instance)
24+
SolidQueue.instrument(:replace_thread, supervisor_pid: ::Process.pid) do |payload|
25+
payload[:thread] = instance
1526

16-
error = Processes::ThreadTerminatedError.new(terminated_instance.name)
17-
release_claimed_jobs_by(terminated_instance, with_error: error)
27+
error = Processes::ThreadTerminatedError.new(terminated_instance.name)
28+
release_claimed_jobs_by(terminated_instance, with_error: error)
1829

19-
start_process(configured_processes.delete(thread_id))
30+
start_process(configured_processes.delete(thread_id))
31+
end
2032
end
21-
end
2233

23-
def perform_graceful_termination
24-
process_instances.values.each(&:stop)
34+
def perform_graceful_termination
35+
process_instances.values.each(&:stop)
2536

26-
Timer.wait_until(SolidQueue.shutdown_timeout, -> { all_processes_terminated? })
27-
end
37+
Timer.wait_until(SolidQueue.shutdown_timeout, -> { all_processes_terminated? })
38+
end
2839

29-
def perform_immediate_termination
30-
exit!
31-
end
40+
def perform_immediate_termination
41+
exit!
42+
end
3243

33-
def all_processes_terminated?
34-
process_instances.values.none?(&:alive?)
35-
end
44+
def all_processes_terminated?
45+
process_instances.values.none?(&:alive?)
46+
end
3647
end
3748
end

lib/solid_queue/dispatcher.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
module SolidQueue
44
class Dispatcher < Processes::Poller
55
include LifecycleHooks
6+
67
attr_reader :batch_size
78

89
after_boot :run_start_hooks

lib/solid_queue/processes/runnable.rb

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -92,16 +92,5 @@ def running_async?
9292
def running_as_fork?
9393
mode.fork?
9494
end
95-
96-
97-
def create_thread(&block)
98-
Thread.new do
99-
Thread.current.name = name
100-
block.call
101-
rescue Exception => exception
102-
handle_thread_error(exception)
103-
raise
104-
end
105-
end
10695
end
10796
end

test/integration/instrumentation_test.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,7 @@ class InstrumentationTest < ActiveSupport::TestCase
171171
wait_for_registered_processes(1, timeout: 1.second)
172172

173173
worker.stop
174+
wait_for_registered_processes(0, timeout: 1.second)
174175
end
175176
end
176177

test/unit/async_supervisor_test.rb

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,14 @@ class AsyncSupervisorTest < ActiveSupport::TestCase
44
self.use_transactional_tests = false
55

66
test "start as non-standalone" do
7-
supervisor, thread = run_supervisor_as_thread
7+
supervisor = run_supervisor_as_thread
88
wait_for_registered_processes(4)
99

1010
assert_registered_processes(kind: "Supervisor(async)")
1111
assert_registered_processes(kind: "Worker", supervisor_id: supervisor.process_id, count: 2)
1212
assert_registered_processes(kind: "Dispatcher", supervisor_id: supervisor.process_id)
1313

1414
supervisor.stop
15-
thread.join
1615

1716
assert_no_registered_processes
1817
end
@@ -30,15 +29,14 @@ class AsyncSupervisorTest < ActiveSupport::TestCase
3029
end
3130

3231
test "start as non-standalone with provided configuration" do
33-
supervisor, thread = run_supervisor_as_thread(workers: [], dispatchers: [ { batch_size: 100 } ])
32+
supervisor = run_supervisor_as_thread(workers: [], dispatchers: [ { batch_size: 100 } ])
3433
wait_for_registered_processes(2) # supervisor + dispatcher
3534

3635
assert_registered_processes(kind: "Supervisor(async)")
3736
assert_registered_processes(kind: "Worker", count: 0)
3837
assert_registered_processes(kind: "Dispatcher", supervisor_id: supervisor.process_id)
3938

4039
supervisor.stop
41-
thread.join
4240

4341
assert_no_registered_processes
4442
end
@@ -51,14 +49,13 @@ class AsyncSupervisorTest < ActiveSupport::TestCase
5149
dispatchers: []
5250
}
5351

54-
supervisor, thread = run_supervisor_as_thread(**config)
52+
supervisor = run_supervisor_as_thread(**config)
5553
wait_for_registered_processes(2) # supervisor + 1 worker
5654
assert_registered_processes(kind: "Supervisor(async)")
5755

5856
wait_while_with_timeout(1.second) { SolidQueue::ClaimedExecution.count > 0 }
5957

6058
supervisor.stop
61-
thread.join
6259

6360
skip_active_record_query_cache do
6461
assert_equal 0, SolidQueue::ClaimedExecution.count
@@ -89,13 +86,8 @@ class AsyncSupervisorTest < ActiveSupport::TestCase
8986
end
9087

9188
private
92-
def run_supervisor_as_thread(**kwargs)
93-
configuration = SolidQueue::Configuration.new(mode: :async, standalone: false, **kwargs)
94-
supervisor = SolidQueue::AsyncSupervisor.new(configuration)
95-
96-
thread = Thread.new { supervisor.start }
97-
98-
[ supervisor, thread ]
89+
def run_supervisor_as_thread(**options)
90+
SolidQueue::Supervisor.start(mode: :async, standalone: false, **options)
9991
end
10092

10193
def simulate_orphaned_executions(count)

test/unit/dispatcher_test.rb

Lines changed: 33 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ class DispatcherTest < ActiveSupport::TestCase
1212

1313
teardown do
1414
@dispatcher.stop
15-
wait_for_registered_processes(0, timeout: 2.seconds)
1615
end
1716

1817
test "dispatcher is registered as process" do
@@ -76,52 +75,59 @@ class DispatcherTest < ActiveSupport::TestCase
7675
assert_no_registered_processes
7776
end
7877

79-
test "dispatch scheduled executions" do
80-
skip_active_record_query_cache do
81-
15.times do
82-
AddToBufferJob.set(wait: 0.5.second).perform_later("I'm scheduled")
83-
end
84-
sleep 0.5.second
85-
assert_equal 15, SolidQueue::ScheduledExecution.count
78+
test "run more than one instance of the dispatcher" do
79+
15.times do
80+
AddToBufferJob.set(wait: 0.5.second).perform_later("I'm scheduled")
81+
end
82+
sleep 0.5.second
83+
assert_equal 15, SolidQueue::ScheduledExecution.count
8684

87-
@dispatcher.start
88-
wait_for_registered_processes(1, timeout: 2.seconds)
85+
another_dispatcher = SolidQueue::Dispatcher.new(polling_interval: 0.1, batch_size: 10)
8986

90-
wait_while_with_timeout(3.seconds) { SolidQueue::ScheduledExecution.any? }
87+
@dispatcher.start
88+
another_dispatcher.start
89+
90+
wait_while_with_timeout(1.second) { SolidQueue::ScheduledExecution.any? }
9191

92+
skip_active_record_query_cache do
9293
assert_equal 0, SolidQueue::ScheduledExecution.count
9394
assert_equal 15, SolidQueue::ReadyExecution.count
9495
end
96+
ensure
97+
another_dispatcher&.stop
9598
end
9699

97100
test "sleeps `0.seconds` between polls if there are ready to dispatch jobs" do
98-
skip_active_record_query_cache do
99-
@dispatcher = SolidQueue::Dispatcher.new(polling_interval: 10, batch_size: 1)
100-
@dispatcher.expects(:interruptible_sleep).with(0.seconds).at_least(3)
101-
@dispatcher.expects(:interruptible_sleep).with(@dispatcher.polling_interval).at_least_once
102-
@dispatcher.expects(:handle_thread_error).never
101+
dispatcher = SolidQueue::Dispatcher.new(polling_interval: 10, batch_size: 1)
102+
dispatcher.expects(:interruptible_sleep).with(0.seconds).at_least(3)
103+
dispatcher.expects(:interruptible_sleep).with(dispatcher.polling_interval).at_least_once
104+
dispatcher.expects(:handle_thread_error).never
103105

104-
3.times { AddToBufferJob.set(wait: 0.5.second).perform_later("I'm scheduled") }
105-
sleep 0.5.second
106-
assert_equal 3, SolidQueue::ScheduledExecution.count
106+
3.times { AddToBufferJob.set(wait: 0.5.second).perform_later("I'm scheduled") }
107+
sleep 0.5.second
108+
assert_equal 3, SolidQueue::ScheduledExecution.count
107109

108-
@dispatcher.start
109-
wait_for_registered_processes(1, timeout: 2.seconds)
110-
wait_while_with_timeout(3.seconds) { SolidQueue::ScheduledExecution.any? }
110+
dispatcher.start
111+
wait_while_with_timeout(1.second) { SolidQueue::ScheduledExecution.any? }
111112

113+
skip_active_record_query_cache do
112114
assert_equal 0, SolidQueue::ScheduledExecution.count
113115
assert_equal 3, SolidQueue::ReadyExecution.count
114116
end
117+
ensure
118+
dispatcher.stop
115119
end
116120

117121
test "sleeps `polling_interval` between polls if there are no un-dispatched jobs" do
118-
@dispatcher = SolidQueue::Dispatcher.new(polling_interval: 10, batch_size: 1)
119-
@dispatcher.expects(:interruptible_sleep).with(0.seconds).never
120-
@dispatcher.expects(:interruptible_sleep).with(@dispatcher.polling_interval).at_least_once
121-
@dispatcher.expects(:handle_thread_error).never
122+
dispatcher = SolidQueue::Dispatcher.new(polling_interval: 10, batch_size: 1)
123+
dispatcher.expects(:interruptible_sleep).with(0.seconds).never
124+
dispatcher.expects(:interruptible_sleep).with(dispatcher.polling_interval).at_least_once
125+
dispatcher.expects(:handle_thread_error).never
122126

123-
@dispatcher.start
127+
dispatcher.start
124128
wait_while_with_timeout(1.second) { !SolidQueue::ScheduledExecution.exists? }
129+
ensure
130+
dispatcher.stop
125131
end
126132

127133
private

0 commit comments

Comments
 (0)