Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/ruby.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ jobs:
- "3.5"
- "3.6"
redis-version:
- "~> 3.x"
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to add this older gem version so that it validated that both work (especially with regard to the batches)

- "~> 4.x"
- "~> 5.x"
exclude:
Expand Down
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,5 @@ nbproject
.env
.env.*
/nul
vendor/
vendor/
.vscode
2 changes: 1 addition & 1 deletion .rubocop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@ Style/DoubleNegation:
Metrics/PerceivedComplexity:
Enabled: false
Metrics/ClassLength:
Max: 110
Max: 130
2 changes: 1 addition & 1 deletion .rubocop_todo.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ Metrics/MethodLength:
# Offense count: 2
# Configuration parameters: CountComments.
Metrics/ModuleLength:
Max: 364
Max: 373

# Offense count: 1
Style/CaseEquality:
Expand Down
51 changes: 35 additions & 16 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -129,24 +129,18 @@ Both the Rake task and standalone executable support the following
environment variables:

* `APP_NAME` - Application name used in procline (`$0`) (default empty)
* `BACKGROUND` - [Run in the background](#running-in-the-background) if
non-empty (via `Process.daemon`, if supported) (default `false`)
* `DYNAMIC_SCHEDULE` - Enables [dynamic scheduling](#dynamic-schedules)
if non-empty (default `false`)
* `RAILS_ENV` - Environment to use in procline (`$0`) (default empty)
* `INITIALIZER_PATH` - Path to a Ruby file that will be loaded *before*
requiring `resque` and `resque/scheduler` (default empty).
* `RESQUE_SCHEDULER_INTERVAL` - Interval in seconds for checking if a
scheduled job must run (coerced with `Kernel#Float()`) (default `5`)
* `BACKGROUND` - [Run in the background](#running-in-the-background) if non-empty (via `Process.daemon`, if supported) (default `false`)
* `DELAYED_REQUEUE_BATCH_SIZE` - Set the delayed job batch size if enabled (default `100`). If `<= 1`, this disables batching.
* `DISABLE_DELAYED_REQUEUE_BATCH` - Disable batched delayed job queuing (default `false`) - [See section below on consequences](#batched-delayed-job-and-resque-enqueue-hooks)
* `DYNAMIC_SCHEDULE` - Enables [dynamic scheduling](#dynamic-schedules) if non-empty (default `false`)
* `INITIALIZER_PATH` - Path to a Ruby file that will be loaded *before* requiring `resque` and `resque/scheduler` (default empty).
* `LOGFILE` - Log file name (default empty, meaning `$stdout`)
* `LOGFORMAT` - Log output format to use (either `'text'`, `'json'` or `'logfmt'`,
default `'text'`)
* `LOGFORMAT` - Log output format to use (either `'text'`, `'json'` or `'logfmt'`, default `'text'`)
* `PIDFILE` - If non-empty, write process PID to file (default empty)
* `QUIET` - Silence most output if non-empty (equivalent to a level of
`MonoLogger::FATAL`, default `false`)
* `VERBOSE` - Maximize log verbosity if non-empty (equivalent to a level
of `MonoLogger::DEBUG`, default `false`)

* `QUIET` - Silence most output if non-empty (equivalent to a level of `MonoLogger::FATAL`, default `false`)
* `RAILS_ENV` - Environment to use in procline (`$0`) (default empty)
* `RESQUE_SCHEDULER_INTERVAL` - Interval in seconds for checking if a scheduled job must run (coerced with `Kernel#Float()`) (default `5`)
* `VERBOSE` - Maximize log verbosity if non-empty (equivalent to a level of `MonoLogger::DEBUG`, default `false`)

### Resque Pool integration

Expand Down Expand Up @@ -755,6 +749,31 @@ This table explains the version requirements for rufus-scheduler
| `~> 4.0` | `~> 3.0` |
| `< 4.0` | `~> 2.0` |

##### Batched delayed job and resque enqueue hooks

Batching delayed job queuing can speed up when per-second job counts grows,
avoiding situations that may cause delayed enqueues to fall behind. This
batching wraps enqueues in a `multi` pipeline, making far fewer roundtrips to
the server.

However, in `redis` gem `>= 4.0`, any operations to Redis within the `multi`
block must use the multi handle so that the actions are captured. Resque's hooks
do not currently have a way to pass this around, and so compatibility with other
Resque plugins or hooks which access Redis at enqueue time is impacted with
batch mode. In these cases, you should consider disabling the batching by setting
the `DISABLE_DELAYED_REQUEUE_BATCH` environment variable to `true`.

Detecting when this occurs can be tricky, you must watch for logs
emitted by your `resque-scheduler` process such as:

```text
Redis::CommandError: ERR
MULTI calls can not be nested
NoMethodError: undefined method nil? for <Redis::Future
NoMethodError: undefined method `to_i' for <Redis::Future
```

and delayed jobs you expect would not be enqueued (they will get dropped).

### Contributing

Expand Down
25 changes: 18 additions & 7 deletions lib/resque/scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -204,25 +204,36 @@ def enqueue_next_item(timestamp)
item
end

def batch_delayed_items?
!disable_delayed_requeue_batches && delayed_requeue_batch_size > 1
end

# Enqueues all delayed jobs for a timestamp
def enqueue_delayed_items_for_timestamp(timestamp)
count = 0
batch_size = delayed_requeue_batch_size
actual_batch_size = nil
batch_size = batch_delayed_items? ? delayed_requeue_batch_size : 1

log "Processing delayed items for timestamp #{timestamp}, in batches of #{batch_size}"
message = "Processing delayed items for timestamp #{timestamp}"
message += ", in batches of #{batch_size}" if batch_delayed_items?
log message
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to keep the log messages consistent for batch vs non-batch


loop do
actual_batch_size = 0

handle_shutdown do
# Continually check that it is still the master
if am_master
actual_batch_size = enqueue_items_in_batch_for_timestamp(timestamp,
batch_size)
if batch_delayed_items?
actual_batch_size = enqueue_items_in_batch_for_timestamp(timestamp, batch_size)
log "queued batch of #{actual_batch_size} jobs" if actual_batch_size != -1
else
item = enqueue_next_item(timestamp)
actual_batch_size = item.nil? ? 0 : 1
end
end
end

count += actual_batch_size
log "queued #{count} jobs" if actual_batch_size != -1

# continue processing until there are no more items in this
# timestamp. If we don't have a full batch, this is the last one.
Expand All @@ -231,7 +242,7 @@ def enqueue_delayed_items_for_timestamp(timestamp)
break if actual_batch_size < batch_size
end

log "finished queueing #{count} total jobs for timestamp #{timestamp}" if count != -1
log "finished queueing #{count} total jobs for timestamp #{timestamp}"
end

def timestamp_key(timestamp)
Expand Down
22 changes: 21 additions & 1 deletion lib/resque/scheduler/cli.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ module Scheduler
pidfile: 'PIDFILE',
poll_sleep_amount: 'RESQUE_SCHEDULER_INTERVAL',
verbose: 'VERBOSE',
lock_timeout: 'LOCK_TIMEOUT'
lock_timeout: 'LOCK_TIMEOUT',
delayed_requeue_batch_size: 'DELAYED_REQUEUE_BATCH_SIZE',
disable_delayed_requeue_batches: 'DISABLE_DELAYED_REQUEUE_BATCHES'
}.freeze

class Cli
Expand Down Expand Up @@ -74,6 +76,24 @@ class Cli
{
args: ['-v', '--verbose', 'Run with verbose output [VERBOSE]'],
callback: ->(options) { ->(v) { options[:verbose] = v } }
},
{
args: ['--lock-timeout [LOCK_TIMEOUT]', 'Lock timeout'],
callback: ->(options) { ->(t) { options[:lock_timeout] = t } }
},
{
args: [
'--delayed-requeue-batch-size [DELAYED_REQUEUE_BATCH_SIZE]',
'Delayed requeue batch size'
],
callback: ->(options) { ->(d) { options[:delayed_requeue_batch_size] = d } }
},
{
args: [
'--disable-delayed-requeue-batches [DISABLE_DELAYED_REQUEUE_BATCHES]',
'Disable delayed requeue batches'
],
callback: ->(options) { ->(d) { options[:disable_delayed_requeue_batches] = d } }
}
].freeze

Expand Down
9 changes: 9 additions & 0 deletions lib/resque/scheduler/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,21 @@ def app_name
@app_name ||= environment['APP_NAME']
end

attr_writer :delayed_requeue_batch_size

def delayed_requeue_batch_size
@delayed_requeue_batch_size ||= \
ENV['DELAYED_REQUEUE_BATCH_SIZE'].to_i if environment['DELAYED_REQUEUE_BATCH_SIZE']
@delayed_requeue_batch_size ||= 100
end

attr_writer :disable_delayed_requeue_batches

def disable_delayed_requeue_batches
@disable_delayed_requeue_batches ||= \
to_bool(environment['DISABLE_DELAYED_REQUEUE_BATCH'])
end

# Amount of time in seconds to sleep between polls of the delayed
# queue. Defaults to 5
attr_writer :poll_sleep_amount
Expand Down
10 changes: 9 additions & 1 deletion lib/resque/scheduler/env.rb
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,21 @@ def setup_scheduler_configuration

c.logformat = options[:logformat] if options.key?(:logformat)

c.lock_timeout = options[:lock_timeout] if options.key?(:lock_timeout)
c.lock_timeout = options[:lock_timeout].to_i if options.key?(:lock_timeout)

if (psleep = options[:poll_sleep_amount]) && !psleep.nil?
c.poll_sleep_amount = Float(psleep)
end

c.verbose = !!options[:verbose] if options.key?(:verbose)

if options.key?(:delayed_requeue_batch_size)
c.delayed_requeue_batch_size = options[:delayed_requeue_batch_size].to_i
end

if options.key?(:disable_delayed_requeue_batches)
c.disable_delayed_requeue_batches = !!options[:disable_delayed_requeue_batches]
end
end
end
# rubocop:enable Metrics/AbcSize
Expand Down
Loading
Loading