Skip to content

Commit a5a3024

Browse files
committed
Add ErlangActor implementation
1 parent 85763d2 commit a5a3024

File tree

12 files changed

+2668
-16
lines changed

12 files changed

+2668
-16
lines changed

docs-source/erlang_actor.in.md

Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
## Examples
2+
3+
The simplest example is to use the actor as an asynchronous execution.
4+
Although, `Promises.future { 1 + 1 }` is better suited for that purpose.
5+
6+
```ruby
7+
actor = Concurrent::ErlangActor.spawn(:on_thread, name: 'addition') { 1 + 1 }
8+
actor.terminated.value!
9+
```
10+
11+
Let's send some messages and maintain some internal state
12+
which is what actors are good for.
13+
14+
```ruby
15+
actor = Concurrent::ErlangActor.spawn(:on_thread, name: 'sum') do
16+
sum = 0 # internal state
17+
# receive and sum the messages until the actor gets :done
18+
while true
19+
message = receive
20+
break if message == :done
21+
# if the message is asked and not only told,
22+
# reply with a current sum
23+
reply sum += message
24+
end
25+
sum
26+
end
27+
```
28+
29+
The actor can be either told a message asynchronously,
30+
or asked. The ask method will block until actor replies.
31+
32+
```ruby
33+
# tell returns immediately returning the actor
34+
actor.tell(1).tell(1)
35+
# blocks, waiting for the answer
36+
actor.ask 10
37+
# stop the actor
38+
actor.tell :done
39+
actor.terminated.value!
40+
```
41+
42+
### Receiving
43+
44+
Simplest message receive.
45+
46+
```ruby
47+
actor = Concurrent::ErlangActor.spawn(:on_thread) { receive }
48+
actor.tell :m
49+
actor.terminated.value!
50+
```
51+
52+
which also works for actor on pool,
53+
because if no block is given it will use a default block `{ |v| v }`
54+
55+
```ruby
56+
actor = Concurrent::ErlangActor.spawn(:on_pool) { receive { |v| v } }
57+
# can simply be following
58+
actor = Concurrent::ErlangActor.spawn(:on_pool) { receive }
59+
actor.tell :m
60+
actor.terminated.value!
61+
```
62+
63+
TBA
64+
65+
### Actor types
66+
67+
There are two types of actors.
68+
The type is specified when calling spawn as a first argument,
69+
`Concurrent::ErlangActor.spawn(:on_thread, ...` or
70+
`Concurrent::ErlangActor.spawn(:on_pool, ...`.
71+
72+
The main difference is in how receive method returns.
73+
74+
- `:on_thread` it blocks the thread until message is available,
75+
then it returns or calls the provided block first.
76+
77+
- However, `:on_pool` it has to free up the thread on the receive
78+
call back to the pool. Therefore the call to receive ends the
79+
execution of current scope. The receive has to be given block
80+
or blocks that act as a continuations and are called
81+
when there is message available.
82+
83+
Let's have a look at how the bodies of actors differ between the types:
84+
85+
```ruby
86+
ping = Concurrent::ErlangActor.spawn(:on_thread) { reply receive }
87+
ping.ask 42
88+
```
89+
90+
It first calls receive, which blocks the thread of the actor.
91+
When it returns the received message is passed an an argument to reply,
92+
which replies the same value back to the ask method.
93+
Then the actor terminates normally, because there is nothing else to do.
94+
95+
However when running on pool a block with code which should be evaluated
96+
after the message is received has to be provided.
97+
98+
```ruby
99+
ping = Concurrent::ErlangActor.spawn(:on_pool) { receive { |m| reply m } }
100+
ping.ask 42
101+
```
102+
103+
It starts by calling receive which will remember the given block for later
104+
execution when a message is available and stops executing the current scope.
105+
Later when a message becomes available the previously provided block is given
106+
the message and called. The result of the block is the final value of the
107+
normally terminated actor.
108+
109+
The direct blocking style of `:on_thread` is simpler to write and more straight
110+
forward however it has limitations. Each `:on_thread` actor creates a Thread
111+
taking time and resources.
112+
There is also a limited number of threads the Ruby process can create
113+
so you may hit the limit and fail to create more threads and therefore actors.
114+
115+
Since the `:on_pool` actor runs on a poll of threads, its creations
116+
is faster and cheaper and it does not create new threads.
117+
Therefore there is no limit (only RAM) on how many actors can be created.
118+
119+
To simplify, if you need only few actors `:on_thread` is fine.
120+
However if you will be creating hundreds of actors or
121+
they will be short-lived `:on_pool` should be used.
122+
123+
### Erlang behaviour
124+
125+
The actor matches Erlang processes in behaviour.
126+
Therefore it supports the usual Erlang actor linking, monitoring, exit behaviour, etc.
127+
128+
```ruby
129+
actor = Concurrent::ErlangActor.spawn(:on_thread) do
130+
spawn(link: true) do # equivalent of spawn_link in Erlang
131+
terminate :err # equivalent of exit in Erlang
132+
end
133+
trap # equivalent of process_flag(trap_exit, true)
134+
receive
135+
end
136+
actor.terminated.value!
137+
```
138+
139+
### TODO
140+
141+
* receives
142+
* More erlang behaviour examples
143+
* Back pressure with bounded mailbox
144+
* _op methods
145+
* types of actors

docs-source/erlang_actor.init.rb

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
require 'concurrent-edge'
2+
3+
def do_stuff(*args)
4+
sleep 0.01
5+
:stuff
6+
end
7+

docs-source/erlang_actor.out.md

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
## Examples
2+
3+
The simplest example is to use the actor as an asynchronous execution.
4+
Although, `Promises.future { 1 + 1 }` is better suited for that purpose.
5+
6+
```ruby
7+
actor = Concurrent::ErlangActor.spawn(:on_thread, name: 'addition') { 1 + 1 }
8+
# => #<Concurrent::ErlangActor::Pid:0x000002 addition>
9+
actor.terminated.value! # => 2
10+
```
11+
12+
Let's send some messages and maintain some internal state
13+
which is what actors are good for.
14+
15+
```ruby
16+
actor = Concurrent::ErlangActor.spawn(:on_thread, name: 'sum') do
17+
sum = 0 # internal state
18+
# receive and sum the messages until the actor gets :done
19+
while true
20+
message = receive
21+
break if message == :done
22+
# if the message is asked and not only told,
23+
# reply with a current sum
24+
reply sum += message
25+
end
26+
sum
27+
end
28+
# => #<Concurrent::ErlangActor::Pid:0x000003 sum>
29+
```
30+
31+
The actor can be either told a message asynchronously,
32+
or asked. The ask method will block until actor replies.
33+
34+
```ruby
35+
# tell returns immediately returning the actor
36+
actor.tell(1).tell(1)
37+
# => #<Concurrent::ErlangActor::Pid:0x000003 sum>
38+
# blocks, waiting for the answer
39+
actor.ask 10 # => 12
40+
# stop the actor
41+
actor.tell :done
42+
# => #<Concurrent::ErlangActor::Pid:0x000003 sum>
43+
actor.terminated.value! # => 12
44+
```
45+
46+
### Actor types
47+
48+
There are two types of actors.
49+
The type is specified when calling spawn as a first argument,
50+
`Concurrent::ErlangActor.spawn(:on_thread, ...` or
51+
`Concurrent::ErlangActor.spawn(:on_pool, ...`.
52+
53+
The main difference is in how receive method returns.
54+
55+
- `:on_thread` it blocks the thread until message is available,
56+
then it returns or calls the provided block first.
57+
58+
- However, `:on_pool` it has to free up the thread on the receive
59+
call back to the pool. Therefore the call to receive ends the
60+
execution of current scope. The receive has to be given block
61+
or blocks that act as a continuations and are called
62+
when there is message available.
63+
64+
Let's have a look at how the bodies of actors differ between the types:
65+
66+
```ruby
67+
ping = Concurrent::ErlangActor.spawn(:on_thread) { reply receive }
68+
# => #<Concurrent::ErlangActor::Pid:0x000004>
69+
ping.ask 42 # => 42
70+
```
71+
72+
It first calls receive, which blocks the thread of the actor.
73+
When it returns the received message is passed an an argument to reply,
74+
which replies the same value back to the ask method.
75+
Then the actor terminates normally, because there is nothing else to do.
76+
77+
However when running on pool a block with code which should be evaluated
78+
after the message is received has to be provided.
79+
80+
```ruby
81+
ping = Concurrent::ErlangActor.spawn(:on_pool) { receive { |m| reply m } }
82+
# => #<Concurrent::ErlangActor::Pid:0x000005>
83+
ping.ask 42 # => 42
84+
```
85+
86+
It starts by calling receive which will remember the given block for later
87+
execution when a message is available and stops executing the current scope.
88+
Later when a message becomes available the previously provided block is given
89+
the message and called. The result of the block is the final value of the
90+
normally terminated actor.
91+
92+
The direct blocking style of `:on_thread` is simpler to write and more straight
93+
forward however it has limitations. Each `:on_thread` actor creates a Thread
94+
taking time and resources.
95+
There is also a limited number of threads the Ruby process can create
96+
so you may hit the limit and fail to create more threads and therefore actors.
97+
98+
Since the `:on_pool` actor runs on a poll of threads, its creations
99+
is faster and cheaper and it does not create new threads.
100+
Therefore there is no limit (only RAM) on how many actors can be created.
101+
102+
To simplify, if you need only few actors `:on_thread` is fine.
103+
However if you will be creating hundreds of actors or
104+
they will be short-lived `:on_pool` should be used.
105+
106+
### Erlang behaviour
107+
108+
The actor matches Erlang processes in behaviour.
109+
Therefore it supports the usual Erlang actor linking, monitoring, exit behaviour, etc.
110+
111+
```ruby
112+
actor = Concurrent::ErlangActor.spawn(:on_thread) do
113+
spawn(link: true) do # equivalent of spawn_link in Erlang
114+
terminate :err # equivalent of exit in Erlang
115+
end
116+
trap # equivalent of process_flag(trap_exit, true)
117+
receive
118+
end
119+
# => #<Concurrent::ErlangActor::Pid:0x000006>
120+
actor.terminated.value!
121+
# => #<Concurrent::ErlangActor::Exit:0x000007
122+
# @from=#<Concurrent::ErlangActor::Pid:0x000008>,
123+
# @link_terminated=true,
124+
# @reason=:err>
125+
```
126+
127+
### TODO
128+
129+
* More erlang behaviour examples
130+
* Back pressure with bounded mailbox
131+
* _op methods
132+
* types of actors

lib-edge/concurrent-edge.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,4 @@
1313
require 'concurrent/edge/channel'
1414

1515
require 'concurrent/edge/processing_actor'
16+
require 'concurrent/edge/erlang_actor'

lib-edge/concurrent/actor/reference.rb

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,9 @@ def ask(message, future = Concurrent::Promises.resolvable_future)
5555
message message, future
5656
end
5757

58+
# @!visibility privated
59+
alias_method :ask_op, :ask
60+
5861
# Sends the message synchronously and blocks until the message
5962
# is processed. Raises on error.
6063
#

lib-edge/concurrent/edge/channel.rb

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,10 @@ def to_s
4949
def any.===(other)
5050
true
5151
end
52+
53+
def any.to_s
54+
'ANY'
55+
end
5256
end
5357

5458
# Create channel.
@@ -164,7 +168,7 @@ def pop_op_matching(matcher, probe = Promises.resolvable_future)
164168
#
165169
# @!macro channel.warn.blocks
166170
# @!macro channel.param.timeout
167-
# @!macro promises.param.timeout_value
171+
# @param [Object] timeout_value a value returned by the method when it times out
168172
# @return [Object, nil] message or nil when timed out
169173
def pop(timeout = nil, timeout_value = nil)
170174
pop_matching ANY, timeout, timeout_value

0 commit comments

Comments
 (0)