Skip to content

Commit ac71ae3

Browse files
committed
Extract more termination logic to Supervisor class
And rename sidecar to standalone, inverting the meaning.
1 parent 0799944 commit ac71ae3

File tree

6 files changed

+81
-68
lines changed

6 files changed

+81
-68
lines changed

lib/puma/plugin/solid_queue.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ def start_forked(launcher)
4040
end
4141

4242
def start_async(launcher)
43-
launcher.events.on_booted { @solid_queue_supervisor = SolidQueue::Supervisor.start(mode: :async, sidecar: true) }
43+
launcher.events.on_booted { @solid_queue_supervisor = SolidQueue::Supervisor.start(mode: :async, standalone: false) }
4444
launcher.events.on_stopped { solid_queue_supervisor.stop }
4545
launcher.events.on_restart { solid_queue_supervisor.stop; solid_queue_supervisor.start }
4646
end

lib/solid_queue/dispatcher.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ def all_work_completed?
5555
end
5656

5757
def set_procline
58-
procline "waiting"
58+
procline "dispatching every #{polling_interval.seconds} seconds"
5959
end
6060
end
6161
end

lib/solid_queue/supervisor.rb

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,15 +56,45 @@ def stopped?
5656
@stopped
5757
end
5858

59-
def supervise
59+
def set_procline
60+
procline "supervising #{supervised_processes.join(", ")}"
6061
end
6162

6263
def start_process(configured_process)
6364
raise NotImplementedError
6465
end
6566

66-
def instrument_termination(type, &block)
67-
SolidQueue.instrument("#{type}_termination".to_sym, process_id: process_id, supervisor_pid: ::Process.pid, &block)
67+
def terminate_gracefully
68+
SolidQueue.instrument(:graceful_termination, process_id: process_id, supervisor_pid: ::Process.pid, supervised_processes: supervised_processes) do |payload|
69+
perform_graceful_termination
70+
71+
unless all_processes_terminated?
72+
payload[:shutdown_timeout_exceeded] = true
73+
terminate_immediately
74+
end
75+
end
76+
end
77+
78+
def terminate_immediately
79+
SolidQueue.instrument(:immediate_termination, process_id: process_id, supervisor_pid: ::Process.pid, supervised_processes: supervised_processes) do
80+
perform_immediate_termination
81+
end
82+
end
83+
84+
def perform_graceful_termination
85+
raise NotImplementedError
86+
end
87+
88+
def perform_immediate_termination
89+
raise NotImplementedError
90+
end
91+
92+
def supervised_processes
93+
raise NotImplementedError
94+
end
95+
96+
def all_processes_terminated?
97+
raise NotImplementedError
6898
end
6999

70100
def shutdown

lib/solid_queue/supervisor/async_supervisor.rb

Lines changed: 16 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,12 @@
22

33
module SolidQueue
44
class Supervisor::AsyncSupervisor < Supervisor
5-
skip_callback :boot, :before, :register_signal_handlers, if: :sidecar?
5+
skip_callback :boot, :before, :register_signal_handlers, unless: :standalone?
66

7-
def initialize(*, sidecar: false)
7+
def initialize(*, standalone: true)
88
super
99

10-
@sidecar = sidecar
10+
@standalone = standalone
1111
@threads = Concurrent::Map.new
1212
end
1313

@@ -26,8 +26,8 @@ def stop
2626
private
2727
attr_reader :threads
2828

29-
def sidecar?
30-
@sidecar
29+
def standalone?
30+
@standalone
3131
end
3232

3333
def start_process(configured_process)
@@ -36,16 +36,19 @@ def start_process(configured_process)
3636
end
3737

3838
process_instance.start
39-
4039
threads[process_instance.name] = process_instance
4140
end
4241

42+
def supervised_processes
43+
threads.keys
44+
end
45+
4346
def supervise
44-
unless sidecar?
47+
if standalone?
4548
loop do
4649
break if stopped?
4750

48-
procline "supervising #{threads.keys.join(", ")}"
51+
set_procline
4952
process_signal_queue
5053

5154
interruptible_sleep(10.second) unless stopped?
@@ -61,26 +64,15 @@ def stop_threads
6164
stop_threads.each { |thr| thr.join(SolidQueue.shutdown_timeout) }
6265
end
6366

64-
def terminate_gracefully
65-
instrument_termination(:graceful) do |payload|
66-
payload[:supervised_processes] = threads.keys
67-
68-
unless all_threads_terminated?
69-
payload[:shutdown_timeout_exceeded] = true
70-
terminate_immediately
71-
end
72-
end
67+
def perform_graceful_termination
68+
# All done when stopping
7369
end
7470

75-
def terminate_immediately
76-
instrument_termination(:immediate) do |payload|
77-
payload[:supervised_processes] = threads.keys
78-
79-
exit!
80-
end
71+
def perform_immediate_termination
72+
exit!
8173
end
8274

83-
def all_threads_terminated?
75+
def all_processes_terminated?
8476
threads.values.none?(&:alive?)
8577
end
8678
end

lib/solid_queue/supervisor/fork_supervisor.rb

Lines changed: 22 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -16,22 +16,6 @@ def kind
1616
private
1717
attr_reader :forks, :configured_processes
1818

19-
def supervise
20-
loop do
21-
break if stopped?
22-
23-
procline "supervising #{forks.keys.join(", ")}"
24-
process_signal_queue
25-
26-
unless stopped?
27-
reap_and_replace_terminated_forks
28-
interruptible_sleep(1.second)
29-
end
30-
end
31-
ensure
32-
shutdown
33-
end
34-
3519
def start_process(configured_process)
3620
process_instance = configured_process.instantiate.tap do |instance|
3721
instance.supervised_by process
@@ -46,31 +30,38 @@ def start_process(configured_process)
4630
forks[pid] = process_instance
4731
end
4832

49-
def terminate_gracefully
50-
instrument_termination(:graceful) do |payload|
51-
payload[:supervised_processes] = forks.keys
33+
def supervised_processes
34+
forks.keys
35+
end
5236

53-
term_forks
37+
def supervise
38+
loop do
39+
break if stopped?
5440

55-
Timer.wait_until(SolidQueue.shutdown_timeout, -> { all_forks_terminated? }) do
56-
reap_terminated_forks
57-
end
41+
set_procline
42+
process_signal_queue
5843

59-
unless all_forks_terminated?
60-
payload[:shutdown_timeout_exceeded] = true
61-
terminate_immediately
44+
unless stopped?
45+
reap_and_replace_terminated_forks
46+
interruptible_sleep(1.second)
6247
end
6348
end
49+
ensure
50+
shutdown
6451
end
6552

66-
def terminate_immediately
67-
instrument_termination(:immediate) do |payload|
68-
payload[:supervised_processes] = forks.keys
53+
def perform_graceful_termination
54+
term_forks
6955

70-
quit_forks
56+
Timer.wait_until(SolidQueue.shutdown_timeout, -> { all_processes_terminated? }) do
57+
reap_terminated_forks
7158
end
7259
end
7360

61+
def perform_immediate_termination
62+
quit_forks
63+
end
64+
7465
def term_forks
7566
signal_processes(forks.keys, :TERM)
7667
end
@@ -121,7 +112,7 @@ def handle_claimed_jobs_by(terminated_fork, status)
121112
end
122113
end
123114

124-
def all_forks_terminated?
115+
def all_processes_terminated?
125116
forks.empty?
126117
end
127118
end

test/unit/async_supervisor_test.rb

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@
33
class AsyncSupervisorTest < ActiveSupport::TestCase
44
self.use_transactional_tests = false
55

6-
test "start as sidecar" do
7-
supervisor = run_supervisor_as_sidecar
6+
test "start as non-standalone" do
7+
supervisor = run_non_standalone_supervisor
88
wait_for_registered_processes(4)
99

1010
assert_registered_processes(kind: "Supervisor(async)")
@@ -16,7 +16,7 @@ class AsyncSupervisorTest < ActiveSupport::TestCase
1616
assert_no_registered_processes
1717
end
1818

19-
test "start as separate process" do
19+
test "start standalone" do
2020
pid = run_supervisor_as_fork(mode: :async)
2121
wait_for_registered_processes(4)
2222

@@ -28,9 +28,9 @@ class AsyncSupervisorTest < ActiveSupport::TestCase
2828
assert_no_registered_processes
2929
end
3030

31-
test "start as sidecar with provided configuration" do
31+
test "start as non-standalone with provided configuration" do
3232
config_as_hash = { workers: [], dispatchers: [ { batch_size: 100 } ] }
33-
supervisor = run_supervisor_as_sidecar(load_configuration_from: config_as_hash)
33+
supervisor = run_non_standalone_supervisor(load_configuration_from: config_as_hash)
3434
wait_for_registered_processes(2) # supervisor + dispatcher
3535

3636
assert_registered_processes(kind: "Supervisor(async)")
@@ -61,7 +61,7 @@ class AsyncSupervisorTest < ActiveSupport::TestCase
6161
dispatchers: []
6262
}
6363

64-
supervisor = run_supervisor_as_sidecar(load_configuration_from: config_as_hash)
64+
supervisor = run_non_standalone_supervisor(load_configuration_from: config_as_hash)
6565
wait_for_registered_processes(3)
6666
assert_registered_processes(kind: "Supervisor(async)")
6767

@@ -72,7 +72,7 @@ class AsyncSupervisorTest < ActiveSupport::TestCase
7272
end
7373

7474
private
75-
def run_supervisor_as_sidecar(**kwargs)
76-
SolidQueue::Supervisor.start(mode: :async, sidecar: true, **kwargs)
75+
def run_non_standalone_supervisor(**kwargs)
76+
SolidQueue::Supervisor.start(mode: :async, standalone: false, **kwargs)
7777
end
7878
end

0 commit comments

Comments
 (0)