Skip to content

Commit 42d7c00

Browse files
authored
Introduce Async::Task#defer_stop for graceful shutdown. (#310)
1 parent 5aef5b7 commit 42d7c00

File tree

2 files changed

+103
-2
lines changed

2 files changed

+103
-2
lines changed

lib/async/task.rb

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,8 @@ def initialize(parent = Task.current?, finished: nil, **options, &block)
6767
@status = :initialized
6868
@result = nil
6969
@finished = finished
70+
71+
@defer_stop = nil
7072
end
7173

7274
def reactor
@@ -212,6 +214,13 @@ def stop(later = false)
212214
return stopped!
213215
end
214216

217+
# If we are deferring stop...
218+
if @defer_stop == false
219+
# Don't stop now... but update the state so we know we need to stop later.
220+
@defer_stop = true
221+
return false
222+
end
223+
215224
# If the fiber is alive, we need to stop it:
216225
if @fiber&.alive?
217226
if self.current?
@@ -239,6 +248,41 @@ def stop(later = false)
239248
end
240249
end
241250

251+
# Defer the handling of stop. During the execution of the given block, if a stop is requested, it will be deferred until the block exits. This is useful for ensuring graceful shutdown of servers and other long-running tasks. You should wrap the response handling code in a defer_stop block to ensure that the task is stopped when the response is complete but not before.
252+
#
253+
# You can nest calls to defer_stop, but the stop will only be deferred until the outermost block exits.
254+
#
255+
# If stop is invoked a second time, it will be immediately executed.
256+
#
257+
# @yields {} The block of code to execute.
258+
def defer_stop
259+
# Tri-state variable for controlling stop:
260+
# - nil: defer_stop has not been called.
261+
# - false: defer_stop has been called and we are not stopping.
262+
# - true: defer_stop has been called and we will stop when exiting the block.
263+
if @defer_stop.nil?
264+
# If we are not deferring stop already, we can defer it now:
265+
@defer_stop = false
266+
267+
begin
268+
yield
269+
rescue Stop
270+
# If we are exiting due to a stop, we shouldn't try to invoke stop again:
271+
@defer_stop = nil
272+
raise
273+
ensure
274+
# If we were asked to stop, we should do so now:
275+
if @defer_stop
276+
@defer_stop = nil
277+
self.stop
278+
end
279+
end
280+
else
281+
# If we are deferring stop already, entering it again is a no-op.
282+
yield
283+
end
284+
end
285+
242286
# Lookup the {Task} for the current fiber. Raise `RuntimeError` if none is available.
243287
# @returns [Task]
244288
# @raises[RuntimeError] If task was not {set!} for the current fiber.

test/async/task.rb

Lines changed: 59 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -841,13 +841,70 @@ def sleep_forever
841841

842842
it "can gets in a task" do
843843
IO.pipe do |input, output|
844-
Async do
844+
Async do
845845
Async do
846846
expect(input.gets).to be == "hello\n"
847847
end
848848
output.puts "hello"
849849
end
850850
end
851851
end
852-
852+
853+
with '#defer_stop' do
854+
it "can defer stopping a task" do
855+
child_task = reactor.async do |task|
856+
task.defer_stop do
857+
sleep
858+
end
859+
end
860+
861+
reactor.run_once(0)
862+
863+
child_task.stop
864+
expect(child_task).to be(:running?)
865+
866+
child_task.stop
867+
expect(child_task).to be(:stopped?)
868+
end
869+
870+
it "will stop the task if it was deferred" do
871+
condition = Async::Notification.new
872+
873+
child_task = reactor.async do |task|
874+
task.defer_stop do
875+
condition.wait
876+
end
877+
end
878+
879+
reactor.run_once(0)
880+
881+
child_task.stop(true)
882+
expect(child_task).to be(:running?)
883+
884+
reactor.async do
885+
condition.signal
886+
end
887+
888+
reactor.run_once(0)
889+
expect(child_task).to be(:stopped?)
890+
end
891+
892+
it "can defer stop in a deferred stop" do
893+
child_task = reactor.async do |task|
894+
task.defer_stop do
895+
task.defer_stop do
896+
sleep
897+
end
898+
end
899+
end
900+
901+
reactor.run_once(0)
902+
903+
child_task.stop
904+
expect(child_task).to be(:running?)
905+
906+
child_task.stop
907+
expect(child_task).to be(:stopped?)
908+
end
909+
end
853910
end

0 commit comments

Comments
 (0)