Skip to content

Commit 676383f

Browse files
committed
Implement basic recurring task parsing and loading in a schedule
Using concurrent-ruby's scheduled tasks. Each task schedules the next one, like GoodJob does. Add a simple test and allow dispatcher to be initialized without having to pass instantiated recurring tasks.
1 parent f5e63a1 commit 676383f

File tree

8 files changed

+149
-33
lines changed

8 files changed

+149
-33
lines changed

lib/solid_queue/configuration.rb

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ def workers
3535
if mode.in? %i[ work all]
3636
workers_options.flat_map do |worker_options|
3737
processes = worker_options.fetch(:processes, WORKER_DEFAULTS[:processes])
38-
processes.times.collect { SolidQueue::Worker.new(**worker_options.with_defaults(WORKER_DEFAULTS)) }
38+
processes.times.map { Worker.new(**worker_options.with_defaults(WORKER_DEFAULTS)) }
3939
end
4040
else
4141
[]
@@ -44,8 +44,10 @@ def workers
4444

4545
def dispatchers
4646
if mode.in? %i[ dispatch all]
47-
dispatchers_options.flat_map do |dispatcher_options|
48-
SolidQueue::Dispatcher.new(**dispatcher_options.with_defaults(DISPATCHER_DEFAULTS))
47+
dispatchers_options.map do |dispatcher_options|
48+
recurring_tasks = parse_recurring_tasks dispatcher_options[:recurring_tasks]
49+
50+
Dispatcher.new **dispatcher_options.merge(recurring_tasks: recurring_tasks).with_defaults(DISPATCHER_DEFAULTS)
4951
end
5052
end
5153
end
@@ -75,6 +77,11 @@ def dispatchers_options
7577
.map { |options| options.dup.symbolize_keys }
7678
end
7779

80+
def parse_recurring_tasks(tasks)
81+
Array(tasks).map do |id, options|
82+
Dispatcher::RecurringTask.from_configuration(id, **options)
83+
end.select(&:valid?)
84+
end
7885

7986
def load_config_from(file_or_hash)
8087
case file_or_hash

lib/solid_queue/dispatcher.rb

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,10 @@ module SolidQueue
44
class Dispatcher < Processes::Base
55
include Processes::Runnable, Processes::Poller
66

7-
attr_accessor :batch_size, :concurrency_maintenance, :recurring_tasks
7+
attr_accessor :batch_size, :concurrency_maintenance, :recurring_schedule
88

9-
after_boot :start_concurrency_maintenance, :schedule_recurring_tasks
10-
before_shutdown :stop_concurrency_maintenance, :unschedule_recurring_tasks
9+
after_boot :start_concurrency_maintenance, :load_recurring_schedule
10+
before_shutdown :stop_concurrency_maintenance, :unload_recurring_schedule
1111

1212
def initialize(**options)
1313
options = options.dup.with_defaults(SolidQueue::Configuration::DISPATCHER_DEFAULTS)
@@ -16,7 +16,7 @@ def initialize(**options)
1616
@polling_interval = options[:polling_interval]
1717

1818
@concurrency_maintenance = ConcurrencyMaintenance.new(options[:concurrency_maintenance_interval], options[:batch_size]) if options[:concurrency_maintenance]
19-
@recurring_tasks = RecurringTasks.new(options[:recurring_tasks])
19+
@recurring_schedule = RecurringSchedule.new(options[:recurring_tasks])
2020
end
2121

2222
private
@@ -31,28 +31,28 @@ def run
3131

3232
def dispatch_next_batch
3333
with_polling_volume do
34-
SolidQueue::ScheduledExecution.dispatch_next_batch(batch_size)
34+
ScheduledExecution.dispatch_next_batch(batch_size)
3535
end
3636
end
3737

3838
def start_concurrency_maintenance
3939
concurrency_maintenance&.start
4040
end
4141

42-
def schedule_recurring_tasks
43-
recurring_tasks.schedule
42+
def load_recurring_schedule
43+
recurring_schedule.load_tasks
4444
end
4545

4646
def stop_concurrency_maintenance
4747
concurrency_maintenance&.stop
4848
end
4949

50-
def unschedule_recurring_tasks
51-
recurring_tasks.unschedule
50+
def unload_recurring_schedule
51+
recurring_schedule.unload_tasks
5252
end
5353

5454
def metadata
55-
super.merge(batch_size: batch_size, concurrency_maintenance_interval: concurrency_maintenance&.interval)
55+
super.merge(batch_size: batch_size, concurrency_maintenance_interval: concurrency_maintenance&.interval, recurring_schedule: recurring_schedule.tasks.presence)
5656
end
5757
end
5858
end
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
# frozen_string_literal: true
2+
3+
module SolidQueue
4+
class Dispatcher::RecurringSchedule
5+
include AppExecutor
6+
7+
attr_reader :configured_tasks, :scheduled_tasks
8+
9+
def initialize(tasks)
10+
@configured_tasks = Array(tasks).map { |task| Dispatcher::RecurringTask.wrap(task) }
11+
@scheduled_tasks = Concurrent::Hash.new
12+
end
13+
14+
def load_tasks
15+
configured_tasks.each do |task|
16+
load_task(task)
17+
end
18+
end
19+
20+
def load_task(task)
21+
scheduled_tasks[task.id] = schedule(task)
22+
end
23+
24+
def unload_tasks
25+
scheduled_tasks.values.each(&:cancel)
26+
scheduled_tasks.clear
27+
end
28+
29+
def tasks
30+
configured_tasks.map(&:to_s)
31+
end
32+
33+
def inspect
34+
tasks.map(&:to_s).join(" | ")
35+
end
36+
37+
private
38+
def schedule(task)
39+
scheduled_task = Concurrent::ScheduledTask.new(task.delay_from_now, args: [ self, task ]) do |thread_schedule, thread_task|
40+
thread_schedule.load_task(task)
41+
42+
wrap_in_app_executor do
43+
thread_task.enqueue
44+
end
45+
end
46+
47+
scheduled_task.add_observer do |_, _, error|
48+
# Don't notify on task cancellation before execution, as this will happen normally
49+
# as part of unloading tasks
50+
handle_thread_error(error) if error && !error.is_a?(Concurrent::CancelledOperationError)
51+
end
52+
53+
scheduled_task.tap(&:execute)
54+
end
55+
end
56+
end
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
require "fugit"
2+
3+
module SolidQueue
4+
class Dispatcher::RecurringTask
5+
class << self
6+
def wrap(args)
7+
args.first.is_a?(self) ? args.first : from_configuration(args.first, **args.second)
8+
end
9+
10+
def from_configuration(id, **options)
11+
new(id, class_name: options[:class], schedule: options[:schedule], arguments: options[:args])
12+
end
13+
end
14+
15+
attr_reader :id, :schedule, :class_name, :arguments
16+
17+
def initialize(id, class_name:, schedule:, arguments: nil)
18+
@id = id
19+
@class_name = class_name
20+
@schedule = Fugit.parse(schedule)
21+
@arguments = Array(arguments)
22+
end
23+
24+
def delay_from_now
25+
[ (next_time - Time.current).to_f, 0 ].max
26+
end
27+
28+
def next_time
29+
schedule.next_time.utc
30+
end
31+
32+
def enqueue
33+
SolidQueue.logger.info("[SolidQueue] Dispatching recurring task #{self}")
34+
job_class.perform_later(*arguments)
35+
end
36+
37+
def valid?
38+
schedule.instance_of?(Fugit::Cron)
39+
end
40+
41+
def to_s
42+
"#{class_name}.perform_later(#{arguments.map(&:inspect).join(",")}) with schedule #{schedule.original}"
43+
end
44+
45+
private
46+
def job_class
47+
@job_class ||= class_name.safe_constantize
48+
end
49+
end
50+
end

lib/solid_queue/dispatcher/recurring_tasks.rb

Lines changed: 0 additions & 19 deletions
This file was deleted.

test/dummy/config/environments/test.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@
4949
# config.action_view.annotate_rendered_view_with_filenames = true
5050

5151
logger = ActiveSupport::Logger.new(STDOUT)
52-
config.solid_queue.on_thread_error = ->(exception) { logger.error("#{exception.class.name}: #{exception.message}\n#{exception.backtrace.join("\n")}") }
52+
config.solid_queue.on_thread_error = ->(exception) { logger.error("#{exception.class.name}: #{exception.message}\n#{(exception.backtrace || caller)&.join("\n")}") }
5353
config.solid_queue.logger = ActiveSupport::Logger.new(nil)
5454

5555
config.solid_queue.shutdown_timeout = 2.seconds

test/dummy/config/solid_queue.yml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,11 @@ default: &default
77
dispatchers:
88
- polling_interval: 1
99
batch_size: 500
10+
recurring_tasks:
11+
periodic-add-to-buffer:
12+
class: AddToBufferJob
13+
args: 42
14+
schedule: every second
1015

1116
development:
1217
<<: *default

test/unit/dispatcher_test.rb

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,23 @@ class DispatcherTest < ActiveSupport::TestCase
3232
process = SolidQueue::Process.first
3333
assert_equal "Dispatcher", process.kind
3434
assert_equal({ "polling_interval" => 0.1, "batch_size" => 10 }, process.metadata)
35+
36+
no_concurrency_maintenance_dispatcher.stop
37+
end
38+
39+
test "recurring schedule" do
40+
recurring_task = { example_task: { class: "AddToBufferJob", schedule: "every hour", args: 42 } }
41+
with_recurring_schedule = SolidQueue::Dispatcher.new(concurrency_maintenance: false, recurring_tasks: recurring_task)
42+
43+
with_recurring_schedule.start
44+
45+
wait_for_registered_processes(1, timeout: 1.second)
46+
47+
process = SolidQueue::Process.first
48+
assert_equal "Dispatcher", process.kind
49+
assert_equal [ "AddToBufferJob.perform_later(42) with schedule 0 * * * *" ], process.metadata["recurring_schedule"]
50+
51+
with_recurring_schedule.stop
3552
end
3653

3754
test "polling queries are logged" do

0 commit comments

Comments
 (0)