Skip to content

Commit e6d858e

Browse files
committed
Add ProcessingActor
1 parent 594e582 commit e6d858e

File tree

8 files changed

+497
-97
lines changed

8 files changed

+497
-97
lines changed

doc/promises.in.md

Lines changed: 80 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -540,9 +540,6 @@ The `ask` method returns future.
540540
```ruby
541541
actor.ask(2).then(&:succ).value!
542542
```
543-
## ProcessingActor
544-
545-
> *TODO: Documentation to be added in few days*
546543

547544
## Channel
548545

@@ -582,6 +579,86 @@ result = (
582579
result.value!
583580
```
584581

582+
## ProcessingActor
583+
584+
There is also a new implementation of actors based on the Channel and the
585+
ability of promises to simulate process. The actor runs as a process but also
586+
does not occupy a thread per actor as previous Concurrent::Actor
587+
implementation. This implementation is close to Erlang actors, therefore OTP
588+
can be ported for this actors (and it's planned).
589+
590+
The simplest actor is a one which just computes without even receiving a
591+
message.
592+
593+
```ruby
594+
actor = Concurrent::ProcessingActor.act(an_argument = 2) do |actor, number|
595+
number ** 3
596+
end
597+
actor.termination.value!
598+
```
599+
Let's receive some messages though.
600+
601+
```ruby
602+
add_2_messages = Concurrent::ProcessingActor.act do |actor|
603+
# Receive two messages then terminate normally with the sum.
604+
(actor.receive & actor.receive).then do |a, b|
605+
a + b
606+
end
607+
end
608+
add_2_messages.tell 1
609+
add_2_messages.termination.resolved?
610+
add_2_messages.tell 3
611+
add_2_messages.termination.value!
612+
```
613+
614+
Actors can also be used to apply back pressure to a producer. Let's start by
615+
defining an actor which a mailbox of size 2.
616+
617+
```ruby
618+
slow_counter = -> (actor, count) do
619+
actor.receive.then do |command, number|
620+
sleep 0.1
621+
case command
622+
when :add
623+
slow_counter.call actor, count + number
624+
when :done
625+
# terminate
626+
count
627+
end
628+
end
629+
end
630+
631+
actor = Concurrent::ProcessingActor.act_listening(
632+
Concurrent::Promises::Channel.new(2),
633+
0,
634+
&slow_counter)
635+
```
636+
637+
Now we can create a producer which will push messages only when there is a
638+
space available in the mailbox. We use promises to free a thread during waiting
639+
on a free space in the mailbox.
640+
641+
```ruby
642+
produce = -> receiver, i do
643+
if i < 10
644+
receiver.
645+
# send a message to the actor, resolves only after the message is
646+
# accepted by the actor's mailbox
647+
tell([:add, i]).
648+
# send incremented message when the above message is accepted
649+
then(i+1, &produce)
650+
else
651+
receiver.tell(:done)
652+
# do not continue
653+
end
654+
end
655+
656+
Concurrent::Promises.future(actor, 0, &produce).run.wait!
657+
658+
actor.termination.value!
659+
```
660+
661+
585662
# Use-cases
586663

587664
## Simple background processing

0 commit comments

Comments
 (0)