diff --git a/README.md b/README.md index d636cd12..c18f3f08 100644 --- a/README.md +++ b/README.md @@ -540,6 +540,7 @@ end | Counter | `delayed_jobs_total` | Total number of delayed jobs executed | `job_name` | | Gauge | `delayed_jobs_enqueued` | Number of enqueued delayed jobs | - | | Gauge | `delayed_jobs_pending` | Number of pending delayed jobs | - | +| Gauge | `delayed_jobs_ready` | Number of ready delayed jobs      | - | | Counter | `delayed_failed_jobs_total` | Total number failed delayed jobs executed | `job_name` | | Counter | `delayed_jobs_max_attempts_reached_total` | Total number of delayed jobs that reached max attempts | - | | Summary | `delayed_job_duration_seconds_summary` | Summary of the time it takes jobs to execute | `status` | @@ -653,6 +654,9 @@ installation, you'll need to start the instrumentation: # e.g. config/initializers/good_job.rb require 'prometheus_exporter/instrumentation' PrometheusExporter::Instrumentation::GoodJob.start + +# or, to collect metrics labelled by their queue name +PrometheusExporter::Instrumentation::GoodJob.start(collect_by_queue: true) ``` #### Metrics collected by GoodJob Instrumentation diff --git a/lib/prometheus_exporter/instrumentation/delayed_job.rb b/lib/prometheus_exporter/instrumentation/delayed_job.rb index 9978affa..0cfd50c1 100644 --- a/lib/prometheus_exporter/instrumentation/delayed_job.rb +++ b/lib/prometheus_exporter/instrumentation/delayed_job.rb @@ -15,13 +15,15 @@ def register_plugin(client: nil, include_module_name: false) lifecycle.around(:invoke_job) do |job, *args, &block| max_attempts = Delayed::Worker.max_attempts enqueued_count = Delayed::Job.where(queue: job.queue).count - pending_count = - Delayed::Job.where(attempts: 0, locked_at: nil, queue: job.queue).count + pending_count = Delayed::Job.where(attempts: 0, locked_at: nil, queue: job.queue).count + ready_count = Delayed::Job.where(queue: job.queue, run_at: ..Time.current, locked_at: nil, failed_at: nil).count + instrumenter.call( job, max_attempts, enqueued_count, pending_count, + ready_count, include_module_name, *args, &block @@ -38,7 +40,7 @@ def initialize(client: nil) @client = client || PrometheusExporter::Client.default end - def call(job, max_attempts, enqueued_count, pending_count, include_module_name, *args, &block) + def call(job, max_attempts, enqueued_count, pending_count, ready_count, include_module_name, *args, &block) success = false start = ::Process.clock_gettime(::Process::CLOCK_MONOTONIC) latency = Time.current - job.run_at @@ -60,6 +62,7 @@ def call(job, max_attempts, enqueued_count, pending_count, include_module_name, max_attempts: max_attempts, enqueued: enqueued_count, pending: pending_count, + ready: ready_count ) end end diff --git a/lib/prometheus_exporter/instrumentation/good_job.rb b/lib/prometheus_exporter/instrumentation/good_job.rb index 6342c340..f3df705b 100644 --- a/lib/prometheus_exporter/instrumentation/good_job.rb +++ b/lib/prometheus_exporter/instrumentation/good_job.rb @@ -3,25 +3,46 @@ # collects stats from GoodJob module PrometheusExporter::Instrumentation class GoodJob < PeriodicStats - def self.start(client: nil, frequency: 30) + TotalCounter = Struct.new do + def count(relation) + relation.size + end + end + + QueueCounter = Struct.new(:queue_names) do + def initialize(queue_names) + @empty_queues = queue_names.to_h { |name| [name, 0] } + end + + def count(relation) + @empty_queues.merge(relation.group(:queue_name).size) + end + end + + def self.start(client: nil, frequency: 30, collect_by_queue: false) good_job_collector = new client ||= PrometheusExporter::Client.default - worker_loop { client.send_json(good_job_collector.collect) } + worker_loop do + client.send_json(good_job_collector.collect(collect_by_queue)) + end super end - def collect + def collect(by_queue = false) + counter = by_queue ? QueueCounter.new(::GoodJob::Job.distinct.pluck(:queue_name)) : TotalCounter.new + { type: "good_job", - scheduled: ::GoodJob::Job.scheduled.size, - retried: ::GoodJob::Job.retried.size, - queued: ::GoodJob::Job.queued.size, - running: ::GoodJob::Job.running.size, - finished: ::GoodJob::Job.finished.size, - succeeded: ::GoodJob::Job.succeeded.size, - discarded: ::GoodJob::Job.discarded.size, + by_queue: by_queue, + scheduled: counter.count(::GoodJob::Job.scheduled), + retried: counter.count(::GoodJob::Job.retried), + queued: counter.count(::GoodJob::Job.queued), + running: counter.count(::GoodJob::Job.running), + finished: counter.count(::GoodJob::Job.finished), + succeeded: counter.count(::GoodJob::Job.succeeded), + discarded: counter.count(::GoodJob::Job.discarded) } end end diff --git a/lib/prometheus_exporter/server/delayed_job_collector.rb b/lib/prometheus_exporter/server/delayed_job_collector.rb index 85bc1721..8ea78c72 100644 --- a/lib/prometheus_exporter/server/delayed_job_collector.rb +++ b/lib/prometheus_exporter/server/delayed_job_collector.rb @@ -13,6 +13,7 @@ def initialize @delayed_job_attempts_summary = nil @delayed_jobs_enqueued = nil @delayed_jobs_pending = nil + @delayed_jobs_ready = nil end def type @@ -48,6 +49,7 @@ def collect(obj) @delayed_job_attempts_summary.observe(obj["attempts"], counter_labels) if obj["success"] @delayed_jobs_enqueued.observe(obj["enqueued"], gauge_labels) @delayed_jobs_pending.observe(obj["pending"], gauge_labels) + @delayed_jobs_ready.observe(obj["ready"], gauge_labels) end def metrics @@ -62,6 +64,7 @@ def metrics @delayed_job_attempts_summary, @delayed_jobs_enqueued, @delayed_jobs_pending, + @delayed_jobs_ready, ] else [] @@ -102,6 +105,11 @@ def ensure_delayed_job_metrics "Number of pending delayed jobs.", ) + @delayed_jobs_ready = + PrometheusExporter::Metric::Gauge.new( + "delayed_jobs_ready", "Number of ready delayed jobs." + ) + @delayed_failed_jobs_total = PrometheusExporter::Metric::Counter.new( "delayed_failed_jobs_total", diff --git a/lib/prometheus_exporter/server/good_job_collector.rb b/lib/prometheus_exporter/server/good_job_collector.rb index e2fc4186..256740ee 100644 --- a/lib/prometheus_exporter/server/good_job_collector.rb +++ b/lib/prometheus_exporter/server/good_job_collector.rb @@ -23,21 +23,9 @@ def type end def metrics - return [] if good_job_metrics.length == 0 - - good_job_metrics.map do |metric| - labels = metric.fetch("custom_labels", {}) - - GOOD_JOB_GAUGES.map do |name, help| - value = metric[name.to_s] - - if value - gauge = gauges[name] ||= PrometheusExporter::Metric::Gauge.new("good_job_#{name}", help) - gauge.observe(value, labels) - end - end - end + return [] if good_job_metrics.length.zero? + good_job_metrics.each(&method(:process_metric)) gauges.values end @@ -48,5 +36,24 @@ def collect(object) private attr_reader :good_job_metrics, :gauges + + def process_metric(metric) + labels = metric.fetch("custom_labels", {}) + + GOOD_JOB_GAUGES.each do |name, help| + next unless (value = metric[name.to_s]) + + gauge = gauges[name] ||= PrometheusExporter::Metric::Gauge.new("good_job_#{name}", help) + observe_metric(gauge, metric, labels, value) + end + end + + def observe_metric(gauge, metric, labels, value) + if metric["by_queue"] + value.each { |queue_name, count| gauge.observe(count, labels.merge(queue_name: queue_name)) } + else + gauge.observe(value, labels) + end + end end end diff --git a/lib/prometheus_exporter/server/metrics_container.rb b/lib/prometheus_exporter/server/metrics_container.rb index 1e0f2ddf..41eff7b3 100644 --- a/lib/prometheus_exporter/server/metrics_container.rb +++ b/lib/prometheus_exporter/server/metrics_container.rb @@ -34,14 +34,14 @@ def size(&blk) end alias_method :length, :size - def map(&blk) - wrap_expire(:map, &blk) - end - def each(&blk) wrap_expire(:each, &blk) end + def map(&blk) + wrap_expire(:map, &blk) + end + def expire(time: nil, new_metric: nil) time ||= get_time