Skip to content

Commit 6749230

Browse files
committed
Add GenericEnqueuer for consistent job priorities
Some of the delayed jobs enqueue other delayed jobs. An example for this is an app delete job which enqueues other jobs like blobstore delete and buildpack cache cleanup. If CAPI is configured with dedicated job priorities or when dynamic job priorities is enabled, those secondary jobs might end up with a higher priority then the primary job. This might result in less important jobs like blobstore delete to have a higher priority than more important ones like service instance create. This change introduces a `GenericEnqueuer` which follows the singleton pattern to ensure that always the same enqueuer instance is used in context of the current thread. `GenericEnqueuer` will be initialized and destroyed in the CCJob wrapper and therefore every job which gets enqueued by `GenericEnqueuer.shared.enqueue(job)` will have the same priority. Additionally the `enqueue` and `enqueue_pollable` methods allow an optional `priority_increment` which increases the priority value and thus makes the job less important. With this we can give secondary jobs a lower priority then their parent useful for e.g. blobstore delete jobs. If a job has a dedicated priority configured this priority will be added to the current priority and `priority_increment`, ensuring that the priorities of the primary job are correctly applied. For reoccurring jobs the priority can be locked with the `preserve_priority` parameter which will ensure the current/previous priority is used again. | Job Type | Preserve Priority | Config Priority | Increment | Final Priority | |------------------------|-------------------|-----------------|-----------|--------------------------------------| | **Parent Job** | ❌ No | `100` | `nil` | **100** | | **Sub-Job A** | ❌ No | `200` | `50` | **350** (100+200+50) | | **Sub-Job B** | ❌ No | `nil` | `50` | **150** (100+50) | | **Sub-Job C** | ❌ No | `nil` | `nil` | **100** (inherits parent) | | **Tertiary Job A1** | ❌ No | `50` | `20` | **420** (350+50+20) | | **Tertiary Job B1** | ✅ Yes | `10` (ignored) | `30` | **150** (preserved from Sub-Job B) | | **Tertiary Job C1** | ❌ No | `nil` | `50` | **150** (100+50) | | **Re-enqueued Job** | ✅ Yes | `20` (ignored) | `100` | **42** (original preserved priority) |
1 parent fad2da1 commit 6749230

32 files changed

+310
-66
lines changed

app/actions/app_delete.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ def route_mappings_to_delete(app)
9797

9898
def delete_buildpack_cache(app)
9999
delete_job = Jobs::V3::BuildpackCacheDelete.new(app.guid)
100-
Jobs::Enqueuer.new(queue: Jobs::Queues.generic).enqueue(delete_job)
100+
Jobs::GenericEnqueuer.shared.enqueue(delete_job, priority_increment: Jobs::REDUCED_PRIORITY)
101101
end
102102

103103
def logger

app/actions/buildpack_delete.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ def delete(buildpacks)
88
end
99
if buildpack.key
1010
blobstore_delete = Jobs::Runtime::BlobstoreDelete.new(buildpack.key, :buildpack_blobstore)
11-
Jobs::Enqueuer.new(queue: Jobs::Queues.generic).enqueue(blobstore_delete)
11+
Jobs::GenericEnqueuer.shared.enqueue(blobstore_delete, priority_increment: Jobs::REDUCED_PRIORITY)
1212
end
1313
end
1414

app/actions/droplet_copy.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ def copy_buildpack_droplet(new_droplet)
6060
new_droplet.buildpack_lifecycle_data(reload: true)
6161

6262
copy_job = Jobs::V3::DropletBitsCopier.new(@source_droplet.guid, new_droplet.guid)
63-
Jobs::Enqueuer.new(queue: Jobs::Queues.generic).enqueue(copy_job)
63+
Jobs::GenericEnqueuer.shared.enqueue(copy_job)
6464
end
6565
end
6666
end

app/actions/droplet_delete.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ def delete(droplets)
1616

1717
if droplet.blobstore_key
1818
blobstore_delete = Jobs::Runtime::BlobstoreDelete.new(droplet.blobstore_key, :droplet_blobstore)
19-
Jobs::Enqueuer.new(queue: Jobs::Queues.generic).enqueue(blobstore_delete)
19+
Jobs::GenericEnqueuer.shared.enqueue(blobstore_delete, priority_increment: Jobs::REDUCED_PRIORITY)
2020
end
2121

2222
Repositories::DropletEventRepository.record_delete(

app/actions/mixins/bindings_delete.rb

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
require 'jobs/queues'
22
require 'jobs/enqueuer'
3+
require 'jobs/generic_enqueuer'
34
require 'jobs/v3/delete_binding_job'
45
require 'jobs/v3/delete_service_binding_job_factory'
56

@@ -19,7 +20,7 @@ def delete_bindings(bindings, user_audit_info:)
1920
result = binding_delete_action.delete(binding)
2021
unless result[:finished]
2122
polling_job = DeleteBindingJob.new(type, binding.guid, user_audit_info:)
22-
Jobs::Enqueuer.new(queue: Jobs::Queues.generic).enqueue_pollable(polling_job)
23+
Jobs::GenericEnqueuer.shared.enqueue_pollable(polling_job)
2324
unbinding_operation_in_progress!(binding)
2425
end
2526
rescue StandardError => e

app/actions/package_copy.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ def copy(destination_app_guid:, source_package:, user_audit_info:, record_event:
2020
package.db.transaction do
2121
package.save
2222

23-
@enqueued_job = Jobs::Enqueuer.new(queue: Jobs::Queues.generic).enqueue(Jobs::V3::PackageBitsCopier.new(source_package.guid, package.guid)) if source_package.type == 'bits'
23+
@enqueued_job = Jobs::GenericEnqueuer.shared.enqueue(Jobs::V3::PackageBitsCopier.new(source_package.guid, package.guid)) if source_package.type == 'bits'
2424

2525
record_audit_event(package, source_package, user_audit_info) if record_event
2626
end

app/actions/package_delete.rb

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
require 'jobs/generic_enqueuer'
2+
13
module VCAP::CloudController
24
class PackageDelete
35
def initialize(user_audit_info)
@@ -10,7 +12,7 @@ def delete(packages)
1012
packages.each do |package|
1113
unless package.docker?
1214
package_src_delete_job = create_package_source_deletion_job(package)
13-
Jobs::Enqueuer.new(queue: Jobs::Queues.generic).enqueue(package_src_delete_job) if package_src_delete_job
15+
Jobs::GenericEnqueuer.shared.enqueue(package_src_delete_job, priority_increment: Jobs::REDUCED_PRIORITY) if package_src_delete_job
1416
end
1517

1618
package.destroy

app/actions/routing/route_delete.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ def delete_sync(route:, recursive:)
1515
end
1616

1717
def delete_async(route:, recursive:)
18-
Jobs::Enqueuer.new(queue: Jobs::Queues.generic).enqueue(do_delete(recursive, route))
18+
Jobs::GenericEnqueuer.shared.enqueue(do_delete(recursive, route))
1919
end
2020

2121
def delete_unmapped_route(route:)

app/actions/service_broker_create.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ def create(message)
3131
service_event_repository.record_broker_event_with_request(:create, broker, message.audit_hash)
3232

3333
synchronization_job = SynchronizeBrokerCatalogJob.new(broker.guid, user_audit_info: service_event_repository.user_audit_info)
34-
pollable_job = Jobs::Enqueuer.new(queue: Jobs::Queues.generic).enqueue_pollable(synchronization_job)
34+
pollable_job = Jobs::GenericEnqueuer.shared.enqueue_pollable(synchronization_job)
3535
end
3636

3737
{ pollable_job: }

app/actions/services/locks/deleter_lock.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ def unlock_and_destroy!
4949

5050
def enqueue_and_unlock!(attributes_to_update, job)
5151
service_instance.save_and_update_operation(attributes_to_update)
52-
Jobs::Enqueuer.new(queue: Jobs::Queues.generic).enqueue(job)
52+
Jobs::GenericEnqueuer.shared.enqueue(job)
5353
@needs_unlock = false
5454
end
5555

0 commit comments

Comments
 (0)