Skip to content

Commit ec267bb

Browse files
authored
Merge pull request #58 from basecamp/refactor-processes
Refactor processes: supervisor, dispatcher, worker, and all related concerns
2 parents 9da8e15 + 86412be commit ec267bb

22 files changed

+349
-296
lines changed

lib/solid_queue.rb

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,17 @@
55
require "active_job/concurrency_controls"
66

77
require "solid_queue/app_executor"
8-
require "solid_queue/interruptible"
9-
require "solid_queue/pidfile"
10-
require "solid_queue/procline"
11-
require "solid_queue/signals"
8+
require "solid_queue/processes/supervised"
9+
require "solid_queue/processes/registrable"
10+
require "solid_queue/processes/interruptible"
11+
require "solid_queue/processes/pidfile"
12+
require "solid_queue/processes/procline"
13+
require "solid_queue/processes/poller"
14+
require "solid_queue/processes/base"
15+
require "solid_queue/processes/runnable"
16+
require "solid_queue/processes/signals"
1217
require "solid_queue/configuration"
1318
require "solid_queue/pool"
14-
require "solid_queue/queue_selector"
15-
require "solid_queue/runner"
16-
require "solid_queue/process_registration"
1719
require "solid_queue/worker"
1820
require "solid_queue/dispatcher"
1921
require "solid_queue/supervisor"

lib/solid_queue/app_executor.rb

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,9 @@ def wrap_in_app_executor(&block)
1111
end
1212

1313
def handle_thread_error(error)
14+
SolidQueue.logger.error("[SolidQueue] #{error}")
15+
1416
if SolidQueue.on_thread_error
15-
SolidQueue.logger.error("[SolidQueue] #{error}")
1617
SolidQueue.on_thread_error.call(error)
1718
end
1819
end

lib/solid_queue/configuration.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ def initialize(mode: :work, load_from: nil)
1919
@raw_config = config_from(load_from)
2020
end
2121

22-
def runners
22+
def processes
2323
case mode
2424
when :dispatch then dispatcher
2525
when :work then workers

lib/solid_queue/dispatcher.rb

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
# frozen_string_literal: true
22

33
module SolidQueue
4-
class Dispatcher
5-
include Runner
4+
class Dispatcher < Processes::Base
5+
include Processes::Runnable, Processes::Poller
66

7-
attr_accessor :batch_size, :polling_interval, :concurrency_maintenance_interval
7+
attr_accessor :batch_size, :concurrency_maintenance_interval
88

9-
set_callback :start, :before, :launch_concurrency_maintenance
9+
set_callback :boot, :after, :launch_concurrency_maintenance
1010
set_callback :shutdown, :before, :stop_concurrency_maintenance
1111

1212
def initialize(**options)
@@ -67,7 +67,7 @@ def initial_jitter
6767
end
6868

6969
def metadata
70-
super.merge(batch_size: batch_size, polling_interval: polling_interval)
70+
super.merge(batch_size: batch_size)
7171
end
7272
end
7373
end

lib/solid_queue/interruptible.rb

Lines changed: 0 additions & 35 deletions
This file was deleted.

lib/solid_queue/processes/base.rb

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# frozen_string_literal: true
2+
3+
module SolidQueue
4+
module Processes
5+
class Base
6+
include ActiveSupport::Callbacks
7+
define_callbacks :boot, :shutdown
8+
9+
include AppExecutor, Registrable, Interruptible, Procline
10+
11+
private
12+
def observe_initial_delay
13+
interruptible_sleep(initial_jitter)
14+
end
15+
16+
def boot
17+
end
18+
19+
def shutdown
20+
end
21+
22+
def initial_jitter
23+
0
24+
end
25+
end
26+
end
27+
end
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
# frozen_string_literal: true
2+
3+
module SolidQueue::Processes
4+
module Interruptible
5+
def wake_up
6+
interrupt
7+
end
8+
9+
private
10+
SELF_PIPE_BLOCK_SIZE = 11
11+
12+
def interrupt
13+
self_pipe[:writer].write_nonblock( "." )
14+
rescue Errno::EAGAIN, Errno::EINTR
15+
# Ignore writes that would block and retry
16+
# if another signal arrived while writing
17+
retry
18+
end
19+
20+
def interruptible_sleep(time)
21+
if time > 0 && self_pipe[:reader].wait_readable(time)
22+
loop { self_pipe[:reader].read_nonblock(SELF_PIPE_BLOCK_SIZE) }
23+
end
24+
rescue Errno::EAGAIN, Errno::EINTR
25+
end
26+
27+
# Self-pipe for signal-handling (http://cr.yp.to/docs/selfpipe.html)
28+
def self_pipe
29+
@self_pipe ||= create_self_pipe
30+
end
31+
32+
def create_self_pipe
33+
reader, writer = IO.pipe
34+
{ reader: reader, writer: writer }
35+
end
36+
end
37+
end

lib/solid_queue/pidfile.rb renamed to lib/solid_queue/processes/pidfile.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# frozen_string_literal: true
22

3-
module SolidQueue
3+
module SolidQueue::Processes
44
class Pidfile
55
def initialize(path)
66
@path = path

lib/solid_queue/processes/poller.rb

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
# frozen_string_literal: true
2+
3+
module SolidQueue::Processes
4+
module Poller
5+
extend ActiveSupport::Concern
6+
7+
included do
8+
attr_accessor :polling_interval
9+
end
10+
11+
private
12+
def with_polling_volume
13+
if SolidQueue.silence_polling?
14+
ActiveRecord::Base.logger.silence { yield }
15+
else
16+
yield
17+
end
18+
end
19+
20+
def metadata
21+
super.merge(polling_interval: polling_interval)
22+
end
23+
end
24+
end

0 commit comments

Comments
 (0)