Skip to content

Commit 3e78fb2

Browse files
committed
Create always new Worker and Dispatcher instances before starting
We were reusing the instances of Worker and Dispatcher from the initial configuration all the time, which could bring some problems with stopped pools. Now that we need a name to be generated and be unique per process instance, we really need to instantiate new processes every time they're started.
1 parent 84cb6e4 commit 3e78fb2

17 files changed

+110
-74
lines changed

lib/solid_queue/configuration.rb

Lines changed: 25 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,12 @@
22

33
module SolidQueue
44
class Configuration
5+
class Process < Struct.new(:kind, :attributes)
6+
def instantiate
7+
"SolidQueue::#{kind.to_s.titleize}".safe_constantize.new(**attributes)
8+
end
9+
end
10+
511
WORKER_DEFAULTS = {
612
queues: "*",
713
threads: 3,
@@ -22,28 +28,10 @@ def initialize(mode: :fork, load_from: nil)
2228
@raw_config = config_from(load_from)
2329
end
2430

25-
def processes
31+
def configured_processes
2632
dispatchers + workers
2733
end
2834

29-
def workers
30-
workers_options.flat_map do |worker_options|
31-
processes = if mode.fork?
32-
worker_options.fetch(:processes, WORKER_DEFAULTS[:processes])
33-
else
34-
WORKER_DEFAULTS[:processes]
35-
end
36-
processes.times.map { Worker.new(**worker_options.with_defaults(WORKER_DEFAULTS)) }
37-
end
38-
end
39-
40-
def dispatchers
41-
dispatchers_options.map do |dispatcher_options|
42-
recurring_tasks = parse_recurring_tasks dispatcher_options[:recurring_tasks]
43-
Dispatcher.new **dispatcher_options.merge(recurring_tasks: recurring_tasks).with_defaults(DISPATCHER_DEFAULTS)
44-
end
45-
end
46-
4735
def max_number_of_threads
4836
# At most "threads" in each worker + 1 thread for the worker + 1 thread for the heartbeat task
4937
workers_options.map { |options| options[:threads] }.max + 2
@@ -54,6 +42,24 @@ def max_number_of_threads
5442

5543
DEFAULT_CONFIG_FILE_PATH = "config/solid_queue.yml"
5644

45+
def workers
46+
workers_options.flat_map do |worker_options|
47+
processes = if mode.fork?
48+
worker_options.fetch(:processes, WORKER_DEFAULTS[:processes])
49+
else
50+
WORKER_DEFAULTS[:processes]
51+
end
52+
processes.times.map { Process.new(:worker, worker_options.with_defaults(WORKER_DEFAULTS)) }
53+
end
54+
end
55+
56+
def dispatchers
57+
dispatchers_options.map do |dispatcher_options|
58+
recurring_tasks = parse_recurring_tasks dispatcher_options[:recurring_tasks]
59+
Process.new :dispatcher, dispatcher_options.merge(recurring_tasks: recurring_tasks).with_defaults(DISPATCHER_DEFAULTS)
60+
end
61+
end
62+
5763
def config_from(file_or_hash, env: Rails.env)
5864
config = load_config_from(file_or_hash)
5965
config[env.to_sym] ? config[env.to_sym] : config

lib/solid_queue/processes/base.rb

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,12 @@ class Base
66
include Callbacks # Defines callbacks needed by other concerns
77
include AppExecutor, Registrable, Interruptible, Procline
88

9+
attr_reader :name
10+
11+
def initialize(*)
12+
@name = generate_name
13+
end
14+
915
def kind
1016
self.class.name.demodulize
1117
end
@@ -19,8 +25,13 @@ def pid
1925
end
2026

2127
def metadata
22-
{}
28+
{ name: name }
2329
end
30+
31+
private
32+
def generate_name
33+
[ kind.downcase, SecureRandom.hex(10) ].join("-")
34+
end
2435
end
2536
end
2637
end

lib/solid_queue/processes/poller.rb

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ class Poller < Base
88

99
def initialize(polling_interval:, **options)
1010
@polling_interval = polling_interval
11+
12+
super(**options)
1113
end
1214

1315
def metadata

lib/solid_queue/processes/registrable.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ def process_id
2121
def register
2222
@process = SolidQueue::Process.register \
2323
kind: kind,
24+
name: name,
2425
pid: pid,
2526
hostname: hostname,
2627
supervisor: try(:supervisor),

lib/solid_queue/processes/runnable.rb

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,6 @@ def stop
2525
@thread&.join
2626
end
2727

28-
def name
29-
@name ||= [ kind.downcase, SecureRandom.hex(6) ].join("-")
30-
end
31-
3228
def alive?
3329
!running_async? || @thread.alive?
3430
end

lib/solid_queue/supervisor.rb

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ def start(mode: :fork, load_configuration_from: nil)
1616

1717
def initialize(configuration)
1818
@configuration = configuration
19+
super
1920
end
2021

2122
def start
@@ -44,7 +45,7 @@ def boot
4445
end
4546

4647
def start_processes
47-
configuration.processes.each { |configured_process| start_process(configured_process) }
48+
configuration.configured_processes.each { |configured_process| start_process(configured_process) }
4849
end
4950

5051
def stopped?

lib/solid_queue/supervisor/async_supervisor.rb

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,13 @@ def stop
2323
attr_reader :threads
2424

2525
def start_process(configured_process)
26-
configured_process.supervised_by process
27-
configured_process.start
26+
process_instance = configured_process.instantiate.tap do |instance|
27+
instance.supervised_by process
28+
end
29+
30+
process_instance.start
2831

29-
threads[configured_process.name] = configured_process
32+
threads[process_instance.name] = process_instance
3033
end
3134

3235
def stop_threads

lib/solid_queue/supervisor/fork_supervisor.rb

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,17 @@ class Supervisor::ForkSupervisor < Supervisor
66

77
def initialize(*)
88
super
9+
910
@forks = {}
11+
@configured_processes = {}
1012
end
1113

1214
def kind
1315
"Supervisor(fork)"
1416
end
1517

1618
private
17-
attr_reader :forks
19+
attr_reader :forks, :configured_processes
1820

1921
def supervise
2022
loop do
@@ -33,14 +35,17 @@ def supervise
3335
end
3436

3537
def start_process(configured_process)
36-
configured_process.supervised_by process
37-
configured_process.mode = :fork
38+
process_instance = configured_process.instantiate.tap do |instance|
39+
instance.supervised_by process
40+
instance.mode = :fork
41+
end
3842

3943
pid = fork do
40-
configured_process.start
44+
process_instance.start
4145
end
4246

43-
forks[pid] = configured_process
47+
configured_processes[pid] = configured_process
48+
forks[pid] = process_instance
4449
end
4550

4651
def terminate_gracefully
@@ -87,6 +92,7 @@ def reap_terminated_forks
8792
break unless pid
8893

8994
forks.delete(pid)
95+
configured_processes.delete(pid)
9096
end
9197
rescue SystemCallError
9298
# All children already reaped
@@ -96,7 +102,8 @@ def replace_fork(pid, status)
96102
SolidQueue.instrument(:replace_fork, supervisor_pid: ::Process.pid, pid: pid, status: status) do |payload|
97103
if supervised_fork = forks.delete(pid)
98104
payload[:fork] = supervised_fork
99-
start_process(supervised_fork)
105+
106+
start_process(configured_processes.delete(pid))
100107
end
101108
end
102109
end

test/integration/instrumentation_test.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ class InstrumentationTest < ActiveSupport::TestCase
113113

114114
test "pruning processes emit prune_processes and deregister_process events" do
115115
time = Time.now
116-
processes = 3.times.collect { |i| SolidQueue::Process.create!(kind: "Worker", supervisor_id: 42, pid: 10 + i, hostname: "localhost", last_heartbeat_at: time) }
116+
processes = 3.times.collect { |i| SolidQueue::Process.create!(kind: "Worker", supervisor_id: 42, pid: 10 + i, hostname: "localhost", last_heartbeat_at: time, name: "worker-123#{i}") }
117117

118118
# Heartbeats will expire
119119
travel_to 3.days.from_now

test/models/solid_queue/claimed_execution_test.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
class SolidQueue::ClaimedExecutionTest < ActiveSupport::TestCase
44
setup do
5-
@process = SolidQueue::Process.register(kind: "Worker", pid: 42, metadata: { queue: "background" })
5+
@process = SolidQueue::Process.register(kind: "Worker", pid: 42, name: "worker-123", metadata: { queue: "background" })
66
end
77

88
test "perform job successfully" do

0 commit comments

Comments
 (0)