Skip to content

Commit 960694e

Browse files
committed
Extract Supervised concern from Runnable
And set the mode explicitly, not in #start
1 parent a86a6be commit 960694e

File tree

7 files changed

+112
-102
lines changed

7 files changed

+112
-102
lines changed

lib/solid_queue.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
require "active_job/concurrency_controls"
66

77
require "solid_queue/app_executor"
8+
require "solid_queue/processes/supervised"
89
require "solid_queue/processes/registrable"
910
require "solid_queue/interruptible"
1011
require "solid_queue/pidfile"

lib/solid_queue/processes/registrable.rb

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,28 +10,22 @@ module Registrable
1010

1111
set_callback :shutdown, :before, :stop_heartbeat
1212
set_callback :shutdown, :after, :deregister
13-
14-
attr_reader :supervisor
1513
end
1614

1715
def inspect
1816
"#{kind}(pid=#{process_pid}, hostname=#{hostname}, metadata=#{metadata})"
1917
end
2018
alias to_s inspect
2119

22-
def supervised_by(process)
23-
@supervisor = process
24-
end
25-
2620
private
2721
attr_accessor :process
2822

2923
def register
3024
@process = SolidQueue::Process.register \
3125
kind: self.class.name.demodulize,
3226
pid: process_pid,
33-
supervisor: supervisor,
3427
hostname: hostname,
28+
supervisor: try(:supervisor),
3529
metadata: metadata
3630
end
3731

lib/solid_queue/processes/runnable.rb

Lines changed: 59 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -1,107 +1,84 @@
11
# frozen_string_literal: true
22

3-
module SolidQueue
4-
module Processes
5-
module Runnable
6-
def start(mode: :supervised)
7-
@mode = mode.to_s.inquiry
8-
@stopping = false
3+
module SolidQueue::Processes
4+
module Runnable
5+
include Supervised
96

10-
observe_initial_delay
11-
run_callbacks(:boot) { boot }
7+
def start
8+
@stopping = false
129

13-
start_loop
14-
end
15-
16-
def stop
17-
@stopping = true
18-
@thread&.join
19-
end
20-
21-
private
22-
attr_reader :mode
10+
observe_initial_delay
11+
run_callbacks(:boot) { boot }
2312

24-
def boot
25-
register_signal_handlers
26-
SolidQueue.logger.info("[SolidQueue] Starting #{self}")
27-
end
28-
29-
def register_signal_handlers
30-
%w[ INT TERM ].each do |signal|
31-
trap(signal) do
32-
stop
33-
interrupt
34-
end
35-
end
36-
37-
trap(:QUIT) do
38-
exit!
39-
end
40-
end
13+
start_loop
14+
end
4115

42-
def start_loop
43-
if mode.async?
44-
@thread = Thread.new { do_start_loop }
45-
else
46-
do_start_loop
47-
end
48-
end
16+
def stop
17+
@stopping = true
18+
@thread&.join
19+
end
4920

50-
def do_start_loop
51-
procline "started"
21+
private
22+
attr_writer :mode
5223

53-
loop do
54-
break if shutting_down?
24+
DEFAULT_MODE = :async
5525

56-
run
57-
end
58-
ensure
59-
run_callbacks(:shutdown) { shutdown }
60-
end
26+
def mode
27+
(@mode || DEFAULT_MODE).to_s.inquiry
28+
end
6129

62-
def shutting_down?
63-
stopping? || supervisor_went_away? || finished?
64-
end
30+
def boot
31+
register_signal_handlers if supervised?
32+
SolidQueue.logger.info("[SolidQueue] Starting #{self}")
33+
end
6534

66-
def run
67-
raise NotImplementedError
35+
def start_loop
36+
if mode.async?
37+
@thread = Thread.new { do_start_loop }
38+
else
39+
do_start_loop
6840
end
41+
end
6942

70-
def shutdown
71-
procline "shutting down"
43+
def do_start_loop
44+
loop do
45+
break if shutting_down?
7246

47+
run
7348
end
49+
ensure
50+
run_callbacks(:shutdown) { shutdown }
51+
end
7452

75-
def stopping?
76-
@stopping
77-
end
53+
def shutting_down?
54+
stopping? || supervisor_went_away? || finished?
55+
end
7856

79-
def finished?
80-
running_inline? && all_work_completed?
81-
end
57+
def run
58+
raise NotImplementedError
59+
end
8260

83-
def supervisor_went_away?
84-
supervised? && supervisor&.pid != ::Process.ppid
85-
end
61+
def stopping?
62+
@stopping
63+
end
8664

87-
def supervised?
88-
mode.supervised?
89-
end
65+
def finished?
66+
running_inline? && all_work_completed?
67+
end
9068

91-
def all_work_completed?
92-
false
93-
end
69+
def all_work_completed?
70+
false
71+
end
9472

95-
def running_inline?
96-
mode.inline?
97-
end
73+
def running_inline?
74+
mode.inline?
75+
end
9876

99-
def with_polling_volume
100-
if SolidQueue.silence_polling?
101-
ActiveRecord::Base.logger.silence { yield }
102-
else
103-
yield
104-
end
77+
def with_polling_volume
78+
if SolidQueue.silence_polling?
79+
ActiveRecord::Base.logger.silence { yield }
80+
else
81+
yield
10582
end
10683
end
10784
end
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
# frozen_string_literal: true
2+
3+
module SolidQueue::Processes
4+
module Supervised
5+
extend ActiveSupport::Concern
6+
7+
included do
8+
attr_reader :supervisor
9+
end
10+
11+
def supervised_by(process)
12+
self.mode = :supervised
13+
@supervisor = process
14+
end
15+
16+
private
17+
def supervisor_went_away?
18+
supervised? && supervisor&.pid != ::Process.ppid
19+
end
20+
21+
def supervised?
22+
mode.supervised?
23+
end
24+
25+
def register_signal_handlers
26+
%w[ INT TERM ].each do |signal|
27+
trap(signal) do
28+
stop
29+
interrupt
30+
end
31+
end
32+
33+
trap(:QUIT) do
34+
exit!
35+
end
36+
end
37+
end
38+
end

test/integration/jobs_lifecycle_test.rb

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@ class JobsLifecycleTest < ActiveSupport::TestCase
1818
AddToBufferJob.perform_later "hey"
1919
AddToBufferJob.perform_later "ho"
2020

21-
@dispatcher.start(mode: :async)
22-
@worker.start(mode: :async)
21+
@dispatcher.start
22+
@worker.start
2323

2424
wait_for_jobs_to_finish_for(2.seconds)
2525

@@ -31,8 +31,8 @@ class JobsLifecycleTest < ActiveSupport::TestCase
3131
AddToBufferJob.set(wait: 1.day).perform_later("I'm scheduled")
3232
AddToBufferJob.set(wait: 3.days).perform_later("I'm scheduled later")
3333

34-
@dispatcher.start(mode: :async)
35-
@worker.start(mode: :async)
34+
@dispatcher.start
35+
@worker.start
3636

3737
assert_equal 2, SolidQueue::ScheduledExecution.count
3838

@@ -56,7 +56,7 @@ class JobsLifecycleTest < ActiveSupport::TestCase
5656
test "delete finished jobs after they run" do
5757
deleting_finished_jobs do
5858
AddToBufferJob.perform_later "hey"
59-
@worker.start(mode: :async)
59+
@worker.start
6060

6161
wait_for_jobs_to_finish_for(2.seconds)
6262
end

test/unit/dispatcher_test.rb

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ class DispatcherTest < ActiveSupport::TestCase
1717
old_logger, ActiveRecord::Base.logger = ActiveRecord::Base.logger, ActiveSupport::Logger.new(log)
1818
old_silence_polling, SolidQueue.silence_polling = SolidQueue.silence_polling, false
1919

20-
@dispatcher.start(mode: :async)
20+
@dispatcher.start
2121
sleep 0.5
2222

2323
assert_match /SELECT .* FROM .solid_queue_scheduled_executions. WHERE/, log.string
@@ -31,7 +31,7 @@ class DispatcherTest < ActiveSupport::TestCase
3131
old_logger, ActiveRecord::Base.logger = ActiveRecord::Base.logger, ActiveSupport::Logger.new(log)
3232
old_silence_polling, SolidQueue.silence_polling = SolidQueue.silence_polling, true
3333

34-
@dispatcher.start(mode: :async)
34+
@dispatcher.start
3535
sleep 0.5
3636

3737
assert_no_match /SELECT .* FROM .solid_queue_scheduled_executions. WHERE/, log.string
@@ -47,8 +47,8 @@ class DispatcherTest < ActiveSupport::TestCase
4747
assert_equal 15, SolidQueue::ScheduledExecution.count
4848

4949
another_dispatcher = SolidQueue::Dispatcher.new(polling_interval: 0.1, batch_size: 10)
50-
@dispatcher.start(mode: :async)
51-
another_dispatcher.start(mode: :async)
50+
@dispatcher.start
51+
another_dispatcher.start
5252

5353
sleep 0.5
5454

test/unit/worker_test.rb

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ class WorkerTest < ActiveSupport::TestCase
2323

2424
AddToBufferJob.perform_later "hey!"
2525

26-
@worker.start(mode: :async)
26+
@worker.start
2727

2828
wait_for_jobs_to_finish_for(1.second)
2929
@worker.wake_up
@@ -44,7 +44,7 @@ class WorkerTest < ActiveSupport::TestCase
4444
StoreResultJob.perform_later(:immediate)
4545
end
4646

47-
@worker.start(mode: :async)
47+
@worker.start
4848

4949
wait_for_jobs_to_finish_for(1.second)
5050
@worker.wake_up
@@ -58,7 +58,7 @@ class WorkerTest < ActiveSupport::TestCase
5858
old_logger, ActiveRecord::Base.logger = ActiveRecord::Base.logger, ActiveSupport::Logger.new(log)
5959
old_silence_polling, SolidQueue.silence_polling = SolidQueue.silence_polling, false
6060

61-
@worker.start(mode: :async)
61+
@worker.start
6262
sleep 0.2
6363

6464
assert_match /SELECT .* FROM .solid_queue_ready_executions. WHERE .solid_queue_ready_executions...queue_name./, log.string
@@ -72,7 +72,7 @@ class WorkerTest < ActiveSupport::TestCase
7272
old_logger, ActiveRecord::Base.logger = ActiveRecord::Base.logger, ActiveSupport::Logger.new(log)
7373
old_silence_polling, SolidQueue.silence_polling = SolidQueue.silence_polling, true
7474

75-
@worker.start(mode: :async)
75+
@worker.start
7676
sleep 0.2
7777

7878
assert_no_match /SELECT .* FROM .solid_queue_ready_executions. WHERE .solid_queue_ready_executions...queue_name./, log.string

0 commit comments

Comments
 (0)