Skip to content

Commit 5306029

Browse files
authored
Improve draining of delayed worker (#3999)
This change improves the draining behavior of the delayed workers: - Add shutdown_delayed_worker method to Drain class which configurable timeout - Add jobs:clear_pending_locks rake task to clear pending job locks in the delayed jobs table for the given worker. Additionally it allows operators to set the number of delayed worker threads and the grace period after which threads will be killed via the jobs rake task.
1 parent b7a6a54 commit 5306029

File tree

9 files changed

+256
-43
lines changed

9 files changed

+256
-43
lines changed

lib/cloud_controller/config_schemas/base/api_schema.rb

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -341,8 +341,7 @@ class ApiSchema < VCAP::Config
341341
optional(:app_usage_events_cleanup) => { timeout_in_seconds: Integer },
342342
optional(:blobstore_delete) => { timeout_in_seconds: Integer },
343343
optional(:diego_sync) => { timeout_in_seconds: Integer },
344-
optional(:priorities) => Hash,
345-
optional(:number_of_worker_threads) => Integer
344+
optional(:priorities) => Hash
346345
},
347346

348347
# perm settings no longer have any effect but are preserved here

lib/cloud_controller/config_schemas/base/worker_schema.rb

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -166,8 +166,7 @@ class WorkerSchema < VCAP::Config
166166
optional(:app_usage_events_cleanup) => { timeout_in_seconds: Integer },
167167
optional(:blobstore_delete) => { timeout_in_seconds: Integer },
168168
optional(:diego_sync) => { timeout_in_seconds: Integer },
169-
optional(:priorities) => Hash,
170-
optional(:number_of_worker_threads) => Integer
169+
optional(:priorities) => Hash
171170
},
172171

173172
volume_services_enabled: bool,

lib/cloud_controller/drain.rb

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,23 @@ def shutdown_cc(pid_path)
3838
log_shutdown_error(pid, process_name) unless wait_for_shutdown(pid, process_name, CCNG_FINAL_TIMEOUT)
3939
end
4040

41+
def shutdown_delayed_worker(pid_path, timeout=15)
42+
pid = File.read(pid_path).to_i
43+
process_name = File.basename(pid_path, '.pid')
44+
45+
# Initiate shutdown.
46+
send_signal('TERM', pid, process_name)
47+
48+
# Wait some additional time for delayed worker to be terminated; otherwise write an error log message.
49+
log_shutdown_error(pid, process_name) unless wait_for_shutdown(pid, process_name, timeout)
50+
51+
# force shutdown
52+
return if terminated?(pid, process_name)
53+
54+
log_info("Forcefully shutting down process '#{process_name}' with pid '#{pid}'")
55+
send_signal('KILL', pid, process_name)
56+
end
57+
4158
private
4259

4360
def send_signal(signal, pid, process_name)

lib/delayed_job/delayed_worker.rb

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,10 @@ def initialize(options)
99
worker_name: options[:name],
1010
quiet: true
1111
}
12+
return unless options[:num_threads] && options[:num_threads].to_i > 0
13+
14+
@queue_options[:num_threads] = options[:num_threads].to_i
15+
@queue_options[:grace_period_seconds] = options[:thread_grace_period_seconds].to_i if options[:thread_grace_period_seconds] && options[:thread_grace_period_seconds].to_i > 0
1216
end
1317

1418
def start_working
@@ -17,16 +21,36 @@ def start_working
1721

1822
logger = Steno.logger('cc-worker')
1923
logger.info("Starting job with options #{@queue_options}")
24+
setup_app_log_emitter(config, logger)
25+
26+
worker = get_initialized_delayed_worker(config, logger)
27+
worker.start
28+
end
2029

30+
def clear_locks!
31+
config = RakeConfig.config
32+
BackgroundJobEnvironment.new(config).setup_environment(readiness_port)
33+
34+
logger = Steno.logger('cc-worker-clear-locks')
35+
logger.info("Clearing pending locks with options {#{@queue_options.map { |k, v| "#{k}: #{v.inspect}" }.join(', ')}}")
2136
setup_app_log_emitter(config, logger)
37+
38+
worker = get_initialized_delayed_worker(config, logger)
39+
Delayed::Job.clear_locks!(worker.name)
40+
41+
# Clear locks for all threads when using a threaded worker
42+
worker.names_with_threads.each { |name| Delayed::Job.clear_locks!(name) } if worker.respond_to?(:names_with_threads)
43+
end
44+
45+
private
46+
47+
def get_initialized_delayed_worker(config, logger)
2248
Delayed::Worker.destroy_failed_jobs = false
2349
Delayed::Worker.max_attempts = 3
2450
Delayed::Worker.max_run_time = config.get(:jobs, :global, :timeout_in_seconds) + 1
2551
Delayed::Worker.logger = logger
2652

27-
num_worker_threads = config.get(:jobs, :number_of_worker_threads)
28-
unless num_worker_threads.nil?
29-
@queue_options[:num_threads] = num_worker_threads
53+
unless @queue_options[:num_threads].nil?
3054
# Dynamically alias Delayed::Worker to ThreadedWorker to ensure plugins etc are working correctly
3155
Delayed.module_eval do
3256
remove_const(:Worker) if const_defined?(:Worker)
@@ -36,11 +60,9 @@ def start_working
3660

3761
worker = Delayed::Worker.new(@queue_options)
3862
worker.name = @queue_options[:worker_name]
39-
worker.start
63+
worker
4064
end
4165

42-
private
43-
4466
def setup_app_log_emitter(config, logger)
4567
VCAP::AppLogEmitter.fluent_emitter = fluent_emitter(config) if config.get(:fluent)
4668
if config.get(:loggregator) && config.get(

lib/delayed_job/threaded_worker.rb

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,11 @@ def start
2828
raise SignalException.new('INT') if self.class.raise_signal_exceptions && self.class.raise_signal_exceptions != :term
2929
end
3030

31-
say "Starting threaded delayed worker with #{@num_threads} threads"
31+
say "Starting threaded delayed worker with #{@num_threads} threads and grace period of #{@grace_period_seconds} seconds"
3232

3333
@num_threads.times do |thread_index|
3434
thread = Thread.new do
35-
Thread.current[:thread_name] = "thread:#{thread_index + 1}"
35+
Thread.current[:thread_index] = thread_index
3636
threaded_start
3737
rescue Exception => e # rubocop:disable Lint/RescueException
3838
say "Unexpected error: #{e.message}\n#{e.backtrace.join("\n")}", 'error'
@@ -49,10 +49,13 @@ def start
4949
raise 'Unexpected error occurred in one of the worker threads' if @unexpected_error
5050
end
5151

52+
def names_with_threads
53+
base_name = name
54+
@num_threads.times.map { |thread_index| extended_name_with_thread_index(base_name, thread_index) }
55+
end
56+
5257
def name
53-
base_name = super
54-
thread_name = Thread.current[:thread_name]
55-
thread_name.nil? ? base_name : "#{base_name} #{thread_name}"
58+
extended_name_with_thread_index(super)
5659
end
5760

5861
def stop
@@ -64,7 +67,7 @@ def stop
6467
Thread.new do
6568
t.join(@grace_period_seconds)
6669
if t.alive?
67-
say "Killing thread '#{t[:thread_name]}'"
70+
say "Killing thread '#{t[:thread_index]}'"
6871
t.kill
6972
end
7073
end
@@ -98,5 +101,18 @@ def threaded_start
98101
end
99102
end
100103
end
104+
105+
private
106+
107+
def extended_name_with_thread_index(base_name, thread_index=nil)
108+
raise ArgumentError.new('base_name cannot be nil or empty') if base_name.nil? || base_name.empty?
109+
110+
thread_index = Thread.current[:thread_index] if thread_index.nil?
111+
thread_name = "thread:#{thread_index + 1}" if thread_index.is_a? Integer
112+
113+
return base_name if thread_name.nil?
114+
115+
"#{base_name} #{thread_name}"
116+
end
101117
end
102118
end

lib/tasks/jobs.rake

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,18 @@ namespace :jobs do
1111
end
1212
end
1313

14+
desc 'Clear pending locks for the current delayed worker.'
15+
task :clear_pending_locks, [:name] => :environment do |_t, args|
16+
puts RUBY_DESCRIPTION
17+
puts "Clearing pending locks for worker: #{args.name}"
18+
args.with_defaults(name: ENV.fetch('HOSTNAME', nil))
19+
20+
RakeConfig.context = :worker
21+
22+
CloudController::DelayedWorker.new(queues: [],
23+
name: args.name).clear_locks!
24+
end
25+
1426
desc 'Start a delayed_job worker that works on jobs that require access to local resources.'
1527

1628
task :local, [:name] => :environment do |_t, args|
@@ -25,9 +37,11 @@ namespace :jobs do
2537
end
2638

2739
desc 'Start a delayed_job worker.'
28-
task :generic, [:name] => :environment do |_t, args|
40+
task :generic, %i[name num_threads thread_grace_period_seconds] => :environment do |_t, args|
2941
puts RUBY_DESCRIPTION
3042
args.with_defaults(name: ENV.fetch('HOSTNAME', nil))
43+
args.with_defaults(num_threads: nil)
44+
args.with_defaults(thread_grace_period_seconds: nil)
3145

3246
RakeConfig.context = :worker
3347
queues = [
@@ -50,7 +64,6 @@ namespace :jobs do
5064
'prune_excess_app_revisions'
5165
]
5266

53-
CloudController::DelayedWorker.new(queues: queues,
54-
name: args.name).start_working
67+
CloudController::DelayedWorker.new(queues: queues, name: args.name, num_threads: args.num_threads, thread_grace_period_seconds: args.thread_grace_period_seconds).start_working
5568
end
5669
end

spec/unit/lib/cloud_controller/drain_spec.rb

Lines changed: 55 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,9 @@ def log_contents
1414
end
1515

1616
let(:pid) { 23_456 }
17+
let(:pid_name) { 'pidfile' }
1718
let(:pid_dir) { Dir.mktmpdir }
18-
let(:pid_path) { File.join(pid_dir, 'pidfile') }
19+
let(:pid_path) { File.join(pid_dir, pid_name) }
1920

2021
before do
2122
File.write(pid_path, pid)
@@ -38,7 +39,7 @@ def log_contents
3839
drain.shutdown_nginx(pid_path)
3940

4041
log_contents do |log|
41-
expect(log).to match(/Sending signal '\w+' to process '\w+' with pid '\d+'/)
42+
expect(log).to include("Sending signal 'QUIT' to process '#{pid_name}' with pid '#{pid}'")
4243
end
4344
end
4445

@@ -50,8 +51,9 @@ def log_contents
5051

5152
expect(drain).to have_received(:sleep).twice
5253
log_contents do |log|
53-
expect(log).to match(/Waiting \d+s for process '\w+' with pid '\d+' to shutdown/)
54-
expect(log).to match(/Process '\w+' with pid '\d+' is not running/)
54+
expect(log).to include("Waiting 30s for process '#{pid_name}' with pid '#{pid}' to shutdown")
55+
expect(log).to include("Waiting 29s for process '#{pid_name}' with pid '#{pid}' to shutdown")
56+
expect(log).to include("Process '#{pid_name}' with pid '#{pid}' is not running")
5557
end
5658
end
5759

@@ -84,7 +86,7 @@ def log_contents
8486

8587
expect(drain).to have_received(:sleep).exactly(40).times
8688
log_contents do |log|
87-
expect(log).to match(/Process '\w+' with pid '\d+' is still running - this indicates an error in the shutdown procedure!/)
89+
expect(log).to include("Process '#{pid_name}' with pid '#{pid}' is still running - this indicates an error in the shutdown procedure!")
8890
end
8991
end
9092
end
@@ -96,7 +98,7 @@ def log_contents
9698
drain.shutdown_cc(pid_path)
9799

98100
log_contents do |log|
99-
expect(log).to match(/Sending signal '\w+' to process '\w+' with pid '\d+'/)
101+
expect(log).to match("Sending signal 'TERM' to process '#{pid_name}' with pid '#{pid}'")
100102
end
101103
end
102104

@@ -107,7 +109,53 @@ def log_contents
107109

108110
expect(drain).to have_received(:sleep).exactly(20).times
109111
log_contents do |log|
110-
expect(log).to match(/Process '\w+' with pid '\d+' is still running - this indicates an error in the shutdown procedure!/)
112+
expect(log).to match("Process '#{pid_name}' with pid '#{pid}' is still running - this indicates an error in the shutdown procedure!")
113+
end
114+
end
115+
end
116+
117+
describe '#shutdown_delayed_worker' do
118+
it 'sends TERM to the delayed worker process specified in the pid file' do
119+
expect(Process).to receive(:kill).with('TERM', pid)
120+
121+
drain.shutdown_delayed_worker(pid_path)
122+
123+
log_contents do |log|
124+
expect(log).to include("Sending signal 'TERM' to process '#{pid_name}' with pid '#{pid}'")
125+
end
126+
end
127+
128+
it 'waits 15s after sending TERM' do
129+
allow(Process).to receive(:getpgid).with(pid).and_return(1)
130+
131+
drain.shutdown_delayed_worker(pid_path)
132+
133+
expect(drain).to have_received(:sleep).exactly(15).times
134+
log_contents do |log|
135+
expect(log).to include("Process '#{pid_name}' with pid '#{pid}' is still running - this indicates an error in the shutdown procedure!")
136+
end
137+
end
138+
139+
it 'waits for the given timeout after sending TERM' do
140+
allow(Process).to receive(:getpgid).with(pid).and_return(1)
141+
142+
drain.shutdown_delayed_worker(pid_path, 30)
143+
144+
expect(drain).to have_received(:sleep).exactly(30).times
145+
log_contents do |log|
146+
expect(log).to include("Process '#{pid_name}' with pid '#{pid}' is still running - this indicates an error in the shutdown procedure!")
147+
end
148+
end
149+
150+
it 'sends KILL to the delayed worker process if it is still running after 15s' do
151+
allow(Process).to receive(:getpgid).with(pid).and_return(1)
152+
allow(Process).to receive(:kill).with('TERM', pid)
153+
expect(Process).to receive(:kill).with('KILL', pid)
154+
155+
drain.shutdown_delayed_worker(pid_path)
156+
157+
log_contents do |log|
158+
expect(log).to include("Forcefully shutting down process '#{pid_name}' with pid '#{pid}'")
111159
end
112160
end
113161
end

0 commit comments

Comments
 (0)