Skip to content

Commit 5abcde8

Browse files
committed
Start discard from execution, not job
To avoid a race condition when we check whether we have a ready execution and the job gets claimed right before, and after we've already checked if it was claimed. Rely always on executions to delete a job that's not finished. When it's finished, it can be cleared in bulk, without having to worry about the callbacks.
1 parent 8398293 commit 5abcde8

File tree

7 files changed

+46
-21
lines changed

7 files changed

+46
-21
lines changed

app/models/solid_queue/claimed_execution.rb

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,10 @@ def release
4444
end
4545
end
4646

47+
def discard
48+
raise UndiscardableError, "Can't discard a job in progress"
49+
end
50+
4751
private
4852
def execute
4953
ActiveJob::Base.execute(job.arguments)

app/models/solid_queue/execution.rb

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
module SolidQueue
44
class Execution < Record
5+
class UndiscardableError < StandardError; end
6+
57
include JobAttributes
68

79
self.abstract_class = true
@@ -10,8 +12,6 @@ class Execution < Record
1012

1113
belongs_to :job
1214

13-
alias_method :discard, :destroy
14-
1515
class << self
1616
def create_all_from_jobs(jobs)
1717
insert_all execution_data_from_jobs(jobs)
@@ -23,5 +23,12 @@ def execution_data_from_jobs(jobs)
2323
end
2424
end
2525
end
26+
27+
def discard
28+
with_lock do
29+
job.destroy
30+
destroy
31+
end
32+
end
2633
end
2734
end

app/models/solid_queue/job/concurrency_controls.rb

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,11 @@ module ConcurrencyControls
66
extend ActiveSupport::Concern
77

88
included do
9-
has_one :blocked_execution, dependent: :destroy
9+
has_one :blocked_execution
1010

1111
delegate :concurrency_limit, :concurrency_duration, to: :job_class
12+
13+
before_destroy :unblock_next_blocked_job, if: :ready?
1214
end
1315

1416
def unblock_next_blocked_job
@@ -49,6 +51,10 @@ def release_next_blocked_job
4951
def job_class
5052
@job_class ||= class_name.safe_constantize
5153
end
54+
55+
def execution
56+
super || blocked_execution
57+
end
5258
end
5359
end
5460
end

app/models/solid_queue/job/executable.rb

Lines changed: 9 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,9 @@ module Executable
88
included do
99
include Clearable, ConcurrencyControls, Schedulable
1010

11-
has_one :ready_execution, dependent: :destroy
12-
has_one :claimed_execution, dependent: :destroy
13-
has_one :failed_execution, dependent: :destroy
11+
has_one :ready_execution
12+
has_one :claimed_execution
13+
has_one :failed_execution
1414

1515
after_create :prepare_for_execution
1616

@@ -89,16 +89,14 @@ def finished?
8989
finished_at.present?
9090
end
9191

92-
def discard
93-
unless claimed?
94-
try_to_discard_while_ready || destroy
95-
end
96-
end
97-
9892
def retry
9993
failed_execution&.retry
10094
end
10195

96+
def discard
97+
execution.discard
98+
end
99+
102100
def failed_with(exception)
103101
FailedExecution.create_or_find_by!(job_id: id, exception: exception)
104102
end
@@ -108,15 +106,10 @@ def ready
108106
ReadyExecution.create_or_find_by!(job_id: id)
109107
end
110108

111-
def try_to_discard_while_ready
112-
# Prevent the job from being polled while being discarded
113-
ready_execution&.with_lock do
114-
unblock_next_blocked_job
115-
destroy
116-
end
109+
def execution
110+
%w[ ready claimed failed ].reduce(nil) { |acc, status| acc || public_send("#{status}_execution") }
117111
end
118112

119-
120113
def preserve_finished_jobs?
121114
SolidQueue.preserve_finished_jobs
122115
end

app/models/solid_queue/job/schedulable.rb

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ module Schedulable
66
extend ActiveSupport::Concern
77

88
included do
9-
has_one :scheduled_execution, dependent: :destroy
9+
has_one :scheduled_execution
1010

1111
scope :scheduled, -> { where.not(finished_at: nil) }
1212
end
@@ -35,6 +35,10 @@ def due?
3535
def schedule
3636
ScheduledExecution.create_or_find_by!(job_id: id)
3737
end
38+
39+
def execution
40+
super || scheduled_execution
41+
end
3842
end
3943
end
4044
end

app/models/solid_queue/ready_execution.rb

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,5 +37,12 @@ def lock_candidates(job_ids, process_id)
3737
end
3838
end
3939
end
40+
41+
def discard
42+
with_lock do
43+
job.destroy
44+
destroy
45+
end
46+
end
4047
end
4148
end

test/models/solid_queue/job_test.rb

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,9 @@ class NonOverlappingGroupedJob2 < NonOverlappingJob
170170
sleep(0.2)
171171

172172
assert_no_difference -> { SolidQueue::Job.count }, -> { SolidQueue::ClaimedExecution.count } do
173-
job.discard
173+
assert_raises SolidQueue::Execution::UndiscardableError do
174+
job.discard
175+
end
174176
end
175177

176178
worker.stop
@@ -225,6 +227,7 @@ class NonOverlappingGroupedJob2 < NonOverlappingJob
225227
private
226228
def assert_ready(&block)
227229
assert_job_counts(ready: 1, &block)
230+
assert SolidQueue::Job.last.ready?
228231
end
229232

230233
def assert_scheduled(&block)
@@ -233,6 +236,7 @@ def assert_scheduled(&block)
233236

234237
def assert_blocked(&block)
235238
assert_job_counts(blocked: 1, &block)
239+
assert SolidQueue::Job.last.blocked?
236240
end
237241

238242
def assert_job_counts(ready: 0, scheduled: 0, blocked: 0, &block)

0 commit comments

Comments
 (0)