Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions app/models/solid_queue/job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ module SolidQueue
class Job < Record
class EnqueueError < StandardError; end

# Job Lifecycle 1: When MyJob.perform_later(args) is called, the job is executed.
include Executable, Clearable, Recurrable

serialize :arguments, coder: JSON
Expand Down
3 changes: 3 additions & 0 deletions app/models/solid_queue/job/concurrency_controls.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,11 @@ def concurrency_on_conflict
end

def acquire_concurrency_lock
# Job Lifecycle 5 - If our job had something like: limits_concurrency to: 2, key: ->(contact) { contact.account }, duration: 5.minutes
# then we would use the key to acquire a lock.
return true unless concurrency_limited?

# Job Lifecycle 6 - The method delegates to SolidQueue::Semaphore.wait(self), passing the job instance
Semaphore.wait(self)
end

Expand Down
4 changes: 4 additions & 0 deletions app/models/solid_queue/job/executable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ module Executable
has_one :ready_execution
has_one :claimed_execution

# Job Lifecycle 2: After the job is created, it is prepared for execution.
after_create :prepare_for_execution

scope :finished, -> { where.not(finished_at: nil) }
Expand Down Expand Up @@ -57,6 +58,7 @@ def dispatched_and_blocked(jobs)
define_method("#{status}?") { public_send("#{status}_execution").present? }
end

# Job Lifecycle 3: After the job is created, it is prepared for execution.
def prepare_for_execution
if due? then dispatch
else
Expand All @@ -65,6 +67,7 @@ def prepare_for_execution
end

def dispatch
# Job Lifecycle 4: If we are able to acquire a concurrency lock, we proceed to ready the job for execution.
if acquire_concurrency_lock then ready
else
handle_concurrency_conflict
Expand Down Expand Up @@ -101,6 +104,7 @@ def discard

private
def ready
# Job Lifecycle 11 - Create a ReadyExecution record for the job.
ReadyExecution.create_or_find_by!(job_id: id)
end

Expand Down
4 changes: 4 additions & 0 deletions app/models/solid_queue/semaphore.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ def initialize(job)
end

def wait
# Job Lifecycle 7 - Check if a semaphore exists for the job's concurrency key.
if semaphore = Semaphore.find_by(key: key)
# Job Lifecycle 8 - Check if the semaphore's value is greater than 0 and attempt to decrement it.
semaphore.value > 0 && attempt_decrement
else
attempt_creation
Expand All @@ -67,10 +69,12 @@ def check_limit_or_decrement
end

def attempt_decrement
# Job Lifecycle 9 - Decement the semaphore's value and update its expiration time.
Semaphore.available.where(key: key).update_all([ "value = value - 1, expires_at = ?", expires_at ]) > 0
end

def attempt_increment
# Job Lifecycle 10 - Increment the semaphore's value and update its expiration time.
Semaphore.where(key: key, value: ...limit).update_all([ "value = value + 1, expires_at = ?", expires_at ]) > 0
end

Expand Down