Skip to content

Commit 2367129

Browse files
committed
Revert "Remove EventMachine from Puma"
This reverts commit 1187814.
1 parent b4a64e3 commit 2367129

File tree

8 files changed

+270
-23
lines changed

8 files changed

+270
-23
lines changed

lib/cloud_controller/metrics/periodic_updater.rb

Lines changed: 37 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,21 +16,23 @@ def initialize(start_time, log_counter, logger, statsd_updater, prometheus_updat
1616

1717
def setup_updates
1818
update!
19-
Concurrent::TimerTask.new(execution_interval: 600) { catch_error { update_user_count } }.execute
20-
Concurrent::TimerTask.new(execution_interval: 30) { catch_error { update_job_queue_length } }.execute
21-
Concurrent::TimerTask.new(execution_interval: 30) { catch_error { update_job_queue_load } }.execute
22-
Concurrent::TimerTask.new(execution_interval: 30) { catch_error { update_failed_job_count } }.execute
23-
Concurrent::TimerTask.new(execution_interval: 30) { catch_error { update_vitals } }.execute
24-
Concurrent::TimerTask.new(execution_interval: 30) { catch_error { update_log_counts } }.execute
25-
Concurrent::TimerTask.new(execution_interval: 30) { catch_error { update_task_stats } }.execute
26-
Concurrent::TimerTask.new(execution_interval: 30) { catch_error { update_deploying_count } }.execute
27-
Concurrent::TimerTask.new(execution_interval: 30) { catch_error { update_webserver_stats } }.execute
19+
EM.add_periodic_timer(600) { catch_error { update_user_count } }
20+
EM.add_periodic_timer(30) { catch_error { update_job_queue_length } }
21+
EM.add_periodic_timer(30) { catch_error { update_job_queue_load } }
22+
EM.add_periodic_timer(30) { catch_error { update_thread_info } }
23+
EM.add_periodic_timer(30) { catch_error { update_failed_job_count } }
24+
EM.add_periodic_timer(30) { catch_error { update_vitals } }
25+
EM.add_periodic_timer(30) { catch_error { update_log_counts } }
26+
EM.add_periodic_timer(30) { catch_error { update_task_stats } }
27+
EM.add_periodic_timer(30) { catch_error { update_deploying_count } }
28+
EM.add_periodic_timer(30) { catch_error { update_webserver_stats } }
2829
end
2930

3031
def update!
3132
update_user_count
3233
update_job_queue_length
3334
update_job_queue_load
35+
update_thread_info
3436
update_failed_job_count
3537
update_vitals
3638
update_log_counts
@@ -108,6 +110,13 @@ def update_job_queue_load
108110
@prometheus_updater.update_job_queue_load(pending_job_load_by_queue)
109111
end
110112

113+
def update_thread_info
114+
return unless VCAP::CloudController::Config.config.get(:webserver) == 'thin'
115+
116+
local_thread_info = thread_info_thin
117+
[@statsd_updater, @prometheus_updater].each { |u| u.update_thread_info_thin(local_thread_info) }
118+
end
119+
111120
def update_failed_job_count
112121
jobs_by_queue_with_count = Delayed::Job.where(Sequel.lit('failed_at IS NOT NULL')).group_and_count(:queue)
113122

@@ -164,5 +173,24 @@ def update_webserver_stats
164173
end
165174
@prometheus_updater.update_webserver_stats_puma(worker_count, worker_stats)
166175
end
176+
177+
def thread_info_thin
178+
threadqueue = EM.instance_variable_get(:@threadqueue) || []
179+
resultqueue = EM.instance_variable_get(:@resultqueue) || []
180+
{
181+
thread_count: Thread.list.size,
182+
event_machine: {
183+
connection_count: EventMachine.connection_count,
184+
threadqueue: {
185+
size: threadqueue.size,
186+
num_waiting: threadqueue.is_a?(Array) ? 0 : threadqueue.num_waiting
187+
},
188+
resultqueue: {
189+
size: resultqueue.size,
190+
num_waiting: resultqueue.is_a?(Array) ? 0 : resultqueue.num_waiting
191+
}
192+
}
193+
}
194+
end
167195
end
168196
end

lib/cloud_controller/metrics/prometheus_updater.rb

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,15 @@ def self.allow_pid_label
4141
{ type: :gauge, name: :cc_deployments_in_progress_total, docstring: 'Number of in progress deployments', aggregation: :most_recent }
4242
].freeze
4343

44+
THIN_METRICS = [
45+
{ type: :gauge, name: :cc_thread_info_thread_count, docstring: 'Thread count' },
46+
{ type: :gauge, name: :cc_thread_info_event_machine_connection_count, docstring: 'EventMachine connection count' },
47+
{ type: :gauge, name: :cc_thread_info_event_machine_threadqueue_size, docstring: 'EventMachine thread queue size' },
48+
{ type: :gauge, name: :cc_thread_info_event_machine_threadqueue_num_waiting, docstring: 'EventMachine num waiting in thread' },
49+
{ type: :gauge, name: :cc_thread_info_event_machine_resultqueue_size, docstring: 'EventMachine queue size' },
50+
{ type: :gauge, name: :cc_thread_info_event_machine_resultqueue_num_waiting, docstring: 'EventMachine requests waiting in queue' }
51+
].freeze
52+
4453
PUMA_METRICS = [
4554
{ type: :gauge, name: :cc_puma_worker_count, docstring: 'Puma worker count', aggregation: :most_recent },
4655
{ type: :gauge, name: :cc_puma_worker_started_at, docstring: 'Puma worker: started_at', labels: %i[index pid], aggregation: :most_recent },
@@ -79,6 +88,7 @@ def initialize(registry: Prometheus::Client.registry, cc_worker: false)
7988
return if cc_worker
8089

8190
METRICS.each { |metric| register(metric) }
91+
THIN_METRICS.each { |metric| register(metric) } if VCAP::CloudController::Config.config&.get(:webserver) == 'thin'
8292
PUMA_METRICS.each { |metric| register(metric) } if VCAP::CloudController::Config.config&.get(:webserver) == 'puma'
8393
end
8494

@@ -126,6 +136,15 @@ def update_job_queue_load(update_job_queue_load)
126136
end
127137
end
128138

139+
def update_thread_info_thin(thread_info)
140+
update_gauge_metric(:cc_thread_info_thread_count, thread_info[:thread_count])
141+
update_gauge_metric(:cc_thread_info_event_machine_connection_count, thread_info[:event_machine][:connection_count])
142+
update_gauge_metric(:cc_thread_info_event_machine_threadqueue_size, thread_info[:event_machine][:threadqueue][:size])
143+
update_gauge_metric(:cc_thread_info_event_machine_threadqueue_num_waiting, thread_info[:event_machine][:threadqueue][:num_waiting])
144+
update_gauge_metric(:cc_thread_info_event_machine_resultqueue_size, thread_info[:event_machine][:resultqueue][:size])
145+
update_gauge_metric(:cc_thread_info_event_machine_resultqueue_num_waiting, thread_info[:event_machine][:resultqueue][:num_waiting])
146+
end
147+
129148
def update_failed_job_count(failed_jobs_by_queue)
130149
failed_jobs_by_queue.each do |key, value|
131150
update_gauge_metric(:cc_failed_jobs_total, value, labels: { queue: key.to_s.underscore })

lib/cloud_controller/runners/puma_runner.rb

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,14 @@ def initialize(config, app, logger, periodic_updater, request_logs)
5959
events = Puma::Events.new
6060
events.after_booted do
6161
prometheus_updater.update_gauge_metric(:cc_db_connection_pool_timeouts_total, 0, labels: { process_type: 'main' })
62-
periodic_updater.setup_updates
62+
Thread.new do
63+
EM.run { periodic_updater.setup_updates }
64+
end
65+
end
66+
events.after_stopped do
67+
EM.stop
6368
end
69+
6470
@puma_launcher = Puma::Launcher.new(puma_config, log_writer:, events:)
6571
end
6672

spec/request/internal/metrics_spec.rb

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,19 @@
119119
end
120120
end
121121

122+
context 'cc_thread_info' do
123+
it 'reports thread info' do
124+
get '/internal/v4/metrics', nil
125+
126+
expect(last_response.body).to match(/cc_thread_info_thread_count [0-9][0-9]*\.\d+/)
127+
expect(last_response.body).to match(/cc_thread_info_event_machine_connection_count [0-9][0-9]*\.\d+/)
128+
expect(last_response.body).to match(/cc_thread_info_event_machine_threadqueue_size [0-9][0-9]*\.\d+/)
129+
expect(last_response.body).to match(/cc_thread_info_event_machine_threadqueue_num_waiting [0-9][0-9]*\.\d+/)
130+
expect(last_response.body).to match(/cc_thread_info_event_machine_resultqueue_size [0-9][0-9]*\.\d+/)
131+
expect(last_response.body).to match(/cc_thread_info_event_machine_resultqueue_num_waiting [0-9][0-9]*\.\d+/)
132+
end
133+
end
134+
122135
context 'cc_failed_job_count' do
123136
before do
124137
Delayed::Job.enqueue(VCAP::CloudController::Jobs::Runtime::EventsCleanup.new(1), { queue: 'cc_api_0', run_at: Time.now + 1.day })

spec/unit/lib/cloud_controller/metrics/periodic_updater_spec.rb

Lines changed: 116 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,27 @@ module VCAP::CloudController::Metrics
66
let(:periodic_updater) { PeriodicUpdater.new(start_time, log_counter, logger, statsd_updater, prometheus_updater) }
77
let(:statsd_updater) { double(:statsd_updater) }
88
let(:prometheus_updater) { double(:prometheus_updater) }
9+
let(:threadqueue) { double(EventMachine::Queue, size: 20, num_waiting: 0) }
10+
let(:resultqueue) { double(EventMachine::Queue, size: 0, num_waiting: 1) }
911
let(:start_time) { Time.now.utc - 90 }
1012
let(:log_counter) { double(:log_counter, counts: {}) }
1113
let(:logger) { double(:logger) }
1214

15+
before do
16+
allow(EventMachine).to receive(:connection_count).and_return(123)
17+
18+
allow(EventMachine).to receive(:instance_variable_get) do |instance_var|
19+
case instance_var
20+
when :@threadqueue
21+
threadqueue
22+
when :@resultqueue
23+
resultqueue
24+
else
25+
raise "Unexpected call: #{instance_var}"
26+
end
27+
end
28+
end
29+
1330
describe 'task stats' do
1431
before do
1532
allow(statsd_updater).to receive(:update_task_stats)
@@ -56,6 +73,7 @@ module VCAP::CloudController::Metrics
5673
allow(statsd_updater).to receive(:update_user_count)
5774
allow(statsd_updater).to receive(:update_job_queue_length)
5875
allow(statsd_updater).to receive(:update_job_queue_load)
76+
allow(statsd_updater).to receive(:update_thread_info_thin)
5977
allow(statsd_updater).to receive(:update_failed_job_count)
6078
allow(statsd_updater).to receive(:update_vitals)
6179
allow(statsd_updater).to receive(:update_log_counts)
@@ -65,11 +83,14 @@ module VCAP::CloudController::Metrics
6583
allow(prometheus_updater).to receive(:update_user_count)
6684
allow(prometheus_updater).to receive(:update_job_queue_length)
6785
allow(prometheus_updater).to receive(:update_job_queue_load)
86+
allow(prometheus_updater).to receive(:update_thread_info_thin)
6887
allow(prometheus_updater).to receive(:update_failed_job_count)
6988
allow(prometheus_updater).to receive(:update_vitals)
7089
allow(prometheus_updater).to receive(:update_log_counts)
7190
allow(prometheus_updater).to receive(:update_task_stats)
7291
allow(prometheus_updater).to receive(:update_deploying_count)
92+
93+
allow(EventMachine).to receive(:add_periodic_timer)
7394
end
7495

7596
it 'bumps the number of users and sets periodic timer' do
@@ -92,6 +113,11 @@ module VCAP::CloudController::Metrics
92113
periodic_updater.setup_updates
93114
end
94115

116+
it 'updates thread count and event machine queues' do
117+
expect(periodic_updater).to receive(:update_thread_info).once
118+
periodic_updater.setup_updates
119+
end
120+
95121
it 'updates the vitals' do
96122
expect(periodic_updater).to receive(:update_vitals).once
97123
periodic_updater.setup_updates
@@ -112,16 +138,15 @@ module VCAP::CloudController::Metrics
112138
periodic_updater.setup_updates
113139
end
114140

115-
context 'when Concurrent::TimerTasks are run' do
141+
context 'when EventMachine periodic_timer tasks are run' do
116142
before do
117143
@periodic_timers = []
118144

119-
allow(Concurrent::TimerTask).to receive(:new) do |opts, &block|
145+
allow(EventMachine).to receive(:add_periodic_timer) do |interval, &block|
120146
@periodic_timers << {
121-
interval: opts[:execution_interval],
122-
block: block
147+
interval:,
148+
block:
123149
}
124-
double('TimerTask', execute: nil, shutdown: nil, kill: nil, running?: false)
125150
end
126151

127152
periodic_updater.setup_updates
@@ -151,37 +176,45 @@ module VCAP::CloudController::Metrics
151176
@periodic_timers[2][:block].call
152177
end
153178

154-
it 'bumps the length of cc failed job queues and sets periodic timer' do
179+
it 'updates thread count and event machine queues' do
155180
expect(periodic_updater).to receive(:catch_error).once.and_call_original
156-
expect(periodic_updater).to receive(:update_failed_job_count).once
181+
expect(periodic_updater).to receive(:update_thread_info).once
157182
expect(@periodic_timers[3][:interval]).to eq(30)
158183

159184
@periodic_timers[3][:block].call
160185
end
161186

162-
it 'updates the vitals' do
187+
it 'bumps the length of cc failed job queues and sets periodic timer' do
163188
expect(periodic_updater).to receive(:catch_error).once.and_call_original
164-
expect(periodic_updater).to receive(:update_vitals).once
189+
expect(periodic_updater).to receive(:update_failed_job_count).once
165190
expect(@periodic_timers[4][:interval]).to eq(30)
166191

167192
@periodic_timers[4][:block].call
168193
end
169194

170-
it 'updates the log counts' do
195+
it 'updates the vitals' do
171196
expect(periodic_updater).to receive(:catch_error).once.and_call_original
172-
expect(periodic_updater).to receive(:update_log_counts).once
197+
expect(periodic_updater).to receive(:update_vitals).once
173198
expect(@periodic_timers[5][:interval]).to eq(30)
174199

175200
@periodic_timers[5][:block].call
176201
end
177202

178-
it 'updates the task stats' do
203+
it 'updates the log counts' do
179204
expect(periodic_updater).to receive(:catch_error).once.and_call_original
180-
expect(periodic_updater).to receive(:update_task_stats).once
205+
expect(periodic_updater).to receive(:update_log_counts).once
181206
expect(@periodic_timers[6][:interval]).to eq(30)
182207

183208
@periodic_timers[6][:block].call
184209
end
210+
211+
it 'updates the task stats' do
212+
expect(periodic_updater).to receive(:catch_error).once.and_call_original
213+
expect(periodic_updater).to receive(:update_task_stats).once
214+
expect(@periodic_timers[7][:interval]).to eq(30)
215+
216+
@periodic_timers[7][:block].call
217+
end
185218
end
186219
end
187220

@@ -505,6 +538,75 @@ module VCAP::CloudController::Metrics
505538
end
506539
end
507540

541+
describe '#update_thread_info' do
542+
before do
543+
allow(statsd_updater).to receive(:update_thread_info_thin)
544+
allow(prometheus_updater).to receive(:update_thread_info_thin)
545+
end
546+
547+
it 'contains EventMachine data and send it to all updaters' do
548+
expected_thread_info = {
549+
thread_count: Thread.list.size,
550+
event_machine: {
551+
connection_count: 123,
552+
threadqueue: {
553+
size: 20,
554+
num_waiting: 0
555+
},
556+
resultqueue: {
557+
size: 0,
558+
num_waiting: 1
559+
}
560+
}
561+
}
562+
563+
periodic_updater.update_thread_info
564+
565+
expect(statsd_updater).to have_received(:update_thread_info_thin).with(expected_thread_info)
566+
expect(prometheus_updater).to have_received(:update_thread_info_thin).with(expected_thread_info)
567+
end
568+
569+
context 'when resultqueue and/or threadqueue is not a queue' do
570+
let(:resultqueue) { [] }
571+
let(:threadqueue) { nil }
572+
573+
it 'does not blow up' do
574+
expected_thread_info = {
575+
thread_count: Thread.list.size,
576+
event_machine: {
577+
connection_count: 123,
578+
threadqueue: {
579+
size: 0,
580+
num_waiting: 0
581+
},
582+
resultqueue: {
583+
size: 0,
584+
num_waiting: 0
585+
}
586+
}
587+
}
588+
589+
periodic_updater.update_thread_info
590+
591+
expect(statsd_updater).to have_received(:update_thread_info_thin).with(expected_thread_info)
592+
expect(prometheus_updater).to have_received(:update_thread_info_thin).with(expected_thread_info)
593+
end
594+
end
595+
596+
context 'when Puma is configured as webserver' do
597+
before do
598+
TestConfig.override(webserver: 'puma')
599+
end
600+
601+
it 'does not send EventMachine data to updaters' do
602+
periodic_updater.update_thread_info
603+
604+
expect(statsd_updater).not_to have_received(:update_thread_info_thin)
605+
expect(prometheus_updater).not_to have_received(:update_thread_info_thin)
606+
end
607+
end
608+
end
609+
508610
describe '#update_vitals' do
509611
before do
510612
allow(statsd_updater).to receive(:update_vitals)
@@ -634,6 +736,7 @@ module VCAP::CloudController::Metrics
634736
expect(periodic_updater).to receive(:update_user_count).once
635737
expect(periodic_updater).to receive(:update_job_queue_length).once
636738
expect(periodic_updater).to receive(:update_job_queue_load).once
739+
expect(periodic_updater).to receive(:update_thread_info).once
637740
expect(periodic_updater).to receive(:update_failed_job_count).once
638741
expect(periodic_updater).to receive(:update_vitals).once
639742
expect(periodic_updater).to receive(:update_log_counts).once

0 commit comments

Comments
 (0)