Skip to content

Commit a655a6f

Browse files
authored
Add new metric job_queue_load for cc-worker (#3694)
* Add new metric job_queue_load for cc-worker * Adjust code based on review
1 parent c4bd744 commit a655a6f

File tree

7 files changed

+217
-10
lines changed

7 files changed

+217
-10
lines changed

lib/cloud_controller/metrics/periodic_updater.rb

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ def setup_updates
1818
update!
1919
EM.add_periodic_timer(600) { catch_error { update_user_count } }
2020
EM.add_periodic_timer(30) { catch_error { update_job_queue_length } }
21+
EM.add_periodic_timer(30) { catch_error { update_job_queue_load } }
2122
EM.add_periodic_timer(30) { catch_error { update_thread_info } }
2223
EM.add_periodic_timer(30) { catch_error { update_failed_job_count } }
2324
EM.add_periodic_timer(30) { catch_error { update_vitals } }
@@ -30,6 +31,7 @@ def setup_updates
3031
def update!
3132
update_user_count
3233
update_job_queue_length
34+
update_job_queue_load
3335
update_thread_info
3436
update_failed_job_count
3537
update_vitals
@@ -91,6 +93,23 @@ def update_job_queue_length
9193
@prometheus_updater.update_job_queue_length(pending_job_count_by_queue)
9294
end
9395

96+
def update_job_queue_load
97+
jobs_by_queue_with_run_now = Delayed::Job.
98+
where(Sequel.lit('run_at <= ?', Time.now)).
99+
where(Sequel.lit('failed_at IS NULL')).group_and_count(:queue)
100+
101+
total = 0
102+
pending_job_load_by_queue = jobs_by_queue_with_run_now.each_with_object({}) do |row, hash|
103+
@known_job_queues[row[:queue].to_sym] = 0
104+
total += row[:count]
105+
hash[row[:queue].to_sym] = row[:count]
106+
end
107+
108+
pending_job_load_by_queue.reverse_merge!(@known_job_queues)
109+
@statsd_updater.update_job_queue_load(pending_job_load_by_queue, total)
110+
@prometheus_updater.update_job_queue_load(pending_job_load_by_queue)
111+
end
112+
94113
def update_thread_info
95114
return unless VCAP::CloudController::Config.config.get(:webserver) == 'thin'
96115

lib/cloud_controller/metrics/prometheus_updater.rb

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ def self.allow_pid_label
1919

2020
METRICS = [
2121
{ type: :gauge, name: :cc_job_queues_length_total, docstring: 'Job queues length of worker processes', labels: [:queue], aggregation: :most_recent },
22+
{ type: :gauge, name: :cc_job_queues_load_total, docstring: 'Number of background jobs ready to run now ', labels: [:queue], aggregation: :most_recent },
2223
{ type: :gauge, name: :cc_failed_jobs_total, docstring: 'Number of failed jobs of worker processes', labels: [:queue], aggregation: :most_recent },
2324
{ type: :counter, name: :cc_staging_requests_total, docstring: 'Number of staging requests' },
2425
{ type: :histogram, name: :cc_staging_succeeded_duration_seconds, docstring: 'Durations of successful staging events', buckets: DURATION_BUCKETS },
@@ -102,6 +103,12 @@ def update_job_queue_length(pending_job_count_by_queue)
102103
end
103104
end
104105

106+
def update_job_queue_load(update_job_queue_load)
107+
update_job_queue_load.each do |key, value|
108+
update_gauge_metric(:cc_job_queues_load_total, value, labels: { queue: key.to_s.underscore })
109+
end
110+
end
111+
105112
def update_thread_info_thin(thread_info)
106113
update_gauge_metric(:cc_thread_info_thread_count, thread_info[:thread_count])
107114
update_gauge_metric(:cc_thread_info_event_machine_connection_count, thread_info[:event_machine][:connection_count])

lib/cloud_controller/metrics/statsd_updater.rb

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,15 @@ def update_job_queue_length(pending_job_count_by_queue, total)
2323
end
2424
end
2525

26+
def update_job_queue_load(pending_job_load_by_queue, total)
27+
@statsd.batch do |batch|
28+
pending_job_load_by_queue.each do |key, value|
29+
batch.gauge("cc.job_queue_load.#{key}", value)
30+
end
31+
batch.gauge('cc.job_queue_load.total', total)
32+
end
33+
end
34+
2635
def update_thread_info_thin(thread_info)
2736
@statsd.batch do |batch|
2837
batch.gauge('cc.thread_info.thread_count', thread_info[:thread_count])

spec/request/internal/metrics_spec.rb

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,42 @@
8383
end
8484
end
8585

86+
context 'cc_job_queue_load' do
87+
before do
88+
Delayed::Job.enqueue(VCAP::CloudController::Jobs::Runtime::EventsCleanup.new(1), { queue: 'cc_api_0', run_at: Time.now })
89+
Delayed::Job.enqueue(VCAP::CloudController::Jobs::Runtime::EventsCleanup.new(1), { queue: 'cc_generic', run_at: Time.now })
90+
end
91+
92+
after do
93+
Delayed::Job.dataset.delete
94+
end
95+
96+
it 'includes job queue load metric labelled for each queue' do
97+
get '/internal/v4/metrics', nil
98+
99+
expect(last_response.body).to match(/cc_job_queues_load_total{queue="cc_api_0"} 1\.0/)
100+
expect(last_response.body).to match(/cc_job_queues_load_total{queue="cc_generic"} 1\.0/)
101+
end
102+
end
103+
104+
context 'cc_job_queue_load_not_ready_to_run_now' do
105+
before do
106+
Delayed::Job.enqueue(VCAP::CloudController::Jobs::Runtime::EventsCleanup.new(1), { queue: 'cc_api_0', run_at: Time.now + 1.minute })
107+
Delayed::Job.enqueue(VCAP::CloudController::Jobs::Runtime::EventsCleanup.new(1), { queue: 'cc_generic', run_at: Time.now + 1.minute })
108+
end
109+
110+
after do
111+
Delayed::Job.dataset.delete
112+
end
113+
114+
it 'includes job queue load metric labelled for each queue' do
115+
get '/internal/v4/metrics', nil
116+
117+
expect(last_response.body).to match(/cc_job_queues_load_total{queue="cc_api_0"} 0\.0/)
118+
expect(last_response.body).to match(/cc_job_queues_load_total{queue="cc_generic"} 0\.0/)
119+
end
120+
end
121+
86122
context 'cc_thread_info' do
87123
it 'reports thread info' do
88124
get '/internal/v4/metrics', nil

spec/unit/lib/cloud_controller/metrics/periodic_updater_spec.rb

Lines changed: 102 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ module VCAP::CloudController::Metrics
7272
before do
7373
allow(statsd_updater).to receive(:update_user_count)
7474
allow(statsd_updater).to receive(:update_job_queue_length)
75+
allow(statsd_updater).to receive(:update_job_queue_load)
7576
allow(statsd_updater).to receive(:update_thread_info_thin)
7677
allow(statsd_updater).to receive(:update_failed_job_count)
7778
allow(statsd_updater).to receive(:update_vitals)
@@ -81,6 +82,7 @@ module VCAP::CloudController::Metrics
8182

8283
allow(prometheus_updater).to receive(:update_user_count)
8384
allow(prometheus_updater).to receive(:update_job_queue_length)
85+
allow(prometheus_updater).to receive(:update_job_queue_load)
8486
allow(prometheus_updater).to receive(:update_thread_info_thin)
8587
allow(prometheus_updater).to receive(:update_failed_job_count)
8688
allow(prometheus_updater).to receive(:update_vitals)
@@ -101,6 +103,11 @@ module VCAP::CloudController::Metrics
101103
periodic_updater.setup_updates
102104
end
103105

106+
it 'bumps the load of cc job queues and sets periodic timer' do
107+
expect(periodic_updater).to receive(:update_job_queue_load).once
108+
periodic_updater.setup_updates
109+
end
110+
104111
it 'bumps the length of cc failed job queues and sets periodic timer' do
105112
expect(periodic_updater).to receive(:update_failed_job_count).once
106113
periodic_updater.setup_updates
@@ -161,45 +168,53 @@ module VCAP::CloudController::Metrics
161168
@periodic_timers[1][:block].call
162169
end
163170

164-
it 'updates thread count and event machine queues' do
171+
it 'bumps the load of cc job queues and sets periodic timer' do
165172
expect(periodic_updater).to receive(:catch_error).once.and_call_original
166-
expect(periodic_updater).to receive(:update_thread_info).once
173+
expect(periodic_updater).to receive(:update_job_queue_load).once
167174
expect(@periodic_timers[2][:interval]).to eq(30)
168175

169176
@periodic_timers[2][:block].call
170177
end
171178

172-
it 'bumps the length of cc failed job queues and sets periodic timer' do
179+
it 'updates thread count and event machine queues' do
173180
expect(periodic_updater).to receive(:catch_error).once.and_call_original
174-
expect(periodic_updater).to receive(:update_failed_job_count).once
181+
expect(periodic_updater).to receive(:update_thread_info).once
175182
expect(@periodic_timers[3][:interval]).to eq(30)
176183

177184
@periodic_timers[3][:block].call
178185
end
179186

180-
it 'updates the vitals' do
187+
it 'bumps the length of cc failed job queues and sets periodic timer' do
181188
expect(periodic_updater).to receive(:catch_error).once.and_call_original
182-
expect(periodic_updater).to receive(:update_vitals).once
189+
expect(periodic_updater).to receive(:update_failed_job_count).once
183190
expect(@periodic_timers[4][:interval]).to eq(30)
184191

185192
@periodic_timers[4][:block].call
186193
end
187194

188-
it 'updates the log counts' do
195+
it 'updates the vitals' do
189196
expect(periodic_updater).to receive(:catch_error).once.and_call_original
190-
expect(periodic_updater).to receive(:update_log_counts).once
197+
expect(periodic_updater).to receive(:update_vitals).once
191198
expect(@periodic_timers[5][:interval]).to eq(30)
192199

193200
@periodic_timers[5][:block].call
194201
end
195202

196-
it 'updates the task stats' do
203+
it 'updates the log counts' do
197204
expect(periodic_updater).to receive(:catch_error).once.and_call_original
198-
expect(periodic_updater).to receive(:update_task_stats).once
205+
expect(periodic_updater).to receive(:update_log_counts).once
199206
expect(@periodic_timers[6][:interval]).to eq(30)
200207

201208
@periodic_timers[6][:block].call
202209
end
210+
211+
it 'updates the task stats' do
212+
expect(periodic_updater).to receive(:catch_error).once.and_call_original
213+
expect(periodic_updater).to receive(:update_task_stats).once
214+
expect(@periodic_timers[7][:interval]).to eq(30)
215+
216+
@periodic_timers[7][:block].call
217+
end
203218
end
204219
end
205220

@@ -353,6 +368,82 @@ module VCAP::CloudController::Metrics
353368
end
354369
end
355370

371+
describe '#update_job_queue_load' do
372+
before do
373+
allow(statsd_updater).to receive(:update_job_queue_load)
374+
allow(prometheus_updater).to receive(:update_job_queue_load)
375+
end
376+
377+
context 'when there are pending jobs ready to run in the local queue' do
378+
it 'emits the correct count' do
379+
Delayed::Job.enqueue(
380+
VCAP::CloudController::Jobs::Runtime::EventsCleanup.new(1),
381+
queue: VCAP::CloudController::Jobs::Queues.local(VCAP::CloudController::Config.config),
382+
run_at: Time.now
383+
)
384+
periodic_updater.update_job_queue_load
385+
386+
expected_pending_job_queue_load = {
387+
'cc-api-0': 1
388+
}
389+
expected_total = 1
390+
391+
expect(statsd_updater).to have_received(:update_job_queue_load).with(expected_pending_job_queue_load, expected_total)
392+
expect(prometheus_updater).to have_received(:update_job_queue_load).with(expected_pending_job_queue_load)
393+
end
394+
end
395+
396+
context 'when local queue does not have pending jobs' do
397+
it 'emits the local queue load as 0 for discoverability' do
398+
periodic_updater.update_job_queue_load
399+
400+
expected_pending_job_queue_load = {
401+
'cc-api-0': 0
402+
}
403+
expected_total = 0
404+
405+
expect(statsd_updater).to have_received(:update_job_queue_load).with(expected_pending_job_queue_load, expected_total)
406+
expect(prometheus_updater).to have_received(:update_job_queue_load).with(expected_pending_job_queue_load)
407+
end
408+
end
409+
410+
it 'includes the load of the delayed job queue and the total' do
411+
Delayed::Job.enqueue(VCAP::CloudController::Jobs::Runtime::EventsCleanup.new(1), queue: 'cc_local', run_at: Time.now)
412+
Delayed::Job.enqueue(VCAP::CloudController::Jobs::Runtime::EventsCleanup.new(1), queue: 'cc_local', run_at: Time.now)
413+
Delayed::Job.enqueue(VCAP::CloudController::Jobs::Runtime::EventsCleanup.new(1), queue: 'cc_generic', run_at: Time.now)
414+
415+
periodic_updater.update_job_queue_load
416+
417+
expected_pending_job_queue_load = {
418+
cc_local: 2,
419+
cc_generic: 1,
420+
'cc-api-0': 0
421+
}
422+
expected_total = 3
423+
424+
expect(statsd_updater).to have_received(:update_job_queue_load).with(expected_pending_job_queue_load, expected_total)
425+
expect(prometheus_updater).to have_received(:update_job_queue_load).with(expected_pending_job_queue_load)
426+
end
427+
428+
it 'does not contain failed jobs in job queue load metric' do
429+
Delayed::Job.enqueue(VCAP::CloudController::Jobs::Runtime::EventsCleanup.new(1), queue: 'cc_local', failed_at: Time.now + 60, run_at: Time.now)
430+
Delayed::Job.enqueue(VCAP::CloudController::Jobs::Runtime::EventsCleanup.new(1), queue: 'cc_local', run_at: Time.now)
431+
Delayed::Job.enqueue(VCAP::CloudController::Jobs::Runtime::EventsCleanup.new(1), queue: 'cc_generic', failed_at: Time.now + 60, run_at: Time.now)
432+
Delayed::Job.enqueue(VCAP::CloudController::Jobs::Runtime::EventsCleanup.new(1), queue: 'cc_generic', run_at: Time.now)
433+
434+
periodic_updater.update_job_queue_load
435+
436+
expected_pending_job_queue_load = {
437+
cc_local: 1,
438+
cc_generic: 1,
439+
'cc-api-0': 0
440+
}
441+
expected_total = 2
442+
expect(statsd_updater).to have_received(:update_job_queue_load).with(expected_pending_job_queue_load, expected_total)
443+
expect(prometheus_updater).to have_received(:update_job_queue_load).with(expected_pending_job_queue_load)
444+
end
445+
end
446+
356447
describe '#update_failed_job_count' do
357448
before do
358449
allow(statsd_updater).to receive(:update_failed_job_count)
@@ -644,6 +735,7 @@ module VCAP::CloudController::Metrics
644735
it 'calls all update methods' do
645736
expect(periodic_updater).to receive(:update_user_count).once
646737
expect(periodic_updater).to receive(:update_job_queue_length).once
738+
expect(periodic_updater).to receive(:update_job_queue_load).once
647739
expect(periodic_updater).to receive(:update_thread_info).once
648740
expect(periodic_updater).to receive(:update_failed_job_count).once
649741
expect(periodic_updater).to receive(:update_vitals).once

spec/unit/lib/cloud_controller/metrics/prometheus_updater_spec.rb

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,24 @@ module VCAP::CloudController::Metrics
7878
end
7979
end
8080

81+
describe '#update_job_queue_load' do
82+
it 'records the load of the delayed job queues and total' do
83+
expected_local_load = 5
84+
expected_generic_load = 6
85+
86+
pending_job_load_by_queue = {
87+
cc_local: expected_local_load,
88+
cc_generic: expected_generic_load
89+
}
90+
91+
updater.update_job_queue_load(pending_job_load_by_queue)
92+
93+
metric = prom_client.get :cc_job_queues_load_total
94+
expect(metric.get(labels: { queue: 'cc_local' })).to eq 5
95+
expect(metric.get(labels: { queue: 'cc_generic' })).to eq 6
96+
end
97+
end
98+
8199
describe '#update_failed_job_count' do
82100
it 'records the number of failed jobs in the delayed job queue and the total to statsd' do
83101
expected_local_length = 5

spec/unit/lib/cloud_controller/metrics/statsd_updater_spec.rb

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,32 @@ module VCAP::CloudController::Metrics
6060
end
6161
end
6262

63+
describe '#update_job_queue_load' do
64+
let(:batch) { double(:batch) }
65+
66+
before do
67+
allow(statsd_client).to receive(:batch).and_yield(batch)
68+
allow(batch).to receive(:gauge)
69+
end
70+
71+
it 'emits the load of the delayed job queues and total to statsd' do
72+
expected_local_load = 5
73+
expected_generic_load = 6
74+
total = expected_local_load + expected_generic_load
75+
76+
pending_job_load_by_queue = {
77+
cc_local: expected_local_load,
78+
cc_generic: expected_generic_load
79+
}
80+
81+
updater.update_job_queue_load(pending_job_load_by_queue, total)
82+
83+
expect(batch).to have_received(:gauge).with('cc.job_queue_load.cc_local', expected_local_load)
84+
expect(batch).to have_received(:gauge).with('cc.job_queue_load.cc_generic', expected_generic_load)
85+
expect(batch).to have_received(:gauge).with('cc.job_queue_load.total', total)
86+
end
87+
end
88+
6389
describe '#update_failed_job_count' do
6490
let(:batch) { double(:batch) }
6591

0 commit comments

Comments
 (0)