Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions lib/solid_queue/pool.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ def post(execution)

Concurrent::Promises.future_on(executor, execution) do |thread_execution|
wrap_in_app_executor do
# Worker Lifecycle 8 - Perform the job in the thread pool
thread_execution.perform
ensure
available_threads.increment
Expand Down
2 changes: 2 additions & 0 deletions lib/solid_queue/processes/poller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ def metadata

private
def run
# Worker Lifecycle 5 - Start loop
start_loop
end

Expand All @@ -26,6 +27,7 @@ def start_loop
break if shutting_down?

delay = wrap_in_app_executor do
# Worker Lifecycle 6 - Start polling
poll
end

Expand Down
2 changes: 2 additions & 0 deletions lib/solid_queue/processes/runnable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ module Runnable
attr_writer :mode

def start
# Worker Lifecycle 3 - We start the worker lifecycle by booting
boot

if running_async?
# Worker Lifecycle 4 - Start running
@thread = create_thread { run }
else
run
Expand Down
1 change: 1 addition & 0 deletions lib/solid_queue/supervisor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ def start_process(configured_process)
end

pid = fork do
# Worker Lifecycle 1 - For each process the supervisor will start the worker lifecycle
process_instance.start
end

Expand Down
2 changes: 2 additions & 0 deletions lib/solid_queue/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ def initialize(**options)
# Ensure that the queues array is deep frozen to prevent accidental modification
@queues = Array(options[:queues]).map(&:freeze).freeze

# Worker Lifecycle 2 - Setup up pool based on the number of threads set.
@pool = Pool.new(options[:threads], on_idle: -> { wake_up })

super(**options)
Expand All @@ -27,6 +28,7 @@ def metadata

private
def poll
# Worker Lifecycle 7 - Claim executions from the queues and post them to the thread pool
claim_executions.then do |executions|
executions.each do |execution|
pool.post(execution)
Expand Down