Skip to content

Commit 2ed6abe

Browse files
committed
Start extracting forking behaviour from Supervisor
1 parent b10d838 commit 2ed6abe

File tree

8 files changed

+122
-101
lines changed

8 files changed

+122
-101
lines changed

lib/solid_queue/log_subscriber.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ def thread_error(event)
118118
end
119119

120120
def graceful_termination(event)
121-
attributes = event.payload.slice(:supervisor_pid, :supervised_pids)
121+
attributes = event.payload.slice(:supervisor_pid, :supervised_processes)
122122

123123
if event.payload[:shutdown_timeout_exceeded]
124124
warn formatted_event(event, action: "Supervisor wasn't terminated gracefully - shutdown timeout exceeded", **attributes)
@@ -128,7 +128,7 @@ def graceful_termination(event)
128128
end
129129

130130
def immediate_termination(event)
131-
info formatted_event(event, action: "Supervisor terminated immediately", **event.payload.slice(:supervisor_pid, :supervised_pids))
131+
info formatted_event(event, action: "Supervisor terminated immediately", **event.payload.slice(:supervisor_pid, :supervised_processes))
132132
end
133133

134134
def unhandled_signal_error(event)

lib/solid_queue/processes/registrable.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ module Registrable
1616

1717
def register
1818
@process = SolidQueue::Process.register \
19-
kind: self.class.name.demodulize,
19+
kind: kind,
2020
pid: pid,
2121
hostname: hostname,
2222
supervisor: try(:supervisor),

lib/solid_queue/supervisor.rb

Lines changed: 27 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -2,26 +2,26 @@
22

33
module SolidQueue
44
class Supervisor < Processes::Base
5-
include Signals
5+
include Signals, Maintenance
66

77
class << self
88
def start(mode: :fork, load_configuration_from: nil)
99
SolidQueue.supervisor = true
1010
configuration = Configuration.new(load_from: load_configuration_from)
1111

12-
new(*configuration.processes).start
12+
Forks.new(*configuration.processes).start
1313
end
1414
end
1515

16-
def initialize(*configured_processes)
17-
@configured_processes = Array(configured_processes)
18-
@forks = {}
16+
def initialize(*configuration)
17+
@configuration = Array(configuration)
18+
@processes = {}
1919
end
2020

2121
def start
2222
run_callbacks(:boot) { boot }
2323

24-
start_forks
24+
start_processes
2525
launch_maintenance_task
2626

2727
supervise
@@ -34,141 +34,76 @@ def start
3434
end
3535

3636
private
37-
attr_reader :configured_processes, :forks
37+
attr_reader :configuration, :processes
3838

3939
def boot
4040
sync_std_streams
4141
setup_pidfile
4242
register_signal_handlers
4343
end
4444

45+
def start_processes
46+
configuration.each { |configured_process| start_process(configured_process) }
47+
end
48+
4549
def supervise
4650
loop do
47-
procline "supervising #{forks.keys.join(", ")}"
51+
procline "supervising #{processes.keys.join(", ")}"
4852

4953
process_signal_queue
50-
reap_and_replace_terminated_forks
54+
reap_and_replace_terminated_processes
5155
interruptible_sleep(1.second)
5256
end
5357
end
5458

59+
5560
def sync_std_streams
5661
STDOUT.sync = STDERR.sync = true
5762
end
5863

5964
def setup_pidfile
60-
@pidfile = if SolidQueue.supervisor_pidfile
61-
Processes::Pidfile.new(SolidQueue.supervisor_pidfile).tap(&:setup)
65+
if path = SolidQueue.supervisor_pidfile
66+
@pidfile = Pidfile.new(path).tap(&:setup)
6267
end
6368
end
6469

65-
def start_forks
66-
configured_processes.each { |configured_process| start_fork(configured_process) }
67-
end
6870

69-
def launch_maintenance_task
70-
@maintenance_task = Concurrent::TimerTask.new(run_now: true, execution_interval: SolidQueue.process_alive_threshold) do
71-
prune_dead_processes
72-
release_orphaned_executions
73-
end
74-
@maintenance_task.execute
71+
def start_process(configured_process)
72+
raise NotImplementedError
7573
end
7674

75+
7776
def shutdown
78-
stop_process_prune
77+
stop_maintenance_task
7978
restore_default_signal_handlers
8079
delete_pidfile
8180
end
8281

8382
def graceful_termination
84-
SolidQueue.instrument(:graceful_termination, supervisor_pid: ::Process.pid, supervised_pids: forks.keys) do |payload|
85-
term_forks
83+
SolidQueue.instrument(:graceful_termination, supervisor_pid: ::Process.pid, supervised_processes: processes.keys) do |payload|
84+
term_processes
8685

87-
wait_until(SolidQueue.shutdown_timeout, -> { all_forks_terminated? }) do
88-
reap_terminated_forks
86+
wait_until(SolidQueue.shutdown_timeout, -> { all_processes_terminated? }) do
87+
reap_terminated_processes
8988
end
9089

91-
unless all_forks_terminated?
90+
unless all_processes_terminated?
9291
payload[:shutdown_timeout_exceeded] = true
9392
immediate_termination
9493
end
9594
end
9695
end
9796

9897
def immediate_termination
99-
SolidQueue.instrument(:immediate_termination, supervisor_pid: ::Process.pid, supervised_pids: forks.keys) do
100-
quit_forks
98+
SolidQueue.instrument(:immediate_termination, supervisor_pid: ::Process.pid, supervised_processes: processes.keys) do
99+
quit_processes
101100
end
102101
end
103102

104-
def term_forks
105-
signal_processes(forks.keys, :TERM)
106-
end
107-
108-
def quit_forks
109-
signal_processes(forks.keys, :QUIT)
110-
end
111-
112-
def stop_process_prune
113-
@maintenance_task&.shutdown
114-
end
115-
116103
def delete_pidfile
117104
@pidfile&.delete
118105
end
119106

120-
def prune_dead_processes
121-
wrap_in_app_executor { SolidQueue::Process.prune }
122-
end
123-
124-
def release_orphaned_executions
125-
wrap_in_app_executor { SolidQueue::ClaimedExecution.orphaned.release_all }
126-
end
127-
128-
def start_fork(configured_process)
129-
configured_process.supervised_by process
130-
configured_process.mode = :fork
131-
132-
pid = fork do
133-
configured_process.start
134-
end
135-
136-
forks[pid] = configured_process
137-
end
138-
139-
def reap_and_replace_terminated_forks
140-
loop do
141-
pid, status = ::Process.waitpid2(-1, ::Process::WNOHANG)
142-
break unless pid
143-
144-
replace_fork(pid, status)
145-
end
146-
end
147-
148-
def reap_terminated_forks
149-
loop do
150-
pid, status = ::Process.waitpid2(-1, ::Process::WNOHANG)
151-
break unless pid
152-
153-
forks.delete(pid)
154-
end
155-
rescue SystemCallError
156-
# All children already reaped
157-
end
158-
159-
def replace_fork(pid, status)
160-
SolidQueue.instrument(:replace_fork, supervisor_pid: ::Process.pid, pid: pid, status: status) do |payload|
161-
if supervised_fork = forks.delete(pid)
162-
payload[:fork] = supervised_fork
163-
start_fork(supervised_fork)
164-
end
165-
end
166-
end
167-
168-
def all_forks_terminated?
169-
forks.empty?
170-
end
171-
172107
def wait_until(timeout, condition, &block)
173108
if timeout > 0
174109
deadline = monotonic_time_now + timeout

lib/solid_queue/supervisor/forks.rb

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
# frozen_string_literal: true
2+
3+
module SolidQueue
4+
class Supervisor::Forks < Supervisor
5+
def kind
6+
"Supervisor(forks)"
7+
end
8+
9+
private
10+
def start_process(configured_process)
11+
configured_process.supervised_by process
12+
configured_process.mode = :fork
13+
14+
pid = fork do
15+
configured_process.start
16+
end
17+
18+
processes[pid] = configured_process
19+
end
20+
21+
def term_processes
22+
signal_processes(processes.keys, :TERM)
23+
end
24+
25+
def quit_processes
26+
signal_processes(processes.keys, :QUIT)
27+
end
28+
29+
def reap_and_replace_terminated_processes
30+
loop do
31+
pid, status = ::Process.waitpid2(-1, ::Process::WNOHANG)
32+
break unless pid
33+
34+
replace_fork(pid, status)
35+
end
36+
end
37+
38+
def reap_terminated_processes
39+
loop do
40+
pid, status = ::Process.waitpid2(-1, ::Process::WNOHANG)
41+
break unless pid
42+
43+
processes.delete(pid)
44+
end
45+
rescue SystemCallError
46+
# All children already reaped
47+
end
48+
49+
def replace_fork(pid, status)
50+
SolidQueue.instrument(:replace_fork, supervisor_pid: ::Process.pid, pid: pid, status: status) do |payload|
51+
if supervised_fork = processes.delete(pid)
52+
payload[:fork] = supervised_fork
53+
start_process(supervised_fork)
54+
end
55+
end
56+
end
57+
58+
def all_processes_terminated?
59+
processes.empty?
60+
end
61+
end
62+
end
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
module SolidQueue
2+
module Supervisor::Maintenance
3+
extend ActiveSupport::Concern
4+
5+
private
6+
def launch_maintenance_task
7+
@maintenance_task = Concurrent::TimerTask.new(run_now: true, execution_interval: SolidQueue.process_alive_threshold) do
8+
prune_dead_processes
9+
release_orphaned_executions
10+
end
11+
@maintenance_task.execute
12+
end
13+
14+
def stop_maintenance_task
15+
@maintenance_task&.shutdown
16+
end
17+
18+
def prune_dead_processes
19+
wrap_in_app_executor { SolidQueue::Process.prune }
20+
end
21+
22+
def release_orphaned_executions
23+
wrap_in_app_executor { SolidQueue::ClaimedExecution.orphaned.release_all }
24+
end
25+
end
26+
end

lib/solid_queue/processes/pidfile.rb renamed to lib/solid_queue/supervisor/pidfile.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
# frozen_string_literal: true
22

3-
module SolidQueue::Processes
4-
class Pidfile
3+
module SolidQueue
4+
class Supervisor::Pidfile
55
def initialize(path)
66
@path = path
77
@pid = ::Process.pid

lib/solid_queue/supervisor/signals.rb

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,6 @@ class GracefulTerminationRequested < Interrupt; end
66
class ImmediateTerminationRequested < Interrupt; end
77

88
module Signals
9-
extend ActiveSupport::Concern
10-
119
private
1210
SIGNALS = %i[ QUIT INT TERM ]
1311

test/integration/processes_lifecycle_test.rb renamed to test/integration/forked_processes_lifecycle_test.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
require "test_helper"
44

5-
class ProcessLifecycleTest < ActiveSupport::TestCase
5+
class ForkedProcessesLifecycleTest < ActiveSupport::TestCase
66
self.use_transactional_tests = false
77

88
setup do
@@ -211,7 +211,7 @@ def assert_registered_workers_for(*queues)
211211
end
212212

213213
def assert_registered_supervisor
214-
processes = find_processes_registered_as("Supervisor")
214+
processes = find_processes_registered_as("Supervisor(forks)")
215215
assert_equal 1, processes.count
216216
assert_equal @pid, processes.first.pid
217217
end

0 commit comments

Comments
 (0)