From 5886421dd6eb26702fa749a01358134407953ac6 Mon Sep 17 00:00:00 2001 From: Ben Garcia <49890553+benngarcia@users.noreply.github.com> Date: Tue, 23 Jan 2024 12:32:26 +0900 Subject: [PATCH 1/3] delayed_jobs_ready metric added to DelayedJob plugin delayed_jobs_ready instance variable added to metrics --- README.md | 1 + .../instrumentation/delayed_job.rb | 11 ++++++++--- .../server/delayed_job_collector.rb | 8 ++++++++ 3 files changed, 17 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index d636cd12..45de28d4 100644 --- a/README.md +++ b/README.md @@ -540,6 +540,7 @@ end | Counter | `delayed_jobs_total` | Total number of delayed jobs executed | `job_name` | | Gauge | `delayed_jobs_enqueued` | Number of enqueued delayed jobs | - | | Gauge | `delayed_jobs_pending` | Number of pending delayed jobs | - | +| Gauge | `delayed_jobs_ready` | Number of ready delayed jobs      | - | | Counter | `delayed_failed_jobs_total` | Total number failed delayed jobs executed | `job_name` | | Counter | `delayed_jobs_max_attempts_reached_total` | Total number of delayed jobs that reached max attempts | - | | Summary | `delayed_job_duration_seconds_summary` | Summary of the time it takes jobs to execute | `status` | diff --git a/lib/prometheus_exporter/instrumentation/delayed_job.rb b/lib/prometheus_exporter/instrumentation/delayed_job.rb index 9978affa..0c640283 100644 --- a/lib/prometheus_exporter/instrumentation/delayed_job.rb +++ b/lib/prometheus_exporter/instrumentation/delayed_job.rb @@ -4,6 +4,8 @@ module PrometheusExporter::Instrumentation class DelayedJob JOB_CLASS_REGEXP = /job_class: ((\w+:{0,2})+)/.freeze + RuntimeMetric = Struct.new(:max_attempts, :enqueued_count, :pending_count, :ready_count) + class << self def register_plugin(client: nil, include_module_name: false) instrumenter = self.new(client: client) @@ -15,13 +17,15 @@ def register_plugin(client: nil, include_module_name: false) lifecycle.around(:invoke_job) do |job, *args, &block| max_attempts = Delayed::Worker.max_attempts enqueued_count = Delayed::Job.where(queue: job.queue).count - pending_count = - Delayed::Job.where(attempts: 0, locked_at: nil, queue: job.queue).count + pending_count = Delayed::Job.where(attempts: 0, locked_at: nil, queue: job.queue).count + ready_count = Delayed::Job.where(queue: job.queue, run_at: ..Time.current, locked_at: nil, failed_at: nil).count + instrumenter.call( job, max_attempts, enqueued_count, pending_count, + ready_count, include_module_name, *args, &block @@ -38,7 +42,7 @@ def initialize(client: nil) @client = client || PrometheusExporter::Client.default end - def call(job, max_attempts, enqueued_count, pending_count, include_module_name, *args, &block) + def call(job, max_attempts, enqueued_count, pending_count, ready_count, include_module_name, *args, &block) success = false start = ::Process.clock_gettime(::Process::CLOCK_MONOTONIC) latency = Time.current - job.run_at @@ -60,6 +64,7 @@ def call(job, max_attempts, enqueued_count, pending_count, include_module_name, max_attempts: max_attempts, enqueued: enqueued_count, pending: pending_count, + ready: ready_count ) end end diff --git a/lib/prometheus_exporter/server/delayed_job_collector.rb b/lib/prometheus_exporter/server/delayed_job_collector.rb index 85bc1721..8ea78c72 100644 --- a/lib/prometheus_exporter/server/delayed_job_collector.rb +++ b/lib/prometheus_exporter/server/delayed_job_collector.rb @@ -13,6 +13,7 @@ def initialize @delayed_job_attempts_summary = nil @delayed_jobs_enqueued = nil @delayed_jobs_pending = nil + @delayed_jobs_ready = nil end def type @@ -48,6 +49,7 @@ def collect(obj) @delayed_job_attempts_summary.observe(obj["attempts"], counter_labels) if obj["success"] @delayed_jobs_enqueued.observe(obj["enqueued"], gauge_labels) @delayed_jobs_pending.observe(obj["pending"], gauge_labels) + @delayed_jobs_ready.observe(obj["ready"], gauge_labels) end def metrics @@ -62,6 +64,7 @@ def metrics @delayed_job_attempts_summary, @delayed_jobs_enqueued, @delayed_jobs_pending, + @delayed_jobs_ready, ] else [] @@ -102,6 +105,11 @@ def ensure_delayed_job_metrics "Number of pending delayed jobs.", ) + @delayed_jobs_ready = + PrometheusExporter::Metric::Gauge.new( + "delayed_jobs_ready", "Number of ready delayed jobs." + ) + @delayed_failed_jobs_total = PrometheusExporter::Metric::Counter.new( "delayed_failed_jobs_total", From 7ce7b1acd425f8008ba3968c411b09acd885e0b7 Mon Sep 17 00:00:00 2001 From: Ben Garcia <49890553+benngarcia@users.noreply.github.com> Date: Tue, 23 Jan 2024 13:12:05 +0900 Subject: [PATCH 2/3] collect_by_queue added as optional argument to GoodJob#Start don't forget the documentation! RuntimeMetric unused struct removed GoodJobCollector refactor empty? changed to length.zero? to respect expiry metrics_container changes reverted since I didn't use any enumerable methods post refactor --- README.md | 3 ++ .../instrumentation/delayed_job.rb | 2 -- .../instrumentation/good_job.rb | 27 ++++++++------ .../server/good_job_collector.rb | 35 +++++++++++-------- .../server/metrics_container.rb | 8 ++--- 5 files changed, 45 insertions(+), 30 deletions(-) diff --git a/README.md b/README.md index 45de28d4..c18f3f08 100644 --- a/README.md +++ b/README.md @@ -654,6 +654,9 @@ installation, you'll need to start the instrumentation: # e.g. config/initializers/good_job.rb require 'prometheus_exporter/instrumentation' PrometheusExporter::Instrumentation::GoodJob.start + +# or, to collect metrics labelled by their queue name +PrometheusExporter::Instrumentation::GoodJob.start(collect_by_queue: true) ``` #### Metrics collected by GoodJob Instrumentation diff --git a/lib/prometheus_exporter/instrumentation/delayed_job.rb b/lib/prometheus_exporter/instrumentation/delayed_job.rb index 0c640283..0cfd50c1 100644 --- a/lib/prometheus_exporter/instrumentation/delayed_job.rb +++ b/lib/prometheus_exporter/instrumentation/delayed_job.rb @@ -4,8 +4,6 @@ module PrometheusExporter::Instrumentation class DelayedJob JOB_CLASS_REGEXP = /job_class: ((\w+:{0,2})+)/.freeze - RuntimeMetric = Struct.new(:max_attempts, :enqueued_count, :pending_count, :ready_count) - class << self def register_plugin(client: nil, include_module_name: false) instrumenter = self.new(client: client) diff --git a/lib/prometheus_exporter/instrumentation/good_job.rb b/lib/prometheus_exporter/instrumentation/good_job.rb index 6342c340..4e65a74c 100644 --- a/lib/prometheus_exporter/instrumentation/good_job.rb +++ b/lib/prometheus_exporter/instrumentation/good_job.rb @@ -3,25 +3,32 @@ # collects stats from GoodJob module PrometheusExporter::Instrumentation class GoodJob < PeriodicStats - def self.start(client: nil, frequency: 30) + COUNT_BY_QUEUE = ->(collection) { collection.group(:queue_name).size } + COUNT_ALL = ->(collection) { collection.size } + + def self.start(client: nil, frequency: 30, collect_by_queue: false) good_job_collector = new client ||= PrometheusExporter::Client.default - worker_loop { client.send_json(good_job_collector.collect) } + worker_loop do + client.send_json(good_job_collector.collect(collect_by_queue)) + end super end - def collect + def collect(by_queue = false) + count_method = by_queue ? COUNT_BY_QUEUE : COUNT_ALL { type: "good_job", - scheduled: ::GoodJob::Job.scheduled.size, - retried: ::GoodJob::Job.retried.size, - queued: ::GoodJob::Job.queued.size, - running: ::GoodJob::Job.running.size, - finished: ::GoodJob::Job.finished.size, - succeeded: ::GoodJob::Job.succeeded.size, - discarded: ::GoodJob::Job.discarded.size, + by_queue: by_queue, + scheduled: ::GoodJob::Job.scheduled.yield_self(&count_method), + retried: ::GoodJob::Job.retried.yield_self(&count_method), + queued: ::GoodJob::Job.queued.yield_self(&count_method), + running: ::GoodJob::Job.running.yield_self(&count_method), + finished: ::GoodJob::Job.finished.yield_self(&count_method), + succeeded: ::GoodJob::Job.succeeded.yield_self(&count_method), + discarded: ::GoodJob::Job.discarded.yield_self(&count_method) } end end diff --git a/lib/prometheus_exporter/server/good_job_collector.rb b/lib/prometheus_exporter/server/good_job_collector.rb index e2fc4186..256740ee 100644 --- a/lib/prometheus_exporter/server/good_job_collector.rb +++ b/lib/prometheus_exporter/server/good_job_collector.rb @@ -23,21 +23,9 @@ def type end def metrics - return [] if good_job_metrics.length == 0 - - good_job_metrics.map do |metric| - labels = metric.fetch("custom_labels", {}) - - GOOD_JOB_GAUGES.map do |name, help| - value = metric[name.to_s] - - if value - gauge = gauges[name] ||= PrometheusExporter::Metric::Gauge.new("good_job_#{name}", help) - gauge.observe(value, labels) - end - end - end + return [] if good_job_metrics.length.zero? + good_job_metrics.each(&method(:process_metric)) gauges.values end @@ -48,5 +36,24 @@ def collect(object) private attr_reader :good_job_metrics, :gauges + + def process_metric(metric) + labels = metric.fetch("custom_labels", {}) + + GOOD_JOB_GAUGES.each do |name, help| + next unless (value = metric[name.to_s]) + + gauge = gauges[name] ||= PrometheusExporter::Metric::Gauge.new("good_job_#{name}", help) + observe_metric(gauge, metric, labels, value) + end + end + + def observe_metric(gauge, metric, labels, value) + if metric["by_queue"] + value.each { |queue_name, count| gauge.observe(count, labels.merge(queue_name: queue_name)) } + else + gauge.observe(value, labels) + end + end end end diff --git a/lib/prometheus_exporter/server/metrics_container.rb b/lib/prometheus_exporter/server/metrics_container.rb index 1e0f2ddf..41eff7b3 100644 --- a/lib/prometheus_exporter/server/metrics_container.rb +++ b/lib/prometheus_exporter/server/metrics_container.rb @@ -34,14 +34,14 @@ def size(&blk) end alias_method :length, :size - def map(&blk) - wrap_expire(:map, &blk) - end - def each(&blk) wrap_expire(:each, &blk) end + def map(&blk) + wrap_expire(:map, &blk) + end + def expire(time: nil, new_metric: nil) time ||= get_time From b29f9fe6fe0bdf8164e808454f5cb5ab9fd054c3 Mon Sep 17 00:00:00 2001 From: Omar Shammas Date: Mon, 26 Aug 2024 16:55:44 -0700 Subject: [PATCH 3/3] fix bug, handle when queue is empty When we do `group(:queue_name).size` it returns the count by queue => {"queue_a"=>3, "queue_d"=>1} The problem is when a queue is empty it will simply be excluded from the results instead of returning count of 0. So the result we want should be => {"queue_a"=>3, "queue_b"=>0, "queue_c"=>0, "queue_d"=>1} Without returning 0, the queue count in prometheus metrics will be the last non-zero value meaning we can't auto-scale down the workers. Bug Fix Refactor --- .../instrumentation/good_job.rb | 34 +++++++++++++------ 1 file changed, 24 insertions(+), 10 deletions(-) diff --git a/lib/prometheus_exporter/instrumentation/good_job.rb b/lib/prometheus_exporter/instrumentation/good_job.rb index 4e65a74c..f3df705b 100644 --- a/lib/prometheus_exporter/instrumentation/good_job.rb +++ b/lib/prometheus_exporter/instrumentation/good_job.rb @@ -3,8 +3,21 @@ # collects stats from GoodJob module PrometheusExporter::Instrumentation class GoodJob < PeriodicStats - COUNT_BY_QUEUE = ->(collection) { collection.group(:queue_name).size } - COUNT_ALL = ->(collection) { collection.size } + TotalCounter = Struct.new do + def count(relation) + relation.size + end + end + + QueueCounter = Struct.new(:queue_names) do + def initialize(queue_names) + @empty_queues = queue_names.to_h { |name| [name, 0] } + end + + def count(relation) + @empty_queues.merge(relation.group(:queue_name).size) + end + end def self.start(client: nil, frequency: 30, collect_by_queue: false) good_job_collector = new @@ -18,17 +31,18 @@ def self.start(client: nil, frequency: 30, collect_by_queue: false) end def collect(by_queue = false) - count_method = by_queue ? COUNT_BY_QUEUE : COUNT_ALL + counter = by_queue ? QueueCounter.new(::GoodJob::Job.distinct.pluck(:queue_name)) : TotalCounter.new + { type: "good_job", by_queue: by_queue, - scheduled: ::GoodJob::Job.scheduled.yield_self(&count_method), - retried: ::GoodJob::Job.retried.yield_self(&count_method), - queued: ::GoodJob::Job.queued.yield_self(&count_method), - running: ::GoodJob::Job.running.yield_self(&count_method), - finished: ::GoodJob::Job.finished.yield_self(&count_method), - succeeded: ::GoodJob::Job.succeeded.yield_self(&count_method), - discarded: ::GoodJob::Job.discarded.yield_self(&count_method) + scheduled: counter.count(::GoodJob::Job.scheduled), + retried: counter.count(::GoodJob::Job.retried), + queued: counter.count(::GoodJob::Job.queued), + running: counter.count(::GoodJob::Job.running), + finished: counter.count(::GoodJob::Job.finished), + succeeded: counter.count(::GoodJob::Job.succeeded), + discarded: counter.count(::GoodJob::Job.discarded) } end end