Skip to content

Commit cf4e62e

Browse files
committed
Fixes #224. I believe this is a Postgres only issue
Postgres, unlike mysql and sqlite, invalidates the current transaction on a database contraint violation such as an attempt to insert a duplicate key into a unique index. SolidQueue uses a pretty standard design pattern of application level conflict detection and resolution via error handling as part of it's concurrency management in Semaphore::Proxy#attempt create (via create! and rescue). For Postgres based implementations when: * enqueing a job inside a transaction * under load such that the race condition triggerng the simultanious conflicted insert the transaction would become unrecoverable at the application level. There are two simple paths to address this issue in Postgres: * Nested transactions * Insert using 'on conflict do nothing' Nested transaction are the easiest to implement and are portable across all 3 of the Rails standard databases. Unfortunately, nested transactions have a known and very signifant performance problem, especially for databases under load and/or replicated databases with long running queries. Each of the big three database inplements the ANSI standard Insert on conflict do nothing in a very different way. Because of this, this PR only replaces the current implementation of application level conflict handling for Postgres. It takes advantage of the Rails standard ActiveRecord::Persistence::Insert method supporting on conflict semantics.
1 parent b7d4e1c commit cf4e62e

File tree

3 files changed

+59
-5
lines changed

3 files changed

+59
-5
lines changed

Gemfile.lock

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,7 @@ GEM
171171

172172
PLATFORMS
173173
arm64-darwin-22
174+
arm64-darwin-23
174175
x86_64-darwin-21
175176
x86_64-darwin-23
176177
x86_64-linux

app/models/solid_queue/semaphore.rb

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,16 +41,30 @@ def signal
4141
end
4242

4343
private
44+
4445
attr_accessor :job
4546

46-
def attempt_creation
47+
def attempt_creation_with_insert_on_conflict
48+
results = Semaphore.insert({ key: key, value: limit - 1, expires_at: expires_at }, unique_by: :key)
49+
50+
if results.length.zero?
51+
limit == 1 ? false : attempt_decrement
52+
else
53+
true
54+
end
55+
end
56+
57+
def attempt_creation_with_create_and_exception_handling
4758
Semaphore.create!(key: key, value: limit - 1, expires_at: expires_at)
4859
true
4960
rescue ActiveRecord::RecordNotUnique
50-
if limit == 1 then false
51-
else
52-
attempt_decrement
53-
end
61+
limit == 1 ? false : attempt_decrement
62+
end
63+
64+
if ActiveRecord::Base.connection.adapter_name == 'PostgreSQL'
65+
alias attempt_creation attempt_creation_with_insert_on_conflict
66+
else
67+
alias attempt_creation attempt_creation_with_create_and_exception_handling
5468
end
5569

5670
def attempt_decrement

test/integration/concurrency_controls_test.rb

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,46 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
183183
assert job.reload.ready?
184184
end
185185

186+
if ActiveRecord::Base.connection.adapter_name == "PostgreSQL"
187+
test "insert_with_unique_by_has_same_database_results_as_create!_with_exception_handling" do
188+
key = "key", limit = 1, expires_at = 1.minute.from_now
189+
190+
assert SolidQueue::Semaphore.count == 0
191+
192+
SolidQueue::Semaphore.insert({ key: key, value: limit - 1, expires_at: expires_at }, unique_by: :key)
193+
assert SolidQueue::Semaphore.count == 1
194+
195+
SolidQueue::Semaphore.insert({ key: key, value: limit - 1, expires_at: expires_at }, unique_by: :key)
196+
assert SolidQueue::Semaphore.count == 1
197+
198+
SolidQueue::Semaphore.delete_all
199+
assert SolidQueue::Semaphore.count == 0
200+
201+
SolidQueue::Semaphore.create!(key: key, value: limit - 1, expires_at: expires_at)
202+
assert SolidQueue::Semaphore.count == 1
203+
204+
assert_raises ActiveRecord::RecordNotUnique do
205+
SolidQueue::Semaphore.create!(key: key, value: limit - 1, expires_at: expires_at)
206+
end
207+
end
208+
end
209+
210+
test "confirm correct version of attempt_creation by database adaptor" do
211+
proxy = SolidQueue::Semaphore::Proxy.new(true)
212+
213+
aliased_method = proxy.method(:attempt_creation)
214+
215+
if ActiveRecord::Base.connection.adapter_name == "PostgreSQL"
216+
original_method = proxy.method(:attempt_creation_with_insert_on_conflict)
217+
else
218+
original_method = proxy.method(:attempt_creation_with_create_and_exception_handling)
219+
end
220+
221+
assert_equal original_method.name, aliased_method.original_name, "The alias maps to the correct original method"
222+
end
223+
186224
private
225+
187226
def assert_stored_sequence(result, *sequences)
188227
expected = sequences.map { |sequence| "seq: " + sequence.map { |name| "s#{name}c#{name}" }.join }
189228
skip_active_record_query_cache do

0 commit comments

Comments
 (0)