Skip to content

Commit 5343bfd

Browse files
committed
Idempotent updates with pessismistic locks
1 parent e059d22 commit 5343bfd

File tree

4 files changed

+31
-22
lines changed

4 files changed

+31
-22
lines changed

app/models/solid_queue/batch_record.rb

Lines changed: 28 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -41,21 +41,26 @@ def on_finish=(value)
4141

4242
def job_finished!(job)
4343
return if finished?
44-
45-
transaction do
46-
if job.failed_execution.present?
47-
self.class.where(id: id).update_all(
48-
"failed_jobs = failed_jobs + 1, pending_jobs = pending_jobs - 1"
49-
)
50-
else
51-
self.class.where(id: id).update_all(
52-
"completed_jobs = completed_jobs + 1, pending_jobs = pending_jobs - 1"
53-
)
44+
return if job.batch_processed_at?
45+
46+
job.with_lock do
47+
if job.batch_processed_at.blank?
48+
job.update!(batch_processed_at: Time.current)
49+
50+
if job.failed_execution.present?
51+
self.class.where(id: id).update_all(
52+
"failed_jobs = failed_jobs + 1, pending_jobs = pending_jobs - 1"
53+
)
54+
else
55+
self.class.where(id: id).update_all(
56+
"completed_jobs = completed_jobs + 1, pending_jobs = pending_jobs - 1"
57+
)
58+
end
5459
end
55-
56-
reload
57-
check_completion!
5860
end
61+
62+
reload
63+
check_completion!
5964
end
6065

6166
def check_completion!
@@ -68,15 +73,17 @@ def check_completion!
6873
return unless child.finished?
6974
end
7075

71-
if pending_jobs <= 0
72-
if failed_jobs > 0
73-
mark_as_failed!
74-
else
75-
mark_as_completed!
76+
with_lock do
77+
if pending_jobs <= 0
78+
if failed_jobs > 0
79+
mark_as_failed!
80+
else
81+
mark_as_completed!
82+
end
83+
clear_unpreserved_jobs
84+
elsif status == "pending"
85+
update!(status: "processing")
7686
end
77-
clear_unpreserved_jobs
78-
elsif status == "pending"
79-
update!(status: "processing")
8087
end
8188
end
8289

lib/generators/solid_queue/install/templates/db/queue_schema.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
t.datetime "created_at", null: false
5959
t.datetime "updated_at", null: false
6060
t.string "batch_id"
61+
t.datetime "batch_processed_at"
6162
t.index [ "active_job_id" ], name: "index_solid_queue_jobs_on_active_job_id"
6263
t.index [ "batch_id" ], name: "index_solid_queue_jobs_on_batch_id"
6364
t.index [ "class_name" ], name: "index_solid_queue_jobs_on_class_name"

lib/solid_queue/batch.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ class Batch
88
include GlobalID::Identification
99

1010
delegate :completed_jobs, :failed_jobs, :pending_jobs, :total_jobs, :progress_percentage,
11-
:finished?, :processing?, :pending?, :batch_id,
11+
:finished?, :processing?, :pending?, :status, :batch_id,
1212
:metadata, :metadata=,
1313
:on_success, :on_success=,
1414
:on_failure, :on_failure=,

test/dummy/db/queue_schema.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@
7171
t.datetime "created_at", null: false
7272
t.datetime "updated_at", null: false
7373
t.string "batch_id"
74+
t.datetime "batch_processed_at"
7475
t.index ["active_job_id"], name: "index_solid_queue_jobs_on_active_job_id"
7576
t.index ["batch_id"], name: "index_solid_queue_jobs_on_batch_id"
7677
t.index ["class_name"], name: "index_solid_queue_jobs_on_class_name"

0 commit comments

Comments
 (0)