Skip to content

Commit 3eaad82

Browse files
authored
Merge pull request #3703 from sap-contributions/add-dynamic-job-prios
Add dynamic job priorities
2 parents f27e6b5 + 9df4f89 commit 3eaad82

File tree

12 files changed

+306
-5
lines changed

12 files changed

+306
-5
lines changed

app/jobs/enqueuer.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ def enqueue_job(job)
4343
request_id = ::VCAP::Request.current_id
4444
timeout_job = TimeoutJob.new(job, job_timeout)
4545
logging_context_job = LoggingContextJob.new(timeout_job, request_id)
46-
@opts[:priority] = job_priority unless job_priority.nil?
46+
@opts[:priority] = job_priority unless @opts[:priority] || job_priority.nil?
4747
Delayed::Job.enqueue(logging_context_job, @opts)
4848
end
4949

app/jobs/pollable_job_wrapper.rb

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,19 @@ def before_enqueue(job)
2424
resource_type: @handler.resource_type
2525
)
2626
else
27+
user_guid = VCAP::CloudController::UserAuditInfo.from_context(VCAP::CloudController::SecurityContext).user_guid
28+
29+
if VCAP::CloudController::Config.config.get(:jobs, :enable_dynamic_job_priorities) && user_guid
30+
job.values[:priority] += PollableJobModel.number_of_active_jobs_by_user(user_guid)
31+
end
32+
2733
PollableJobModel.create(
2834
delayed_job_guid: job.guid,
2935
state: PollableJobModel::PROCESSING_STATE,
3036
operation: @handler.display_name,
3137
resource_guid: @handler.resource_guid,
32-
resource_type: @handler.resource_type
38+
resource_type: @handler.resource_type,
39+
user_guid: user_guid
3340
)
3441
end
3542
end

app/jobs/reoccurring_job.rb

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ def success(current_delayed_job)
1313
elsif next_enqueue_would_exceed_maximum_duration?
1414
expire!
1515
else
16-
enqueue_next_job(pollable_job)
16+
enqueue_next_job(pollable_job, current_delayed_job.priority)
1717
end
1818
end
1919

@@ -75,10 +75,11 @@ def expire!
7575
raise CloudController::Errors::ApiError.new_from_details('JobTimeout')
7676
end
7777

78-
def enqueue_next_job(pollable_job)
78+
def enqueue_next_job(pollable_job, priority)
7979
opts = {
8080
queue: Jobs::Queues.generic,
81-
run_at: Delayed::Job.db_time_now + next_execution_in
81+
run_at: Delayed::Job.db_time_now + next_execution_in,
82+
priority: priority
8283
}
8384

8485
@retry_number += 1

app/models/runtime/pollable_job_model.rb

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,5 +46,9 @@ def self.find_by_delayed_job_guid(delayed_job_guid)
4646

4747
pollable_job
4848
end
49+
50+
def self.number_of_active_jobs_by_user(user_guid)
51+
PollableJobModel.where(state: %w[PROCESSING POLLING], user_guid: user_guid).count
52+
end
4953
end
5054
end
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
Sequel.migration do
2+
# adding an index concurrently cannot be done within a transaction
3+
no_transaction
4+
5+
up do
6+
if database_type == :postgres
7+
alter_table :jobs do
8+
add_column :user_guid, String, size: 255, if_not_exists: true
9+
add_index :user_guid, name: :jobs_user_guid_index, if_not_exists: true, concurrently: true
10+
end
11+
12+
elsif database_type == :mysql
13+
alter_table :jobs do
14+
add_column :user_guid, String, size: 255 unless @db.schema(:jobs).map(&:first).include?(:user_guid)
15+
# rubocop:disable Sequel/ConcurrentIndex
16+
add_index :user_guid, name: :jobs_user_guid_index unless @db.indexes(:jobs).include?(:jobs_user_guid_index)
17+
# rubocop:enable Sequel/ConcurrentIndex
18+
end
19+
end
20+
end
21+
22+
down do
23+
if database_type == :postgres
24+
alter_table :jobs do
25+
drop_index :user_guid, name: :jobs_user_guid_index, if_exists: true, concurrently: true
26+
drop_column :user_guid, if_exists: true
27+
end
28+
end
29+
30+
if database_type == :mysql
31+
alter_table :jobs do
32+
# rubocop:disable Sequel/ConcurrentIndex
33+
drop_index :user_guid, name: :jobs_user_guid_index if @db.indexes(:jobs).include?(:jobs_user_guid_index)
34+
# rubocop:enable Sequel/ConcurrentIndex
35+
drop_column :user_guid if @db.schema(:jobs).map(&:first).include?(:user_guid)
36+
end
37+
end
38+
end
39+
end

lib/cloud_controller/config_schemas/base/api_schema.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -339,6 +339,7 @@ class ApiSchema < VCAP::Config
339339

340340
jobs: {
341341
global: { timeout_in_seconds: Integer },
342+
optional(:enable_dynamic_job_priorities) => bool,
342343
optional(:app_usage_events_cleanup) => { timeout_in_seconds: Integer },
343344
optional(:blobstore_delete) => { timeout_in_seconds: Integer },
344345
optional(:diego_sync) => { timeout_in_seconds: Integer },

lib/cloud_controller/config_schemas/base/worker_schema.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,7 @@ class WorkerSchema < VCAP::Config
167167

168168
jobs: {
169169
global: { timeout_in_seconds: Integer },
170+
optional(:enable_dynamic_job_priorities) => bool,
170171
optional(:app_usage_events_cleanup) => { timeout_in_seconds: Integer },
171172
optional(:blobstore_delete) => { timeout_in_seconds: Integer },
172173
optional(:diego_sync) => { timeout_in_seconds: Integer },
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
require 'spec_helper'
2+
require 'migrations/helpers/migration_shared_context'
3+
4+
RSpec.describe 'migration to add user_guid column to jobs table and add an index for that column', isolation: :truncation do
5+
include_context 'migration' do
6+
let(:migration_filename) { '20240314131908_add_user_guid_to_jobs_table.rb' }
7+
end
8+
9+
describe 'jobs table' do
10+
it 'adds a column `user_guid`' do
11+
expect(db[:jobs].columns).not_to include(:user_guid)
12+
expect { Sequel::Migrator.run(db, migration_to_test, allow_missing_migration_files: true) }.not_to raise_error
13+
expect(db[:jobs].columns).to include(:user_guid)
14+
end
15+
16+
it 'adds an index on the user_guid column' do
17+
expect(db.indexes(:jobs)).not_to include(:jobs_user_guid_index)
18+
expect { Sequel::Migrator.run(db, migration_to_test, allow_missing_migration_files: true) }.not_to raise_error
19+
expect(db.indexes(:jobs)).to include(:jobs_user_guid_index)
20+
end
21+
22+
describe 'idempotency of up' do
23+
context '`user_guid` column already exists' do
24+
before do
25+
db.add_column :jobs, :user_guid, String, size: 255
26+
end
27+
28+
it 'does not fail' do
29+
expect(db[:jobs].columns).to include(:user_guid)
30+
expect { Sequel::Migrator.run(db, migration_to_test, allow_missing_migration_files: true) }.not_to raise_error
31+
end
32+
33+
it 'continues to create the index' do
34+
expect(db[:jobs].columns).to include(:user_guid)
35+
expect(db.indexes(:jobs)).not_to include(:jobs_user_guid_index)
36+
expect { Sequel::Migrator.run(db, migration_to_test, allow_missing_migration_files: true) }.not_to raise_error
37+
expect(db.indexes(:jobs)).to include(:jobs_user_guid_index)
38+
end
39+
end
40+
41+
context 'index already exists' do
42+
before do
43+
db.add_column :jobs, :user_guid, String, size: 255
44+
db.add_index :jobs, :user_guid, name: :jobs_user_guid_index
45+
end
46+
47+
it 'does not fail' do
48+
expect(db.indexes(:jobs)).to include(:jobs_user_guid_index)
49+
expect { Sequel::Migrator.run(db, migration_to_test, allow_missing_migration_files: true) }.not_to raise_error
50+
end
51+
end
52+
end
53+
54+
describe 'idempotency of down' do
55+
context 'index does not exist' do
56+
before do
57+
Sequel::Migrator.run(db, migration_to_test, allow_missing_migration_files: true)
58+
db.drop_index :jobs, :user_guid, name: :jobs_user_guid_index
59+
end
60+
61+
it 'does not fail' do
62+
expect(db[:jobs].columns).to include(:user_guid)
63+
expect(db.indexes(:jobs)).not_to include(:jobs_user_guid_index)
64+
expect { Sequel::Migrator.run(db, migration_to_test, allow_missing_migration_files: true, target: 0) }.not_to raise_error
65+
end
66+
67+
it 'continues to remove the `user_guid` column' do
68+
expect(db[:jobs].columns).to include(:user_guid)
69+
expect(db.indexes(:jobs)).not_to include(:jobs_user_guid_index)
70+
expect { Sequel::Migrator.run(db, migration_to_test, allow_missing_migration_files: true, target: 0) }.not_to raise_error
71+
expect(db[:jobs].columns).not_to include(:user_guid)
72+
end
73+
end
74+
75+
context 'index and column do not exist' do
76+
before do
77+
Sequel::Migrator.run(db, migration_to_test, allow_missing_migration_files: true)
78+
db.drop_index :jobs, :user_guid, name: :jobs_user_guid_index
79+
db.drop_column :jobs, :user_guid
80+
end
81+
82+
it 'does not fail' do
83+
expect(db[:jobs].columns).not_to include(:user_guid)
84+
expect(db.indexes(:jobs)).not_to include(:jobs_user_guid_index)
85+
expect { Sequel::Migrator.run(db, migration_to_test, allow_missing_migration_files: true, target: 0) }.not_to raise_error
86+
end
87+
end
88+
end
89+
end
90+
end

spec/unit/jobs/enqueuer_spec.rb

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
require 'spec_helper'
12
require 'db_spec_helper'
23
require 'jobs/enqueuer'
34
require 'jobs/delete_action_job'
@@ -133,6 +134,15 @@ module VCAP::CloudController::Jobs
133134
end
134135
end
135136

137+
it 'uses the default priority' do
138+
original_enqueue = Delayed::Job.method(:enqueue)
139+
expect(Delayed::Job).to receive(:enqueue) do |enqueued_job, opts|
140+
expect(opts).not_to include(:priority)
141+
original_enqueue.call(enqueued_job, opts)
142+
end
143+
Enqueuer.new(wrapped_job, opts).enqueue_pollable
144+
end
145+
136146
context 'priority from config' do
137147
let(:priorities) { { priorities: { wrapped_job.display_name.to_sym => 1899 } } }
138148

@@ -144,6 +154,18 @@ module VCAP::CloudController::Jobs
144154
end
145155
Enqueuer.new(wrapped_job, opts).enqueue_pollable
146156
end
157+
158+
context 'and priority from Enqueuer (e.g. from reoccurring jobs)' do
159+
it 'uses the priority passed into the Enqueuer' do
160+
original_enqueue = Delayed::Job.method(:enqueue)
161+
expect(Delayed::Job).to receive(:enqueue) do |enqueued_job, opts|
162+
expect(opts).to include({ priority: 2000 })
163+
original_enqueue.call(enqueued_job, opts)
164+
end
165+
opts[:priority] = 2000
166+
Enqueuer.new(wrapped_job, opts).enqueue_pollable
167+
end
168+
end
147169
end
148170
end
149171

spec/unit/jobs/pollable_job_wrapper_spec.rb

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,76 @@ class BigException < StandardError
3838
expect(job_record.reload.state).to eq('COMPLETE')
3939
end
4040

41+
context 'when dynamic job priorities are enabled' do
42+
before do
43+
TestConfig.config[:jobs][:enable_dynamic_job_priorities] = true
44+
end
45+
46+
context 'when there are several active jobs for the current user' do
47+
let(:security_context) { double({ current_user_email: 'user-email', current_user_name: 'user-name', current_user: double({ guid: 'user-guid' }) }) }
48+
let(:pollable_job) { PollableJobWrapper.new(job) }
49+
50+
before do
51+
stub_const('VCAP::CloudController::SecurityContext', security_context)
52+
VCAP::CloudController::Jobs::Enqueuer.new(PollableJobWrapper.new(job)).enqueue
53+
VCAP::CloudController::Jobs::Enqueuer.new(PollableJobWrapper.new(job)).enqueue
54+
VCAP::CloudController::Jobs::Enqueuer.new(PollableJobWrapper.new(job)).enqueue
55+
end
56+
57+
it 'adds +1 to base priority for each active job' do
58+
TestConfig.config[:jobs][:priorities] = { 'droplet.delete': 20 }
59+
60+
enqueued_job = VCAP::CloudController::Jobs::Enqueuer.new(pollable_job).enqueue
61+
expect(enqueued_job.priority).to eq(23)
62+
end
63+
64+
context 'when SecurityContext does not have a user guid' do
65+
let(:security_context) { double({ current_user_email: 'user-email', current_user_name: 'user-name', current_user: double({}) }) }
66+
67+
before do
68+
stub_const('VCAP::CloudController::SecurityContext', security_context)
69+
end
70+
71+
it 'does not change its delayed job\'s base priority' do
72+
TestConfig.config[:jobs][:priorities] = { 'droplet.delete': 20 }
73+
74+
enqueued_job = VCAP::CloudController::Jobs::Enqueuer.new(pollable_job).enqueue
75+
expect(enqueued_job.priority).to eq(20)
76+
end
77+
78+
it 'uses nil for the user_guid' do
79+
TestConfig.config[:jobs][:priorities] = { 'droplet.delete': 20 }
80+
81+
enqueued_job = VCAP::CloudController::Jobs::Enqueuer.new(pollable_job).enqueue
82+
83+
job_record = VCAP::CloudController::PollableJobModel.find(delayed_job_guid: enqueued_job.guid)
84+
expect(job_record.user_guid).to be_nil
85+
end
86+
end
87+
end
88+
end
89+
90+
context 'when dynamic job priorities are disabled' do
91+
context 'when there are several active jobs for the current user' do
92+
let(:security_context) { double({ current_user_email: 'user-email', current_user_name: 'user-name', current_user: double({ guid: 'user-guid' }) }) }
93+
let(:pollable_job) { PollableJobWrapper.new(job) }
94+
95+
before do
96+
stub_const('VCAP::CloudController::SecurityContext', security_context)
97+
VCAP::CloudController::Jobs::Enqueuer.new(PollableJobWrapper.new(job)).enqueue
98+
VCAP::CloudController::Jobs::Enqueuer.new(PollableJobWrapper.new(job)).enqueue
99+
VCAP::CloudController::Jobs::Enqueuer.new(PollableJobWrapper.new(job)).enqueue
100+
end
101+
102+
it 'does not change its delayed job\'s base priority' do
103+
TestConfig.config[:jobs][:priorities] = { 'droplet.delete': 20 }
104+
105+
enqueued_job = VCAP::CloudController::Jobs::Enqueuer.new(pollable_job).enqueue
106+
expect(enqueued_job.priority).to eq(20)
107+
end
108+
end
109+
end
110+
41111
context 'reusing a pollable job' do
42112
let!(:existing) { VCAP::CloudController::PollableJobModel.make }
43113
let(:pollable_job) { PollableJobWrapper.new(job, existing_guid: existing.guid) }
@@ -60,6 +130,30 @@ class BigException < StandardError
60130
expect(job_record.cf_api_error).to be_nil
61131
end
62132

133+
context 'when dynamic job priorities are enabled' do
134+
before do
135+
TestConfig.config[:jobs][:enable_dynamic_job_priorities] = true
136+
end
137+
138+
context 'when there are several active jobs for the current user' do
139+
let(:security_context) { double({ current_user_email: 'user-email', current_user_name: 'user-name', current_user: double({ guid: 'user-guid' }) }) }
140+
141+
before do
142+
stub_const('VCAP::CloudController::SecurityContext', security_context)
143+
VCAP::CloudController::Jobs::Enqueuer.new(PollableJobWrapper.new(job)).enqueue
144+
VCAP::CloudController::Jobs::Enqueuer.new(PollableJobWrapper.new(job)).enqueue
145+
VCAP::CloudController::Jobs::Enqueuer.new(PollableJobWrapper.new(job)).enqueue
146+
end
147+
148+
it 'does not change its delayed job\'s base priority' do
149+
TestConfig.config[:jobs][:priorities] = { 'droplet.delete': 20 }
150+
151+
enqueued_job = VCAP::CloudController::Jobs::Enqueuer.new(pollable_job).enqueue
152+
expect(enqueued_job.priority).to eq(20)
153+
end
154+
end
155+
end
156+
63157
context 'when the job defines its state' do
64158
before do
65159
job.define_singleton_method(:pollable_job_state) do

0 commit comments

Comments
 (0)