Skip to content

Commit 1c9e37a

Browse files
committed
Added #join method to SimpleActorRef and shared specs.
1 parent 0b08c40 commit 1c9e37a

File tree

2 files changed

+62
-6
lines changed

2 files changed

+62
-6
lines changed

lib/concurrent/simple_actor_ref.rb

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
require 'thread'
22

33
require 'concurrent/actor_ref'
4+
require 'concurrent/event'
45
require 'concurrent/ivar'
56

67
module Concurrent
@@ -13,25 +14,25 @@ def initialize(actor, opts = {})
1314
@mutex = Mutex.new
1415
@queue = Queue.new
1516
@thread = nil
16-
@stopped = false
17+
@stop_event = Event.new
1718
@abort_on_exception = opts.fetch(:abort_on_exception, true)
1819
@reset_on_error = opts.fetch(:reset_on_error, true)
1920
@exception_class = opts.fetch(:rescue_exception, false) ? Exception : StandardError
2021
@observers = CopyOnNotifyObserverSet.new
2122
end
2223

2324
def running?
24-
@mutex.synchronize{ @stopped == false }
25+
! @stop_event.set?
2526
end
2627

2728
def shutdown?
28-
@mutex.synchronize{ @stopped == true }
29+
@stop_event.set?
2930
end
3031

3132
def post(*msg, &block)
3233
raise ArgumentError.new('message cannot be empty') if msg.empty?
3334
@mutex.synchronize do
34-
supervise unless @stopped == true
35+
supervise unless shutdown?
3536
end
3637
ivar = IVar.new
3738
@queue.push(Message.new(msg, ivar, block))
@@ -52,15 +53,19 @@ def post!(seconds, *msg)
5253

5354
def shutdown
5455
@mutex.synchronize do
55-
return if @stopped
56-
@stopped = true
56+
return if shutdown?
5757
if @thread && @thread.alive?
5858
@thread.kill
5959
@actor.on_shutdown
6060
end
61+
@stop_event.set
6162
end
6263
end
6364

65+
def join(timeout = nil)
66+
@stop_event.wait(timeout)
67+
end
68+
6469
private
6570

6671
Message = Struct.new(:payload, :ivar, :callback)

spec/concurrent/actor_ref_shared.rb

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,57 @@ def receive(*msg)
169169
end
170170
end
171171

172+
context '#join' do
173+
174+
it 'blocks until shutdown when no limit is given' do
175+
start = Time.now
176+
subject << :foo # start the actor's thread
177+
Thread.new{ sleep(1); subject.shutdown }
178+
subject.join
179+
stop = Time.now
180+
181+
subject.should be_shutdown
182+
stop.should >= start + 1
183+
stop.should <= start + 2
184+
end
185+
186+
it 'blocks for no more than the given number of seconds' do
187+
start = Time.now
188+
subject << :foo # start the actor's thread
189+
Thread.new{ sleep(5); subject.shutdown }
190+
subject.join(1)
191+
stop = Time.now
192+
193+
stop.should >= start + 1
194+
stop.should <= start + 2
195+
end
196+
197+
it 'returns true when shutdown has completed before timeout' do
198+
subject << :foo # start the actor's thread
199+
Thread.new{ sleep(1); subject.shutdown }
200+
subject.join.should be_true
201+
end
202+
203+
it 'returns false on timeout' do
204+
subject << :foo # start the actor's thread
205+
Thread.new{ sleep(5); subject.shutdown }
206+
subject.join(1).should be_false
207+
end
208+
209+
it 'returns immediately when already shutdown' do
210+
start = Time.now
211+
subject << :foo # start the actor's thread
212+
sleep(0.1)
213+
subject.shutdown
214+
sleep(0.1)
215+
216+
start = Time.now
217+
subject.join
218+
Time.now.should >= start
219+
Time.now.should <= start + 0.1
220+
end
221+
end
222+
172223
context 'observation' do
173224

174225
let(:observer_class) do

0 commit comments

Comments
 (0)