Skip to content

Commit 4b3483a

Browse files
committed
Remove the new async mode
It's been short-lived ^_^U
1 parent eded15b commit 4b3483a

22 files changed

+230
-823
lines changed

README.md

Lines changed: 2 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ We have three types of actors in Solid Queue:
8080
- _Dispatchers_ are in charge of selecting jobs scheduled to run in the future that are due and _dispatching_ them, which is simply moving them from the `solid_queue_scheduled_executions` table over to the `solid_queue_ready_executions` table so that workers can pick them up. They're also in charge of managing [recurring tasks](#recurring-tasks), dispatching jobs to process them according to their schedule. On top of that, they do some maintenance work related to [concurrency controls](#concurrency-controls).
8181
- The _supervisor_ runs workers and dispatchers according to the configuration, controls their heartbeats, and stops and starts them when needed.
8282

83-
By default, Solid Queue runs in `fork` mode. This means the supervisor will fork a separate process for each supervised worker/dispatcher. There's also an `async` mode where each worker and dispatcher will be run as a thread of the supervisor process. This can be used with [the provided Puma plugin](#puma-plugin)
83+
Solid Queue's supervisor will fork a separate process for each supervised worker/dispatcher.
8484

8585
By default, Solid Queue will try to find your configuration under `config/solid_queue.yml`, but you can set a different path using the environment variable `SOLID_QUEUE_CONFIG`. This is what this configuration looks like:
8686

@@ -131,7 +131,7 @@ Here's an overview of the different options:
131131

132132
Finally, you can combine prefixes with exact names, like `[ staging*, background ]`, and the behaviour with respect to order will be the same as with only exact names.
133133
- `threads`: this is the max size of the thread pool that each worker will have to run jobs. Each worker will fetch this number of jobs from their queue(s), at most and will post them to the thread pool to be run. By default, this is `3`. Only workers have this setting.
134-
- `processes`: this is the number of worker processes that will be forked by the supervisor with the settings given. By default, this is `1`, just a single process. This setting is useful if you want to dedicate more than one CPU core to a queue or queues with the same configuration. Only workers have this setting. **Note**: this option will be ignored if [running in `async` mode](#running-as-a-fork-or-asynchronously).
134+
- `processes`: this is the number of worker processes that will be forked by the supervisor with the settings given. By default, this is `1`, just a single process. This setting is useful if you want to dedicate more than one CPU core to a queue or queues with the same configuration. Only workers have this setting.
135135
- `concurrency_maintenance`: whether the dispatcher will perform the concurrency maintenance work. This is `true` by default, and it's useful if you don't use any [concurrency controls](#concurrency-controls) and want to disable it or if you run multiple dispatchers and want some of them to just dispatch jobs without doing anything else.
136136
- `recurring_tasks`: a list of recurring tasks the dispatcher will manage. Read more details about this one in the [Recurring tasks](#recurring-tasks) section.
137137

@@ -159,16 +159,6 @@ When receiving a `QUIT` signal, if workers still have jobs in-flight, these will
159159
If processes have no chance of cleaning up before exiting (e.g. if someone pulls a cable somewhere), in-flight jobs might remain claimed by the processes executing them. Processes send heartbeats, and the supervisor checks and prunes processes with expired heartbeats, which will release any claimed jobs back to their queues. You can configure both the frequency of heartbeats and the threshold to consider a process dead. See the section below for this.
160160

161161

162-
### Fork vs. async mode
163-
164-
By default, the supervisor will fork additional processes for each worker and dispatcher so that they run in different processes. This provides the best isolation and performance, but can have additional memory usage. Alternatively, you can run all workers and dispatchers in the same process as the supervisor by passing `--mode async` when starting it:
165-
```
166-
bin/jobs --mode async
167-
```
168-
169-
If you use the `async` mode, the `processes` option in the configuration will be ignored.
170-
171-
172162
### Dedicated database configuration
173163

174164
Solid Queue can be configured to run on a different database than the main application.
@@ -315,18 +305,6 @@ plugin :solid_queue
315305
```
316306
to your `puma.rb` configuration.
317307

318-
### Running as a fork or asynchronously
319-
320-
By default, the Puma plugin will fork additional processes for each worker and dispatcher so that they run in different processes. This provides the best isolation and performance, but can have additional memory usage.
321-
322-
Alternatively, workers and dispatchers can be run within the same Puma process(s). To do so just configure the plugin as:
323-
324-
```ruby
325-
plugin :solid_queue
326-
solid_queue_mode :async
327-
```
328-
329-
Note that in this case, the `processes` configuration option will be ignored.
330308

331309
## Jobs and transactional integrity
332310
:warning: Having your jobs in the same ACID-compliant database as your application data enables a powerful yet sharp tool: taking advantage of transactional integrity to ensure some action in your app is not committed unless your job is also committed. This can be very powerful and useful, but it can also backfire if you base some of your logic on this behaviour, and in the future, you move to another active job backend, or if you simply move Solid Queue to its own database, and suddenly the behaviour changes under you.

UPGRADING.md

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,10 @@
1+
# Upgrading to version 0.7.x
2+
3+
This version removed the new async mode introduced in version 0.4.0 and introduced a new binstub that can be used to start Solid Queue's supervisor. To install it, you can just run
4+
```
5+
bin/rails generate solid_queue:install
6+
```
7+
18
# Upgrading to version 0.6.x
29

310
## New migration in 3 steps
@@ -44,7 +51,7 @@ And then run the migrations.
4451

4552

4653
# Upgrading to version 0.4.x
47-
This version introduced an _async_ mode to run the supervisor and have all workers and dispatchers run as part of the same process as the supervisor, instead of separate, forked, processes. Together with this, we introduced some changes in how the supervisor is started. Prior this change, you could choose whether you wanted to run workers, dispatchers or both, by starting Solid Queue as `solid_queue:work` or `solid_queue:dispatch`. From version 0.4.0, the only option available is:
54+
This version introduced an _async_ mode (this mode has been removed in version 0.7.0) to run the supervisor and have all workers and dispatchers run as part of the same process as the supervisor, instead of separate, forked, processes. Together with this, we introduced some changes in how the supervisor is started. Prior this change, you could choose whether you wanted to run workers, dispatchers or both, by starting Solid Queue as `solid_queue:work` or `solid_queue:dispatch`. From version 0.4.0, the only option available is:
4855

4956
```
5057
$ bundle exec rake solid_queue:start

lib/puma/plugin/solid_queue.rb

Lines changed: 10 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,50 +1,28 @@
11
require "puma/plugin"
22

3-
module Puma
4-
class DSL
5-
def solid_queue_mode(mode = :fork)
6-
@options[:solid_queue_mode] = mode.to_sym
7-
end
8-
end
9-
end
10-
113
Puma::Plugin.create do
124
attr_reader :puma_pid, :solid_queue_pid, :log_writer, :solid_queue_supervisor
135

146
def start(launcher)
157
@log_writer = launcher.log_writer
168
@puma_pid = $$
179

18-
if launcher.options[:solid_queue_mode] == :async
19-
start_async(launcher)
20-
else
21-
start_forked(launcher)
10+
in_background do
11+
monitor_solid_queue
2212
end
23-
end
2413

25-
private
26-
def start_forked(launcher)
27-
in_background do
28-
monitor_solid_queue
14+
launcher.events.on_booted do
15+
@solid_queue_pid = fork do
16+
Thread.new { monitor_puma }
17+
SolidQueue::Supervisor.start
2918
end
30-
31-
launcher.events.on_booted do
32-
@solid_queue_pid = fork do
33-
Thread.new { monitor_puma }
34-
SolidQueue::Supervisor.start(mode: :fork)
35-
end
36-
end
37-
38-
launcher.events.on_stopped { stop_solid_queue }
39-
launcher.events.on_restart { stop_solid_queue }
4019
end
4120

42-
def start_async(launcher)
43-
launcher.events.on_booted { @solid_queue_supervisor = SolidQueue::Supervisor.start(mode: :async, standalone: false) }
44-
launcher.events.on_stopped { solid_queue_supervisor.stop }
45-
launcher.events.on_restart { solid_queue_supervisor.stop; solid_queue_supervisor.start }
46-
end
21+
launcher.events.on_stopped { stop_solid_queue }
22+
launcher.events.on_restart { stop_solid_queue }
23+
end
4724

25+
private
4826
def stop_solid_queue
4927
Process.waitpid(solid_queue_pid, Process::WNOHANG)
5028
log "Stopping Solid Queue..."

lib/solid_queue/cli.rb

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
module SolidQueue
66
class Cli < Thor
77
class_option :config_file, type: :string, aliases: "-c", default: Configuration::DEFAULT_CONFIG_FILE_PATH, desc: "Path to config file"
8-
class_option :mode, type: :string, default: "fork", enum: %w[ fork async ], desc: "Whether to fork processes for workers and dispatchers (fork) or to run these in the same process as the supervisor (async)"
98

109
def self.exit_on_failure?
1110
true
@@ -15,7 +14,7 @@ def self.exit_on_failure?
1514
default_command :start
1615

1716
def start
18-
SolidQueue::Supervisor.start(mode: options["mode"], load_configuration_from: options["config_file"])
17+
SolidQueue::Supervisor.start(load_configuration_from: options["config_file"])
1918
end
2019
end
2120
end

lib/solid_queue/configuration.rb

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,7 @@ def instantiate
2828
dispatchers: [ DISPATCHER_DEFAULTS ]
2929
}
3030

31-
def initialize(mode: :fork, load_from: nil)
32-
@mode = mode.to_s.inquiry
31+
def initialize(load_from: nil)
3332
@raw_config = config_from(load_from)
3433
end
3534

@@ -43,17 +42,13 @@ def max_number_of_threads
4342
end
4443

4544
private
46-
attr_reader :raw_config, :mode
45+
attr_reader :raw_config
4746

4847
DEFAULT_CONFIG_FILE_PATH = "config/solid_queue.yml"
4948

5049
def workers
5150
workers_options.flat_map do |worker_options|
52-
processes = if mode.fork?
53-
worker_options.fetch(:processes, WORKER_DEFAULTS[:processes])
54-
else
55-
WORKER_DEFAULTS[:processes]
56-
end
51+
processes = worker_options.fetch(:processes, WORKER_DEFAULTS[:processes])
5752
processes.times.map { Process.new(:worker, worker_options.with_defaults(WORKER_DEFAULTS)) }
5853
end
5954
end

lib/solid_queue/supervisor.rb

Lines changed: 100 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -5,21 +5,23 @@ class Supervisor < Processes::Base
55
include Maintenance, Signals, Pidfiled
66

77
class << self
8-
def start(mode: "fork", load_configuration_from: nil, **options)
8+
def start(load_configuration_from: nil)
99
SolidQueue.supervisor = true
10-
configuration = Configuration.new(mode: mode, load_from: load_configuration_from)
10+
configuration = Configuration.new(load_from: load_configuration_from)
1111

1212
if configuration.configured_processes.any?
13-
klass = mode.to_s.inquiry.fork? ? ForkSupervisor : AsyncSupervisor
14-
klass.new(configuration, **options).tap(&:start)
13+
new(configuration).tap(&:start)
1514
else
1615
abort "No workers or processed configured. Exiting..."
1716
end
1817
end
1918
end
2019

21-
def initialize(configuration, **options)
20+
def initialize(configuration)
2221
@configuration = configuration
22+
@forks = {}
23+
@configured_processes = {}
24+
2325
super
2426
end
2527

@@ -37,7 +39,7 @@ def stop
3739
end
3840

3941
private
40-
attr_reader :configuration
42+
attr_reader :configuration, :forks, :configured_processes
4143

4244
def boot
4345
SolidQueue.instrument(:start_process, process: self) do
@@ -52,6 +54,36 @@ def start_processes
5254
configuration.configured_processes.each { |configured_process| start_process(configured_process) }
5355
end
5456

57+
def supervise
58+
loop do
59+
break if stopped?
60+
61+
set_procline
62+
process_signal_queue
63+
64+
unless stopped?
65+
reap_and_replace_terminated_forks
66+
interruptible_sleep(1.second)
67+
end
68+
end
69+
ensure
70+
shutdown
71+
end
72+
73+
def start_process(configured_process)
74+
process_instance = configured_process.instantiate.tap do |instance|
75+
instance.supervised_by process
76+
instance.mode = :fork
77+
end
78+
79+
pid = fork do
80+
process_instance.start
81+
end
82+
83+
configured_processes[pid] = configured_process
84+
forks[pid] = process_instance
85+
end
86+
5587
def stopped?
5688
@stopped
5789
end
@@ -60,15 +92,15 @@ def set_procline
6092
procline "supervising #{supervised_processes.join(", ")}"
6193
end
6294

63-
def start_process(configured_process)
64-
raise NotImplementedError
65-
end
66-
6795
def terminate_gracefully
6896
SolidQueue.instrument(:graceful_termination, process_id: process_id, supervisor_pid: ::Process.pid, supervised_processes: supervised_processes) do |payload|
69-
perform_graceful_termination
97+
term_forks
98+
99+
Timer.wait_until(SolidQueue.shutdown_timeout, -> { all_forks_terminated? }) do
100+
reap_terminated_forks
101+
end
70102

71-
unless all_processes_terminated?
103+
unless all_forks_terminated?
72104
payload[:shutdown_timeout_exceeded] = true
73105
terminate_immediately
74106
end
@@ -77,36 +109,78 @@ def terminate_gracefully
77109

78110
def terminate_immediately
79111
SolidQueue.instrument(:immediate_termination, process_id: process_id, supervisor_pid: ::Process.pid, supervised_processes: supervised_processes) do
80-
perform_immediate_termination
112+
quit_forks
81113
end
82114
end
83115

84-
def perform_graceful_termination
85-
raise NotImplementedError
116+
def shutdown
117+
SolidQueue.instrument(:shutdown_process, process: self) do
118+
run_callbacks(:shutdown) do
119+
stop_maintenance_task
120+
end
121+
end
86122
end
87123

88-
def perform_immediate_termination
89-
raise NotImplementedError
124+
def sync_std_streams
125+
STDOUT.sync = STDERR.sync = true
90126
end
91127

92128
def supervised_processes
93-
raise NotImplementedError
129+
forks.keys
94130
end
95131

96-
def all_processes_terminated?
97-
raise NotImplementedError
132+
def term_forks
133+
signal_processes(forks.keys, :TERM)
98134
end
99135

100-
def shutdown
101-
SolidQueue.instrument(:shutdown_process, process: self) do
102-
run_callbacks(:shutdown) do
103-
stop_maintenance_task
136+
def quit_forks
137+
signal_processes(forks.keys, :QUIT)
138+
end
139+
140+
def reap_and_replace_terminated_forks
141+
loop do
142+
pid, status = ::Process.waitpid2(-1, ::Process::WNOHANG)
143+
break unless pid
144+
145+
replace_fork(pid, status)
146+
end
147+
end
148+
149+
def reap_terminated_forks
150+
loop do
151+
pid, status = ::Process.waitpid2(-1, ::Process::WNOHANG)
152+
break unless pid
153+
154+
if (terminated_fork = forks.delete(pid)) && (!status.exited? || status.exitstatus > 0)
155+
handle_claimed_jobs_by(terminated_fork, status)
104156
end
157+
158+
configured_processes.delete(pid)
105159
end
160+
rescue SystemCallError
161+
# All children already reaped
106162
end
107163

108-
def sync_std_streams
109-
STDOUT.sync = STDERR.sync = true
164+
def replace_fork(pid, status)
165+
SolidQueue.instrument(:replace_fork, supervisor_pid: ::Process.pid, pid: pid, status: status) do |payload|
166+
if terminated_fork = forks.delete(pid)
167+
payload[:fork] = terminated_fork
168+
handle_claimed_jobs_by(terminated_fork, status)
169+
170+
start_process(configured_processes.delete(pid))
171+
end
172+
end
173+
end
174+
175+
def handle_claimed_jobs_by(terminated_fork, status)
176+
if registered_process = process.supervisees.find_by(name: terminated_fork.name)
177+
error = Processes::ProcessExitError.new(status)
178+
registered_process.fail_all_claimed_executions_with(error)
179+
end
180+
end
181+
182+
def all_forks_terminated?
183+
forks.empty?
110184
end
111185
end
112186
end

0 commit comments

Comments
 (0)