Skip to content

Commit 030cc49

Browse files
committed
Run non-standalone supervisor in a separate thread
Otherwise `supervise` will block and we won't be able to stop it externally (from the Puma plugin).
1 parent 064d30c commit 030cc49

File tree

4 files changed

+43
-32
lines changed

4 files changed

+43
-32
lines changed

lib/solid_queue/app_executor.rb

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,5 +17,15 @@ def handle_thread_error(error)
1717
SolidQueue.on_thread_error.call(error)
1818
end
1919
end
20+
21+
def create_thread(&block)
22+
Thread.new do
23+
Thread.current.name = name
24+
block.call
25+
rescue Exception => exception
26+
handle_thread_error(exception)
27+
raise
28+
end
29+
end
2030
end
2131
end

lib/solid_queue/async_supervisor.rb

Lines changed: 32 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -2,36 +2,47 @@
22

33
module SolidQueue
44
class AsyncSupervisor < Supervisor
5+
def stop
6+
super
7+
@thread&.join
8+
end
9+
510
private
11+
def supervise
12+
if standalone? then super
13+
else
14+
@thread = create_thread { super }
15+
end
16+
end
617

7-
def check_and_replace_terminated_processes
8-
terminated_threads = process_instances.select { |thread_id, instance| !instance.alive? }
9-
terminated_threads.each { |thread_id, instance| replace_thread(thread_id, instance) }
10-
end
18+
def check_and_replace_terminated_processes
19+
terminated_threads = process_instances.select { |thread_id, instance| !instance.alive? }
20+
terminated_threads.each { |thread_id, instance| replace_thread(thread_id, instance) }
21+
end
1122

12-
def replace_thread(thread_id, instance)
13-
SolidQueue.instrument(:replace_thread, supervisor_pid: ::Process.pid) do |payload|
14-
payload[:thread] = instance
23+
def replace_thread(thread_id, instance)
24+
SolidQueue.instrument(:replace_thread, supervisor_pid: ::Process.pid) do |payload|
25+
payload[:thread] = instance
1526

16-
error = Processes::ThreadTerminatedError.new(terminated_instance.name)
17-
release_claimed_jobs_by(terminated_instance, with_error: error)
27+
error = Processes::ThreadTerminatedError.new(terminated_instance.name)
28+
release_claimed_jobs_by(terminated_instance, with_error: error)
1829

19-
start_process(configured_processes.delete(thread_id))
30+
start_process(configured_processes.delete(thread_id))
31+
end
2032
end
21-
end
2233

23-
def perform_graceful_termination
24-
process_instances.values.each(&:stop)
34+
def perform_graceful_termination
35+
process_instances.values.each(&:stop)
2536

26-
Timer.wait_until(SolidQueue.shutdown_timeout, -> { all_processes_terminated? })
27-
end
37+
Timer.wait_until(SolidQueue.shutdown_timeout, -> { all_processes_terminated? })
38+
end
2839

29-
def perform_immediate_termination
30-
exit!
31-
end
40+
def perform_immediate_termination
41+
exit!
42+
end
3243

33-
def all_processes_terminated?
34-
process_instances.values.none?(&:alive?)
35-
end
44+
def all_processes_terminated?
45+
process_instances.values.none?(&:alive?)
46+
end
3647
end
3748
end

lib/solid_queue/dispatcher.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
module SolidQueue
44
class Dispatcher < Processes::Poller
55
include LifecycleHooks
6+
67
attr_reader :batch_size
78

89
after_boot :run_start_hooks

lib/solid_queue/processes/runnable.rb

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -92,16 +92,5 @@ def running_async?
9292
def running_as_fork?
9393
mode.fork?
9494
end
95-
96-
97-
def create_thread(&block)
98-
Thread.new do
99-
Thread.current.name = name
100-
block.call
101-
rescue Exception => exception
102-
handle_thread_error(exception)
103-
raise
104-
end
105-
end
10695
end
10796
end

0 commit comments

Comments
 (0)