Skip to content

Commit 0592010

Browse files
committed
Merge pull request #48 from jdantonio/actor-context
Initial implementation of ActorContext and the #spawn function
2 parents 4f8594e + c76c050 commit 0592010

File tree

8 files changed

+712
-0
lines changed

8 files changed

+712
-0
lines changed

lib/concurrent.rb

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@
3232
require 'concurrent/tvar'
3333
require 'concurrent/utilities'
3434

35+
require 'concurrent/actor_context'
36+
require 'concurrent/simple_actor_ref'
37+
3538
require 'concurrent/cached_thread_pool'
3639
require 'concurrent/fixed_thread_pool'
3740
require 'concurrent/immediate_executor'

lib/concurrent/actor.rb

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,7 @@ def initialize(queue)
179179
#
180180
# @deprecated +Actor+ is being replaced with a completely new framework prior to v1.0.0
181181
def self.pool(count, *args, &block)
182+
warn '[DEPRECATED] `Actor` is deprecated and will be replaced with `ActorContext`.'
182183
raise ArgumentError.new('count must be greater than zero') unless count > 0
183184
mailbox = Queue.new
184185
actors = count.times.collect do
@@ -211,13 +212,15 @@ def self.pool(count, *args, &block)
211212
#
212213
# @!visibility public
213214
def act(*message)
215+
warn '[DEPRECATED] `Actor` is deprecated and will be replaced with `ActorContext`.'
214216
raise NotImplementedError.new("#{self.class} does not implement #act")
215217
end
216218

217219
# @!visibility private
218220
#
219221
# @deprecated +Actor+ is being replaced with a completely new framework prior to v1.0.0
220222
def on_run # :nodoc:
223+
warn '[DEPRECATED] `Actor` is deprecated and will be replaced with `ActorContext`.'
221224
queue.clear
222225
end
223226

lib/concurrent/actor_context.rb

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
require 'concurrent/simple_actor_ref'
2+
3+
module Concurrent
4+
5+
module ActorContext
6+
7+
def on_start
8+
end
9+
10+
def on_reset
11+
end
12+
13+
def on_shutdown
14+
end
15+
16+
def self.included(base)
17+
18+
class << base
19+
protected :new
20+
21+
def spawn(opts = {})
22+
args = opts.fetch(:args, [])
23+
Concurrent::SimpleActorRef.new(self.new(*args), opts)
24+
end
25+
end
26+
end
27+
end
28+
end

lib/concurrent/actor_ref.rb

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
require 'concurrent/copy_on_notify_observer_set'
2+
3+
module Concurrent
4+
5+
module ActorRef
6+
7+
#NOTE: Required API methods
8+
# Must be implemented in all subclasses
9+
#def post(*msg, &block)
10+
#def post!(*msg)
11+
#def running?
12+
#def shutdown?
13+
#def shutdown
14+
#def join(timeout = nil)
15+
16+
def <<(message)
17+
post(*message)
18+
self
19+
end
20+
21+
def add_observer(*args)
22+
@observers.add_observer(*args)
23+
end
24+
25+
def delete_observer(*args)
26+
@observers.delete_observer(*args)
27+
end
28+
29+
def delete_observers
30+
@observers.delete_observers
31+
end
32+
33+
protected
34+
35+
def observers
36+
@observers ||= CopyOnNotifyObserverSet.new
37+
end
38+
end
39+
end

lib/concurrent/simple_actor_ref.rb

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
require 'thread'
2+
3+
require 'concurrent/actor_ref'
4+
require 'concurrent/event'
5+
require 'concurrent/ivar'
6+
7+
module Concurrent
8+
9+
class SimpleActorRef
10+
include ActorRef
11+
12+
def initialize(actor, opts = {})
13+
@actor = actor
14+
@mutex = Mutex.new
15+
@queue = Queue.new
16+
@thread = nil
17+
@stop_event = Event.new
18+
@abort_on_exception = opts.fetch(:abort_on_exception, true)
19+
@reset_on_error = opts.fetch(:reset_on_error, true)
20+
@exception_class = opts.fetch(:rescue_exception, false) ? Exception : StandardError
21+
@observers = CopyOnNotifyObserverSet.new
22+
end
23+
24+
def running?
25+
! @stop_event.set?
26+
end
27+
28+
def shutdown?
29+
@stop_event.set?
30+
end
31+
32+
def post(*msg, &block)
33+
raise ArgumentError.new('message cannot be empty') if msg.empty?
34+
@mutex.synchronize do
35+
supervise unless shutdown?
36+
end
37+
ivar = IVar.new
38+
@queue.push(Message.new(msg, ivar, block))
39+
ivar
40+
end
41+
42+
def post!(seconds, *msg)
43+
raise Concurrent::TimeoutError if seconds == 0
44+
ivar = self.post(*msg)
45+
ivar.value(seconds)
46+
if ivar.incomplete?
47+
raise Concurrent::TimeoutError
48+
elsif ivar.reason
49+
raise ivar.reason
50+
end
51+
ivar.value
52+
end
53+
54+
def shutdown
55+
@mutex.synchronize do
56+
return if shutdown?
57+
if @thread && @thread.alive?
58+
@thread.kill
59+
@actor.on_shutdown
60+
end
61+
@stop_event.set
62+
end
63+
end
64+
65+
def join(timeout = nil)
66+
@stop_event.wait(timeout)
67+
end
68+
69+
private
70+
71+
Message = Struct.new(:payload, :ivar, :callback)
72+
73+
def supervise
74+
if @thread.nil?
75+
@actor.on_start
76+
@thread = new_worker_thread
77+
elsif ! @thread.alive?
78+
@actor.on_reset
79+
@thread = new_worker_thread
80+
end
81+
end
82+
83+
def new_worker_thread
84+
Thread.new do
85+
Thread.current.abort_on_exception = @abort_on_exception
86+
run_message_loop
87+
end
88+
end
89+
90+
def run_message_loop
91+
loop do
92+
message = @queue.pop
93+
result = ex = nil
94+
95+
begin
96+
result = @actor.receive(*message.payload)
97+
rescue @exception_class => ex
98+
@actor.on_reset if @reset_on_error
99+
ensure
100+
now = Time.now
101+
message.ivar.complete(ex.nil?, result, ex)
102+
message.callback.call(now, result, ex) if message.callback
103+
observers.notify_observers(now, message.payload, result, ex)
104+
end
105+
end
106+
end
107+
end
108+
end

spec/concurrent/actor_context_spec.rb

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
require 'spec_helper'
2+
3+
module Concurrent
4+
5+
describe ActorContext do
6+
7+
let(:described_class) do
8+
Class.new do
9+
include ActorContext
10+
end
11+
end
12+
13+
it 'protects #initialize' do
14+
expect {
15+
described_class.new
16+
}.to raise_error(NoMethodError)
17+
end
18+
19+
context 'callbacks' do
20+
21+
subject { described_class.send(:new) }
22+
23+
specify { subject.should respond_to :on_start }
24+
25+
specify { subject.should respond_to :on_reset }
26+
27+
specify { subject.should respond_to :on_shutdown }
28+
end
29+
30+
context '#spawn' do
31+
32+
it 'returns an ActorRef' do
33+
described_class.spawn.should be_a ActorRef
34+
end
35+
end
36+
end
37+
end

0 commit comments

Comments
 (0)