Skip to content

Commit 113fe07

Browse files
committed
Actor migrated to Edge::Future and integration added
1 parent 701a6b6 commit 113fe07

File tree

13 files changed

+66
-74
lines changed

13 files changed

+66
-74
lines changed

lib/concurrent/actor.rb

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
11
require 'concurrent/configuration'
2-
require 'concurrent/delay'
32
require 'concurrent/executor/serialized_execution'
4-
require 'concurrent/ivar'
53
require 'concurrent/logging'
64
require 'concurrent/synchronization'
5+
require 'concurrent/edge/future'
76

87
module Concurrent
98
# TODO https://github.com/celluloid/celluloid/wiki/Supervision-Groups ?
@@ -40,9 +39,9 @@ def self.current
4039
Thread.current[:__current_actor__]
4140
end
4241

43-
@root = Delay.new do
44-
Core.new(parent: nil, name: '/', class: Root, initialized: ivar = IVar.new).reference.tap do
45-
ivar.no_error!
42+
@root = Concurrent.delay do
43+
Core.new(parent: nil, name: '/', class: Root, initialized: future = Concurrent.future).reference.tap do
44+
future.wait!
4645
end
4746
end
4847

@@ -77,7 +76,7 @@ def self.spawn(*args, &block)
7776

7877
# as {.spawn} but it'll raise when Actor not initialized properly
7978
def self.spawn!(*args, &block)
80-
spawn(spawn_optionify(*args).merge(initialized: ivar = IVar.new), &block).tap { ivar.no_error! }
79+
spawn(spawn_optionify(*args).merge(initialized: future = Concurrent.future), &block).tap { future.wait! }
8180
end
8281

8382
# @overload spawn_optionify(context_class, name, *args)

lib/concurrent/actor/behaviour/abstract.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ def broadcast(event)
3838

3939
def reject_envelope(envelope)
4040
envelope.reject! ActorTerminated.new(reference)
41-
dead_letter_routing << envelope unless envelope.ivar
41+
dead_letter_routing << envelope unless envelope.future
4242
log Logging::DEBUG, "rejected #{envelope.message} from #{envelope.sender_path}"
4343
end
4444
end

lib/concurrent/actor/behaviour/sets_results.rb

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
module Concurrent
22
module Actor
33
module Behaviour
4-
# Collects returning value and sets the IVar in the {Envelope} or error on failure.
4+
# Collects returning value and sets the CompletableFuture in the {Envelope} or error on failure.
55
class SetResults < Abstract
66
attr_reader :error_strategy
77

@@ -12,8 +12,8 @@ def initialize(core, subsequent, error_strategy)
1212

1313
def on_envelope(envelope)
1414
result = pass envelope
15-
if result != MESSAGE_PROCESSED && !envelope.ivar.nil?
16-
envelope.ivar.set result
15+
if result != MESSAGE_PROCESSED && !envelope.future.nil?
16+
envelope.future.success result
1717
end
1818
nil
1919
rescue => error
@@ -28,7 +28,7 @@ def on_envelope(envelope)
2828
else
2929
raise
3030
end
31-
envelope.ivar.fail error unless envelope.ivar.nil?
31+
envelope.future.fail error unless envelope.future.nil?
3232
end
3333
end
3434
end

lib/concurrent/actor/behaviour/termination.rb

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,18 +7,18 @@ module Behaviour
77
class Termination < Abstract
88

99
# @!attribute [r] terminated
10-
# @return [Event] event which will become set when actor is terminated.
10+
# @return [Edge::Event] event which will become set when actor is terminated.
1111
attr_reader :terminated
1212

1313
def initialize(core, subsequent)
1414
super core, subsequent
15-
@terminated = Event.new
15+
@terminated = Concurrent.event
1616
end
1717

1818
# @note Actor rejects envelopes when terminated.
1919
# @return [true, false] if actor is terminated
2020
def terminated?
21-
@terminated.set?
21+
@terminated.completed?
2222
end
2323

2424
def on_envelope(envelope)
@@ -43,7 +43,7 @@ def on_envelope(envelope)
4343
# Terminates all its children, does not wait until they are terminated.
4444
def terminate!
4545
return true if terminated?
46-
terminated.set
46+
terminated.complete
4747
broadcast(:terminated) # TODO do not end up in Dead Letter Router
4848
parent << :remove_child if parent
4949
true

lib/concurrent/actor/context.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ class AbstractContext
2020

2121
# @abstract override to define Actor's behaviour
2222
# @param [Object] message
23-
# @return [Object] a result which will be used to set the IVar supplied to Reference#ask
23+
# @return [Object] a result which will be used to set the Future supplied to Reference#ask
2424
# @note self should not be returned (or sent to other actors), {#reference} should be used
2525
# instead
2626
def on_message(message)
@@ -46,7 +46,7 @@ def pass
4646
end
4747

4848
# Defines an actor responsible for dead letters. Any rejected message send
49-
# with {Reference#tell} is sent there, a message with ivar is considered
49+
# with {Reference#tell} is sent there, a message with future is considered
5050
# already monitored for failures. Default behaviour is to use
5151
# {AbstractContext#dead_letter_routing} of the parent, so if no
5252
# {AbstractContext#dead_letter_routing} method is overridden in

lib/concurrent/actor/core.rb

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ class Core < Synchronization::Object
4040
# @option opts [Array<Array(Behavior::Abstract, Array<Object>)>]
4141
# behaviour_definition, array of pairs where each pair is behaviour
4242
# class and its args, see {Behaviour.basic_behaviour_definition}
43-
# @option opts [IVar, nil] initialized, if present it'll be set or failed
43+
# @option opts [CompletableFuture, nil] initialized, if present it'll be set or failed
4444
# after {Context} initialization
4545
# @option opts [Proc, nil] logger a proc accepting (level, progname,
4646
# message = nil, &block) params, can be used to hook actor instance to
@@ -77,7 +77,7 @@ def initialize(opts = {}, &block)
7777

7878
@args = opts.fetch(:args, [])
7979
@block = block
80-
initialized = Type! opts[:initialized], IVar, NilClass
80+
initialized = Type! opts[:initialized], Edge::CompletableFuture, NilClass
8181

8282
messages = []
8383
messages << :link if opts[:link]
@@ -91,7 +91,7 @@ def initialize(opts = {}, &block)
9191
handle_envelope Envelope.new(message, nil, parent, reference)
9292
end
9393

94-
initialized.set reference if initialized
94+
initialized.success reference if initialized
9595
rescue => ex
9696
log ERROR, ex
9797
@first_behaviour.terminate!

lib/concurrent/actor/envelope.rb

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,18 +5,18 @@ class Envelope
55

66
# @!attribute [r] message
77
# @return [Object] a message
8-
# @!attribute [r] ivar
9-
# @return [IVar] an ivar which becomes resolved after message is processed
8+
# @!attribute [r] future
9+
# @return [Edge::Future] a future which becomes resolved after message is processed
1010
# @!attribute [r] sender
1111
# @return [Reference, Thread] an actor or thread sending the message
1212
# @!attribute [r] address
1313
# @return [Reference] where this message will be delivered
1414

15-
attr_reader :message, :ivar, :sender, :address
15+
attr_reader :message, :future, :sender, :address
1616

17-
def initialize(message, ivar, sender, address)
17+
def initialize(message, future, sender, address)
1818
@message = message
19-
@ivar = Type! ivar, IVar, NilClass
19+
@future = Type! future, Edge::CompletableFuture, NilClass
2020
@sender = Type! sender, Reference, Thread
2121
@address = Type! address, Reference
2222
end
@@ -34,7 +34,7 @@ def address_path
3434
end
3535

3636
def reject!(error)
37-
ivar.fail error unless ivar.nil?
37+
future.fail error unless future.nil?
3838
end
3939
end
4040
end

lib/concurrent/actor/internal_delegations.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ def dead_letter_routing
2525
end
2626

2727
def redirect(reference, envelope = self.envelope)
28-
reference.message(envelope.message, envelope.ivar)
28+
reference.message(envelope.message, envelope.future)
2929
Behaviour::MESSAGE_PROCESSED
3030
end
3131

lib/concurrent/actor/reference.rb

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,13 @@ def tell(message)
3232
#
3333
# sends message to the actor and asks for the result of its processing, returns immediately
3434
# @param [Object] message
35-
# @param [Ivar] ivar to be fulfilled be message's processing result
36-
# @return [IVar] supplied ivar
37-
def ask(message, ivar = IVar.new)
38-
message message, ivar
35+
# @param [Edge::Future] future to be fulfilled be message's processing result
36+
# @return [Edge::Future] supplied future
37+
def ask(message, future = Concurrent.future)
38+
message message, future
39+
# # @return [Future] a future
40+
# def ask(message)
41+
# message message, ConcurrentNext.promise
3942
end
4043

4144
# @note it's a good practice to use tell whenever possible. Ask should be used only for
@@ -45,17 +48,26 @@ def ask(message, ivar = IVar.new)
4548
#
4649
# sends message to the actor and asks for the result of its processing, blocks
4750
# @param [Object] message
48-
# @param [Ivar] ivar to be fulfilled be message's processing result
51+
# @param [Edge::Future] future to be fulfilled be message's processing result
4952
# @return [Object] message's processing result
50-
# @raise [Exception] ivar.reason if ivar is #rejected?
51-
def ask!(message, ivar = IVar.new)
52-
ask(message, ivar).value!
53+
# @raise [Exception] future.reason if future is #rejected?
54+
def ask!(message, future = Concurrent.future)
55+
ask(message, future).value!
56+
# # @param [Object] message
57+
# # @return [Object] message's processing result
58+
# # @raise [Exception] future.reason if future is #failed?
59+
# def ask!(message)
60+
# ask(message).value!
5361
end
5462

55-
# behaves as {#tell} when no ivar and as {#ask} when ivar
56-
def message(message, ivar = nil)
57-
core.on_envelope Envelope.new(message, ivar, Actor.current || Thread.current, self)
58-
return ivar || self
63+
# behaves as {#tell} when no future and as {#ask} when future
64+
def message(message, future = nil)
65+
core.on_envelope Envelope.new(message, future, Actor.current || Thread.current, self)
66+
return future || self
67+
# # behaves as {#tell} when no promise and as {#ask} when promise
68+
# def message(message, promise = nil)
69+
# core.on_envelope Envelope.new(message, promise, Actor.current || Thread.current, self)
70+
# return promise ? promise.future : self
5971
end
6072

6173
# @see AbstractContext#dead_letter_routing

lib/concurrent/configuration.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ def self.new_fast_executor(opts = {})
121121
auto_terminate: opts.fetch(:auto_terminate, true),
122122
idletime: 60, # 1 minute
123123
max_queue: 0, # unlimited
124-
fallback_policy: :caller_runs # shouldn't matter -- 0 max queue
124+
fallback_policy: :abort # shouldn't matter -- 0 max queue
125125
)
126126
end
127127

@@ -133,7 +133,7 @@ def self.new_io_executor(opts = {})
133133
auto_terminate: opts.fetch(:auto_terminate, true),
134134
idletime: 60, # 1 minute
135135
max_queue: 0, # unlimited
136-
fallback_policy: :caller_runs # shouldn't matter -- 0 max queue
136+
fallback_policy: :abort # shouldn't matter -- 0 max queue
137137
)
138138
end
139139

0 commit comments

Comments
 (0)