Skip to content

Commit 5181f45

Browse files
committed
Initial implementation of ActorContext, ActorRef, and the #spawn factory.
1 parent 7c525e2 commit 5181f45

File tree

7 files changed

+434
-0
lines changed

7 files changed

+434
-0
lines changed

lib/concurrent.rb

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@
3232
require 'concurrent/tvar'
3333
require 'concurrent/utilities'
3434

35+
require 'concurrent/actor_context'
36+
require 'concurrent/simple_actor_ref'
37+
3538
require 'concurrent/cached_thread_pool'
3639
require 'concurrent/fixed_thread_pool'
3740
require 'concurrent/immediate_executor'

lib/concurrent/actor_context.rb

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
require 'concurrent/simple_actor_ref'
2+
3+
module Concurrent
4+
5+
module ActorContext
6+
7+
def on_start
8+
end
9+
10+
def on_restart
11+
end
12+
13+
def on_shutdown
14+
end
15+
16+
def self.included(base)
17+
18+
def base.spawn
19+
Concurrent::SimpleActorRef.new(self.new)
20+
end
21+
end
22+
end
23+
end

lib/concurrent/actor_ref.rb

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
require 'concurrent/utilities'
2+
3+
module Concurrent
4+
5+
module ActorRef
6+
7+
def running?
8+
true
9+
end
10+
11+
def shutdown?
12+
false
13+
end
14+
15+
def post(*msg, &block)
16+
raise NotImplementedError
17+
end
18+
19+
def post!(*msg)
20+
raise NotImplementedError
21+
end
22+
23+
def <<(message)
24+
post(*message)
25+
self
26+
end
27+
end
28+
end

lib/concurrent/simple_actor_ref.rb

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
require 'thread'
2+
3+
require 'concurrent/actor_ref'
4+
require 'concurrent/ivar'
5+
6+
module Concurrent
7+
8+
class SimpleActorRef
9+
include ActorRef
10+
11+
def initialize(actor)
12+
@actor = actor
13+
@mutex = Mutex.new
14+
@queue = Queue.new
15+
@thread = nil
16+
@stopped = false
17+
end
18+
19+
def running?
20+
@mutex.synchronize{ @stopped == false }
21+
end
22+
23+
def shutdown?
24+
@mutex.synchronize{ @stopped == true }
25+
end
26+
27+
def post(*msg, &block)
28+
raise ArgumentError.new('message cannot be empty') if msg.empty?
29+
@mutex.synchronize do
30+
supervise unless @stopped == true
31+
end
32+
ivar = IVar.new
33+
@queue.push(Message.new(msg, ivar, block))
34+
ivar
35+
end
36+
37+
def post!(seconds, *msg)
38+
raise Concurrent::TimeoutError if seconds == 0
39+
ivar = self.post(*msg)
40+
ivar.value(seconds)
41+
if ivar.incomplete?
42+
raise Concurrent::TimeoutError
43+
elsif ivar.reason
44+
raise ivar.reason
45+
end
46+
ivar.value
47+
end
48+
49+
def shutdown
50+
@mutex.synchronize do
51+
@stopped = true
52+
if @thread && @thread.alive?
53+
@thread.kill
54+
@actor.on_shutdown
55+
end
56+
end
57+
end
58+
59+
private
60+
61+
Message = Struct.new(:payload, :ivar, :callback)
62+
63+
def supervise
64+
if @thread.nil?
65+
@actor.on_start
66+
@thread = new_worker_thread
67+
elsif ! @thread.alive?
68+
@actor.on_restart
69+
@thread = new_worker_thread
70+
end
71+
end
72+
73+
def new_worker_thread
74+
Thread.new do
75+
Thread.current.abort_on_exception = true
76+
run_message_loop
77+
end
78+
end
79+
80+
def run_message_loop
81+
loop do
82+
message = @queue.pop
83+
result = ex = nil
84+
85+
begin
86+
result = @actor.receive(*message.payload)
87+
rescue => ex
88+
# suppress
89+
ensure
90+
message.ivar.complete(ex.nil?, result, ex)
91+
message.callback.call(Time.now, result, ex) if message.callback
92+
end
93+
end
94+
end
95+
end
96+
end

spec/concurrent/actor_context_spec.rb

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
require 'spec_helper'
2+
3+
module Concurrent
4+
5+
describe ActorContext do
6+
7+
let(:described_class) do
8+
Class.new do
9+
include ActorContext
10+
end
11+
end
12+
13+
context 'callbacks' do
14+
15+
subject { described_class.new }
16+
17+
specify { subject.should respond_to :on_start }
18+
19+
specify { subject.should respond_to :on_restart }
20+
21+
specify { subject.should respond_to :on_shutdown }
22+
end
23+
24+
context '#spawn' do
25+
26+
it 'returns an ActorRef' do
27+
described_class.spawn.should be_a ActorRef
28+
end
29+
end
30+
end
31+
end

spec/concurrent/actor_ref_shared.rb

Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
require 'spec_helper'
2+
3+
def shared_actor_test_class
4+
Class.new do
5+
include Concurrent::ActorContext
6+
def receive(*msg)
7+
case msg.first
8+
when :poison
9+
raise StandardError
10+
when :terminate
11+
Thread.current.kill
12+
when :sleep
13+
sleep(msg.last)
14+
when :check
15+
msg[1].set(msg.last)
16+
else
17+
msg.first
18+
end
19+
end
20+
end
21+
end
22+
23+
share_examples_for :actor_ref do
24+
25+
it 'includes ActorRef' do
26+
subject.should be_a Concurrent::ActorRef
27+
end
28+
29+
context 'running and shutdown' do
30+
31+
specify { subject.should respond_to :shutdown }
32+
33+
specify { subject.should be_running }
34+
35+
specify { subject.should_not be_shutdown }
36+
37+
specify do
38+
subject.shutdown
39+
sleep(0.1)
40+
subject.should be_shutdown
41+
end
42+
end
43+
44+
context '#post' do
45+
46+
it 'raises an exception when the message is empty' do
47+
expect {
48+
subject.post
49+
}.to raise_error(ArgumentError)
50+
end
51+
52+
it 'returns an IVar' do
53+
subject.post(:foo).should be_a Concurrent::IVar
54+
end
55+
56+
it 'fulfills the IVar when message is processed' do
57+
ivar = subject.post(:foo)
58+
sleep(0.1)
59+
ivar.should be_fulfilled
60+
ivar.value.should eq :foo
61+
end
62+
63+
it 'rejects the IVar when message processing fails' do
64+
ivar = subject.post(:poison)
65+
sleep(0.1)
66+
ivar.should be_rejected
67+
ivar.reason.should be_a StandardError
68+
end
69+
end
70+
71+
context '#<<' do
72+
73+
it 'posts the message' do
74+
ivar = Concurrent::IVar.new
75+
subject << [:check, ivar, :foo]
76+
ivar.value(0.1).should eq :foo
77+
end
78+
79+
it 'returns self' do
80+
(subject << [1,2,3,4]).should eq subject
81+
end
82+
end
83+
84+
context '#post with callback' do
85+
86+
specify 'on success calls the callback with time and value' do
87+
expected_value = expected_reason = nil
88+
subject.post(:foo) do |time, value, reason|
89+
expected_value = value
90+
expected_reason = reason
91+
end
92+
sleep(0.1)
93+
94+
expected_value.should eq :foo
95+
expected_reason.should be_nil
96+
end
97+
98+
specify 'on failure calls the callback with time and reason' do
99+
expected_value = expected_reason = nil
100+
subject.post(:poison) do |time, value, reason|
101+
expected_value = value
102+
expected_reason = reason
103+
end
104+
sleep(0.1)
105+
106+
expected_value.should be_nil
107+
expected_reason.should be_a StandardError
108+
end
109+
end
110+
111+
context '#post!' do
112+
113+
it 'raises an exception when the message is empty' do
114+
expect {
115+
subject.post!(1)
116+
}.to raise_error(ArgumentError)
117+
end
118+
119+
it 'blocks for up to the given number of seconds' do
120+
start = Time.now.to_f
121+
begin
122+
subject.post!(1, :sleep, 2)
123+
rescue
124+
end
125+
delta = Time.now.to_f - start
126+
delta.should >= 1
127+
delta.should <= 2
128+
end
129+
130+
it 'blocks forever when the timeout is nil' do
131+
start = Time.now.to_f
132+
begin
133+
subject.post!(nil, :sleep, 1)
134+
rescue
135+
end
136+
delta = Time.now.to_f - start
137+
delta.should > 1
138+
end
139+
140+
it 'raises a TimeoutError when timeout is zero' do
141+
expect {
142+
subject.post!(0, :foo)
143+
}.to raise_error(Concurrent::TimeoutError)
144+
end
145+
146+
it 'raises a TimeoutError when the timeout is reached' do
147+
expect {
148+
subject.post!(1, :sleep, 10)
149+
}.to raise_error(Concurrent::TimeoutError)
150+
end
151+
152+
it 'returns the result of success processing' do
153+
subject.post!(1, :foo).should eq :foo
154+
end
155+
156+
it 'bubbles exceptions thrown during processing' do
157+
expect {
158+
subject.post!(1, :poison)
159+
}.to raise_error(StandardError)
160+
end
161+
end
162+
end

0 commit comments

Comments
 (0)