Skip to content

Commit ce03fb0

Browse files
bougymanBryceARich
andauthored
feat: Adds graceful shutdown when INT/TERM/QUIT signal is received (#18)
* refactor: Lowers AbcSize for building workers * chore: move instance methods to an instance method module and update impacted tests * feat: Adds Dry::Configurable extension, and makes #logger a configurable setting * feat: Adds graceful termination of workers * style: Corrects rubocop AbcSize offense * style: Separate shutdown method for clarity * fix: Adds logging of worker options * fix: Makes #logger a method to keep the One True Logger pattern alive * fix: Adds guard around signal trap defining * style: Cleaner syntax for the instance logger definition --------- Co-authored-by: Bryce Rich <[email protected]>
1 parent 5f86233 commit ce03fb0

File tree

4 files changed

+186
-56
lines changed

4 files changed

+186
-56
lines changed

Gemfile.lock

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
PATH
22
remote: .
33
specs:
4-
leopard (0.1.0)
4+
leopard (0.1.5)
55
concurrent-ruby (~> 1.1)
66
dry-configurable (~> 1.3)
77
dry-monads (~> 1.9)

examples/echo_endpoint.rb

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,21 @@
77
class EchoService
88
include Rubyists::Leopard::NatsApiServer
99

10+
def initialize(a_var = 1)
11+
logger.info "EchoService initialized with a_var: #{a_var}"
12+
end
13+
1014
endpoint(:echo) { |msg| Success(msg.data) }
1115
end
1216

1317
if __FILE__ == $PROGRAM_NAME
1418
EchoService.run(
1519
nats_url: 'nats://localhost:4222',
16-
service_opts: { name: 'example.echo', version: '1.0.0' },
20+
service_opts: {
21+
name: 'example.echo',
22+
version: '1.0.0',
23+
instance_args: [2],
24+
},
1725
instances: 4,
1826
)
1927
end

lib/leopard/nats_api_server.rb

Lines changed: 124 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
require 'nats/client'
44
require 'dry/monads'
5+
require 'dry/configurable'
56
require 'concurrent'
67
require_relative '../leopard'
78
require_relative 'message_wrapper'
@@ -14,10 +15,14 @@ module NatsApiServer
1415

1516
def self.included(base)
1617
base.extend(ClassMethods)
18+
base.include(InstanceMethods)
1719
base.extend(Dry::Monads[:result])
18-
base.include(SemanticLogger::Loggable)
20+
base.extend(Dry::Configurable)
21+
base.setting :logger, default: Rubyists::Leopard.logger, reader: true
1922
end
2023

24+
Endpoint = Struct.new(:name, :subject, :queue, :group, :handler)
25+
2126
module ClassMethods
2227
def endpoints = @endpoints ||= []
2328
def groups = @groups ||= {}
@@ -33,13 +38,7 @@ def middleware = @middleware ||= []
3338
#
3439
# @return [void]
3540
def endpoint(name, subject: nil, queue: nil, group: nil, &handler)
36-
endpoints << {
37-
name:,
38-
subject: subject || name,
39-
queue:,
40-
group:,
41-
handler:,
42-
}
41+
endpoints << Endpoint.new(name:, subject: subject || name, queue:, group:, handler:)
4342
end
4443

4544
# Define a group for organizing endpoints.
@@ -75,11 +74,12 @@ def use(klass, *args, &block)
7574
# @return [void]
7675
def run(nats_url:, service_opts:, instances: 1, blocking: true)
7776
logger.info 'Booting NATS API server...'
78-
# Return the thread pool if non-blocking
79-
pool = spawn_instances(nats_url, service_opts, instances)
77+
workers = Concurrent::Array.new
78+
pool = spawn_instances(nats_url, service_opts, instances, workers)
79+
logger.info 'Setting up signal trap...'
80+
trap_signals(workers, pool)
8081
return pool unless blocking
8182

82-
# Otherwise, just sleep the main thread forever
8383
sleep
8484
end
8585

@@ -92,16 +92,86 @@ def run(nats_url:, service_opts:, instances: 1, blocking: true)
9292
# @param count [Integer] The number of instances to spawn.
9393
#
9494
# @return [Concurrent::FixedThreadPool] The thread pool managing the worker threads.
95-
def spawn_instances(url, opts, count)
95+
def spawn_instances(url, opts, count, workers)
9696
pool = Concurrent::FixedThreadPool.new(count)
97+
@instance_args = opts.delete(:instance_args) || nil
98+
logger.info "Building #{count} workers with options: #{opts.inspect}, instance_args: #{@instance_args}"
9799
count.times do
98100
eps = endpoints.dup
99101
gps = groups.dup
100-
pool.post { setup_worker(url, opts, eps, gps) }
102+
pool.post { build_worker(url, opts, eps, gps, workers) }
101103
end
102104
pool
103105
end
104106

107+
# Builds a worker instance and sets it up with the NATS server.
108+
#
109+
# @param url [String] The URL of the NATS server.
110+
# @param opts [Hash] Options for the NATS service.
111+
# @param eps [Array<Hash>] The list of endpoints to add.
112+
# @param gps [Hash] The groups to add.
113+
# @param workers [Array] The array to store worker instances.
114+
#
115+
# @return [void]
116+
def build_worker(url, opts, eps, gps, workers)
117+
worker = @instance_args ? new(*@instance_args) : new
118+
workers << worker
119+
worker.setup_worker(url, opts, eps, gps)
120+
end
121+
122+
# Shuts down the NATS API server gracefully.
123+
#
124+
# @param workers [Array] The array of worker instances to stop.
125+
# @param pool [Concurrent::FixedThreadPool] The thread pool managing the worker threads.
126+
#
127+
# @return [Proc] A lambda that performs the shutdown operations.
128+
def shutdown(workers, pool)
129+
lambda do
130+
logger.warn 'Draining worker subscriptions...'
131+
workers.each(&:stop)
132+
logger.warn 'All workers stopped, shutting down pool...'
133+
pool.shutdown
134+
logger.warn 'Pool is shut down, waiting for termination!'
135+
pool.wait_for_termination
136+
logger.warn 'Bye bye!'
137+
wake_main_thread
138+
end
139+
end
140+
141+
# Sets up signal traps for graceful shutdown of the NATS API server.
142+
#
143+
# @param workers [Array] The array of worker instances to stop on signal.
144+
# @param pool [Concurrent::FixedThreadPool] The thread pool managing the worker threads.
145+
#
146+
# @return [void]
147+
def trap_signals(workers, pool)
148+
return if @trapped
149+
150+
%w[INT TERM QUIT].each do |sig|
151+
trap(sig) do
152+
logger.warn "Received #{sig} signal, shutting down..."
153+
Thread.new { shutdown(workers, pool).call }
154+
end
155+
end
156+
@trapped = true
157+
end
158+
159+
# Wakes up the main thread to allow it to continue execution after the server is stopped.
160+
# This is useful when the server is running in a blocking mode.
161+
# If the main thread is not blocked, this method does nothing.
162+
#
163+
# @return [void]
164+
def wake_main_thread
165+
Thread.main.wakeup
166+
rescue ThreadError
167+
nil
168+
end
169+
end
170+
171+
module InstanceMethods
172+
# Returns the logger configured for the NATS API server.
173+
def logger = self.class.logger
174+
105175
# Sets up a worker thread for the NATS API server.
106176
# This method connects to the NATS server, adds the service, groups, and endpoints,
107177
# and keeps the worker thread alive.
@@ -113,62 +183,80 @@ def spawn_instances(url, opts, count)
113183
#
114184
# @return [void]
115185
def setup_worker(url, opts, eps, gps)
116-
client = NATS.connect url
117-
service = client.services.add(**opts)
118-
group_map = add_groups(service, gps)
119-
add_endpoints service, eps, group_map
120-
# Keep the worker thread alive
186+
@thread = Thread.current
187+
@client = NATS.connect url
188+
@service = @client.services.add(**opts)
189+
group_map = add_groups(gps)
190+
add_endpoints eps, group_map
121191
sleep
122192
end
123193

194+
# Stops the NATS API server worker.
195+
def stop
196+
@service&.stop
197+
@client&.close
198+
@thread&.wakeup
199+
rescue ThreadError
200+
nil
201+
end
202+
203+
private
204+
124205
# Adds groups to the NATS service.
125206
#
126-
# @param service [NATS::Service] The NATS service to add groups to.
127207
# @param gps [Hash] The groups to add, where keys are group names and values are group definitions.
128208
#
129209
# @return [Hash] A map of group names to their created group objects.
130-
def add_groups(service, gps)
210+
def add_groups(gps)
131211
created = {}
132-
gps.each_key { |name| build_group(service, gps, created, name) }
212+
gps.each_key { |name| build_group(gps, created, name) }
133213
created
134214
end
135215

136216
# Builds a group in the NATS service.
137217
#
138-
# @param service [NATS::Service] The NATS service to add the group to.
139218
# @param defs [Hash] The group definitions, where keys are group names and values are group definitions.
140219
# @param cache [Hash] A cache to store already created groups.
141220
# @param name [String] The name of the group to build.
142221
#
143222
# @return [NATS::Group] The created group object.
144-
def build_group(service, defs, cache, name)
223+
def build_group(defs, cache, name)
145224
return cache[name] if cache.key?(name)
146225

147226
gdef = defs[name]
148227
raise ArgumentError, "Group #{name} not defined" unless gdef
149228

150-
parent = gdef[:parent] ? build_group(service, defs, cache, gdef[:parent]) : service
229+
parent = gdef[:parent] ? build_group(defs, cache, gdef[:parent]) : @service
151230
cache[name] = parent.groups.add(gdef[:name], queue: gdef[:queue])
152231
end
153232

154233
# Adds endpoints to the NATS service.
155234
#
156-
# @param service [NATS::Service] The NATS service to add endpoints to.
157235
# @param endpoints [Array<Hash>] The list of endpoints to add.
158236
# @param group_map [Hash] A map of group names to their created group objects.
159237
#
160238
# @return [void]
161-
def add_endpoints(service, endpoints, group_map)
239+
def add_endpoints(endpoints, group_map)
162240
endpoints.each do |ep|
163-
parent = ep[:group] ? group_map[ep[:group]] : service
164-
raise ArgumentError, "Group #{ep[:group]} not defined" if ep[:group] && parent.nil?
165-
166-
parent.endpoints.add(
167-
ep[:name], subject: ep[:subject], queue: ep[:queue]
168-
) do |raw_msg|
169-
wrapper = MessageWrapper.new(raw_msg)
170-
dispatch_with_middleware(wrapper, ep[:handler])
171-
end
241+
grp = ep.group
242+
parent = grp ? group_map[grp] : @service
243+
raise ArgumentError, "Group #{grp} not defined" if grp && parent.nil?
244+
245+
build_endpoint(parent, ep)
246+
end
247+
end
248+
249+
# Builds an endpoint in the NATS service.
250+
#
251+
# @param parent [NATS::Group] The parent group or service to add the endpoint to.
252+
# @param ept [Endpoint] The endpoint definition containing name, subject, queue, and handler.
253+
# NOTE: Named ept because `endpoint` is a DSL method we expose, to avoid confusion.
254+
#
255+
# @return [void]
256+
def build_endpoint(parent, ept)
257+
parent.endpoints.add(ept.name, subject: ept.subject, queue: ept.queue) do |raw_msg|
258+
wrapper = MessageWrapper.new(raw_msg)
259+
dispatch_with_middleware(wrapper, ept.handler)
172260
end
173261
end
174262

@@ -180,7 +268,7 @@ def add_endpoints(service, endpoints, group_map)
180268
# @return [void]
181269
def dispatch_with_middleware(wrapper, handler)
182270
app = ->(w) { handle_message(w.raw, handler) }
183-
middleware.reverse_each do |(klass, args, blk)|
271+
self.class.middleware.reverse_each do |(klass, args, blk)|
184272
app = klass.new(app, *args, &blk)
185273
end
186274
app.call(wrapper)
@@ -203,7 +291,6 @@ def handle_message(raw_msg, handler)
203291

204292
# Processes the result of the handler execution.
205293
#
206-
#
207294
# @param wrapper [MessageWrapper] The message wrapper containing the raw message.
208295
# @param result [Dry::Monads::Result] The result of the handler execution.
209296
#

0 commit comments

Comments
 (0)