Skip to content

Commit 39e5e08

Browse files
committed
ErlangActor
* add Functions * guarantee ask always returns * other minor improvements
1 parent c030575 commit 39e5e08

File tree

5 files changed

+486
-126
lines changed

5 files changed

+486
-126
lines changed

docs-source/erlang_actor.in.md

Lines changed: 115 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,118 @@ actor.tell :m
6060
actor.terminated.value!
6161
```
6262

63-
TBA
63+
The received message type can be limited.
64+
65+
```ruby
66+
Concurrent::ErlangActor.
67+
spawn(:on_thread) { receive(Numeric).succ }.
68+
tell('junk'). # ignored message
69+
tell(42).
70+
terminated.value!
71+
```
72+
73+
On pool it requires a block.
74+
75+
```ruby
76+
Concurrent::ErlangActor.
77+
spawn(:on_pool) { receive(Numeric) { |v| v.succ } }.
78+
tell('junk'). # ignored message
79+
tell(42).
80+
terminated.value!
81+
```
82+
83+
By the way, the body written for on pool actor will work for on thread actor
84+
as well.
85+
86+
```ruby
87+
Concurrent::ErlangActor.
88+
spawn(:on_thread) { receive(Numeric) { |v| v.succ } }.
89+
tell('junk'). # ignored message
90+
tell(42).
91+
terminated.value!
92+
```
93+
94+
The `receive` method can be also used to dispatch based on the received message.
95+
96+
```ruby
97+
actor = Concurrent::ErlangActor.spawn(:on_thread) do
98+
while true
99+
receive(on(Symbol) { |s| reply s.to_s },
100+
on(And[Numeric, -> v { v >= 0 }]) { |v| reply v.succ },
101+
# put last works as else
102+
on(ANY) do |v|
103+
reply :bad_message
104+
terminate [:bad_message, v]
105+
end)
106+
end
107+
end
108+
actor.ask 1
109+
actor.ask 2
110+
actor.ask :value
111+
# this malformed message will terminate the actor
112+
actor.ask -1
113+
# the actor is no longer alive, so ask fails
114+
actor.ask "junk" rescue $!
115+
actor.terminated.result
116+
```
117+
118+
And a same thing for the actor on pool.
119+
Since it cannot loop it will call the body method repeatedly.
120+
121+
```ruby
122+
module Behaviour
123+
def body
124+
receive(on(Symbol) do |s|
125+
reply s.to_s
126+
body # call again
127+
end,
128+
on(And[Numeric, -> v { v >= 0 }]) do |v|
129+
reply v.succ
130+
body # call again
131+
end,
132+
# put last works as else
133+
on(ANY) do |v|
134+
reply :bad_message
135+
terminate [:bad_message, v]
136+
end)
137+
end
138+
end
139+
140+
actor = Concurrent::ErlangActor.spawn(:on_pool, environment: Behaviour) { body }
141+
actor.ask 1
142+
actor.ask 2
143+
actor.ask :value
144+
# this malformed message will terminate the actor
145+
actor.ask -1
146+
# the actor is no longer alive, so ask fails
147+
actor.ask "junk" rescue $!
148+
actor.terminated.result
149+
```
150+
151+
Since the behavior is stable in this case we can simplify with the `:keep` option
152+
that will keep the receive rules until another receive is called
153+
replacing the kept rules.
154+
155+
```ruby
156+
actor = Concurrent::ErlangActor.spawn(:on_pool) do
157+
receive(on(Symbol) { |s| reply s.to_s },
158+
on(And[Numeric, -> v { v >= 0 }]) { |v| reply v.succ },
159+
# put last works as else
160+
on(ANY) do |v|
161+
reply :bad_message
162+
terminate [:bad_message, v]
163+
end,
164+
keep: true)
165+
end
166+
actor.ask 1
167+
actor.ask 2
168+
actor.ask :value
169+
# this malformed message will terminate the actor
170+
actor.ask -1
171+
# the actor is no longer alive, so ask fails
172+
actor.ask "junk" rescue $!
173+
actor.terminated.result
174+
```
64175

65176
### Actor types
66177

@@ -143,3 +254,6 @@ actor.terminated.value!
143254
* Back pressure with bounded mailbox
144255
* _op methods
145256
* types of actors
257+
* always use timeout
258+
* drop and log unrecognized messages, or just terminate
259+
* Functions module

docs-source/erlang_actor.init.rb

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
require 'concurrent-edge'
22

3+
include Concurrent::ErlangActor::EnvironmentConstants
4+
35
def do_stuff(*args)
46
sleep 0.01
57
:stuff

docs-source/erlang_actor.out.md

Lines changed: 136 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ Although, `Promises.future { 1 + 1 }` is better suited for that purpose.
55

66
```ruby
77
actor = Concurrent::ErlangActor.spawn(:on_thread, name: 'addition') { 1 + 1 }
8-
# => #<Concurrent::ErlangActor::Pid:0x000002 addition>
8+
# => #<Concurrent::ErlangActor::Pid:0x000002 addition terminated normally with 2>
99
actor.terminated.value! # => 2
1010
```
1111

@@ -25,7 +25,7 @@ actor = Concurrent::ErlangActor.spawn(:on_thread, name: 'sum') do
2525
end
2626
sum
2727
end
28-
# => #<Concurrent::ErlangActor::Pid:0x000003 sum>
28+
# => #<Concurrent::ErlangActor::Pid:0x000003 sum running>
2929
```
3030

3131
The actor can be either told a message asynchronously,
@@ -34,12 +34,12 @@ or asked. The ask method will block until actor replies.
3434
```ruby
3535
# tell returns immediately returning the actor
3636
actor.tell(1).tell(1)
37-
# => #<Concurrent::ErlangActor::Pid:0x000003 sum>
37+
# => #<Concurrent::ErlangActor::Pid:0x000003 sum running>
3838
# blocks, waiting for the answer
3939
actor.ask 10 # => 12
4040
# stop the actor
4141
actor.tell :done
42-
# => #<Concurrent::ErlangActor::Pid:0x000003 sum>
42+
# => #<Concurrent::ErlangActor::Pid:0x000003 sum running>
4343
actor.terminated.value! # => 12
4444
```
4545

@@ -49,9 +49,9 @@ Simplest message receive.
4949

5050
```ruby
5151
actor = Concurrent::ErlangActor.spawn(:on_thread) { receive }
52-
# => #<Concurrent::ErlangActor::Pid:0x000004>
52+
# => #<Concurrent::ErlangActor::Pid:0x000004 running>
5353
actor.tell :m
54-
# => #<Concurrent::ErlangActor::Pid:0x000004>
54+
# => #<Concurrent::ErlangActor::Pid:0x000004 running>
5555
actor.terminated.value! # => :m
5656
```
5757

@@ -60,16 +60,133 @@ because if no block is given it will use a default block `{ |v| v }`
6060

6161
```ruby
6262
actor = Concurrent::ErlangActor.spawn(:on_pool) { receive { |v| v } }
63-
# => #<Concurrent::ErlangActor::Pid:0x000005>
63+
# => #<Concurrent::ErlangActor::Pid:0x000005 running>
6464
# can simply be following
6565
actor = Concurrent::ErlangActor.spawn(:on_pool) { receive }
66-
# => #<Concurrent::ErlangActor::Pid:0x000006>
66+
# => #<Concurrent::ErlangActor::Pid:0x000006 running>
6767
actor.tell :m
68-
# => #<Concurrent::ErlangActor::Pid:0x000006>
68+
# => #<Concurrent::ErlangActor::Pid:0x000006 running>
6969
actor.terminated.value! # => :m
7070
```
7171

72-
TBA
72+
The received message type can be limited.
73+
74+
```ruby
75+
Concurrent::ErlangActor.
76+
spawn(:on_thread) { receive(Numeric).succ }.
77+
tell('junk'). # ignored message
78+
tell(42).
79+
terminated.value! # => 43
80+
```
81+
82+
On pool it requires a block.
83+
84+
```ruby
85+
Concurrent::ErlangActor.
86+
spawn(:on_pool) { receive(Numeric) { |v| v.succ } }.
87+
tell('junk'). # ignored message
88+
tell(42).
89+
terminated.value! # => 43
90+
```
91+
92+
By the way, the body written for on pool actor will work for on thread actor
93+
as well.
94+
95+
```ruby
96+
Concurrent::ErlangActor.
97+
spawn(:on_thread) { receive(Numeric) { |v| v.succ } }.
98+
tell('junk'). # ignored message
99+
tell(42).
100+
terminated.value! # => 43
101+
```
102+
103+
The `receive` method can be also used to dispatch based on the received message.
104+
105+
```ruby
106+
actor = Concurrent::ErlangActor.spawn(:on_thread) do
107+
while true
108+
receive(on(Symbol) { |s| reply s.to_s },
109+
on(And[Numeric, -> v { v >= 0 }]) { |v| reply v.succ },
110+
# put last works as else
111+
on(ANY) do |v|
112+
reply :bad_message
113+
terminate [:bad_message, v]
114+
end)
115+
end
116+
end
117+
# => #<Concurrent::ErlangActor::Pid:0x000007 running>
118+
actor.ask 1 # => 2
119+
actor.ask 2 # => 3
120+
actor.ask :value # => "value"
121+
# this malformed message will terminate the actor
122+
actor.ask -1 # => :bad_message
123+
# the actor is no longer alive, so ask fails
124+
actor.ask "junk" rescue $!
125+
# => #<Concurrent::ErlangActor::NoActor: #<Concurrent::ErlangActor::Pid:0x000007 terminated because of [:bad_message, -1]>>
126+
actor.terminated.result # => [false, nil, [:bad_message, -1]]
127+
```
128+
129+
And a same thing for the actor on pool.
130+
Since it cannot loop it will call the body method repeatedly.
131+
132+
```ruby
133+
module Behaviour
134+
def body
135+
receive(on(Symbol) do |s|
136+
reply s.to_s
137+
body # call again
138+
end,
139+
on(And[Numeric, -> v { v >= 0 }]) do |v|
140+
reply v.succ
141+
body # call again
142+
end,
143+
# put last works as else
144+
on(ANY) do |v|
145+
reply :bad_message
146+
terminate [:bad_message, v]
147+
end)
148+
end
149+
end # => :body
150+
151+
actor = Concurrent::ErlangActor.spawn(:on_pool, environment: Behaviour) { body }
152+
# => #<Concurrent::ErlangActor::Pid:0x000008 running>
153+
actor.ask 1 # => 2
154+
actor.ask 2 # => 3
155+
actor.ask :value # => "value"
156+
# this malformed message will terminate the actor
157+
actor.ask -1 # => :bad_message
158+
# the actor is no longer alive, so ask fails
159+
actor.ask "junk" rescue $!
160+
# => #<Concurrent::ErlangActor::NoActor: #<Concurrent::ErlangActor::Pid:0x000008 terminated because of [:bad_message, -1]>>
161+
actor.terminated.result # => [false, nil, [:bad_message, -1]]
162+
```
163+
164+
Since the behavior is stable in this case we can simplify with the `:keep` option
165+
that will keep the receive rules until another receive is called
166+
replacing the kept rules.
167+
168+
```ruby
169+
actor = Concurrent::ErlangActor.spawn(:on_pool) do
170+
receive(on(Symbol) { |s| reply s.to_s },
171+
on(And[Numeric, -> v { v >= 0 }]) { |v| reply v.succ },
172+
# put last works as else
173+
on(ANY) do |v|
174+
reply :bad_message
175+
terminate [:bad_message, v]
176+
end,
177+
keep: true)
178+
end
179+
# => #<Concurrent::ErlangActor::Pid:0x000009 running>
180+
actor.ask 1 # => 2
181+
actor.ask 2 # => 3
182+
actor.ask :value # => "value"
183+
# this malformed message will terminate the actor
184+
actor.ask -1 # => :bad_message
185+
# the actor is no longer alive, so ask fails
186+
actor.ask "junk" rescue $!
187+
# => #<Concurrent::ErlangActor::NoActor: #<Concurrent::ErlangActor::Pid:0x000009 terminated because of [:bad_message, -1]>>
188+
actor.terminated.result # => [false, nil, [:bad_message, -1]]
189+
```
73190

74191
### Actor types
75192

@@ -93,7 +210,7 @@ Let's have a look at how the bodies of actors differ between the types:
93210

94211
```ruby
95212
ping = Concurrent::ErlangActor.spawn(:on_thread) { reply receive }
96-
# => #<Concurrent::ErlangActor::Pid:0x000007>
213+
# => #<Concurrent::ErlangActor::Pid:0x00000a running>
97214
ping.ask 42 # => 42
98215
```
99216

@@ -107,7 +224,7 @@ after the message is received has to be provided.
107224

108225
```ruby
109226
ping = Concurrent::ErlangActor.spawn(:on_pool) { receive { |m| reply m } }
110-
# => #<Concurrent::ErlangActor::Pid:0x000008>
227+
# => #<Concurrent::ErlangActor::Pid:0x00000b running>
111228
ping.ask 42 # => 42
112229
```
113230

@@ -144,10 +261,11 @@ actor = Concurrent::ErlangActor.spawn(:on_thread) do
144261
trap # equivalent of process_flag(trap_exit, true)
145262
receive
146263
end
147-
# => #<Concurrent::ErlangActor::Pid:0x000009>
264+
# => #<Concurrent::ErlangActor::Pid:0x00000c running>
148265
actor.terminated.value!
149-
# => #<Concurrent::ErlangActor::Exit:0x00000a
150-
# @from=#<Concurrent::ErlangActor::Pid:0x00000b>,
266+
# => #<Concurrent::ErlangActor::Exit:0x00000d
267+
# @from=
268+
# #<Concurrent::ErlangActor::Pid:0x00000e terminated because of err>,
151269
# @link_terminated=true,
152270
# @reason=:err>
153271
```
@@ -159,3 +277,6 @@ actor.terminated.value!
159277
* Back pressure with bounded mailbox
160278
* _op methods
161279
* types of actors
280+
* always use timeout
281+
* drop and log unrecognized messages, or just terminate
282+
* Functions module

0 commit comments

Comments
 (0)