From c8fd491f5aea9dbee1a1b9b15992f343bb76a879 Mon Sep 17 00:00:00 2001 From: Albert Jankowski Date: Sat, 19 Jul 2025 16:24:30 -0500 Subject: [PATCH] Job Lifecycle comments added --- app/models/solid_queue/job.rb | 1 + app/models/solid_queue/job/concurrency_controls.rb | 3 +++ app/models/solid_queue/job/executable.rb | 4 ++++ app/models/solid_queue/semaphore.rb | 4 ++++ 4 files changed, 12 insertions(+) diff --git a/app/models/solid_queue/job.rb b/app/models/solid_queue/job.rb index 234e5eb4..fbfb2f58 100644 --- a/app/models/solid_queue/job.rb +++ b/app/models/solid_queue/job.rb @@ -4,6 +4,7 @@ module SolidQueue class Job < Record class EnqueueError < StandardError; end + # Job Lifecycle 1: When MyJob.perform_later(args) is called, the job is executed. include Executable, Clearable, Recurrable serialize :arguments, coder: JSON diff --git a/app/models/solid_queue/job/concurrency_controls.rb b/app/models/solid_queue/job/concurrency_controls.rb index b7410b08..264e1cbe 100644 --- a/app/models/solid_queue/job/concurrency_controls.rb +++ b/app/models/solid_queue/job/concurrency_controls.rb @@ -39,8 +39,11 @@ def concurrency_on_conflict end def acquire_concurrency_lock + # Job Lifecycle 5 - If our job had something like: limits_concurrency to: 2, key: ->(contact) { contact.account }, duration: 5.minutes + # then we would use the key to acquire a lock. return true unless concurrency_limited? + # Job Lifecycle 6 - The method delegates to SolidQueue::Semaphore.wait(self), passing the job instance Semaphore.wait(self) end diff --git a/app/models/solid_queue/job/executable.rb b/app/models/solid_queue/job/executable.rb index b0a4cb93..dc670a30 100644 --- a/app/models/solid_queue/job/executable.rb +++ b/app/models/solid_queue/job/executable.rb @@ -11,6 +11,7 @@ module Executable has_one :ready_execution has_one :claimed_execution + # Job Lifecycle 2: After the job is created, it is prepared for execution. after_create :prepare_for_execution scope :finished, -> { where.not(finished_at: nil) } @@ -57,6 +58,7 @@ def dispatched_and_blocked(jobs) define_method("#{status}?") { public_send("#{status}_execution").present? } end + # Job Lifecycle 3: After the job is created, it is prepared for execution. def prepare_for_execution if due? then dispatch else @@ -65,6 +67,7 @@ def prepare_for_execution end def dispatch + # Job Lifecycle 4: If we are able to acquire a concurrency lock, we proceed to ready the job for execution. if acquire_concurrency_lock then ready else handle_concurrency_conflict @@ -101,6 +104,7 @@ def discard private def ready + # Job Lifecycle 11 - Create a ReadyExecution record for the job. ReadyExecution.create_or_find_by!(job_id: id) end diff --git a/app/models/solid_queue/semaphore.rb b/app/models/solid_queue/semaphore.rb index 62eeb035..eb090657 100644 --- a/app/models/solid_queue/semaphore.rb +++ b/app/models/solid_queue/semaphore.rb @@ -40,7 +40,9 @@ def initialize(job) end def wait + # Job Lifecycle 7 - Check if a semaphore exists for the job's concurrency key. if semaphore = Semaphore.find_by(key: key) + # Job Lifecycle 8 - Check if the semaphore's value is greater than 0 and attempt to decrement it. semaphore.value > 0 && attempt_decrement else attempt_creation @@ -67,10 +69,12 @@ def check_limit_or_decrement end def attempt_decrement + # Job Lifecycle 9 - Decement the semaphore's value and update its expiration time. Semaphore.available.where(key: key).update_all([ "value = value - 1, expires_at = ?", expires_at ]) > 0 end def attempt_increment + # Job Lifecycle 10 - Increment the semaphore's value and update its expiration time. Semaphore.where(key: key, value: ...limit).update_all([ "value = value + 1, expires_at = ?", expires_at ]) > 0 end