Skip to content

Commit cf656a9

Browse files
authored
Merge pull request #338 from rails/improve-recurring-tasks-config
Revamp recurring tasks configuration and management
2 parents 46745d1 + 2e53a64 commit cf656a9

35 files changed

+498
-220
lines changed

README.md

Lines changed: 53 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@
22

33
Solid Queue is a DB-based queuing backend for [Active Job](https://edgeguides.rubyonrails.org/active_job_basics.html), designed with simplicity and performance in mind.
44

5-
Besides regular job enqueuing and processing, Solid Queue supports delayed jobs, concurrency controls, pausing queues, numeric priorities per job, priorities by queue order, and bulk enqueuing (`enqueue_all` for Active Job's `perform_all_later`).
5+
Besides regular job enqueuing and processing, Solid Queue supports delayed jobs, concurrency controls, recurring jobs, pausing queues, numeric priorities per job, priorities by queue order, and bulk enqueuing (`enqueue_all` for Active Job's `perform_all_later`).
66

7-
Solid Queue can be used with SQL databases such as MySQL, PostgreSQL or SQLite, and it leverages the `FOR UPDATE SKIP LOCKED` clause, if available, to avoid blocking and waiting on locks when polling jobs. It relies on Active Job for retries, discarding, error handling, serialization, or delays, and it's compatible with Ruby on Rails multi-threading.
7+
Solid Queue can be used with SQL databases such as MySQL, PostgreSQL or SQLite, and it leverages the `FOR UPDATE SKIP LOCKED` clause, if available, to avoid blocking and waiting on locks when polling jobs. It relies on Active Job for retries, discarding, error handling, serialization, or delays, and it's compatible with Ruby on Rails's multi-threading.
88

99
## Installation
1010

@@ -13,9 +13,9 @@ Solid Queue is configured by default in new Rails 8 applications. But if you're
1313
1. `bundle add solid_queue`
1414
2. `bin/rails solid_queue:install`
1515

16-
This will configure Solid Queue as the production Active Job backend, create `config/solid_queue.yml`, and create the `db/queue_schema.rb`.
16+
This will configure Solid Queue as the production Active Job backend, create the configuration files `config/solid_queue.yml` and `config/recurring.yml`, and create the `db/queue_schema.rb`. It'll also create a `bin/jobs` executable wrapper that you can use to start Solid Queue.
1717

18-
You will then have to add the configuration for the queue database in `config/database.yml`. If you're using sqlite, it'll look like this:
18+
Once you've done that, you will then have to add the configuration for the queue database in `config/database.yml`. If you're using sqlite, it'll look like this:
1919

2020
```yaml
2121
production:
@@ -55,7 +55,7 @@ For small projects, you can run Solid Queue on the same machine as your webserve
5555

5656
It's also possibile to use one single database for both production data:
5757

58-
1. Shovel `db/queue_schema.rb` into a normal migration and delete `db/queue_schema.rb`
58+
1. Copy the contents of `db/queue_schema.rb` into a normal migration and delete `db/queue_schema.rb`
5959
2. Remove `config.solid_queue.connects_to` from `production.rb`
6060
3. Migrate your database. You are ready to run `bin/jobs`
6161

@@ -73,22 +73,31 @@ class MyJob < ApplicationJob
7373
# ...
7474
end
7575
```
76+
7677
## High performance requirements
7778

7879
Solid Queue was designed for the highest throughput when used with MySQL 8+ or PostgreSQL 9.5+, as they support `FOR UPDATE SKIP LOCKED`. You can use it with older versions, but in that case, you might run into lock waits if you run multiple workers for the same queue. You can also use it with SQLite on smaller applications.
7980

8081
## Configuration
8182

82-
### Workers and dispatchers
83+
### Workers, dispatchers and scheduler
8384

84-
We have three types of actors in Solid Queue:
85+
We have several types of actors in Solid Queue:
8586
- _Workers_ are in charge of picking jobs ready to run from queues and processing them. They work off the `solid_queue_ready_executions` table.
86-
- _Dispatchers_ are in charge of selecting jobs scheduled to run in the future that are due and _dispatching_ them, which is simply moving them from the `solid_queue_scheduled_executions` table over to the `solid_queue_ready_executions` table so that workers can pick them up. They're also in charge of managing [recurring tasks](#recurring-tasks), dispatching jobs to process them according to their schedule. On top of that, they do some maintenance work related to [concurrency controls](#concurrency-controls).
87+
- _Dispatchers_ are in charge of selecting jobs scheduled to run in the future that are due and _dispatching_ them, which is simply moving them from the `solid_queue_scheduled_executions` table over to the `solid_queue_ready_executions` table so that workers can pick them up. On top of that, they do some maintenance work related to [concurrency controls](#concurrency-controls).
88+
- The _scheduler_ manages [recurring tasks](#recurring-tasks), enqueuing jobs for them when they're due.
8789
- The _supervisor_ runs workers and dispatchers according to the configuration, controls their heartbeats, and stops and starts them when needed.
8890

89-
Solid Queue's supervisor will fork a separate process for each supervised worker/dispatcher.
91+
Solid Queue's supervisor will fork a separate process for each supervised worker/dispatcher/scheduler.
92+
93+
By default, Solid Queue will try to find your configuration under `config/solid_queue.yml`, but you can set a different path using the environment variable `SOLID_QUEUE_CONFIG` or by using the `-c/--config_file` option with `bin/jobs`, like this:
94+
95+
```
96+
bin/jobs -c config/calendar.yml
97+
```
98+
9099

91-
By default, Solid Queue will try to find your configuration under `config/solid_queue.yml`, but you can set a different path using the environment variable `SOLID_QUEUE_CONFIG`. This is what this configuration looks like:
100+
This is what this configuration looks like:
92101

93102
```yml
94103
production:
@@ -117,6 +126,7 @@ production:
117126
```
118127
the supervisor will run 1 dispatcher and no workers.
119128

129+
120130
Here's an overview of the different options:
121131

122132
- `polling_interval`: the time interval in seconds that workers and dispatchers will wait before checking for more jobs. This time defaults to `1` second for dispatchers and `0.1` seconds for workers.
@@ -139,7 +149,7 @@ Here's an overview of the different options:
139149
- `threads`: this is the max size of the thread pool that each worker will have to run jobs. Each worker will fetch this number of jobs from their queue(s), at most and will post them to the thread pool to be run. By default, this is `3`. Only workers have this setting.
140150
- `processes`: this is the number of worker processes that will be forked by the supervisor with the settings given. By default, this is `1`, just a single process. This setting is useful if you want to dedicate more than one CPU core to a queue or queues with the same configuration. Only workers have this setting.
141151
- `concurrency_maintenance`: whether the dispatcher will perform the concurrency maintenance work. This is `true` by default, and it's useful if you don't use any [concurrency controls](#concurrency-controls) and want to disable it or if you run multiple dispatchers and want some of them to just dispatch jobs without doing anything else.
142-
- `recurring_tasks`: a list of recurring tasks the dispatcher will manage. Read more details about this one in the [Recurring tasks](#recurring-tasks) section.
152+
143153

144154
### Queue order and priorities
145155

@@ -305,27 +315,42 @@ to your `puma.rb` configuration.
305315

306316
## Recurring tasks
307317

308-
Solid Queue supports defining recurring tasks that run at specific times in the future, on a regular basis like cron jobs. These are managed by dispatcher processes and as such, they can be defined in the dispatcher's configuration like this:
318+
Solid Queue supports defining recurring tasks that run at specific times in the future, on a regular basis like cron jobs. These are managed by the scheduler process and are defined in their own configuration file. By default, the file is located in `config/recurring.yml`, but you can set a different path using the environment variable `SOLID_QUEUE_RECURRING_SCHEDULE` or by using the `--recurring_schedule_file` option with `bin/jobs`, like this:
319+
320+
```
321+
bin/jobs --recurring_schedule_file=config/schedule.yml
322+
```
323+
324+
The configuration itself looks like this:
325+
309326
```yml
310-
dispatchers:
311-
- polling_interval: 1
312-
batch_size: 500
313-
recurring_tasks:
314-
my_periodic_job:
315-
class: MyJob
316-
args: [ 42, { status: "custom_status" } ]
317-
schedule: every second
327+
a_periodic_job:
328+
class: MyJob
329+
args: [ 42, { status: "custom_status" } ]
330+
schedule: every second
331+
a_cleanup_task:
332+
command: "DeletedStuff.clear_all"
333+
schedule: every day at 9am
318334
```
319-
`recurring_tasks` is a hash/dictionary, and the key will be the task key internally. Each task needs to have a class, which will be the job class to enqueue, and a schedule. The schedule is parsed using [Fugit](https://github.com/floraison/fugit), so it accepts anything [that Fugit accepts as a cron](https://github.com/floraison/fugit?tab=readme-ov-file#fugitcron). You can also provide arguments to be passed to the job, as a single argument, a hash, or an array of arguments that can also include kwargs as the last element in the array.
335+
336+
Tasks are specified as a hash/dictionary, where the key will be the task's key internally. Each task needs to either have a `class`, which will be the job class to enqueue, or a `command`, which will be eval'ed in the context of a job (`SolidQueue::RecurringJob`) that will be enqueued according to its schedule, in the `solid_queue_recurring` queue.
337+
338+
Each task needs to have also a schedule, which is parsed using [Fugit](https://github.com/floraison/fugit), so it accepts anything [that Fugit accepts as a cron](https://github.com/floraison/fugit?tab=readme-ov-file#fugitcron). You can optionally supply the following for each task:
339+
- `args`: the arguments to be passed to the job, as a single argument, a hash, or an array of arguments that can also include kwargs as the last element in the array.
320340

321341
The job in the example configuration above will be enqueued every second as:
322342
```ruby
323343
MyJob.perform_later(42, status: "custom_status")
324344
```
325345

326-
Tasks are enqueued at their corresponding times by the dispatcher that owns them, and each task schedules the next one. This is pretty much [inspired by what GoodJob does](https://github.com/bensheldon/good_job/blob/994ecff5323bf0337e10464841128fda100750e6/lib/good_job/cron_manager.rb).
346+
- `queue`: a different queue to be used when enqueuing the job. If none, the queue set up for the job class.
327347

328-
It's possible to run multiple dispatchers with the same `recurring_tasks` configuration. To avoid enqueuing duplicate tasks at the same time, an entry in a new `solid_queue_recurring_executions` table is created in the same transaction as the job is enqueued. This table has a unique index on `task_key` and `run_at`, ensuring only one entry per task per time will be created. This only works if you have `preserve_finished_jobs` set to `true` (the default), and the guarantee applies as long as you keep the jobs around.
348+
- `priority`: a numeric priority value to be used when enqueuing the job.
349+
350+
351+
Tasks are enqueued at their corresponding times by the scheduler, and each task schedules the next one. This is pretty much [inspired by what GoodJob does](https://github.com/bensheldon/good_job/blob/994ecff5323bf0337e10464841128fda100750e6/lib/good_job/cron_manager.rb).
352+
353+
It's possible to run multiple schedulers with the same `recurring_tasks` configuration, for example, if you have multiple servers for redundancy, and you run the `scheduler` in more than one of them. To avoid enqueuing duplicate tasks at the same time, an entry in a new `solid_queue_recurring_executions` table is created in the same transaction as the job is enqueued. This table has a unique index on `task_key` and `run_at`, ensuring only one entry per task per time will be created. This only works if you have `preserve_finished_jobs` set to `true` (the default), and the guarantee applies as long as you keep the jobs around.
329354

330355
Finally, it's possible to configure jobs that aren't handled by Solid Queue. That is, you can have a job like this in your app:
331356
```ruby
@@ -340,13 +365,12 @@ end
340365

341366
You can still configure this in Solid Queue:
342367
```yml
343-
dispatchers:
344-
- recurring_tasks:
345-
my_periodic_resque_job:
346-
class: MyResqueJob
347-
args: 22
348-
schedule: "*/5 * * * *"
368+
my_periodic_resque_job:
369+
class: MyResqueJob
370+
args: 22
371+
schedule: "*/5 * * * *"
349372
```
373+
350374
and the job will be enqueued via `perform_later` so it'll run in Resque. However, in this case we won't track any `solid_queue_recurring_execution` record for it and there won't be any guarantees that the job is enqueued only once each time.
351375

352376
## Inspiration

Rakefile

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
# frozen_string_literal: true
2+
13
require "bundler/setup"
24

35
APP_RAKEFILE = File.expand_path("test/dummy/Rakefile", __dir__)
@@ -6,3 +8,14 @@ load "rails/tasks/engine.rake"
68
load "rails/tasks/statistics.rake"
79

810
require "bundler/gem_tasks"
11+
12+
def databases
13+
%w[ mysql postgres sqlite ]
14+
end
15+
16+
task :test do
17+
databases.each do |database|
18+
sh("TARGET_DB=#{database} bin/setup")
19+
sh("TARGET_DB=#{database} bin/rails test")
20+
end
21+
end

UPGRADING.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,8 @@
1+
# Upgrading to version 0.9.x
2+
This version changes how recurring tasks are configured. Before, they would be defined as part of the _dispatcher_ configuration. Now they've been upgraded to their own configuration file, and a dedicated process (the _scheduler_) to manage them. Check the _Recurring tasks_ section in the `README` to learn how to configure them in detail.
3+
4+
In short, they live now in `config/recurring.yml` (by default) and follow the same format as before when they lived under `dispatchers > recurring_tasks`.
5+
16
# Upgrading to version 0.8.x
27
*IMPORTANT*: This version collapsed all migrations into a single `db/queue_schema.rb`, that will use a separate `queue` database. If you're upgrading from a version < 0.6.0, you need to upgrade to 0.6.0 first, ensure all migrations are up-to-date, and then upgrade further.
38

app/jobs/solid_queue/recurring_job.rb

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
# frozen_string_literal: true
2+
3+
class SolidQueue::RecurringJob < ActiveJob::Base
4+
queue_as :solid_queue_recurring
5+
6+
def perform(command)
7+
eval(command)
8+
end
9+
end

app/models/solid_queue/recurring_task.rb

Lines changed: 35 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,17 +7,30 @@ class RecurringTask < Record
77
serialize :arguments, coder: Arguments, default: []
88

99
validate :supported_schedule
10+
validate :ensure_command_or_class_present
1011
validate :existing_job_class
1112

1213
scope :static, -> { where(static: true) }
1314

15+
mattr_accessor :default_job_class
16+
self.default_job_class = RecurringJob
17+
1418
class << self
1519
def wrap(args)
1620
args.is_a?(self) ? args : from_configuration(args.first, **args.second)
1721
end
1822

1923
def from_configuration(key, **options)
20-
new(key: key, class_name: options[:class], schedule: options[:schedule], arguments: options[:args])
24+
new \
25+
key: key,
26+
class_name: options[:class],
27+
command: options[:command],
28+
arguments: options[:args],
29+
schedule: options[:schedule],
30+
queue_name: options[:queue].presence,
31+
priority: options[:priority].presence,
32+
description: options[:description],
33+
static: true
2134
end
2235

2336
def create_or_update_all(tasks)
@@ -47,7 +60,7 @@ def enqueue(at:)
4760
else
4861
payload[:other_adapter] = true
4962

50-
perform_later do |job|
63+
perform_later.tap do |job|
5164
unless job.successfully_enqueued?
5265
payload[:enqueue_error] = job.enqueue_error&.message
5366
end
@@ -77,8 +90,14 @@ def supported_schedule
7790
end
7891
end
7992

93+
def ensure_command_or_class_present
94+
unless command.present? || class_name.present?
95+
errors.add :base, :command_and_class_blank, message: "either command or class_name must be present"
96+
end
97+
end
98+
8099
def existing_job_class
81-
unless job_class.present?
100+
if class_name.present? && job_class.nil?
82101
errors.add :class_name, :undefined, message: "doesn't correspond to an existing class"
83102
end
84103
end
@@ -89,7 +108,7 @@ def using_solid_queue_adapter?
89108

90109
def enqueue_and_record(run_at:)
91110
RecurringExecution.record(key, run_at) do
92-
job_class.new(*arguments_with_kwargs).tap do |active_job|
111+
job_class.new(*arguments_with_kwargs).set(enqueue_options).tap do |active_job|
93112
active_job.run_callbacks(:enqueue) do
94113
Job.enqueue(active_job)
95114
end
@@ -98,12 +117,16 @@ def enqueue_and_record(run_at:)
98117
end
99118
end
100119

101-
def perform_later(&block)
102-
job_class.perform_later(*arguments_with_kwargs, &block)
120+
def perform_later
121+
job_class.new(*arguments_with_kwargs).tap do |active_job|
122+
active_job.enqueue(enqueue_options)
123+
end
103124
end
104125

105126
def arguments_with_kwargs
106-
if arguments.last.is_a?(Hash)
127+
if class_name.nil?
128+
command
129+
elsif arguments.last.is_a?(Hash)
107130
arguments[0...-1] + [ Hash.ruby2_keywords_hash(arguments.last) ]
108131
else
109132
arguments
@@ -116,7 +139,11 @@ def parsed_schedule
116139
end
117140

118141
def job_class
119-
@job_class ||= class_name&.safe_constantize
142+
@job_class ||= class_name.present? ? class_name.safe_constantize : self.class.default_job_class
143+
end
144+
145+
def enqueue_options
146+
{ queue: queue_name, priority: priority }.compact
120147
end
121148
end
122149
end

lib/solid_queue/cli.rb

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,17 @@
44

55
module SolidQueue
66
class Cli < Thor
7-
class_option :config_file, type: :string, aliases: "-c", default: Configuration::DEFAULT_CONFIG_FILE_PATH, desc: "Path to config file"
7+
class_option :config_file, type: :string, aliases: "-c",
8+
default: Configuration::DEFAULT_CONFIG_FILE_PATH,
9+
desc: "Path to config file",
10+
banner: "SOLID_QUEUE_CONFIG"
11+
12+
class_option :recurring_schedule_file, type: :string,
13+
default: Configuration::DEFAULT_RECURRING_SCHEDULE_FILE_PATH,
14+
desc: "Path to recurring schedule definition",
15+
banner: "SOLID_QUEUE_RECURRING_SCHEDULE"
16+
17+
class_option :skip_recurring, type: :boolean, default: false
818

919
def self.exit_on_failure?
1020
true
@@ -14,7 +24,7 @@ def self.exit_on_failure?
1424
default_command :start
1525

1626
def start
17-
SolidQueue::Supervisor.start(load_configuration_from: options["config_file"])
27+
SolidQueue::Supervisor.start(**options.symbolize_keys)
1828
end
1929
end
2030
end

0 commit comments

Comments
 (0)