Skip to content

Commit 9df4f89

Browse files
committed
Add dynamic job priorities
- DB migration to add user_guid field to jobs table - When job is enqueued, get currently active jobs of that user and add +1 to job priority for each active job Use original priority for reoccuring jobs - Pass on priority of current delayed job to next delayed job in reoccurring job - Don't overwrite priority, which was passed into Enqueuer, with base priority Add unit tests Use concurrently for index operations. Small refactoring. Add config parameter for enabling dynamic job priorities Add migration tests and fix migration Add test to enqueuer_spec Use index name when dropping an index
1 parent ac514a2 commit 9df4f89

File tree

12 files changed

+306
-5
lines changed

12 files changed

+306
-5
lines changed

app/jobs/enqueuer.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ def enqueue_job(job)
4343
request_id = ::VCAP::Request.current_id
4444
timeout_job = TimeoutJob.new(job, job_timeout)
4545
logging_context_job = LoggingContextJob.new(timeout_job, request_id)
46-
@opts[:priority] = job_priority unless job_priority.nil?
46+
@opts[:priority] = job_priority unless @opts[:priority] || job_priority.nil?
4747
Delayed::Job.enqueue(logging_context_job, @opts)
4848
end
4949

app/jobs/pollable_job_wrapper.rb

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,19 @@ def before_enqueue(job)
2424
resource_type: @handler.resource_type
2525
)
2626
else
27+
user_guid = VCAP::CloudController::UserAuditInfo.from_context(VCAP::CloudController::SecurityContext).user_guid
28+
29+
if VCAP::CloudController::Config.config.get(:jobs, :enable_dynamic_job_priorities) && user_guid
30+
job.values[:priority] += PollableJobModel.number_of_active_jobs_by_user(user_guid)
31+
end
32+
2733
PollableJobModel.create(
2834
delayed_job_guid: job.guid,
2935
state: PollableJobModel::PROCESSING_STATE,
3036
operation: @handler.display_name,
3137
resource_guid: @handler.resource_guid,
32-
resource_type: @handler.resource_type
38+
resource_type: @handler.resource_type,
39+
user_guid: user_guid
3340
)
3441
end
3542
end

app/jobs/reoccurring_job.rb

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ def success(current_delayed_job)
1313
elsif next_enqueue_would_exceed_maximum_duration?
1414
expire!
1515
else
16-
enqueue_next_job(pollable_job)
16+
enqueue_next_job(pollable_job, current_delayed_job.priority)
1717
end
1818
end
1919

@@ -75,10 +75,11 @@ def expire!
7575
raise CloudController::Errors::ApiError.new_from_details('JobTimeout')
7676
end
7777

78-
def enqueue_next_job(pollable_job)
78+
def enqueue_next_job(pollable_job, priority)
7979
opts = {
8080
queue: Jobs::Queues.generic,
81-
run_at: Delayed::Job.db_time_now + next_execution_in
81+
run_at: Delayed::Job.db_time_now + next_execution_in,
82+
priority: priority
8283
}
8384

8485
@retry_number += 1

app/models/runtime/pollable_job_model.rb

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,5 +46,9 @@ def self.find_by_delayed_job_guid(delayed_job_guid)
4646

4747
pollable_job
4848
end
49+
50+
def self.number_of_active_jobs_by_user(user_guid)
51+
PollableJobModel.where(state: %w[PROCESSING POLLING], user_guid: user_guid).count
52+
end
4953
end
5054
end
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
Sequel.migration do
2+
# adding an index concurrently cannot be done within a transaction
3+
no_transaction
4+
5+
up do
6+
if database_type == :postgres
7+
alter_table :jobs do
8+
add_column :user_guid, String, size: 255, if_not_exists: true
9+
add_index :user_guid, name: :jobs_user_guid_index, if_not_exists: true, concurrently: true
10+
end
11+
12+
elsif database_type == :mysql
13+
alter_table :jobs do
14+
add_column :user_guid, String, size: 255 unless @db.schema(:jobs).map(&:first).include?(:user_guid)
15+
# rubocop:disable Sequel/ConcurrentIndex
16+
add_index :user_guid, name: :jobs_user_guid_index unless @db.indexes(:jobs).include?(:jobs_user_guid_index)
17+
# rubocop:enable Sequel/ConcurrentIndex
18+
end
19+
end
20+
end
21+
22+
down do
23+
if database_type == :postgres
24+
alter_table :jobs do
25+
drop_index :user_guid, name: :jobs_user_guid_index, if_exists: true, concurrently: true
26+
drop_column :user_guid, if_exists: true
27+
end
28+
end
29+
30+
if database_type == :mysql
31+
alter_table :jobs do
32+
# rubocop:disable Sequel/ConcurrentIndex
33+
drop_index :user_guid, name: :jobs_user_guid_index if @db.indexes(:jobs).include?(:jobs_user_guid_index)
34+
# rubocop:enable Sequel/ConcurrentIndex
35+
drop_column :user_guid if @db.schema(:jobs).map(&:first).include?(:user_guid)
36+
end
37+
end
38+
end
39+
end

lib/cloud_controller/config_schemas/base/api_schema.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -335,6 +335,7 @@ class ApiSchema < VCAP::Config
335335

336336
jobs: {
337337
global: { timeout_in_seconds: Integer },
338+
optional(:enable_dynamic_job_priorities) => bool,
338339
optional(:app_usage_events_cleanup) => { timeout_in_seconds: Integer },
339340
optional(:blobstore_delete) => { timeout_in_seconds: Integer },
340341
optional(:diego_sync) => { timeout_in_seconds: Integer },

lib/cloud_controller/config_schemas/base/worker_schema.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,7 @@ class WorkerSchema < VCAP::Config
167167

168168
jobs: {
169169
global: { timeout_in_seconds: Integer },
170+
optional(:enable_dynamic_job_priorities) => bool,
170171
optional(:app_usage_events_cleanup) => { timeout_in_seconds: Integer },
171172
optional(:blobstore_delete) => { timeout_in_seconds: Integer },
172173
optional(:diego_sync) => { timeout_in_seconds: Integer },
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
require 'spec_helper'
2+
require 'migrations/helpers/migration_shared_context'
3+
4+
RSpec.describe 'migration to add user_guid column to jobs table and add an index for that column', isolation: :truncation do
5+
include_context 'migration' do
6+
let(:migration_filename) { '20240314131908_add_user_guid_to_jobs_table.rb' }
7+
end
8+
9+
describe 'jobs table' do
10+
it 'adds a column `user_guid`' do
11+
expect(db[:jobs].columns).not_to include(:user_guid)
12+
expect { Sequel::Migrator.run(db, migration_to_test, allow_missing_migration_files: true) }.not_to raise_error
13+
expect(db[:jobs].columns).to include(:user_guid)
14+
end
15+
16+
it 'adds an index on the user_guid column' do
17+
expect(db.indexes(:jobs)).not_to include(:jobs_user_guid_index)
18+
expect { Sequel::Migrator.run(db, migration_to_test, allow_missing_migration_files: true) }.not_to raise_error
19+
expect(db.indexes(:jobs)).to include(:jobs_user_guid_index)
20+
end
21+
22+
describe 'idempotency of up' do
23+
context '`user_guid` column already exists' do
24+
before do
25+
db.add_column :jobs, :user_guid, String, size: 255
26+
end
27+
28+
it 'does not fail' do
29+
expect(db[:jobs].columns).to include(:user_guid)
30+
expect { Sequel::Migrator.run(db, migration_to_test, allow_missing_migration_files: true) }.not_to raise_error
31+
end
32+
33+
it 'continues to create the index' do
34+
expect(db[:jobs].columns).to include(:user_guid)
35+
expect(db.indexes(:jobs)).not_to include(:jobs_user_guid_index)
36+
expect { Sequel::Migrator.run(db, migration_to_test, allow_missing_migration_files: true) }.not_to raise_error
37+
expect(db.indexes(:jobs)).to include(:jobs_user_guid_index)
38+
end
39+
end
40+
41+
context 'index already exists' do
42+
before do
43+
db.add_column :jobs, :user_guid, String, size: 255
44+
db.add_index :jobs, :user_guid, name: :jobs_user_guid_index
45+
end
46+
47+
it 'does not fail' do
48+
expect(db.indexes(:jobs)).to include(:jobs_user_guid_index)
49+
expect { Sequel::Migrator.run(db, migration_to_test, allow_missing_migration_files: true) }.not_to raise_error
50+
end
51+
end
52+
end
53+
54+
describe 'idempotency of down' do
55+
context 'index does not exist' do
56+
before do
57+
Sequel::Migrator.run(db, migration_to_test, allow_missing_migration_files: true)
58+
db.drop_index :jobs, :user_guid, name: :jobs_user_guid_index
59+
end
60+
61+
it 'does not fail' do
62+
expect(db[:jobs].columns).to include(:user_guid)
63+
expect(db.indexes(:jobs)).not_to include(:jobs_user_guid_index)
64+
expect { Sequel::Migrator.run(db, migration_to_test, allow_missing_migration_files: true, target: 0) }.not_to raise_error
65+
end
66+
67+
it 'continues to remove the `user_guid` column' do
68+
expect(db[:jobs].columns).to include(:user_guid)
69+
expect(db.indexes(:jobs)).not_to include(:jobs_user_guid_index)
70+
expect { Sequel::Migrator.run(db, migration_to_test, allow_missing_migration_files: true, target: 0) }.not_to raise_error
71+
expect(db[:jobs].columns).not_to include(:user_guid)
72+
end
73+
end
74+
75+
context 'index and column do not exist' do
76+
before do
77+
Sequel::Migrator.run(db, migration_to_test, allow_missing_migration_files: true)
78+
db.drop_index :jobs, :user_guid, name: :jobs_user_guid_index
79+
db.drop_column :jobs, :user_guid
80+
end
81+
82+
it 'does not fail' do
83+
expect(db[:jobs].columns).not_to include(:user_guid)
84+
expect(db.indexes(:jobs)).not_to include(:jobs_user_guid_index)
85+
expect { Sequel::Migrator.run(db, migration_to_test, allow_missing_migration_files: true, target: 0) }.not_to raise_error
86+
end
87+
end
88+
end
89+
end
90+
end

spec/unit/jobs/enqueuer_spec.rb

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
require 'spec_helper'
12
require 'db_spec_helper'
23
require 'jobs/enqueuer'
34
require 'jobs/delete_action_job'
@@ -133,6 +134,15 @@ module VCAP::CloudController::Jobs
133134
end
134135
end
135136

137+
it 'uses the default priority' do
138+
original_enqueue = Delayed::Job.method(:enqueue)
139+
expect(Delayed::Job).to receive(:enqueue) do |enqueued_job, opts|
140+
expect(opts).not_to include(:priority)
141+
original_enqueue.call(enqueued_job, opts)
142+
end
143+
Enqueuer.new(wrapped_job, opts).enqueue_pollable
144+
end
145+
136146
context 'priority from config' do
137147
let(:priorities) { { priorities: { wrapped_job.display_name.to_sym => 1899 } } }
138148

@@ -144,6 +154,18 @@ module VCAP::CloudController::Jobs
144154
end
145155
Enqueuer.new(wrapped_job, opts).enqueue_pollable
146156
end
157+
158+
context 'and priority from Enqueuer (e.g. from reoccurring jobs)' do
159+
it 'uses the priority passed into the Enqueuer' do
160+
original_enqueue = Delayed::Job.method(:enqueue)
161+
expect(Delayed::Job).to receive(:enqueue) do |enqueued_job, opts|
162+
expect(opts).to include({ priority: 2000 })
163+
original_enqueue.call(enqueued_job, opts)
164+
end
165+
opts[:priority] = 2000
166+
Enqueuer.new(wrapped_job, opts).enqueue_pollable
167+
end
168+
end
147169
end
148170
end
149171

spec/unit/jobs/pollable_job_wrapper_spec.rb

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,76 @@ class BigException < StandardError
3838
expect(job_record.reload.state).to eq('COMPLETE')
3939
end
4040

41+
context 'when dynamic job priorities are enabled' do
42+
before do
43+
TestConfig.config[:jobs][:enable_dynamic_job_priorities] = true
44+
end
45+
46+
context 'when there are several active jobs for the current user' do
47+
let(:security_context) { double({ current_user_email: 'user-email', current_user_name: 'user-name', current_user: double({ guid: 'user-guid' }) }) }
48+
let(:pollable_job) { PollableJobWrapper.new(job) }
49+
50+
before do
51+
stub_const('VCAP::CloudController::SecurityContext', security_context)
52+
VCAP::CloudController::Jobs::Enqueuer.new(PollableJobWrapper.new(job)).enqueue
53+
VCAP::CloudController::Jobs::Enqueuer.new(PollableJobWrapper.new(job)).enqueue
54+
VCAP::CloudController::Jobs::Enqueuer.new(PollableJobWrapper.new(job)).enqueue
55+
end
56+
57+
it 'adds +1 to base priority for each active job' do
58+
TestConfig.config[:jobs][:priorities] = { 'droplet.delete': 20 }
59+
60+
enqueued_job = VCAP::CloudController::Jobs::Enqueuer.new(pollable_job).enqueue
61+
expect(enqueued_job.priority).to eq(23)
62+
end
63+
64+
context 'when SecurityContext does not have a user guid' do
65+
let(:security_context) { double({ current_user_email: 'user-email', current_user_name: 'user-name', current_user: double({}) }) }
66+
67+
before do
68+
stub_const('VCAP::CloudController::SecurityContext', security_context)
69+
end
70+
71+
it 'does not change its delayed job\'s base priority' do
72+
TestConfig.config[:jobs][:priorities] = { 'droplet.delete': 20 }
73+
74+
enqueued_job = VCAP::CloudController::Jobs::Enqueuer.new(pollable_job).enqueue
75+
expect(enqueued_job.priority).to eq(20)
76+
end
77+
78+
it 'uses nil for the user_guid' do
79+
TestConfig.config[:jobs][:priorities] = { 'droplet.delete': 20 }
80+
81+
enqueued_job = VCAP::CloudController::Jobs::Enqueuer.new(pollable_job).enqueue
82+
83+
job_record = VCAP::CloudController::PollableJobModel.find(delayed_job_guid: enqueued_job.guid)
84+
expect(job_record.user_guid).to be_nil
85+
end
86+
end
87+
end
88+
end
89+
90+
context 'when dynamic job priorities are disabled' do
91+
context 'when there are several active jobs for the current user' do
92+
let(:security_context) { double({ current_user_email: 'user-email', current_user_name: 'user-name', current_user: double({ guid: 'user-guid' }) }) }
93+
let(:pollable_job) { PollableJobWrapper.new(job) }
94+
95+
before do
96+
stub_const('VCAP::CloudController::SecurityContext', security_context)
97+
VCAP::CloudController::Jobs::Enqueuer.new(PollableJobWrapper.new(job)).enqueue
98+
VCAP::CloudController::Jobs::Enqueuer.new(PollableJobWrapper.new(job)).enqueue
99+
VCAP::CloudController::Jobs::Enqueuer.new(PollableJobWrapper.new(job)).enqueue
100+
end
101+
102+
it 'does not change its delayed job\'s base priority' do
103+
TestConfig.config[:jobs][:priorities] = { 'droplet.delete': 20 }
104+
105+
enqueued_job = VCAP::CloudController::Jobs::Enqueuer.new(pollable_job).enqueue
106+
expect(enqueued_job.priority).to eq(20)
107+
end
108+
end
109+
end
110+
41111
context 'reusing a pollable job' do
42112
let!(:existing) { VCAP::CloudController::PollableJobModel.make }
43113
let(:pollable_job) { PollableJobWrapper.new(job, existing_guid: existing.guid) }
@@ -60,6 +130,30 @@ class BigException < StandardError
60130
expect(job_record.cf_api_error).to be_nil
61131
end
62132

133+
context 'when dynamic job priorities are enabled' do
134+
before do
135+
TestConfig.config[:jobs][:enable_dynamic_job_priorities] = true
136+
end
137+
138+
context 'when there are several active jobs for the current user' do
139+
let(:security_context) { double({ current_user_email: 'user-email', current_user_name: 'user-name', current_user: double({ guid: 'user-guid' }) }) }
140+
141+
before do
142+
stub_const('VCAP::CloudController::SecurityContext', security_context)
143+
VCAP::CloudController::Jobs::Enqueuer.new(PollableJobWrapper.new(job)).enqueue
144+
VCAP::CloudController::Jobs::Enqueuer.new(PollableJobWrapper.new(job)).enqueue
145+
VCAP::CloudController::Jobs::Enqueuer.new(PollableJobWrapper.new(job)).enqueue
146+
end
147+
148+
it 'does not change its delayed job\'s base priority' do
149+
TestConfig.config[:jobs][:priorities] = { 'droplet.delete': 20 }
150+
151+
enqueued_job = VCAP::CloudController::Jobs::Enqueuer.new(pollable_job).enqueue
152+
expect(enqueued_job.priority).to eq(20)
153+
end
154+
end
155+
end
156+
63157
context 'when the job defines its state' do
64158
before do
65159
job.define_singleton_method(:pollable_job_state) do

0 commit comments

Comments
 (0)