Skip to content

Commit 0969f9c

Browse files
committed
All messages should have same priority
It's now possible to send actor << job1 << job2 <<:terminate! and be sure that both jobs are processed first.
1 parent 041196d commit 0969f9c

File tree

4 files changed

+28
-28
lines changed

4 files changed

+28
-28
lines changed

lib/concurrent/actor/behaviour.rb

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -33,35 +33,43 @@ module Behaviour
3333
require 'concurrent/actor/behaviour/terminates_children'
3434

3535
def self.basic_behaviour_definition
36-
[*base,
37-
*user_messages(:terminate!)]
36+
[*base(:terminate!),
37+
*linking,
38+
*user_messages]
3839
end
3940

40-
def self.restarting_behaviour_definition
41-
[*base,
41+
def self.restarting_behaviour_definition(handle = :reset!, strategy = :one_for_one)
42+
[*base(:pause!),
43+
*linking,
4244
*supervised,
43-
[Behaviour::Supervising, [:reset!, :one_for_one]],
44-
*user_messages(:pause!)]
45+
*supervising(handle, strategy),
46+
*user_messages]
4547
end
4648

47-
def self.base
48-
[[SetResults, [:terminate!]],
49+
def self.base(on_error)
50+
[[SetResults, [on_error]],
4951
# has to be before Termination to be able to remove children form terminated actor
5052
[RemovesChild, []],
5153
[Termination, []],
52-
[TerminatesChildren, []],
53-
[Linking, []]]
54+
[TerminatesChildren, []]]
55+
end
56+
57+
# @see '' its source code
58+
def self.linking
59+
[[Linking, []]]
5460
end
5561

5662
def self.supervised
5763
[[Supervised, []],
5864
[Pausing, []]]
5965
end
6066

61-
def self.user_messages(on_error)
62-
[[Buffer, []],
63-
[SetResults, [on_error]],
64-
[Awaits, []],
67+
def self.supervising(handle = :reset!, strategy = :one_for_one)
68+
[[Behaviour::Supervising, [handle, strategy]]]
69+
end
70+
71+
def self.user_messages
72+
[[Awaits, []],
6573
[ExecutesContext, []],
6674
[ErrorsOnUnknownMessage, []]]
6775
end

lib/concurrent/actor/behaviour/pausing.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ def on_event(event)
6666
@buffer.each { |envelope| reject_envelope envelope }
6767
@buffer.clear
6868
when :resumed, :reset
69-
@buffer.each { |envelope| core.schedule_execution { pass envelope } }
69+
@buffer.each { |envelope| core.schedule_execution { core.process_envelope envelope } }
7070
@buffer.clear
7171
end
7272
super event

lib/concurrent/actor/root.rb

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,9 @@ def dead_letter_routing
2828
end
2929

3030
def behaviour_definition
31-
[*Behaviour.base,
32-
[Behaviour::Supervising, [:reset!, :one_for_one]],
33-
*Behaviour.user_messages(:just_log)]
31+
[*Behaviour.base(:just_log),
32+
*Behaviour.supervising,
33+
*Behaviour.user_messages]
3434
end
3535
end
3636
end

spec/concurrent/actor_spec.rb

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -282,13 +282,7 @@ def on_message(message)
282282
describe 'pausing' do
283283
it 'pauses on error' do
284284
queue = Queue.new
285-
resuming_behaviour = Behaviour.restarting_behaviour_definition.map do |c, args|
286-
if Behaviour::Supervising == c
287-
[c, [:resume!, :one_for_one]]
288-
else
289-
[c, args]
290-
end
291-
end
285+
resuming_behaviour = Behaviour.restarting_behaviour_definition(:resume!)
292286

293287
test = AdHoc.spawn name: :tester, behaviour_definition: resuming_behaviour do
294288
actor = AdHoc.spawn name: :pausing,
@@ -302,9 +296,7 @@ def on_message(message)
302296
actor << nil
303297
queue << actor.ask(:add)
304298

305-
-> m do
306-
queue << m
307-
end
299+
-> m { queue << m }
308300
end
309301

310302
expect(queue.pop).to eq :init

0 commit comments

Comments
 (0)