Skip to content

Commit 7f12354

Browse files
committed
Stop runnable processes when registered process record disappears
If, for some reason, the process failed a heartbeat and the supervisor pruned it, we shouldn't continue running. Just stop as if we had received a signal. This could be used in the future from Mission Control to stop a worker.
1 parent 93ae90a commit 7f12354

File tree

3 files changed

+32
-4
lines changed

3 files changed

+32
-4
lines changed

lib/solid_queue/processes/registrable.rb

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,11 @@ def register
2929
end
3030

3131
def deregister
32-
process.deregister if registered?
32+
process&.deregister
3333
end
3434

3535
def registered?
36-
process&.persisted?
36+
process.present?
3737
end
3838

3939
def launch_heartbeat
@@ -53,7 +53,10 @@ def stop_heartbeat
5353
end
5454

5555
def heartbeat
56-
process.heartbeat
56+
process.reload.heartbeat
57+
rescue ActiveRecord::RecordNotFound
58+
self.process = nil
59+
wake_up
5760
end
5861
end
5962
end

lib/solid_queue/processes/runnable.rb

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,12 @@ def boot
4141
end
4242
end
4343

44+
def run
45+
raise NotImplementedError
46+
end
47+
4448
def shutting_down?
45-
stopped? || (running_as_fork? && supervisor_went_away?) || finished?
49+
stopped? || (running_as_fork? && supervisor_went_away?) || finished? || !registered?
4650
end
4751

4852
def run

test/unit/worker_test.rb

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,27 @@ class WorkerTest < ActiveSupport::TestCase
133133
assert_equal 5, JobResult.where(queue_name: :background, status: "completed", value: :immediate).count
134134
end
135135

136+
test "terminate on heartbeat when unregistered" do
137+
old_heartbeat_interval, SolidQueue.process_heartbeat_interval = SolidQueue.process_heartbeat_interval, 1.second
138+
139+
@worker.start
140+
wait_for_registered_processes(1, timeout: 1.second)
141+
142+
assert_not @worker.pool.shutdown?
143+
144+
process = SolidQueue::Process.first
145+
assert_equal "Worker", process.kind
146+
147+
process.deregister
148+
149+
# And now just wait until the worker tries to heartbeat and realises
150+
# it needs to stop
151+
wait_while_with_timeout(2) { !@worker.pool.shutdown? }
152+
assert @worker.pool.shutdown?
153+
ensure
154+
SolidQueue.process_heartbeat_interval = old_heartbeat_interval
155+
end
156+
136157
private
137158
def with_polling(silence:)
138159
old_silence_polling, SolidQueue.silence_polling = SolidQueue.silence_polling, silence

0 commit comments

Comments
 (0)