Skip to content

Commit e647272

Browse files
authored
Add prometheus metrics to cc workers (#4205)
* Publish cc-worker metrics Implemented a small puma webserver, which will be started in a separate thread with the first worker process. Using the exporter middleware provided by the prometheus client, the metrics stored in the registry will be published under `/metrics`. All processes use the same DirectFileStore registry, so that metrics can be published in a single webserver. * Turn off publishing metrics by default and make configurable * Use worker process index as pid for metrics store * Fix code style and unit tests * Make metrics endpoint only available on localhost
1 parent b29f15d commit e647272

File tree

11 files changed

+294
-102
lines changed

11 files changed

+294
-102
lines changed

lib/cloud_controller/config_schemas/base/worker_schema.rb

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,10 @@ class WorkerSchema < VCAP::Config
3737

3838
log_audit_events: bool,
3939

40+
directories: {
41+
tmpdir: String
42+
},
43+
4044
stacks_file: String,
4145
newrelic_enabled: bool,
4246

@@ -139,6 +143,9 @@ class WorkerSchema < VCAP::Config
139143
optional(:port) => Integer
140144
},
141145

146+
optional(:publish_metrics) => bool,
147+
optional(:prometheus_port) => Integer,
148+
142149
skip_cert_verify: bool,
143150

144151
optional(:routing_api) => {

lib/cloud_controller/db.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,8 @@ def self.add_connection_expiration_extension(db, opts)
7171
end
7272

7373
def self.add_connection_metrics_extension(db)
74-
# only add the metrics for api processes. Otherwise e.g. rake db:migrate would also initialize metric updaters, which need additional config
75-
return if Object.const_defined?(:RakeConfig)
74+
# only add the metrics for api and cc-worker processes. Otherwise e.g. rake db:migrate would also initialize metric updaters, which need additional config
75+
return if Object.const_defined?(:RakeConfig) && RakeConfig.context != :worker
7676

7777
db.extension(:connection_metrics)
7878
# so that we gather connection metrics from the beginning

lib/cloud_controller/dependency_locator.rb

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,10 @@ def prometheus_updater
7474
@dependencies[:prometheus_updater] || register(:prometheus_updater, VCAP::CloudController::Metrics::PrometheusUpdater.new)
7575
end
7676

77+
def cc_worker_prometheus_updater
78+
@dependencies[:cc_worker_prometheus_updater] || register(:cc_worker_prometheus_updater, VCAP::CloudController::Metrics::PrometheusUpdater.new(cc_worker: true))
79+
end
80+
7781
def statsd_updater
7882
@dependencies[:statsd_updater] || register(:statsd_updater, VCAP::CloudController::Metrics::StatsdUpdater.new(statsd_client))
7983
end
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
# Storing the metrics of several worker processes on cc-worker VMs in a DirectFileStore residing in a single directory
2+
# did not work because the different processes are isolated by bpm and several processes used the same pid within their container.
3+
# This pid is used for the filename and resulted in corrupted data because several processes were writing data to the same files.
4+
# When requiring this file, the process_id method of the MetricStore will be overridden to first check for `INDEX` in
5+
# env variables before returning the actual pid. The `INDEX` is provided for cc-worker processes.
6+
7+
module CustomProcessId
8+
def process_id
9+
ENV.fetch('INDEX', Process.pid).to_i
10+
end
11+
end
12+
13+
module Prometheus
14+
module Client
15+
module DataStores
16+
class DirectFileStore
17+
class MetricStore
18+
prepend CustomProcessId
19+
end
20+
end
21+
end
22+
end
23+
end

lib/cloud_controller/metrics/prometheus_updater.rb

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -36,15 +36,7 @@ def self.allow_pid_label
3636
{ type: :gauge, name: :cc_running_tasks_total, docstring: 'Total running tasks', aggregation: :most_recent },
3737
{ type: :gauge, name: :cc_running_tasks_memory_bytes, docstring: 'Total memory consumed by running tasks', aggregation: :most_recent },
3838
{ type: :gauge, name: :cc_users_total, docstring: 'Number of users', aggregation: :most_recent },
39-
{ type: :gauge, name: :cc_deployments_in_progress_total, docstring: 'Number of in progress deployments', aggregation: :most_recent },
40-
{ type: :gauge, name: :cc_acquired_db_connections_total, labels: %i[process_type], docstring: 'Number of acquired DB connections' },
41-
{ type: :histogram, name: :cc_db_connection_hold_duration_seconds, docstring: 'The time threads were holding DB connections', buckets: CONNECTION_DURATION_BUCKETS },
42-
# cc_connection_pool_timeouts_total must be a gauge metric, because otherwise we cannot match them with processes
43-
{ type: :gauge, name: :cc_db_connection_pool_timeouts_total, labels: %i[process_type],
44-
docstring: 'Number of threads which failed to acquire a free DB connection from the pool within the timeout' },
45-
{ type: :gauge, name: :cc_open_db_connections_total, labels: %i[process_type], docstring: 'Number of open DB connections (acquired + available)' },
46-
{ type: :histogram, name: :cc_db_connection_wait_duration_seconds, docstring: 'The time threads were waiting for an available DB connection',
47-
buckets: CONNECTION_DURATION_BUCKETS }
39+
{ type: :gauge, name: :cc_deployments_in_progress_total, docstring: 'Number of in progress deployments', aggregation: :most_recent }
4840
].freeze
4941

5042
THIN_METRICS = [
@@ -63,12 +55,27 @@ def self.allow_pid_label
6355
{ type: :gauge, name: :cc_puma_worker_backlog, docstring: 'Puma worker: backlog', labels: %i[index pid], aggregation: :most_recent }
6456
].freeze
6557

66-
def initialize(registry=Prometheus::Client.registry)
58+
DB_CONNECTION_POOL_METRICS = [
59+
{ type: :gauge, name: :cc_acquired_db_connections_total, labels: %i[process_type], docstring: 'Number of acquired DB connections' },
60+
{ type: :histogram, name: :cc_db_connection_hold_duration_seconds, docstring: 'The time threads were holding DB connections', buckets: CONNECTION_DURATION_BUCKETS },
61+
# cc_connection_pool_timeouts_total must be a gauge metric, because otherwise we cannot match them with processes
62+
{ type: :gauge, name: :cc_db_connection_pool_timeouts_total, labels: %i[process_type],
63+
docstring: 'Number of threads which failed to acquire a free DB connection from the pool within the timeout' },
64+
{ type: :gauge, name: :cc_open_db_connections_total, labels: %i[process_type], docstring: 'Number of open DB connections (acquired + available)' },
65+
{ type: :histogram, name: :cc_db_connection_wait_duration_seconds, docstring: 'The time threads were waiting for an available DB connection',
66+
buckets: CONNECTION_DURATION_BUCKETS }
67+
].freeze
68+
69+
def initialize(registry: Prometheus::Client.registry, cc_worker: false)
6770
self.class.allow_pid_label
6871

6972
@registry = registry
7073

7174
# Register all metrics, to initialize them for discoverability
75+
DB_CONNECTION_POOL_METRICS.each { |metric| register(metric) }
76+
77+
return if cc_worker
78+
7279
METRICS.each { |metric| register(metric) }
7380
THIN_METRICS.each { |metric| register(metric) } if VCAP::CloudController::Config.config&.get(:webserver) == 'thin'
7481
PUMA_METRICS.each { |metric| register(metric) } if VCAP::CloudController::Config.config&.get(:webserver) == 'puma'

lib/delayed_job/delayed_worker.rb

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
11
require 'delayed_job/threaded_worker'
2+
require 'rack'
3+
require 'puma'
4+
require 'prometheus/middleware/exporter'
25

36
class CloudController::DelayedWorker
47
def initialize(options)
@@ -9,6 +12,8 @@ def initialize(options)
912
worker_name: options[:name],
1013
quiet: true
1114
}
15+
16+
@publish_metrics = options.fetch(:publish_metrics, false)
1217
return unless options[:num_threads] && options[:num_threads].to_i > 0
1318

1419
@queue_options[:num_threads] = options[:num_threads].to_i
@@ -17,6 +22,7 @@ def initialize(options)
1722

1823
def start_working
1924
config = RakeConfig.config
25+
setup_metrics(config) if @publish_metrics
2026
BackgroundJobEnvironment.new(config).setup_environment(readiness_port)
2127

2228
logger = Steno.logger('cc-worker')
@@ -92,4 +98,40 @@ def readiness_port
9298
def is_first_generic_worker_on_machine?
9399
RakeConfig.context != :api && ENV['INDEX']&.to_i == 1
94100
end
101+
102+
def setup_metrics(config)
103+
prometheus_dir = File.join(config.get(:directories, :tmpdir), 'prometheus')
104+
Prometheus::Client.config.data_store = Prometheus::Client::DataStores::DirectFileStore.new(dir: prometheus_dir)
105+
106+
setup_webserver(config, prometheus_dir) if is_first_generic_worker_on_machine?
107+
108+
# initialize metric with 0 for discoverability, because it likely won't get updated on healthy systems
109+
CloudController::DependencyLocator.instance.cc_worker_prometheus_updater.update_gauge_metric(:cc_db_connection_pool_timeouts_total, 0, labels: { process_type: 'cc-worker' })
110+
end
111+
112+
def setup_webserver(config, prometheus_dir)
113+
FileUtils.mkdir_p(prometheus_dir)
114+
115+
# Resetting metrics on startup
116+
Dir["#{prometheus_dir}/*.bin"].each do |file_path|
117+
File.unlink(file_path)
118+
end
119+
120+
metrics_app = Rack::Builder.new do
121+
use Prometheus::Middleware::Exporter, path: '/metrics'
122+
123+
map '/' do
124+
run lambda { |_env|
125+
# Return 404 for any other request
126+
['404', { 'Content-Type' => 'text/plain' }, ['Not Found']]
127+
}
128+
end
129+
end
130+
131+
Thread.new do
132+
server = Puma::Server.new(metrics_app)
133+
server.add_tcp_listener '127.0.0.1', config.get(:prometheus_port) || 9394
134+
server.run
135+
end
136+
end
95137
end

lib/sequel/extensions/connection_metrics.rb

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,11 @@ def self.extended(pool)
2323

2424
pool.instance_exec do
2525
sync do
26-
@prometheus_updater = CloudController::DependencyLocator.instance.prometheus_updater
26+
@prometheus_updater = if process_type == 'cc-worker'
27+
CloudController::DependencyLocator.instance.cc_worker_prometheus_updater
28+
else
29+
CloudController::DependencyLocator.instance.prometheus_updater
30+
end
2731
@connection_info = {}
2832
end
2933
end

lib/tasks/jobs.rake

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,15 @@ namespace :jobs do
6464
'prune_excess_app_revisions'
6565
]
6666

67-
CloudController::DelayedWorker.new(queues: queues, name: args.name, num_threads: args.num_threads, thread_grace_period_seconds: args.thread_grace_period_seconds).start_working
67+
ENV['PROCESS_TYPE'] = 'cc-worker'
68+
require 'cloud_controller/metrics/custom_process_id'
69+
70+
publish_metrics = RakeConfig.config.get(:publish_metrics) || false
71+
72+
CloudController::DelayedWorker.new(queues: queues,
73+
name: args.name,
74+
num_threads: args.num_threads,
75+
thread_grace_period_seconds: args.thread_grace_period_seconds,
76+
publish_metrics: publish_metrics).start_working
6877
end
6978
end

spec/unit/lib/cloud_controller/metrics/prometheus_updater_spec.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
module VCAP::CloudController::Metrics
55
RSpec.describe PrometheusUpdater do
6-
let(:updater) { PrometheusUpdater.new(prom_client) }
6+
let(:updater) { PrometheusUpdater.new(registry: prom_client) }
77
let(:tmpdir) { Dir.mktmpdir }
88
let(:prom_client) do
99
Prometheus::Client.config.data_store = Prometheus::Client::DataStores::DirectFileStore.new(dir: tmpdir)

spec/unit/lib/delayed_job/delayed_worker_spec.rb

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,26 @@
6464
worker_instance = CloudController::DelayedWorker.new(options)
6565
expect(worker_instance.instance_variable_get(:@queue_options)).not_to include(:grace_period_seconds)
6666
end
67+
68+
describe 'publish metrics' do
69+
context 'when not set' do
70+
it 'does not publish metrics' do
71+
worker_instance = CloudController::DelayedWorker.new(options)
72+
expect(worker_instance.instance_variable_get(:@publish_metrics)).to be(false)
73+
end
74+
end
75+
76+
context 'when set to true' do
77+
before do
78+
options[:publish_metrics] = true
79+
end
80+
81+
it 'publishes metrics' do
82+
worker_instance = CloudController::DelayedWorker.new(options)
83+
expect(worker_instance.instance_variable_get(:@publish_metrics)).to be(true)
84+
end
85+
end
86+
end
6787
end
6888

6989
describe '#start_working' do
@@ -120,6 +140,47 @@
120140
cc_delayed_worker.start_working
121141
end
122142
end
143+
144+
describe 'publish metrics' do
145+
before do
146+
allow(Prometheus::Client::DataStores::DirectFileStore).to receive(:new)
147+
end
148+
149+
context 'when set to false' do
150+
before do
151+
options[:publish_metrics] = false
152+
end
153+
154+
it 'does not publish metrics' do
155+
cc_delayed_worker.start_working
156+
expect(Prometheus::Client::DataStores::DirectFileStore).not_to have_received(:new)
157+
end
158+
end
159+
160+
context 'when set to true' do
161+
before do
162+
options[:publish_metrics] = true
163+
end
164+
165+
it 'publishes metrics' do
166+
cc_delayed_worker.start_working
167+
expect(Prometheus::Client::DataStores::DirectFileStore).to have_received(:new)
168+
end
169+
170+
context 'when first worker on machine' do
171+
before do
172+
allow(cc_delayed_worker).to receive(:is_first_generic_worker_on_machine?).and_return(true)
173+
allow(cc_delayed_worker).to receive(:readiness_port)
174+
allow(cc_delayed_worker).to receive(:setup_webserver)
175+
end
176+
177+
it 'sets up a webserver' do
178+
cc_delayed_worker.start_working
179+
expect(cc_delayed_worker).to have_received(:setup_webserver)
180+
end
181+
end
182+
end
183+
end
123184
end
124185

125186
describe '#clear_locks!' do

0 commit comments

Comments
 (0)