Skip to content

Commit 1bb6304

Browse files
committed
Raw port of Actress
needs more work and discussions
1 parent ab03fae commit 1bb6304

File tree

4 files changed

+351
-0
lines changed

4 files changed

+351
-0
lines changed

Gemfile

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,11 @@ source 'https://rubygems.org'
22

33
gemspec
44

5+
group :actress do
6+
gem 'algebrick'
7+
gem 'atomic'
8+
end
9+
510
group :development do
611
gem 'rake', '~> 10.2.2'
712
gem 'countloc', '~> 0.4.0', platforms: :mri

lib/concurrent.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
require 'concurrent/supervisor'
2929
require 'concurrent/timer_task'
3030
require 'concurrent/tvar'
31+
require 'concurrent/actress'
3132

3233
# Modern concurrency tools for Ruby. Inspired by Erlang, Clojure, Scala, Haskell,
3334
# F#, C#, Java, and classic concurrency patterns.

lib/concurrent/actress.rb

Lines changed: 295 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,295 @@
1+
require 'algebrick'
2+
require 'atomic'
3+
require 'logger'
4+
5+
module Concurrent
6+
module Actress
7+
Error = Class.new(StandardError)
8+
9+
class ActressTerminated < Error
10+
include Algebrick::TypeCheck
11+
12+
def initialize(reference)
13+
Type! reference, Reference
14+
super reference.path
15+
end
16+
end
17+
18+
def self.current
19+
Thread.current[:__current_actress__]
20+
end
21+
22+
module CoreDelegations
23+
def path
24+
core.path
25+
end
26+
27+
def parent
28+
core.parent
29+
end
30+
31+
def terminated?
32+
core.terminated?
33+
end
34+
35+
def reference
36+
core.reference
37+
end
38+
39+
alias_method :ref, :reference
40+
end
41+
42+
class Reference
43+
include Algebrick::TypeCheck
44+
include Algebrick::Types
45+
include CoreDelegations
46+
47+
attr_reader :core
48+
private :core
49+
50+
def initialize(core)
51+
@core = Type! core, Core
52+
end
53+
54+
55+
def tell(message)
56+
message message, nil
57+
end
58+
59+
alias_method :<<, :tell
60+
61+
def ask(message, ivar = IVar.new)
62+
message message, ivar
63+
end
64+
65+
def message(message, ivar = nil)
66+
core.on_envelope Envelope[message,
67+
ivar ? Some[IVar][ivar] : None,
68+
Actress.current ? Some[Reference][Actress.current] : None]
69+
return ivar || self
70+
end
71+
72+
def to_s
73+
"#<#{self.class} #{path}>"
74+
end
75+
76+
alias_method :inspect, :to_s
77+
78+
def ==(other)
79+
Type? other, self.class and other.send(:core) == core
80+
end
81+
end
82+
83+
include Algebrick::Types
84+
85+
Envelope = Algebrick.type do
86+
fields! message: Object,
87+
ivar: Maybe[IVar],
88+
sender: Maybe[Reference]
89+
end
90+
91+
module Envelope
92+
def sender_path
93+
sender.maybe { |reference| reference.path } || 'outside-actress'
94+
end
95+
96+
def reject!(error)
97+
ivar.maybe { |v| v.fail error }
98+
end
99+
end
100+
101+
class Core
102+
include Algebrick::TypeCheck
103+
104+
attr_reader :reference, :name, :path, :logger, :parent_core
105+
private :parent_core
106+
107+
def initialize(parent, name, actress_class, *args, &block)
108+
@mailbox = Array.new
109+
@one_by_one = OneByOne.new
110+
@executor = Concurrent.configuration.global_task_pool # TODO configurable
111+
@parent_core = (Type! parent, Reference, NilClass) && parent.send(:core)
112+
@name = (Type! name, String, Symbol).to_s
113+
@children = Atomic.new []
114+
@path = @parent_core ? File.join(@parent_core.path, @name) : @name
115+
@logger = Logger.new($stderr) # TODO add proper logging
116+
@logger.progname = @path
117+
@reference = Reference.new self
118+
# noinspection RubyArgCount
119+
@terminated = Event.new
120+
@mutex = Mutex.new
121+
122+
@actress_class = Child! actress_class, Abstract
123+
schedule_execution do
124+
parent_core.add_child reference if parent_core
125+
@actress = actress_class.new self, *args, &block # FIXME it may fail
126+
end
127+
end
128+
129+
def parent
130+
@parent_core.reference
131+
end
132+
133+
def children
134+
@children.get
135+
end
136+
137+
def add_child(child)
138+
Type! child, Reference
139+
@children.update { |o| [*o, child] }
140+
end
141+
142+
def remove_child(child)
143+
Type! child, Reference
144+
@children.update { |o| o - [child] }
145+
end
146+
147+
def on_envelope(envelope)
148+
schedule_execution { execute_on_envelope envelope }
149+
end
150+
151+
def terminated?
152+
@terminated.set?
153+
end
154+
155+
def terminate!
156+
guard!
157+
@terminated.set
158+
parent_core.remove_child reference if parent_core
159+
end
160+
161+
def guard!
162+
raise 'can be called only inside this actor' unless Actress.current == reference
163+
end
164+
165+
private
166+
167+
def process?
168+
unless @mailbox.empty? || @receive_envelope_scheduled
169+
@receive_envelope_scheduled = true
170+
schedule_execution { receive_envelope }
171+
end
172+
end
173+
174+
def receive_envelope
175+
envelope = @mailbox.shift
176+
177+
if terminated?
178+
# FIXME make sure that it cannot be GCed before all messages are rejected after termination
179+
reject_envelope envelope
180+
logger.debug "rejected #{envelope.message} from #{envelope.sender_path}"
181+
return
182+
end
183+
logger.debug "received #{envelope.message} from #{envelope.sender_path}"
184+
185+
result = @actress.on_envelope envelope
186+
envelope.ivar.maybe { |iv| iv.set result }
187+
rescue => error
188+
logger.error error
189+
envelope.ivar.maybe { |iv| iv.fail error }
190+
ensure
191+
@receive_envelope_scheduled = false
192+
process?
193+
end
194+
195+
def schedule_execution
196+
@one_by_one.post(@executor) do
197+
begin
198+
# TODO enable this mutex only on JRuby
199+
@mutex.lock # only for JRuby
200+
Thread.current[:__current_actress__] = reference
201+
yield
202+
rescue => e
203+
puts e
204+
ensure
205+
Thread.current[:__current_actress__] = nil
206+
@mutex.unlock # only for JRuby
207+
end
208+
end
209+
end
210+
211+
def execute_on_envelope(envelope)
212+
if terminated?
213+
reject_envelope envelope
214+
else
215+
@mailbox.push envelope
216+
end
217+
process?
218+
end
219+
220+
def create_and_set_actor(actress_class, block, *args)
221+
parent_core.add_child reference if parent_core
222+
@actress = actress_class.new self, *args, &block # FIXME may fail
223+
end
224+
225+
def reject_envelope(envelope)
226+
envelope.reject! ActressTerminated.new(reference)
227+
end
228+
end
229+
230+
class Abstract
231+
include Algebrick::TypeCheck
232+
extend Algebrick::TypeCheck
233+
include Algebrick::Matching
234+
include CoreDelegations
235+
236+
attr_reader :core
237+
238+
def self.new(core, *args, &block)
239+
allocate.tap do |actress|
240+
actress.__send__ :pre_initialize, core
241+
actress.__send__ :initialize, *args, &block
242+
end
243+
end
244+
245+
def on_message(message)
246+
raise NotImplementedError
247+
end
248+
249+
def logger
250+
core.logger
251+
end
252+
253+
def on_envelope(envelope)
254+
@envelope = envelope
255+
on_message envelope.message
256+
ensure
257+
@envelope = nil
258+
end
259+
260+
def spawn(actress_class, name, *args, &block)
261+
Actress.spawn(actress_class, name, *args, &block)
262+
end
263+
264+
def children
265+
core.children
266+
end
267+
268+
def terminate!
269+
core.terminate!
270+
end
271+
272+
private
273+
274+
def pre_initialize(core)
275+
@core = Type! core, Core
276+
end
277+
278+
def envelope
279+
@envelope or raise 'envelope not set'
280+
end
281+
end
282+
283+
class Root < Abstract
284+
def on_message(message)
285+
# ignore
286+
end
287+
end
288+
289+
ROOT = Core.new(nil, '/', Root).reference
290+
291+
def self.spawn(actress_class, name, *args, &block)
292+
Core.new(Actress.current || ROOT, name, actress_class, *args, &block).reference
293+
end
294+
end
295+
end

spec/concurrent/actress_spec.rb

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
require 'spec_helper'
2+
require_relative 'dereferenceable_shared'
3+
require_relative 'observable_shared'
4+
5+
module Concurrent
6+
7+
describe Actress do
8+
Child = Algebrick.atom
9+
Terminate = Algebrick.atom
10+
11+
class Ping < Actress::Abstract
12+
13+
def initialize(queue)
14+
@queue = queue
15+
end
16+
17+
def on_message(message)
18+
match message,
19+
on(Terminate) { terminate! },
20+
on(Child) { spawn Ping, :pong, @queue },
21+
(on(any) do
22+
@queue << message
23+
message
24+
end)
25+
26+
end
27+
end
28+
29+
it 'works' do
30+
queue = Queue.new
31+
actor = Actress.spawn Ping, :ping, queue
32+
33+
actor << 'a' << 1
34+
queue.pop.should eq 'a'
35+
actor.ask(2).value.should eq 2
36+
37+
actor.parent.should eq Actress::ROOT
38+
Actress::ROOT.path.should eq '/'
39+
actor.path.should eq '/ping'
40+
child = actor.ask(Child).value
41+
child.path.should eq '/ping/pong'
42+
queue.clear
43+
child.ask(3)
44+
queue.pop.should eq 3
45+
46+
actor << Terminate
47+
actor.ask(:blow_up).wait.rejected?.should be_true
48+
end
49+
end
50+
end

0 commit comments

Comments
 (0)