From b4a364a44b302db7dab9cd07c67f5bb0095927dc Mon Sep 17 00:00:00 2001 From: Andriichuk Date: Thu, 23 Oct 2025 21:39:55 +0100 Subject: [PATCH] Add configurable HTTP health-check server --- README.md | 25 ++++++ lib/puma/plugin/solid_queue.rb | 2 + lib/solid_queue.rb | 4 +- lib/solid_queue/configuration.rb | 42 ++++++++- lib/solid_queue/health_server.rb | 142 +++++++++++++++++++++++++++++++ test/unit/health_server_test.rb | 126 +++++++++++++++++++++++++++ 6 files changed, 336 insertions(+), 5 deletions(-) create mode 100644 lib/solid_queue/health_server.rb create mode 100644 test/unit/health_server_test.rb diff --git a/README.md b/README.md index 92a018d4..24b23145 100644 --- a/README.md +++ b/README.md @@ -28,6 +28,7 @@ Solid Queue can be used with SQL databases such as MySQL, PostgreSQL, or SQLite, - [Failed jobs and retries](#failed-jobs-and-retries) - [Error reporting on jobs](#error-reporting-on-jobs) - [Puma plugin](#puma-plugin) +- [Health-check HTTP server](#health-check-http-server) - [Jobs and transactional integrity](#jobs-and-transactional-integrity) - [Recurring tasks](#recurring-tasks) - [Inspiration](#inspiration) @@ -603,6 +604,30 @@ that you set in production only. This is what Rails 8's default Puma config look **Note**: phased restarts are not supported currently because the plugin requires [app preloading](https://github.com/puma/puma?tab=readme-ov-file#cluster-mode) to work. +## Health-check HTTP server + +Solid Queue provides a tiny HTTP health-check server that runs as a supervised process. + +- Endpoints: + - `/` and `/health`: + - Returns `200 OK` with body `OK` when the supervisor and all supervised processes (workers, dispatchers, scheduler, and the health server itself) have fresh heartbeats. + - Returns `503 Service Unavailable` with body `Unhealthy` if any supervised process (or the supervisor) has a stale heartbeat. + - Any other path: returns `404 Not Found` + - Configure via `config/queue.yml` under `health_server:`. Both `host` and `port` are required. + +Enable and configure via process configuration: + +```yml +production: + health_server: + host: 0.0.0.0 + port: 9393 +``` + +Note: +- This runs under the supervisor just like workers/dispatchers. +- When the Puma plugin is active (`plugin :solid_queue` in `puma.rb`), the configured health server is skipped to avoid running multiple HTTP servers in the same process tree. A warning is logged. If you need the health server, run Solid Queue outside Puma (for example, via `bin/jobs`) or disable the plugin on that instance. + ## Jobs and transactional integrity :warning: Having your jobs in the same ACID-compliant database as your application data enables a powerful yet sharp tool: taking advantage of transactional integrity to ensure some action in your app is not committed unless your job is also committed and vice versa, and ensuring that your job won't be enqueued until the transaction within which you're enqueuing it is committed. This can be very powerful and useful, but it can also backfire if you base some of your logic on this behaviour, and in the future, you move to another active job backend, or if you simply move Solid Queue to its own database, and suddenly the behaviour changes under you. Because this can be quite tricky and many people shouldn't need to worry about it, by default Solid Queue is configured in a different database as the main app. diff --git a/lib/puma/plugin/solid_queue.rb b/lib/puma/plugin/solid_queue.rb index 434b8f65..a89a4ecf 100644 --- a/lib/puma/plugin/solid_queue.rb +++ b/lib/puma/plugin/solid_queue.rb @@ -13,6 +13,7 @@ def start(launcher) if Gem::Version.new(Puma::Const::VERSION) < Gem::Version.new("7") launcher.events.on_booted do + SolidQueue.puma_plugin = true @solid_queue_pid = fork do Thread.new { monitor_puma } SolidQueue::Supervisor.start @@ -23,6 +24,7 @@ def start(launcher) launcher.events.on_restart { stop_solid_queue } else launcher.events.after_booted do + SolidQueue.puma_plugin = true @solid_queue_pid = fork do Thread.new { monitor_puma } SolidQueue::Supervisor.start diff --git a/lib/solid_queue.rb b/lib/solid_queue.rb index e0d51c8c..a9d49e40 100644 --- a/lib/solid_queue.rb +++ b/lib/solid_queue.rb @@ -41,9 +41,11 @@ module SolidQueue mattr_accessor :clear_finished_jobs_after, default: 1.day mattr_accessor :default_concurrency_control_period, default: 3.minutes + mattr_accessor :puma_plugin, default: false + delegate :on_start, :on_stop, :on_exit, to: Supervisor - [ Dispatcher, Scheduler, Worker ].each do |process| + [ Dispatcher, Scheduler, Worker, HealthServer ].each do |process| define_singleton_method(:"on_#{process.name.demodulize.downcase}_start") do |&block| process.on_start(&block) end diff --git a/lib/solid_queue/configuration.rb b/lib/solid_queue/configuration.rb index a002b41d..a3e250f0 100644 --- a/lib/solid_queue/configuration.rb +++ b/lib/solid_queue/configuration.rb @@ -7,10 +7,11 @@ class Configuration validate :ensure_configured_processes validate :ensure_valid_recurring_tasks validate :ensure_correctly_sized_thread_pool + validate :ensure_valid_health_server class Process < Struct.new(:kind, :attributes) def instantiate - "SolidQueue::#{kind.to_s.titleize}".safe_constantize.new(**attributes) + "SolidQueue::#{kind.to_s.camelize}".safe_constantize.new(**attributes) end end @@ -38,7 +39,7 @@ def initialize(**options) def configured_processes if only_work? then workers else - dispatchers + workers + schedulers + dispatchers + workers + schedulers + health_server end end @@ -129,6 +130,31 @@ def schedulers end end + def health_server + if SolidQueue.puma_plugin + SolidQueue.logger&.warn("SolidQueue health server is configured but Puma plugin is active; skipping starting health server to avoid duplicate servers") + return [] + end + + options = health_server_options + return [] unless options + + [ Process.new(:health_server, options) ] + end + + def ensure_valid_health_server + server_options = health_server_options + return unless server_options + + unless server_options[:host].present? + errors.add(:base, "Health server: host is required") + end + + unless server_options.key?(:port) && server_options[:port].present? + errors.add(:base, "Health server: port is required") + end + end + def workers_options @workers_options ||= processes_config.fetch(:workers, []) .map { |options| options.dup.symbolize_keys } @@ -139,6 +165,14 @@ def dispatchers_options .map { |options| options.dup.symbolize_keys } end + def health_server_options + @health_server_options ||= begin + options = processes_config[:health_server] + options = options.dup.symbolize_keys if options + options.present? ? options : nil + end + end + def recurring_tasks @recurring_tasks ||= recurring_tasks_config.map do |id, options| RecurringTask.from_configuration(id, **options) if options&.has_key?(:schedule) @@ -147,8 +181,8 @@ def recurring_tasks def processes_config @processes_config ||= config_from \ - options.slice(:workers, :dispatchers).presence || options[:config_file], - keys: [ :workers, :dispatchers ], + options.slice(:workers, :dispatchers, :health_server).presence || options[:config_file], + keys: [ :workers, :dispatchers, :health_server ], fallback: { workers: [ WORKER_DEFAULTS ], dispatchers: [ DISPATCHER_DEFAULTS ] } end diff --git a/lib/solid_queue/health_server.rb b/lib/solid_queue/health_server.rb new file mode 100644 index 00000000..ecd0d5d0 --- /dev/null +++ b/lib/solid_queue/health_server.rb @@ -0,0 +1,142 @@ +# frozen_string_literal: true + +require "socket" +require "logger" + +module SolidQueue + class HealthServer < Processes::Base + include Processes::Runnable + + attr_reader :host, :port, :logger + + def initialize(host:, port:, logger: nil, **options) + @host = host + @port = port + @logger = logger || default_logger + @server = nil + + super(**options) + end + + def metadata + super.merge(host: host, port: port) + end + + def running? + @thread&.alive? + end + + private + def run + begin + @server = TCPServer.new(host, port) + log_info("listening on #{host}:#{port}") + + loop do + break if shutting_down? + + readables, = IO.select([ @server, self_pipe[:reader] ].compact, nil, nil, 1) + next unless readables + + if readables.include?(self_pipe[:reader]) + drain_self_pipe + end + + if readables.include?(@server) + handle_connection + end + end + rescue Exception => exception + handle_thread_error(exception) + ensure + SolidQueue.instrument(:shutdown_process, process: self) do + run_callbacks(:shutdown) { shutdown } + end + end + end + + def handle_connection + socket = @server.accept_nonblock(exception: false) + return unless socket.is_a?(::TCPSocket) + + begin + request_line = socket.gets + path = request_line&.split(" ")&.at(1) || "/" + + if path == "/" || path == "/health" + if system_healthy? + body = "OK" + status_line = "HTTP/1.1 200 OK" + else + body = "Unhealthy" + status_line = "HTTP/1.1 503 Service Unavailable" + end + else + body = "Not Found" + status_line = "HTTP/1.1 404 Not Found" + end + + headers = [ + "Content-Type: text/plain", + "Content-Length: #{body.bytesize}", + "Connection: close" + ].join("\r\n") + + socket.write("#{status_line}\r\n#{headers}\r\n\r\n#{body}") + ensure + begin + socket.close + rescue StandardError + end + end + end + + def shutdown + begin + @server&.close + rescue StandardError + ensure + @server = nil + end + end + + def set_procline + procline "http #{host}:#{port}" + end + + def default_logger + logger = Logger.new($stdout) + logger.level = Logger::INFO + logger.progname = "SolidQueueHTTP" + logger + end + + def log_info(message) + logger&.info(message) + end + + def drain_self_pipe + loop { self_pipe[:reader].read_nonblock(11) } + rescue Errno::EAGAIN, Errno::EINTR, IO::EWOULDBLOCKWaitReadable + end + + def system_healthy? + wrap_in_app_executor do + # If not supervised (e.g., unit tests), consider healthy + supervisor_record = process&.supervisor + return true unless supervisor_record + + # Supervisor must be alive + supervisor_alive = SolidQueue::Process.where(id: supervisor_record.id).merge(SolidQueue::Process.prunable).none? + + # All supervisees must be alive (including this health server) + supervisees_alive = supervisor_record.supervisees.merge(SolidQueue::Process.prunable).none? + + supervisor_alive && supervisees_alive + end + rescue StandardError => error + log_info("health check error: #{error.class}: #{error.message}") + false + end + end +end diff --git a/test/unit/health_server_test.rb b/test/unit/health_server_test.rb new file mode 100644 index 00000000..619d112c --- /dev/null +++ b/test/unit/health_server_test.rb @@ -0,0 +1,126 @@ +# frozen_string_literal: true + +require "test_helper" +require "net/http" +require "socket" +require "stringio" + +class HealthServerTest < ActiveSupport::TestCase + self.use_transactional_tests = false + def setup + @host = "127.0.0.1" + @port = available_port(@host) + @server = SolidQueue::HealthServer.new(host: @host, port: @port, logger: Logger.new(IO::NULL)) + @server.start + wait_for_server + end + + def teardown + @server.stop if defined?(@server) + end + + def test_health_endpoint_returns_ok + response = http_get("/health") + assert_equal "200", response.code + assert_equal "OK", response.body + end + + def test_root_endpoint_returns_ok + response = http_get("/") + assert_equal "200", response.code + assert_equal "OK", response.body + end + + def test_unknown_path_returns_not_found + response = http_get("/unknown") + assert_equal "404", response.code + assert_equal "Not Found", response.body + end + + def test_stop_stops_server + assert @server.running?, "server should be running before stop" + @server.stop + assert_not @server.running?, "server should not be running after stop" + ensure + # Avoid double-stop in teardown if we stopped here + @server = SolidQueue::HealthServer.new(host: @host, port: @port, logger: Logger.new(IO::NULL)) + end + + def test_supervisor_starts_health_server_from_configuration + @server.stop # ensure no unsupervised health server is registered + other_port = available_port(@host) + pid = run_supervisor_as_fork(health_server: { host: @host, port: other_port }, workers: [], dispatchers: []) + wait_for_registered_processes(2, timeout: 2) # supervisor + health server + + assert_registered_processes(kind: "HealthServer", count: 1) + + # Verify it responds to HTTP + wait_for_server_on(other_port) + response = http_get_on(other_port, "/health") + assert_equal "200", response.code + assert_equal "OK", response.body + ensure + terminate_process(pid) if pid + end + + def test_supervisor_skips_health_server_when_puma_plugin_is_active + SolidQueue.puma_plugin = true + + original_logger = SolidQueue.logger + SolidQueue.logger = ActiveSupport::Logger.new($stdout) + + @server.stop # ensure no unsupervised health server is registered + pid = nil + pid = run_supervisor_as_fork(health_server: { host: @host, port: available_port(@host) }, workers: [], dispatchers: []) + # Expect only supervisor to register + wait_for_registered_processes(1, timeout: 2) + assert_equal 0, find_processes_registered_as("HealthServer").count + ensure + SolidQueue.logger = original_logger if defined?(original_logger) + SolidQueue.puma_plugin = false + terminate_process(pid) if pid + end + + private + def http_get(path) + Net::HTTP.start(@host, @port) do |http| + http.get(path) + end + end + + def wait_for_server + # Try to connect for up to 1 second + deadline = Process.clock_gettime(Process::CLOCK_MONOTONIC) + 1.0 + begin + Net::HTTP.start(@host, @port) { |http| http.head("/") } + rescue Errno::ECONNREFUSED, Errno::EHOSTUNREACH + raise if Process.clock_gettime(Process::CLOCK_MONOTONIC) > deadline + sleep 0.05 + retry + end + end + + def wait_for_server_on(port) + deadline = Process.clock_gettime(Process::CLOCK_MONOTONIC) + 1.0 + begin + Net::HTTP.start(@host, port) { |http| http.head("/") } + rescue Errno::ECONNREFUSED, Errno::EHOSTUNREACH + raise if Process.clock_gettime(Process::CLOCK_MONOTONIC) > deadline + sleep 0.05 + retry + end + end + + def http_get_on(port, path) + Net::HTTP.start(@host, port) do |http| + http.get(path) + end + end + + def available_port(host) + tcp = TCPServer.new(host, 0) + port = tcp.addr[1] + tcp.close + port + end +end