Skip to content

Commit 13d3414

Browse files
committed
Add configurable HTTP health-check server
1 parent bd6b377 commit 13d3414

File tree

6 files changed

+306
-5
lines changed

6 files changed

+306
-5
lines changed

README.md

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ Solid Queue can be used with SQL databases such as MySQL, PostgreSQL, or SQLite,
2828
- [Failed jobs and retries](#failed-jobs-and-retries)
2929
- [Error reporting on jobs](#error-reporting-on-jobs)
3030
- [Puma plugin](#puma-plugin)
31+
- [Health-check HTTP server](#health-check-http-server)
3132
- [Jobs and transactional integrity](#jobs-and-transactional-integrity)
3233
- [Recurring tasks](#recurring-tasks)
3334
- [Inspiration](#inspiration)
@@ -603,6 +604,28 @@ that you set in production only. This is what Rails 8's default Puma config look
603604

604605
**Note**: phased restarts are not supported currently because the plugin requires [app preloading](https://github.com/puma/puma?tab=readme-ov-file#cluster-mode) to work.
605606

607+
## Health-check HTTP server
608+
609+
Solid Queue provides a tiny HTTP health-check server that now runs as a supervised process.
610+
611+
- Endpoints:
612+
- `/` and `/health`: returns `200 OK` with body `OK`
613+
- Any other path: returns `404 Not Found`
614+
- Configure via `config/queue.yml` under `health_servers:`. Both `host` and `port` are required.
615+
616+
Enable and configure via process configuration:
617+
618+
```yml
619+
production:
620+
health_servers:
621+
- host: 0.0.0.0
622+
port: 9393
623+
```
624+
625+
Note:
626+
- This runs under the supervisor just like workers/dispatchers.
627+
- When the Puma plugin is active (`plugin :solid_queue` in `puma.rb`), configured `health_servers` are skipped to avoid running multiple HTTP servers in the same process tree. A warning is logged. If you need the health server, run Solid Queue outside Puma (for example, via `bin/jobs`) or disable the plugin on that instance.
628+
606629
## Jobs and transactional integrity
607630
:warning: Having your jobs in the same ACID-compliant database as your application data enables a powerful yet sharp tool: taking advantage of transactional integrity to ensure some action in your app is not committed unless your job is also committed and vice versa, and ensuring that your job won't be enqueued until the transaction within which you're enqueuing it is committed. This can be very powerful and useful, but it can also backfire if you base some of your logic on this behaviour, and in the future, you move to another active job backend, or if you simply move Solid Queue to its own database, and suddenly the behaviour changes under you. Because this can be quite tricky and many people shouldn't need to worry about it, by default Solid Queue is configured in a different database as the main app.
608631

lib/puma/plugin/solid_queue.rb

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ def start(launcher)
1313

1414
if Gem::Version.new(Puma::Const::VERSION) < Gem::Version.new("7")
1515
launcher.events.on_booted do
16+
SolidQueue.puma_plugin = true
1617
@solid_queue_pid = fork do
1718
Thread.new { monitor_puma }
1819
SolidQueue::Supervisor.start
@@ -23,6 +24,7 @@ def start(launcher)
2324
launcher.events.on_restart { stop_solid_queue }
2425
else
2526
launcher.events.after_booted do
27+
SolidQueue.puma_plugin = true
2628
@solid_queue_pid = fork do
2729
Thread.new { monitor_puma }
2830
SolidQueue::Supervisor.start

lib/solid_queue.rb

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,11 @@ module SolidQueue
4141
mattr_accessor :clear_finished_jobs_after, default: 1.day
4242
mattr_accessor :default_concurrency_control_period, default: 3.minutes
4343

44+
mattr_accessor :puma_plugin, default: false
45+
4446
delegate :on_start, :on_stop, :on_exit, to: Supervisor
4547

46-
[ Dispatcher, Scheduler, Worker ].each do |process|
48+
[ Dispatcher, Scheduler, Worker, HealthServer ].each do |process|
4749
define_singleton_method(:"on_#{process.name.demodulize.downcase}_start") do |&block|
4850
process.on_start(&block)
4951
end

lib/solid_queue/configuration.rb

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,11 @@ class Configuration
77
validate :ensure_configured_processes
88
validate :ensure_valid_recurring_tasks
99
validate :ensure_correctly_sized_thread_pool
10+
validate :ensure_valid_health_servers
1011

1112
class Process < Struct.new(:kind, :attributes)
1213
def instantiate
13-
"SolidQueue::#{kind.to_s.titleize}".safe_constantize.new(**attributes)
14+
"SolidQueue::#{kind.to_s.camelize}".safe_constantize.new(**attributes)
1415
end
1516
end
1617

@@ -38,7 +39,7 @@ def initialize(**options)
3839
def configured_processes
3940
if only_work? then workers
4041
else
41-
dispatchers + workers + schedulers
42+
dispatchers + workers + schedulers + health_servers
4243
end
4344
end
4445

@@ -129,6 +130,30 @@ def schedulers
129130
end
130131
end
131132

133+
def health_servers
134+
if SolidQueue.puma_plugin
135+
SolidQueue.logger&.warn("SolidQueue health server is configured but Puma plugin is active; skipping starting health server to avoid duplicate servers")
136+
return []
137+
end
138+
139+
health_servers_options.flat_map do |server_options|
140+
processes = server_options.fetch(:processes, 1)
141+
processes.times.map { Process.new(:health_server, server_options) }
142+
end
143+
end
144+
145+
def ensure_valid_health_servers
146+
health_servers_options.each_with_index do |server_options, index|
147+
unless server_options[:host].present?
148+
errors.add(:base, "Health server ##{index + 1}: host is required")
149+
end
150+
151+
unless server_options.key?(:port) && server_options[:port].present?
152+
errors.add(:base, "Health server ##{index + 1}: port is required")
153+
end
154+
end
155+
end
156+
132157
def workers_options
133158
@workers_options ||= processes_config.fetch(:workers, [])
134159
.map { |options| options.dup.symbolize_keys }
@@ -139,6 +164,11 @@ def dispatchers_options
139164
.map { |options| options.dup.symbolize_keys }
140165
end
141166

167+
def health_servers_options
168+
@health_servers_options ||= processes_config.fetch(:health_servers, [])
169+
.map { |options| options.dup.symbolize_keys }
170+
end
171+
142172
def recurring_tasks
143173
@recurring_tasks ||= recurring_tasks_config.map do |id, options|
144174
RecurringTask.from_configuration(id, **options) if options&.has_key?(:schedule)
@@ -147,8 +177,8 @@ def recurring_tasks
147177

148178
def processes_config
149179
@processes_config ||= config_from \
150-
options.slice(:workers, :dispatchers).presence || options[:config_file],
151-
keys: [ :workers, :dispatchers ],
180+
options.slice(:workers, :dispatchers, :health_servers).presence || options[:config_file],
181+
keys: [ :workers, :dispatchers, :health_servers ],
152182
fallback: { workers: [ WORKER_DEFAULTS ], dispatchers: [ DISPATCHER_DEFAULTS ] }
153183
end
154184

lib/solid_queue/health_server.rb

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
# frozen_string_literal: true
2+
3+
require "socket"
4+
require "logger"
5+
6+
module SolidQueue
7+
class HealthServer < Processes::Base
8+
include Processes::Runnable
9+
10+
attr_reader :host, :port, :logger
11+
12+
def initialize(host:, port:, logger: nil, **options)
13+
@host = host
14+
@port = port
15+
@logger = logger || default_logger
16+
@server = nil
17+
18+
super(**options)
19+
end
20+
21+
def metadata
22+
super.merge(host: host, port: port)
23+
end
24+
25+
def running?
26+
@thread&.alive?
27+
end
28+
29+
private
30+
def run
31+
begin
32+
@server = TCPServer.new(host, port)
33+
log_info("listening on #{host}:#{port}")
34+
35+
loop do
36+
break if shutting_down?
37+
38+
readables, = IO.select([ @server, self_pipe[:reader] ].compact, nil, nil, 1)
39+
next unless readables
40+
41+
if readables.include?(self_pipe[:reader])
42+
drain_self_pipe
43+
end
44+
45+
if readables.include?(@server)
46+
handle_connection
47+
end
48+
end
49+
rescue Exception => exception
50+
handle_thread_error(exception)
51+
ensure
52+
SolidQueue.instrument(:shutdown_process, process: self) do
53+
run_callbacks(:shutdown) { shutdown }
54+
end
55+
end
56+
end
57+
58+
def handle_connection
59+
socket = @server.accept_nonblock(exception: false)
60+
return unless socket.is_a?(::TCPSocket)
61+
62+
begin
63+
request_line = socket.gets
64+
path = request_line&.split(" ")&.at(1) || "/"
65+
66+
if path == "/" || path == "/health"
67+
body = "OK"
68+
status_line = "HTTP/1.1 200 OK"
69+
else
70+
body = "Not Found"
71+
status_line = "HTTP/1.1 404 Not Found"
72+
end
73+
74+
headers = [
75+
"Content-Type: text/plain",
76+
"Content-Length: #{body.bytesize}",
77+
"Connection: close"
78+
].join("\r\n")
79+
80+
socket.write("#{status_line}\r\n#{headers}\r\n\r\n#{body}")
81+
ensure
82+
begin
83+
socket.close
84+
rescue StandardError
85+
end
86+
end
87+
end
88+
89+
def shutdown
90+
begin
91+
@server&.close
92+
rescue StandardError
93+
ensure
94+
@server = nil
95+
end
96+
end
97+
98+
def set_procline
99+
procline "http #{host}:#{port}"
100+
end
101+
102+
def default_logger
103+
logger = Logger.new($stdout)
104+
logger.level = Logger::INFO
105+
logger.progname = "SolidQueueHTTP"
106+
logger
107+
end
108+
109+
def log_info(message)
110+
logger&.info(message)
111+
end
112+
113+
def drain_self_pipe
114+
loop { self_pipe[:reader].read_nonblock(11) }
115+
rescue Errno::EAGAIN, Errno::EINTR, IO::EWOULDBLOCKWaitReadable
116+
end
117+
end
118+
end

test/unit/health_server_test.rb

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
# frozen_string_literal: true
2+
3+
require "test_helper"
4+
require "net/http"
5+
require "socket"
6+
require "stringio"
7+
8+
class HealthServerTest < ActiveSupport::TestCase
9+
self.use_transactional_tests = false
10+
def setup
11+
@host = "127.0.0.1"
12+
@port = available_port(@host)
13+
@server = SolidQueue::HealthServer.new(host: @host, port: @port, logger: Logger.new(IO::NULL))
14+
@server.start
15+
wait_for_server
16+
end
17+
18+
def teardown
19+
@server.stop if defined?(@server)
20+
end
21+
22+
def test_health_endpoint_returns_ok
23+
response = http_get("/health")
24+
assert_equal "200", response.code
25+
assert_equal "OK", response.body
26+
end
27+
28+
def test_root_endpoint_returns_ok
29+
response = http_get("/")
30+
assert_equal "200", response.code
31+
assert_equal "OK", response.body
32+
end
33+
34+
def test_unknown_path_returns_not_found
35+
response = http_get("/unknown")
36+
assert_equal "404", response.code
37+
assert_equal "Not Found", response.body
38+
end
39+
40+
def test_stop_stops_server
41+
assert @server.running?, "server should be running before stop"
42+
@server.stop
43+
assert_not @server.running?, "server should not be running after stop"
44+
ensure
45+
# Avoid double-stop in teardown if we stopped here
46+
@server = SolidQueue::HealthServer.new(host: @host, port: @port, logger: Logger.new(IO::NULL))
47+
end
48+
49+
def test_supervisor_starts_health_server_from_configuration
50+
@server.stop # ensure no unsupervised health server is registered
51+
other_port = available_port(@host)
52+
pid = run_supervisor_as_fork(health_servers: [ { host: @host, port: other_port } ], workers: [], dispatchers: [])
53+
wait_for_registered_processes(2, timeout: 2) # supervisor + health server
54+
55+
assert_registered_processes(kind: "HealthServer", count: 1)
56+
57+
# Verify it responds to HTTP
58+
wait_for_server_on(other_port)
59+
response = http_get_on(other_port, "/health")
60+
assert_equal "200", response.code
61+
assert_equal "OK", response.body
62+
ensure
63+
terminate_process(pid) if pid
64+
end
65+
66+
def test_supervisor_skips_health_server_when_puma_plugin_is_active
67+
SolidQueue.puma_plugin = true
68+
69+
original_logger = SolidQueue.logger
70+
SolidQueue.logger = ActiveSupport::Logger.new($stdout)
71+
72+
@server.stop # ensure no unsupervised health server is registered
73+
pid = nil
74+
pid = run_supervisor_as_fork(health_servers: [ { host: @host, port: available_port(@host) } ], workers: [], dispatchers: [])
75+
# Expect only supervisor to register
76+
wait_for_registered_processes(1, timeout: 2)
77+
assert_equal 0, find_processes_registered_as("HealthServer").count
78+
ensure
79+
SolidQueue.logger = original_logger if defined?(original_logger)
80+
SolidQueue.puma_plugin = false
81+
terminate_process(pid) if pid
82+
end
83+
84+
private
85+
def http_get(path)
86+
Net::HTTP.start(@host, @port) do |http|
87+
http.get(path)
88+
end
89+
end
90+
91+
def wait_for_server
92+
# Try to connect for up to 1 second
93+
deadline = Process.clock_gettime(Process::CLOCK_MONOTONIC) + 1.0
94+
begin
95+
Net::HTTP.start(@host, @port) { |http| http.head("/") }
96+
rescue Errno::ECONNREFUSED, Errno::EHOSTUNREACH
97+
raise if Process.clock_gettime(Process::CLOCK_MONOTONIC) > deadline
98+
sleep 0.05
99+
retry
100+
end
101+
end
102+
103+
def wait_for_server_on(port)
104+
deadline = Process.clock_gettime(Process::CLOCK_MONOTONIC) + 1.0
105+
begin
106+
Net::HTTP.start(@host, port) { |http| http.head("/") }
107+
rescue Errno::ECONNREFUSED, Errno::EHOSTUNREACH
108+
raise if Process.clock_gettime(Process::CLOCK_MONOTONIC) > deadline
109+
sleep 0.05
110+
retry
111+
end
112+
end
113+
114+
def http_get_on(port, path)
115+
Net::HTTP.start(@host, port) do |http|
116+
http.get(path)
117+
end
118+
end
119+
120+
def available_port(host)
121+
tcp = TCPServer.new(host, 0)
122+
port = tcp.addr[1]
123+
tcp.close
124+
port
125+
end
126+
end

0 commit comments

Comments
 (0)