From 94ff46eb76d6c4e8d9a073d698e8cfff363d6af0 Mon Sep 17 00:00:00 2001 From: Justin Hart Date: Mon, 19 Aug 2024 15:20:55 -0700 Subject: [PATCH 1/2] add flag for batch enqueue delayed --- .github/workflows/ruby.yml | 1 + .gitignore | 3 +- .rubocop.yml | 2 +- .rubocop_todo.yml | 2 +- README.md | 44 ++++--- lib/resque/scheduler.rb | 25 +++- lib/resque/scheduler/cli.rb | 22 +++- lib/resque/scheduler/configuration.rb | 9 ++ lib/resque/scheduler/env.rb | 10 +- test/delayed_queue_test.rb | 182 +++++++++++++++++--------- test/scheduler_task_test.rb | 4 +- 11 files changed, 214 insertions(+), 90 deletions(-) diff --git a/.github/workflows/ruby.yml b/.github/workflows/ruby.yml index 326e3fad..5c1a88f4 100644 --- a/.github/workflows/ruby.yml +++ b/.github/workflows/ruby.yml @@ -38,6 +38,7 @@ jobs: - "3.5" - "3.6" redis-version: + - "~> 3.x" - "~> 4.x" - "~> 5.x" exclude: diff --git a/.gitignore b/.gitignore index 6cf49881..6cd1c439 100644 --- a/.gitignore +++ b/.gitignore @@ -16,4 +16,5 @@ nbproject .env .env.* /nul -vendor/ \ No newline at end of file +vendor/ +.vscode diff --git a/.rubocop.yml b/.rubocop.yml index d7063c9f..d0b6d9fa 100644 --- a/.rubocop.yml +++ b/.rubocop.yml @@ -15,4 +15,4 @@ Style/DoubleNegation: Metrics/PerceivedComplexity: Enabled: false Metrics/ClassLength: - Max: 110 + Max: 130 diff --git a/.rubocop_todo.yml b/.rubocop_todo.yml index 97902050..c879cbbc 100644 --- a/.rubocop_todo.yml +++ b/.rubocop_todo.yml @@ -39,7 +39,7 @@ Metrics/MethodLength: # Offense count: 2 # Configuration parameters: CountComments. Metrics/ModuleLength: - Max: 364 + Max: 373 # Offense count: 1 Style/CaseEquality: diff --git a/README.md b/README.md index 319394c2..a490cac5 100644 --- a/README.md +++ b/README.md @@ -129,24 +129,18 @@ Both the Rake task and standalone executable support the following environment variables: * `APP_NAME` - Application name used in procline (`$0`) (default empty) -* `BACKGROUND` - [Run in the background](#running-in-the-background) if -non-empty (via `Process.daemon`, if supported) (default `false`) -* `DYNAMIC_SCHEDULE` - Enables [dynamic scheduling](#dynamic-schedules) -if non-empty (default `false`) -* `RAILS_ENV` - Environment to use in procline (`$0`) (default empty) -* `INITIALIZER_PATH` - Path to a Ruby file that will be loaded *before* -requiring `resque` and `resque/scheduler` (default empty). -* `RESQUE_SCHEDULER_INTERVAL` - Interval in seconds for checking if a -scheduled job must run (coerced with `Kernel#Float()`) (default `5`) +* `BACKGROUND` - [Run in the background](#running-in-the-background) if non-empty (via `Process.daemon`, if supported) (default `false`) +* `DELAYED_REQUEUE_BATCH_SIZE` - Set the delayed job batch size if enabled (default `100`). If `<= 1`, this disables batching. +* `DISABLE_DELAYED_REQUEUE_BATCH` - Disable batched delayed job queuing (default `false`) - [See section below on consequences](#batched-delayed-job-and-resque-enqueue-hooks) +* `DYNAMIC_SCHEDULE` - Enables [dynamic scheduling](#dynamic-schedules) if non-empty (default `false`) +* `INITIALIZER_PATH` - Path to a Ruby file that will be loaded *before* requiring `resque` and `resque/scheduler` (default empty). * `LOGFILE` - Log file name (default empty, meaning `$stdout`) -* `LOGFORMAT` - Log output format to use (either `'text'`, `'json'` or `'logfmt'`, -default `'text'`) +* `LOGFORMAT` - Log output format to use (either `'text'`, `'json'` or `'logfmt'`, default `'text'`) * `PIDFILE` - If non-empty, write process PID to file (default empty) -* `QUIET` - Silence most output if non-empty (equivalent to a level of -`MonoLogger::FATAL`, default `false`) -* `VERBOSE` - Maximize log verbosity if non-empty (equivalent to a level -of `MonoLogger::DEBUG`, default `false`) - +* `QUIET` - Silence most output if non-empty (equivalent to a level of `MonoLogger::FATAL`, default `false`) +* `RAILS_ENV` - Environment to use in procline (`$0`) (default empty) +* `RESQUE_SCHEDULER_INTERVAL` - Interval in seconds for checking if a scheduled job must run (coerced with `Kernel#Float()`) (default `5`) +* `VERBOSE` - Maximize log verbosity if non-empty (equivalent to a level of `MonoLogger::DEBUG`, default `false`) ### Resque Pool integration @@ -755,6 +749,24 @@ This table explains the version requirements for rufus-scheduler | `~> 4.0` | `~> 3.0` | | `< 4.0` | `~> 2.0` | +##### Batched delayed job and resque enqueue hooks + +Batching delayed job queuing can speed up when per-second job counts grows, +avoiding situations that may cause delayed enqueues to fall behind. This +batching wraps enqueues in a `multi` pipeline, making far fewer roundtrips to +the server. + +However, in `redis` gem `>= 4.0`, any operations to redis within the `multi` +block must use the multi handle so that the actions are captured. Resque's hooks +do not currently have a way to pass this around, and so compatibility with other +resque plugins or hooks which access redis at enqueue time is impacted with +batch mode. In these cases, you should consider disabling the batching by setting +the `DISABLE_DELAYED_REQUEUE_BATCH` environment variable to `true`. + +Detecting when this occurs can be tricky, you must watch for logs +emitted by your `resque-scheduler` process such as `Redis::CommandError: ERR +MULTI calls can not be nested` or `NoMethodError: undefined method nil? for + 1 + end + # Enqueues all delayed jobs for a timestamp def enqueue_delayed_items_for_timestamp(timestamp) count = 0 - batch_size = delayed_requeue_batch_size - actual_batch_size = nil + batch_size = batch_delayed_items? ? delayed_requeue_batch_size : 1 - log "Processing delayed items for timestamp #{timestamp}, in batches of #{batch_size}" + message = "Processing delayed items for timestamp #{timestamp}" + message += ", in batches of #{batch_size}" if batch_delayed_items? + log message loop do + actual_batch_size = 0 + handle_shutdown do # Continually check that it is still the master if am_master - actual_batch_size = enqueue_items_in_batch_for_timestamp(timestamp, - batch_size) + if batch_delayed_items? + actual_batch_size = enqueue_items_in_batch_for_timestamp(timestamp, batch_size) + log "queued batch of #{actual_batch_size} jobs" if actual_batch_size != -1 + else + item = enqueue_next_item(timestamp) + actual_batch_size = item.nil? ? 0 : 1 + end end end count += actual_batch_size - log "queued #{count} jobs" if actual_batch_size != -1 # continue processing until there are no more items in this # timestamp. If we don't have a full batch, this is the last one. @@ -231,7 +242,7 @@ def enqueue_delayed_items_for_timestamp(timestamp) break if actual_batch_size < batch_size end - log "finished queueing #{count} total jobs for timestamp #{timestamp}" if count != -1 + log "finished queueing #{count} total jobs for timestamp #{timestamp}" end def timestamp_key(timestamp) diff --git a/lib/resque/scheduler/cli.rb b/lib/resque/scheduler/cli.rb index f766ff80..1f37f733 100644 --- a/lib/resque/scheduler/cli.rb +++ b/lib/resque/scheduler/cli.rb @@ -16,7 +16,9 @@ module Scheduler pidfile: 'PIDFILE', poll_sleep_amount: 'RESQUE_SCHEDULER_INTERVAL', verbose: 'VERBOSE', - lock_timeout: 'LOCK_TIMEOUT' + lock_timeout: 'LOCK_TIMEOUT', + delayed_requeue_batch_size: 'DELAYED_REQUEUE_BATCH_SIZE', + disable_delayed_requeue_batches: 'DISABLE_DELAYED_REQUEUE_BATCHES' }.freeze class Cli @@ -74,6 +76,24 @@ class Cli { args: ['-v', '--verbose', 'Run with verbose output [VERBOSE]'], callback: ->(options) { ->(v) { options[:verbose] = v } } + }, + { + args: ['--lock-timeout [LOCK_TIMEOUT]', 'Lock timeout'], + callback: ->(options) { ->(t) { options[:lock_timeout] = t } } + }, + { + args: [ + '--delayed-requeue-batch-size [DELAYED_REQUEUE_BATCH_SIZE]', + 'Delayed requeue batch size' + ], + callback: ->(options) { ->(d) { options[:delayed_requeue_batch_size] = d } } + }, + { + args: [ + '--disable-delayed-requeue-batches [DISABLE_DELAYED_REQUEUE_BATCHES]', + 'Disable delayed requeue batches' + ], + callback: ->(options) { ->(d) { options[:disable_delayed_requeue_batches] = d } } } ].freeze diff --git a/lib/resque/scheduler/configuration.rb b/lib/resque/scheduler/configuration.rb index 8ab61569..be91aef4 100644 --- a/lib/resque/scheduler/configuration.rb +++ b/lib/resque/scheduler/configuration.rb @@ -65,12 +65,21 @@ def app_name @app_name ||= environment['APP_NAME'] end + attr_writer :delayed_requeue_batch_size + def delayed_requeue_batch_size @delayed_requeue_batch_size ||= \ ENV['DELAYED_REQUEUE_BATCH_SIZE'].to_i if environment['DELAYED_REQUEUE_BATCH_SIZE'] @delayed_requeue_batch_size ||= 100 end + attr_writer :disable_delayed_requeue_batches + + def disable_delayed_requeue_batches + @disable_delayed_requeue_batches ||= \ + to_bool(environment['DISABLE_DELAYED_REQUEUE_BATCH']) + end + # Amount of time in seconds to sleep between polls of the delayed # queue. Defaults to 5 attr_writer :poll_sleep_amount diff --git a/lib/resque/scheduler/env.rb b/lib/resque/scheduler/env.rb index 25c20115..6f95a2fc 100644 --- a/lib/resque/scheduler/env.rb +++ b/lib/resque/scheduler/env.rb @@ -67,13 +67,21 @@ def setup_scheduler_configuration c.logformat = options[:logformat] if options.key?(:logformat) - c.lock_timeout = options[:lock_timeout] if options.key?(:lock_timeout) + c.lock_timeout = options[:lock_timeout].to_i if options.key?(:lock_timeout) if (psleep = options[:poll_sleep_amount]) && !psleep.nil? c.poll_sleep_amount = Float(psleep) end c.verbose = !!options[:verbose] if options.key?(:verbose) + + if options.key?(:delayed_requeue_batch_size) + c.delayed_requeue_batch_size = options[:delayed_requeue_batch_size].to_i + end + + if options.key?(:disable_delayed_requeue_batches) + c.disable_delayed_requeue_batches = !!options[:disable_delayed_requeue_batches] + end end end # rubocop:enable Metrics/AbcSize diff --git a/test/delayed_queue_test.rb b/test/delayed_queue_test.rb index abd4c06d..0fe88df3 100644 --- a/test/delayed_queue_test.rb +++ b/test/delayed_queue_test.rb @@ -1,6 +1,14 @@ # vim:fileencoding=utf-8 require_relative 'test_helper' +def assert_resque_key_exists?(key) + if Gem::Requirement.create('< 4').satisfied_by?(Gem::Version.create(Redis::VERSION)) + assert(!Resque.redis.exists(key)) + else + assert(!Resque.redis.exists?(key)) + end +end + context 'DelayedQueue' do setup do Resque::Scheduler.quiet = true @@ -41,7 +49,7 @@ 'Should have the same arguments that we queued') # And now confirm the keys are gone - assert(!Resque.redis.exists?("delayed:#{timestamp.to_i}")) + assert_resque_key_exists?("delayed:#{timestamp.to_i}") assert_equal(0, Resque.redis.zcard(:delayed_queue_schedule), 'delayed queue should be empty') assert_equal(0, Resque.redis.scard("timestamps:#{encoded_job}"), @@ -84,7 +92,7 @@ 'Should have the queue that we asked for') # And now confirm the keys are gone - assert(!Resque.redis.exists?("delayed:#{timestamp.to_i}")) + assert_resque_key_exists?("delayed:#{timestamp.to_i}") assert_equal(0, Resque.redis.zcard(:delayed_queue_schedule), 'delayed queue should be empty') assert_equal(0, Resque.redis.scard("timestamps:#{encoded_job}"), @@ -383,62 +391,6 @@ assert_equal(1, Resque.delayed_timestamp_peek(t, 0, 3).length) end - test 'enqueue_delayed_items_for_timestamp enqueues jobs in 2 batches' do - t = Time.now + 60 - - # create 120 jobs - 120.times { Resque.enqueue_at(t, SomeIvarJob) } - assert_equal(120, Resque.delayed_timestamp_size(t)) - - Resque::Scheduler.enqueue_delayed_items_for_timestamp(t) - assert_equal(0, Resque.delayed_timestamp_size(t)) - - # assert that the active queue is now 120 - assert_equal(120, Resque.size(Resque.queue_from_class(SomeIvarJob))) - end - - test 'enqueue_delayed_items_for_timestamp enqueues jobs in one batch for the timestamp' do - t = Time.now + 60 - - # create 90 jobs - 90.times { Resque.enqueue_at(t, SomeIvarJob) } - assert_equal(90, Resque.delayed_timestamp_size(t)) - - Resque::Scheduler.enqueue_delayed_items_for_timestamp(t) - assert_equal(0, Resque.delayed_timestamp_size(t)) - - # assert that the active queue is now 90 - assert_equal(90, Resque.size(Resque.queue_from_class(SomeIvarJob))) - end - - # test to make sure the timestamp is cleaned up - - test 'enqueue_delayed_items_for_timestamp handles a watch failure' do - t = Time.now + 60 - - # create 100 jobs - 100.times { Resque.enqueue_at(t, SomeIvarJob) } - assert_equal(100, Resque.delayed_timestamp_size(t)) - - Resque.redis.stubs(:watch).returns(nil) - - Resque.expects(:clean_up_timestamp).never - - Resque::Scheduler.enqueue_delayed_items_for_timestamp(t) - end - - test 'enqueue_delayed_items_for_timestamp cleans up a timestamp' do - t = Time.now + 60 - - # create 100 jobs - 100.times { Resque.enqueue_at(t, SomeIvarJob) } - assert_equal(100, Resque.delayed_timestamp_size(t)) - - Resque.expects(:clean_up_timestamp).once - - Resque::Scheduler.enqueue_delayed_items_for_timestamp(t) - end - test 'enqueue_delayed_items_for_timestamp creates jobs ' \ 'and empties the delayed queue' do t = Time.now + 60 @@ -1053,7 +1005,7 @@ assert_equal( 1, Resque.remove_delayed_job_from_timestamp(t, SomeIvarJob, 'foo') ) - assert !Resque.redis.exists?("delayed:#{t.to_i}") + assert_resque_key_exists?("delayed:#{t.to_i}") assert Resque.delayed_queue_peek(0, 100).empty? end @@ -1094,7 +1046,7 @@ Resque.enqueue_at(timestamp, SomeIvarJob, 'foo', 'bar') assert_equal 0, Resque.count_all_scheduled_jobs - assert !Resque.redis.exists?("delayed:#{timestamp.to_i}") + assert_resque_key_exists?("delayed:#{timestamp.to_i}") ensure Resque.inline = false end @@ -1110,3 +1062,113 @@ assert !Resque.delayed?(SomeJob) end end + +context 'DelayedQueue non-batch delayed item queue' do + batch_disabled = Resque::Scheduler.disable_delayed_requeue_batches + batch_size = Resque::Scheduler.delayed_requeue_batch_size + setup do + Resque::Scheduler.quiet = true + Resque.data_store.redis.flushall + Resque::Scheduler.disable_delayed_requeue_batches = true + Resque::Scheduler.delayed_requeue_batch_size = 1 + end + + teardown do + Resque::Scheduler.disable_delayed_requeue_batches = batch_disabled + Resque::Scheduler.delayed_requeue_batch_size = batch_size + end + + test 'enqueue_delayed_items_for_timestamp enqueues jobs for the timestamp' do + t = Time.now + 60 + + # create 90 jobs at t + 90.times { Resque.enqueue_at(t, SomeIvarJob) } + assert_equal(90, Resque.delayed_timestamp_size(t)) + assert_equal(0, Resque.size(Resque.queue_from_class(SomeIvarJob))) + + Resque::Scheduler.expects(:enqueue_items_in_batch_for_timestamp).never + + Resque::Scheduler.enqueue_delayed_items_for_timestamp(t) + + # assert that the active queue is now 90 + assert_equal(0, Resque.delayed_timestamp_size(t)) + assert_equal(90, Resque.size(Resque.queue_from_class(SomeIvarJob))) + end +end + +context 'DelayedQueue batch delayed item queue' do + batch_disabled = Resque::Scheduler.disable_delayed_requeue_batches + batch_size = Resque::Scheduler.delayed_requeue_batch_size + setup do + Resque::Scheduler.quiet = true + Resque.data_store.redis.flushall + Resque::Scheduler.disable_delayed_requeue_batches = false + Resque::Scheduler.delayed_requeue_batch_size = 100 + end + + teardown do + Resque::Scheduler.disable_delayed_requeue_batches = batch_disabled + Resque::Scheduler.delayed_requeue_batch_size = batch_size + end + + test 'enqueue_delayed_items_for_timestamp enqueues jobs in 2 batches' do + t = Time.now + 60 + + # create 120 jobs + 120.times { Resque.enqueue_at(t, SomeIvarJob) } + assert_equal(120, Resque.delayed_timestamp_size(t)) + assert_equal(0, Resque.size(Resque.queue_from_class(SomeIvarJob))) + + Resque::Scheduler.expects(:enqueue_next_item).never + + Resque::Scheduler.enqueue_delayed_items_for_timestamp(t) + + # assert that the active queue is now 120 + assert_equal(0, Resque.delayed_timestamp_size(t)) + assert_equal(120, Resque.size(Resque.queue_from_class(SomeIvarJob))) + end + + test 'enqueue_delayed_items_for_timestamp enqueues jobs in one batch for the timestamp' do + t = Time.now + 60 + + # create 90 jobs + 90.times { Resque.enqueue_at(t, SomeIvarJob) } + assert_equal(90, Resque.delayed_timestamp_size(t)) + assert_equal(0, Resque.size(Resque.queue_from_class(SomeIvarJob))) + + Resque::Scheduler.expects(:enqueue_next_item).never + + Resque::Scheduler.enqueue_delayed_items_for_timestamp(t) + + # assert that the active queue is now 90 + assert_equal(0, Resque.delayed_timestamp_size(t)) + assert_equal(90, Resque.size(Resque.queue_from_class(SomeIvarJob))) + end + + # test to make sure the timestamp is cleaned up + test 'enqueue_delayed_items_for_timestamp handles a watch failure' do + t = Time.now + 60 + + # create 100 jobs + 100.times { Resque.enqueue_at(t, SomeIvarJob) } + assert_equal(100, Resque.delayed_timestamp_size(t)) + + Resque.redis.stubs(:watch).returns(nil) + + Resque.expects(:clean_up_timestamp).never + + Resque::Scheduler.enqueue_delayed_items_for_timestamp(t) + end + + test 'enqueue_delayed_items_for_timestamp cleans up a timestamp' do + t = Time.now + 60 + + # create 100 jobs + 100.times { Resque.enqueue_at(t, SomeIvarJob) } + assert_equal(100, Resque.delayed_timestamp_size(t)) + + Resque.expects(:clean_up_timestamp).once + + Resque::Scheduler.enqueue_delayed_items_for_timestamp(t) + end +end diff --git a/test/scheduler_task_test.rb b/test/scheduler_task_test.rb index d409945b..106577a6 100644 --- a/test/scheduler_task_test.rb +++ b/test/scheduler_task_test.rb @@ -88,7 +88,7 @@ @pid = Process.pid Thread.new do - sleep(0.1) + sleep(0.3) Process.kill(:TERM, @pid) end @@ -104,7 +104,7 @@ @pid = Process.pid Thread.new do - sleep(0.1) + sleep(0.3) Process.kill(:TERM, @pid) end From da2bab468b8a5cf69ac3623a9e85b985b03b6618 Mon Sep 17 00:00:00 2001 From: Justin Hart Date: Wed, 19 Nov 2025 16:31:46 -0800 Subject: [PATCH 2/2] README format --- README.md | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index a490cac5..aa496976 100644 --- a/README.md +++ b/README.md @@ -756,17 +756,24 @@ avoiding situations that may cause delayed enqueues to fall behind. This batching wraps enqueues in a `multi` pipeline, making far fewer roundtrips to the server. -However, in `redis` gem `>= 4.0`, any operations to redis within the `multi` +However, in `redis` gem `>= 4.0`, any operations to Redis within the `multi` block must use the multi handle so that the actions are captured. Resque's hooks do not currently have a way to pass this around, and so compatibility with other -resque plugins or hooks which access redis at enqueue time is impacted with +Resque plugins or hooks which access Redis at enqueue time is impacted with batch mode. In these cases, you should consider disabling the batching by setting the `DISABLE_DELAYED_REQUEUE_BATCH` environment variable to `true`. Detecting when this occurs can be tricky, you must watch for logs -emitted by your `resque-scheduler` process such as `Redis::CommandError: ERR -MULTI calls can not be nested` or `NoMethodError: undefined method nil? for -