Skip to content

Commit 601266e

Browse files
committed
Extract rest of forking behaviour from Supervisor to another subclass
1 parent faefbba commit 601266e

File tree

5 files changed

+103
-81
lines changed

5 files changed

+103
-81
lines changed

lib/solid_queue/supervisor.rb

Lines changed: 13 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,10 @@
22

33
module SolidQueue
44
class Supervisor < Processes::Base
5-
include Signals, Maintenance
5+
include Maintenance
6+
7+
class GracefulTerminationRequested < Interrupt; end
8+
class ImmediateTerminationRequested < Interrupt; end
69

710
class << self
811
def start(mode: :fork, load_configuration_from: nil)
@@ -15,7 +18,6 @@ def start(mode: :fork, load_configuration_from: nil)
1518

1619
def initialize(configuration)
1720
@configuration = configuration
18-
@processes = {}
1921
end
2022

2123
def start
@@ -28,80 +30,42 @@ def start
2830
rescue GracefulTerminationRequested
2931
terminate_gracefully
3032
rescue ImmediateTerminationRequested
31-
immediate_termination
33+
terminate_immediately
3234
ensure
3335
run_callbacks(:shutdown) { shutdown }
3436
end
3537

3638
private
37-
attr_reader :configuration, :processes
39+
attr_reader :configuration
3840

3941
def boot
4042
sync_std_streams
41-
setup_pidfile
42-
register_signal_handlers
4343
end
4444

4545
def start_processes
4646
configuration.processes.each { |configured_process| start_process(configured_process) }
4747
end
4848

4949
def supervise
50-
loop do
51-
procline "supervising #{processes.keys.join(", ")}"
52-
53-
process_signal_queue
54-
reap_and_replace_terminated_processes
55-
interruptible_sleep(1.second)
56-
end
57-
end
58-
59-
60-
def sync_std_streams
61-
STDOUT.sync = STDERR.sync = true
62-
end
63-
64-
def setup_pidfile
65-
if path = SolidQueue.supervisor_pidfile
66-
@pidfile = Pidfile.new(path).tap(&:setup)
67-
end
50+
raise NotImplementedError
6851
end
6952

70-
7153
def start_process(configured_process)
7254
raise NotImplementedError
7355
end
7456

75-
76-
def shutdown
77-
stop_maintenance_task
78-
restore_default_signal_handlers
79-
delete_pidfile
57+
def terminate_gracefully
8058
end
8159

82-
def terminate_gracefully
83-
SolidQueue.instrument(:graceful_termination, supervisor_pid: ::Process.pid, supervised_processes: processes.keys) do |payload|
84-
term_processes
85-
86-
Timer.wait_until(SolidQueue.shutdown_timeout, -> { all_processes_terminated? }) do
87-
reap_terminated_processes
88-
end
89-
90-
unless all_processes_terminated?
91-
payload[:shutdown_timeout_exceeded] = true
92-
immediate_termination
93-
end
94-
end
60+
def terminate_immediately
9561
end
9662

97-
def immediate_termination
98-
SolidQueue.instrument(:immediate_termination, supervisor_pid: ::Process.pid, supervised_processes: processes.keys) do
99-
quit_processes
100-
end
63+
def shutdown
64+
stop_maintenance_task
10165
end
10266

103-
def delete_pidfile
104-
@pidfile&.delete
67+
def sync_std_streams
68+
STDOUT.sync = STDERR.sync = true
10569
end
10670
end
10771
end

lib/solid_queue/supervisor/forks.rb

Lines changed: 64 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,33 @@
22

33
module SolidQueue
44
class Supervisor::Forks < Supervisor
5+
include Signals
6+
7+
before_boot :setup_pidfile
8+
after_shutdown :delete_pidfile
9+
10+
def initialize(*)
11+
super
12+
@forks = {}
13+
end
14+
515
def kind
616
"Supervisor(fork)"
717
end
818

919
private
20+
attr_reader :forks
21+
22+
def supervise
23+
loop do
24+
procline "supervising #{forks.keys.join(", ")}"
25+
26+
process_signal_queue
27+
reap_and_replace_terminated_forks
28+
interruptible_sleep(1.second)
29+
end
30+
end
31+
1032
def start_process(configured_process)
1133
configured_process.supervised_by process
1234
configured_process.mode = :fork
@@ -15,18 +37,39 @@ def start_process(configured_process)
1537
configured_process.start
1638
end
1739

18-
processes[pid] = configured_process
40+
forks[pid] = configured_process
1941
end
2042

21-
def term_processes
22-
signal_processes(processes.keys, :TERM)
43+
def terminate_gracefully
44+
SolidQueue.instrument(:graceful_termination, supervisor_pid: ::Process.pid, supervised_processes: forks.keys) do |payload|
45+
term_forks
46+
47+
Timer.wait_until(SolidQueue.shutdown_timeout, -> { all_forks_terminated? }) do
48+
reap_terminated_forks
49+
end
50+
51+
unless all_forks_terminated?
52+
payload[:shutdown_timeout_exceeded] = true
53+
terminate_immediately
54+
end
55+
end
56+
end
57+
58+
def terminate_immediately
59+
SolidQueue.instrument(:immediate_termination, supervisor_pid: ::Process.pid, supervised_processes: forks.keys) do
60+
quit_forks
61+
end
62+
end
63+
64+
def term_forks
65+
signal_processes(forks.keys, :TERM)
2366
end
2467

25-
def quit_processes
26-
signal_processes(processes.keys, :QUIT)
68+
def quit_forks
69+
signal_processes(forks.keys, :QUIT)
2770
end
2871

29-
def reap_and_replace_terminated_processes
72+
def reap_and_replace_terminated_forks
3073
loop do
3174
pid, status = ::Process.waitpid2(-1, ::Process::WNOHANG)
3275
break unless pid
@@ -35,28 +78,38 @@ def reap_and_replace_terminated_processes
3578
end
3679
end
3780

38-
def reap_terminated_processes
81+
def reap_terminated_forks
3982
loop do
4083
pid, status = ::Process.waitpid2(-1, ::Process::WNOHANG)
4184
break unless pid
4285

43-
processes.delete(pid)
86+
forks.delete(pid)
4487
end
4588
rescue SystemCallError
4689
# All children already reaped
4790
end
4891

4992
def replace_fork(pid, status)
5093
SolidQueue.instrument(:replace_fork, supervisor_pid: ::Process.pid, pid: pid, status: status) do |payload|
51-
if supervised_fork = processes.delete(pid)
94+
if supervised_fork = forks.delete(pid)
5295
payload[:fork] = supervised_fork
5396
start_process(supervised_fork)
5497
end
5598
end
5699
end
57100

58-
def all_processes_terminated?
59-
processes.empty?
101+
def setup_pidfile
102+
if path = SolidQueue.supervisor_pidfile
103+
@pidfile = Pidfile.new(path).tap(&:setup)
104+
end
105+
end
106+
107+
def delete_pidfile
108+
@pidfile&.delete
109+
end
110+
111+
def all_forks_terminated?
112+
forks.empty?
60113
end
61114
end
62115
end

lib/solid_queue/supervisor/maintenance.rb

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -2,26 +2,27 @@ module SolidQueue
22
module Supervisor::Maintenance
33
extend ActiveSupport::Concern
44

5-
private
5+
included do
6+
after_boot :release_orphaned_executions
7+
end
68

7-
def launch_maintenance_task
8-
@maintenance_task = Concurrent::TimerTask.new(run_now: true, execution_interval: SolidQueue.process_alive_threshold) do
9-
prune_dead_processes
10-
release_orphaned_executions
9+
private
10+
def launch_maintenance_task
11+
@maintenance_task = Concurrent::TimerTask.new(run_now: true, execution_interval: SolidQueue.process_alive_threshold) do
12+
prune_dead_processes
13+
end.tap(&:execute)
1114
end
12-
@maintenance_task.execute
13-
end
1415

15-
def stop_maintenance_task
16-
@maintenance_task&.shutdown
17-
end
16+
def stop_maintenance_task
17+
@maintenance_task&.shutdown
18+
end
1819

19-
def prune_dead_processes
20-
wrap_in_app_executor { SolidQueue::Process.prune }
21-
end
20+
def prune_dead_processes
21+
wrap_in_app_executor { SolidQueue::Process.prune }
22+
end
2223

23-
def release_orphaned_executions
24-
wrap_in_app_executor { SolidQueue::ClaimedExecution.orphaned.release_all }
25-
end
24+
def release_orphaned_executions
25+
wrap_in_app_executor { SolidQueue::ClaimedExecution.orphaned.release_all }
26+
end
2627
end
2728
end

lib/solid_queue/supervisor/signals.rb

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,14 @@
22

33
module SolidQueue
44
class Supervisor
5-
class GracefulTerminationRequested < Interrupt; end
6-
class ImmediateTerminationRequested < Interrupt; end
7-
85
module Signals
6+
extend ActiveSupport::Concern
7+
8+
included do
9+
before_boot :register_signal_handlers
10+
after_shutdown :restore_default_signal_handlers
11+
end
12+
913
private
1014
SIGNALS = %i[ QUIT INT TERM ]
1115

test/integration/concurrency_controls_test.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -169,15 +169,15 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
169169
end
170170

171171
test "don't block claimed executions that get released" do
172-
SequentialUpdateResultJob.perform_later(@result, name: "I'll be released to ready", pause: SolidQueue.shutdown_timeout + 3.seconds)
172+
SequentialUpdateResultJob.perform_later(@result, name: "I'll be released to ready", pause: SolidQueue.shutdown_timeout + 10.seconds)
173173
job = SolidQueue::Job.last
174174

175175
sleep(0.2)
176176
assert job.claimed?
177177

178178
# This won't leave time to the job to finish
179179
signal_process(@pid, :TERM, wait: 0.1.second)
180-
sleep(SolidQueue.shutdown_timeout + 0.2.seconds)
180+
sleep(SolidQueue.shutdown_timeout + 0.6.seconds)
181181

182182
assert_not job.reload.finished?
183183
assert job.reload.ready?

0 commit comments

Comments
 (0)