Skip to content

Commit fad2da1

Browse files
authored
delayed_jobs lock_with_read_ahead for postgres (#4231)
1 parent 78d93d4 commit fad2da1

File tree

9 files changed

+119
-0
lines changed

9 files changed

+119
-0
lines changed

lib/cloud_controller/config_schemas/base/api_schema.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -352,6 +352,7 @@ class ApiSchema < VCAP::Config
352352
queues: {
353353
optional(:cc_generic) => { timeout_in_seconds: Integer }
354354
},
355+
optional(:read_ahead) => Integer,
355356
optional(:enable_dynamic_job_priorities) => bool,
356357
optional(:app_usage_events_cleanup) => { timeout_in_seconds: Integer },
357358
optional(:blobstore_delete) => { timeout_in_seconds: Integer },

lib/cloud_controller/config_schemas/base/clock_schema.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,7 @@ class ClockSchema < VCAP::Config
174174
allow_app_ssh_access: bool,
175175
jobs: {
176176
global: { timeout_in_seconds: Integer },
177+
optional(:read_ahead) => Integer,
177178
optional(:app_usage_events_cleanup) => { timeout_in_seconds: Integer },
178179
optional(:blobstore_delete) => { timeout_in_seconds: Integer },
179180
optional(:diego_sync) => { timeout_in_seconds: Integer },

lib/cloud_controller/config_schemas/base/worker_schema.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,7 @@ class WorkerSchema < VCAP::Config
175175
queues: {
176176
optional(:cc_generic) => { timeout_in_seconds: Integer }
177177
},
178+
optional(:read_ahead) => Integer,
178179
optional(:enable_dynamic_job_priorities) => bool,
179180
optional(:app_usage_events_cleanup) => { timeout_in_seconds: Integer },
180181
optional(:blobstore_delete) => { timeout_in_seconds: Integer },

lib/cloud_controller/db.rb

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,13 +85,17 @@ def self.load_models(db_config, logger)
8585

8686
require 'models'
8787
require 'delayed_job_sequel'
88+
# load monkey patch for sequel backend to support configurable job lock method (postgres only)
89+
require 'delayed_job/sequel_patch'
8890
end
8991

9092
def self.load_models_without_migrations_check(db_config, logger)
9193
connect(db_config, logger)
9294

9395
require 'models'
9496
require 'delayed_job_sequel'
97+
# load monkey patch for sequel backend to support configurable job lock method (postgres only)
98+
require 'delayed_job/sequel_patch'
9599
end
96100
end
97101
end

lib/delayed_job/delayed_worker.rb

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@
44
require 'prometheus/middleware/exporter'
55

66
class CloudController::DelayedWorker
7+
DEFAULT_READ_AHEAD_POSTGRES = 0
8+
DEFAULT_READ_AHEAD_MYSQL = Delayed::Worker::DEFAULT_READ_AHEAD
9+
710
def initialize(options)
811
@queue_options = {
912
min_priority: ENV.fetch('MIN_PRIORITY', nil),
@@ -56,6 +59,16 @@ def get_initialized_delayed_worker(config, logger)
5659
Delayed::Worker.max_run_time = config.get(:jobs, :global, :timeout_in_seconds) + 1
5760
Delayed::Worker.sleep_delay = config.get(:jobs, :global, :worker_sleep_delay_in_seconds)
5861
Delayed::Worker.logger = logger
62+
if ::Sequel::Model.db.database_type == :mysql
63+
read_ahead = config.get(:jobs, :read_ahead) || DEFAULT_READ_AHEAD_MYSQL
64+
# lock for update is not configurable for mysql
65+
read_ahead = DEFAULT_READ_AHEAD_MYSQL if read_ahead <= 0
66+
else
67+
# read_ahead 0 = lock for update (default for postgres)
68+
read_ahead = config.get(:jobs, :read_ahead) || DEFAULT_READ_AHEAD_POSTGRES
69+
read_ahead = DEFAULT_READ_AHEAD_POSTGRES if read_ahead < 0
70+
end
71+
Delayed::Worker.read_ahead = read_ahead
5972

6073
unless @queue_options[:num_threads].nil?
6174
# Dynamically alias Delayed::Worker to ThreadedWorker to ensure plugins etc are working correctly

lib/delayed_job/sequel_patch.rb

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
require 'delayed/backend/sequel'
2+
3+
module Delayed
4+
module Backend
5+
module Sequel
6+
class Job
7+
# monkey patch to allow explicit configuration of job lock method
8+
def self.reserve(worker, max_run_time=Worker.max_run_time)
9+
ds = ready_to_run(worker.name, max_run_time)
10+
11+
ds = ds.filter(::Sequel.lit('priority >= ?', Worker.min_priority)) if Worker.min_priority
12+
ds = ds.filter(::Sequel.lit('priority <= ?', Worker.max_priority)) if Worker.max_priority
13+
ds = ds.filter(queue: Worker.queues) if Worker.queues.any?
14+
ds = ds.by_priority
15+
16+
if Worker.read_ahead > 0
17+
lock_with_read_ahead(ds, worker)
18+
else
19+
lock_with_for_update(ds, worker)
20+
end
21+
end
22+
end
23+
end
24+
end
25+
end

scripts/generate_jobs.rb

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
# usage: pipe this script into bin/console on a cc-worker vm
2+
3+
begin
4+
NUM_JOBS = 1
5+
DELAY = 1
6+
7+
puts "Generating #{NUM_JOBS} dummy job(s) with delay of #{DELAY} seconds"
8+
enqueuer = VCAP::CloudController::Jobs::Enqueuer.new(queue: VCAP::CloudController::Jobs::Queues.generic)
9+
start_time = Time.now
10+
NUM_JOBS.times do
11+
dummy_job = VCAP::CloudController::Jobs::Runtime::BlobstoreDelete.new('00000000-0000-0000-0000-000000000000/0000000000000000000000000000000000000000', :droplet_blobstore)
12+
enqueuer.enqueue(dummy_job)
13+
sleep DELAY
14+
end
15+
puts "Generated #{NUM_JOBS} dummy job(s) in #{Time.now - start_time} seconds"
16+
end

spec/unit/lib/delayed_job/delayed_worker_spec.rb

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,56 @@
141141
end
142142
end
143143

144+
context 'when DB type is mysql' do
145+
before do
146+
db = instance_double(Sequel::Database)
147+
allow(db).to receive(:database_type).and_return(:mysql)
148+
allow(Sequel::Model).to receive(:db).and_return(db)
149+
end
150+
151+
it 'read_ahead defaults to DEFAULT_READ_AHEAD_MYSQL' do
152+
cc_delayed_worker.start_working
153+
expect(Delayed::Worker.read_ahead).to eq(CloudController::DelayedWorker::DEFAULT_READ_AHEAD_MYSQL)
154+
end
155+
156+
it 'read_ahead can be configured' do
157+
TestConfig.config[:jobs][:read_ahead] = 3
158+
cc_delayed_worker.start_working
159+
expect(Delayed::Worker.read_ahead).to eq(3)
160+
end
161+
162+
it 'read_ahead cant be set to 0' do
163+
TestConfig.config[:jobs][:read_ahead] = 0
164+
cc_delayed_worker.start_working
165+
expect(Delayed::Worker.read_ahead).to eq(CloudController::DelayedWorker::DEFAULT_READ_AHEAD_MYSQL)
166+
end
167+
end
168+
169+
context 'when DB type is postgres' do
170+
before do
171+
db = instance_double(Sequel::Database)
172+
allow(db).to receive(:database_type).and_return(:postgres)
173+
allow(Sequel::Model).to receive(:db).and_return(db)
174+
end
175+
176+
it 'read_ahead defaults to DEFAULT_READ_AHEAD_POSTGRES' do
177+
cc_delayed_worker.start_working
178+
expect(Delayed::Worker.read_ahead).to eq(CloudController::DelayedWorker::DEFAULT_READ_AHEAD_POSTGRES)
179+
end
180+
181+
it 'read_ahead can be configured' do
182+
TestConfig.config[:jobs][:read_ahead] = 3
183+
cc_delayed_worker.start_working
184+
expect(Delayed::Worker.read_ahead).to eq(3)
185+
end
186+
187+
it 'read_ahead cant be set to negative values' do
188+
TestConfig.config[:jobs][:read_ahead] = -1
189+
cc_delayed_worker.start_working
190+
expect(Delayed::Worker.read_ahead).to eq(CloudController::DelayedWorker::DEFAULT_READ_AHEAD_POSTGRES)
191+
end
192+
end
193+
144194
describe 'publish metrics' do
145195
before do
146196
allow(Prometheus::Client::DataStores::DirectFileStore).to receive(:new)
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
RSpec.describe 'sequel_patch' do
2+
describe 'version' do
3+
it 'is not updated' do
4+
expect(Gem.loaded_specs['talentbox-delayed_job_sequel'].version).to eq('4.3.0'),
5+
'revisit monkey patch in lib/delayed_job/sequel_patch.rb'
6+
end
7+
end
8+
end

0 commit comments

Comments
 (0)