Skip to content

Commit f7d62ec

Browse files
committed
Implement retry_all by selecting, locking, dispatching and deleting
This is very similar to the work we have to do when dispatching scheduled executions, so this extracts that common functionality into its own concern.
1 parent 3c1c5d7 commit f7d62ec

File tree

6 files changed

+93
-35
lines changed

6 files changed

+93
-35
lines changed
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
# frozen_string_literal: true
2+
3+
module SolidQueue
4+
class Execution
5+
module Dispatching
6+
extend ActiveSupport::Concern
7+
8+
class_methods do
9+
def dispatch_batch(job_ids)
10+
jobs = Job.where(id: job_ids)
11+
12+
Job.dispatch_all(jobs).map(&:id).tap do |dispatched_job_ids|
13+
where(job_id: dispatched_job_ids).delete_all
14+
SolidQueue.logger.info("[SolidQueue] Dispatched scheduled batch with #{dispatched_job_ids.size} jobs")
15+
end
16+
end
17+
end
18+
end
19+
end
20+
end
Lines changed: 29 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,40 @@
11
# frozen_string_literal: true
22

3-
class SolidQueue::FailedExecution < SolidQueue::Execution
4-
serialize :error, coder: JSON
3+
module SolidQueue
4+
class FailedExecution < Execution
5+
include Dispatching
56

6-
before_create :expand_error_details_from_exception
7+
serialize :error, coder: JSON
78

8-
attr_accessor :exception
9+
before_create :expand_error_details_from_exception
910

10-
def retry
11-
transaction do
12-
job.prepare_for_execution
13-
destroy!
11+
attr_accessor :exception
12+
13+
class << self
14+
def retry_all(jobs)
15+
transaction do
16+
retriable_job_ids = where(job_id: jobs.map(&:id)).order(:job_id).lock.pluck(:job_id)
17+
dispatch_batch(retriable_job_ids)
18+
end
19+
end
20+
end
21+
22+
def retry
23+
with_lock do
24+
job.prepare_for_execution
25+
destroy!
26+
end
1427
end
15-
end
1628

17-
%i[ exception_class message backtrace ].each do |attribute|
18-
define_method(attribute) { error.with_indifferent_access[attribute] }
19-
end
29+
%i[ exception_class message backtrace ].each do |attribute|
30+
define_method(attribute) { error.with_indifferent_access[attribute] }
31+
end
2032

21-
private
22-
def expand_error_details_from_exception
23-
if exception
24-
self.error = { exception_class: exception.class.name, message: exception.message, backtrace: exception.backtrace }
33+
private
34+
def expand_error_details_from_exception
35+
if exception
36+
self.error = { exception_class: exception.class.name, message: exception.message, backtrace: exception.backtrace }
37+
end
2538
end
2639
end
2740
end

app/models/solid_queue/ready_execution.rb

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -41,12 +41,5 @@ def lock_candidates(job_ids, process_id)
4141
end
4242
end
4343
end
44-
45-
def discard
46-
with_lock do
47-
job.destroy
48-
destroy
49-
end
50-
end
5144
end
5245
end

app/models/solid_queue/scheduled_execution.rb

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
module SolidQueue
44
class ScheduledExecution < Execution
5+
include Dispatching
6+
57
scope :due, -> { where(scheduled_at: ..Time.current) }
68
scope :ordered, -> { order(scheduled_at: :asc, priority: :asc) }
79
scope :next_batch, ->(batch_size) { due.ordered.limit(batch_size) }
@@ -18,16 +20,6 @@ def dispatch_next_batch(batch_size)
1820
end
1921
end
2022
end
21-
22-
private
23-
def dispatch_batch(job_ids)
24-
jobs = Job.where(id: job_ids)
25-
26-
Job.dispatch_all(jobs).map(&:id).tap do |dispatched_job_ids|
27-
where(job_id: dispatched_job_ids).delete_all
28-
SolidQueue.logger.info("[SolidQueue] Dispatched scheduled batch with #{dispatched_job_ids.size} jobs")
29-
end
30-
end
3123
end
3224
end
3325
end

test/integration/jobs_lifecycle_test.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@
33

44
class JobsLifecycleTest < ActiveSupport::TestCase
55
setup do
6-
@worker = SolidQueue::Worker.new(queues: "background", threads: 3, polling_interval: 0.5)
7-
@dispatcher = SolidQueue::Dispatcher.new(batch_size: 10, polling_interval: 1)
6+
@worker = SolidQueue::Worker.new(queues: "background", threads: 3)
7+
@dispatcher = SolidQueue::Dispatcher.new(batch_size: 10, polling_interval: 0.2)
88
end
99

1010
teardown do
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
require "test_helper"
2+
3+
class SolidQueue::FailedExecutionTest < ActiveSupport::TestCase
4+
setup do
5+
@worker = SolidQueue::Worker.new(queues: "background")
6+
@worker.mode = :inline
7+
end
8+
9+
test "run job that fails" do
10+
RaisingJob.perform_later(RuntimeError, "A")
11+
@worker.start
12+
13+
assert_equal 1, SolidQueue::FailedExecution.count
14+
assert SolidQueue::Job.last.failed?
15+
end
16+
17+
test "retry failed job" do
18+
RaisingJob.perform_later(RuntimeError, "A")
19+
@worker.start
20+
21+
assert_difference -> { SolidQueue::FailedExecution.count }, -1 do
22+
assert_difference -> { SolidQueue::ReadyExecution.count }, +1 do
23+
SolidQueue::FailedExecution.last.retry
24+
end
25+
end
26+
end
27+
28+
test "retry failed jobs in bulk" do
29+
1.upto(5) { |i| RaisingJob.perform_later(RuntimeError, i) }
30+
1.upto(3) { |i| AddToBufferJob.perform_later(i) }
31+
32+
@worker.start
33+
34+
assert_difference -> { SolidQueue::FailedExecution.count }, -5 do
35+
assert_difference -> { SolidQueue::ReadyExecution.count }, +5 do
36+
SolidQueue::FailedExecution.retry_all(SolidQueue::Job.all)
37+
end
38+
end
39+
end
40+
end

0 commit comments

Comments
 (0)