Skip to content

Commit 65b7a73

Browse files
committed
delayed_jobs lock_with_read_ahead for postgres
- monkey patch for delayed_jobs to allow configurable lock method - config via jobs.read_ahead (number of jobs to read) - postgres default = 0 (use lock_with_for_update) - mysql default = 5 (use lock_with_read_ahead) - jobs.read_ahead = 0: lock_with_for_update = lock jobs using SELECT FOR UPDATE - jobs.read_ahead > 0: lock_with_read_ahead = optimistic locking of jobs - add rake task generate_load to create jobs Using lock_with_for_update showed severe performance problems on postgres when there is a high number of jobs in the queue (>50k). Job processing starves due to row locks and temp IO gets high. lock_with_read_ahead performs much better under such load situations. Performance on low/normal job load is comparable.
1 parent 9cf8c45 commit 65b7a73

File tree

8 files changed

+118
-0
lines changed

8 files changed

+118
-0
lines changed

lib/cloud_controller/config_schemas/base/api_schema.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -352,6 +352,7 @@ class ApiSchema < VCAP::Config
352352
queues: {
353353
optional(:cc_generic) => { timeout_in_seconds: Integer }
354354
},
355+
optional(:read_ahead) => Integer,
355356
optional(:enable_dynamic_job_priorities) => bool,
356357
optional(:app_usage_events_cleanup) => { timeout_in_seconds: Integer },
357358
optional(:blobstore_delete) => { timeout_in_seconds: Integer },

lib/cloud_controller/config_schemas/base/clock_schema.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,7 @@ class ClockSchema < VCAP::Config
174174
allow_app_ssh_access: bool,
175175
jobs: {
176176
global: { timeout_in_seconds: Integer },
177+
optional(:read_ahead) => Integer,
177178
optional(:app_usage_events_cleanup) => { timeout_in_seconds: Integer },
178179
optional(:blobstore_delete) => { timeout_in_seconds: Integer },
179180
optional(:diego_sync) => { timeout_in_seconds: Integer },

lib/cloud_controller/config_schemas/base/worker_schema.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,7 @@ class WorkerSchema < VCAP::Config
175175
queues: {
176176
optional(:cc_generic) => { timeout_in_seconds: Integer }
177177
},
178+
optional(:read_ahead) => Integer,
178179
optional(:enable_dynamic_job_priorities) => bool,
179180
optional(:app_usage_events_cleanup) => { timeout_in_seconds: Integer },
180181
optional(:blobstore_delete) => { timeout_in_seconds: Integer },

lib/delayed_job/delayed_worker.rb

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ def start_working
2525
setup_metrics(config) if @publish_metrics
2626
BackgroundJobEnvironment.new(config).setup_environment(readiness_port)
2727

28+
# load monkey patch for sequel backend to support configurable job lock method (postgres only)
29+
require 'delayed_job/sequel_patch'
2830
logger = Steno.logger('cc-worker')
2931
logger.info("Starting job with options #{@queue_options}")
3032
setup_app_log_emitter(config, logger)
@@ -56,6 +58,16 @@ def get_initialized_delayed_worker(config, logger)
5658
Delayed::Worker.max_run_time = config.get(:jobs, :global, :timeout_in_seconds) + 1
5759
Delayed::Worker.sleep_delay = config.get(:jobs, :global, :worker_sleep_delay_in_seconds)
5860
Delayed::Worker.logger = logger
61+
if ::Sequel::Model.db.database_type == :mysql
62+
read_ahead = config.get(:jobs, :read_ahead) || Delayed::Worker::DEFAULT_READ_AHEAD
63+
# lock for update is not configurable for mysql
64+
read_ahead = Delayed::Worker::DEFAULT_READ_AHEAD if read_ahead <= 0
65+
else
66+
# read_ahead 0 = lock for update (default for postgres)
67+
read_ahead = config.get(:jobs, :read_ahead) || 0
68+
read_ahead = 0 if read_ahead < 0
69+
end
70+
Delayed::Worker.read_ahead = read_ahead
5971

6072
unless @queue_options[:num_threads].nil?
6173
# Dynamically alias Delayed::Worker to ThreadedWorker to ensure plugins etc are working correctly

lib/delayed_job/sequel_patch.rb

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
require 'delayed/backend/sequel'
2+
3+
module Delayed
4+
module Backend
5+
module Sequel
6+
class Job
7+
# monkey patch to allow explicit configuration of job lock method
8+
def self.reserve(worker, max_run_time=Worker.max_run_time)
9+
ds = ready_to_run(worker.name, max_run_time)
10+
11+
ds = ds.filter(::Sequel.lit('priority >= ?', Worker.min_priority)) if Worker.min_priority
12+
ds = ds.filter(::Sequel.lit('priority <= ?', Worker.max_priority)) if Worker.max_priority
13+
ds = ds.filter(queue: Worker.queues) if Worker.queues.any?
14+
ds = ds.by_priority
15+
16+
if Worker.read_ahead > 0
17+
lock_with_read_ahead(ds, worker)
18+
else
19+
lock_with_for_update(ds, worker)
20+
end
21+
end
22+
end
23+
end
24+
end
25+
end

lib/tasks/jobs.rake

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,4 +75,24 @@ namespace :jobs do
7575
thread_grace_period_seconds: args.thread_grace_period_seconds,
7676
publish_metrics: publish_metrics).start_working
7777
end
78+
79+
desc 'Generate dummy jobs for tests. Usage: rake jobs:generate_load\'[num,delay]\''
80+
task :generate_load, %i[num delay] => :environment do |_t, args|
81+
args.with_defaults(num: '1')
82+
args.with_defaults(delay: '1')
83+
num = args[:num].to_i
84+
delay = args[:delay].to_f
85+
puts "Generating #{num} dummy job(s) with delay of #{delay} seconds"
86+
87+
ENV['PROCESS_TYPE'] = 'cc-worker'
88+
RakeConfig.context = :worker
89+
VCAP::CloudController::DB.load_models(RakeConfig.config.get(:db), @logger)
90+
RakeConfig.config.configure_components
91+
92+
num.times do
93+
dummy_job = VCAP::CloudController::Jobs::Runtime::BlobstoreDelete.new('00000000-0000-0000-0000-000000000000/0000000000000000000000000000000000000000', :droplet_blobstore)
94+
VCAP::CloudController::Jobs::Enqueuer.new(dummy_job, queue: VCAP::CloudController::Jobs::Queues.generic).enqueue
95+
sleep delay
96+
end
97+
end
7898
end

spec/unit/lib/delayed_job/delayed_worker_spec.rb

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,56 @@
141141
end
142142
end
143143

144+
context 'when DB type is mysql' do
145+
before do
146+
db = instance_double(Sequel::Database)
147+
allow(db).to receive(:database_type).and_return(:mysql)
148+
allow(Sequel::Model).to receive(:db).and_return(db)
149+
end
150+
151+
it 'read_ahead defaults to DEFAULT_READ_AHEAD' do
152+
cc_delayed_worker.start_working
153+
expect(Delayed::Worker.read_ahead).to eq(Delayed::Worker::DEFAULT_READ_AHEAD)
154+
end
155+
156+
it 'read_ahead can be configured' do
157+
TestConfig.config[:jobs][:read_ahead] = 3
158+
cc_delayed_worker.start_working
159+
expect(Delayed::Worker.read_ahead).to eq(3)
160+
end
161+
162+
it 'read_ahead cant be set to 0' do
163+
TestConfig.config[:jobs][:read_ahead] = 0
164+
cc_delayed_worker.start_working
165+
expect(Delayed::Worker.read_ahead).to eq(Delayed::Worker::DEFAULT_READ_AHEAD)
166+
end
167+
end
168+
169+
context 'when DB type is postgres' do
170+
before do
171+
db = instance_double(Sequel::Database)
172+
allow(db).to receive(:database_type).and_return(:postgres)
173+
allow(Sequel::Model).to receive(:db).and_return(db)
174+
end
175+
176+
it 'read_ahead defaults to 0' do
177+
cc_delayed_worker.start_working
178+
expect(Delayed::Worker.read_ahead).to eq(0)
179+
end
180+
181+
it 'read_ahead can be configured' do
182+
TestConfig.config[:jobs][:read_ahead] = 3
183+
cc_delayed_worker.start_working
184+
expect(Delayed::Worker.read_ahead).to eq(3)
185+
end
186+
187+
it 'read_ahead cant be set to negative values' do
188+
TestConfig.config[:jobs][:read_ahead] = -1
189+
cc_delayed_worker.start_working
190+
expect(Delayed::Worker.read_ahead).to eq(0)
191+
end
192+
end
193+
144194
describe 'publish metrics' do
145195
before do
146196
allow(Prometheus::Client::DataStores::DirectFileStore).to receive(:new)
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
RSpec.describe 'sequel_patch' do
2+
describe 'version' do
3+
it 'is not updated' do
4+
expect(Gem.loaded_specs['talentbox-delayed_job_sequel'].version).to eq('4.3.0'),
5+
'revisit monkey patch in lib/delayed_job/sequel_patch.rb'
6+
end
7+
end
8+
end

0 commit comments

Comments
 (0)