Skip to content

Commit 85e4fab

Browse files
committed
Split behaviors into files
1 parent 827402b commit 85e4fab

14 files changed

+408
-340
lines changed

lib/concurrent/actor/behaviour.rb

Lines changed: 19 additions & 339 deletions
Original file line numberDiff line numberDiff line change
@@ -1,344 +1,23 @@
11
module Concurrent
22
module Actor
33

4-
# TODO split this into files
54
# TODO document dependencies
5+
# TODO callbacks to context
66
module Behaviour
77
MESSAGE_PROCESSED = Object.new
88

9-
class Abstract
10-
include TypeCheck
11-
include InternalDelegations
12-
13-
attr_reader :core, :subsequent
14-
15-
def initialize(core, subsequent)
16-
@core = Type! core, Core
17-
@subsequent = Type! subsequent, Abstract, NilClass
18-
end
19-
20-
def on_envelope(envelope)
21-
pass envelope
22-
end
23-
24-
def pass(envelope)
25-
subsequent.on_envelope envelope
26-
end
27-
28-
def on_event(event)
29-
subsequent.on_event event if subsequent
30-
end
31-
32-
def broadcast(event)
33-
core.broadcast(event)
34-
end
35-
36-
def reject_envelope(envelope)
37-
envelope.reject! ActorTerminated.new(reference)
38-
dead_letter_routing << envelope unless envelope.ivar
39-
log Logging::DEBUG, "rejected #{envelope.message} from #{envelope.sender_path}"
40-
end
41-
end
42-
43-
class Termination < Abstract
44-
45-
# @!attribute [r] terminated
46-
# @return [Event] event which will become set when actor is terminated.
47-
attr_reader :terminated
48-
49-
def initialize(core, subsequent)
50-
super core, subsequent
51-
@terminated = Event.new
52-
end
53-
54-
# @note Actor rejects envelopes when terminated.
55-
# @return [true, false] if actor is terminated
56-
def terminated?
57-
@terminated.set?
58-
end
59-
60-
def on_envelope(envelope)
61-
if terminated?
62-
reject_envelope envelope
63-
MESSAGE_PROCESSED
64-
else
65-
if envelope.message == :terminate!
66-
terminate!
67-
else
68-
pass envelope
69-
end
70-
end
71-
end
72-
73-
# Terminates the actor. Any Envelope received after termination is rejected.
74-
# Terminates all its children, does not wait until they are terminated.
75-
def terminate!
76-
return nil if terminated?
77-
@terminated.set
78-
broadcast(:terminated)
79-
parent << :remove_child if parent
80-
nil
81-
end
82-
end
83-
84-
class TerminateChildren < Abstract
85-
def on_event(event)
86-
children.each { |ch| ch << :terminate! } if event == :terminated
87-
super event
88-
end
89-
end
90-
91-
class Linking < Abstract
92-
def initialize(core, subsequent)
93-
super core, subsequent
94-
@linked = Set.new
95-
end
96-
97-
def on_envelope(envelope)
98-
case envelope.message
99-
when :link
100-
link envelope.sender
101-
when :unlink
102-
unlink envelope.sender
103-
else
104-
pass envelope
105-
end
106-
end
107-
108-
def link(ref)
109-
@linked.add(ref)
110-
true
111-
end
112-
113-
def unlink(ref)
114-
@linked.delete(ref)
115-
true
116-
end
117-
118-
def on_event(event)
119-
@linked.each { |a| a << event }
120-
@linked.clear if event == :terminated
121-
super event
122-
end
123-
end
124-
125-
class Supervising < Abstract
126-
attr_reader :supervisor
127-
128-
def initialize(core, subsequent)
129-
super core, subsequent
130-
@supervisor = nil
131-
end
132-
133-
def on_envelope(envelope)
134-
case envelope.message
135-
when :supervise
136-
supervise envelope.sender
137-
when :supervisor
138-
supervisor
139-
when :un_supervise
140-
un_supervise envelope.sender
141-
else
142-
pass envelope
143-
end
144-
end
145-
146-
def supervise(ref)
147-
@supervisor = ref
148-
behaviour!(Linking).link ref
149-
true
150-
end
151-
152-
def un_supervise(ref)
153-
if @supervisor == ref
154-
behaviour!(Linking).unlink ref
155-
@supervisor = nil
156-
true
157-
else
158-
false
159-
end
160-
end
161-
162-
def on_event(event)
163-
@supervisor = nil if event == :terminated
164-
super event
165-
end
166-
end
167-
168-
# pause on error ask its parent
169-
# handling
170-
# :continue
171-
# :reset will only rebuild context
172-
# :restart drops messaged and as :reset
173-
# TODO callbacks
174-
175-
class Pausing < Abstract
176-
def initialize(core, subsequent)
177-
super core, subsequent
178-
@paused = false
179-
@buffer = []
180-
end
181-
182-
def on_envelope(envelope)
183-
case envelope.message
184-
when :pause!
185-
from_supervisor?(envelope) { pause! }
186-
when :resume!
187-
from_supervisor?(envelope) { resume! }
188-
when :reset!
189-
from_supervisor?(envelope) { reset! }
190-
# when :restart! TODO
191-
# from_supervisor?(envelope) { reset! }
192-
else
193-
if @paused
194-
@buffer << envelope
195-
MESSAGE_PROCESSED
196-
else
197-
pass envelope
198-
end
199-
end
200-
end
201-
202-
def pause!(error = nil)
203-
@paused = true
204-
broadcast(error || :paused)
205-
true
206-
end
207-
208-
def resume!
209-
@buffer.each { |envelope| core.schedule_execution { pass envelope } }
210-
@buffer.clear
211-
@paused = false
212-
broadcast(:resumed)
213-
true
214-
end
215-
216-
def from_supervisor?(envelope)
217-
if behaviour!(Supervising).supervisor == envelope.sender
218-
yield
219-
else
220-
false
221-
end
222-
end
223-
224-
def reset!
225-
core.allocate_context
226-
core.build_context
227-
broadcast(:reset)
228-
resume!
229-
true
230-
end
231-
232-
def on_event(event)
233-
if event == :terminated
234-
@buffer.each { |envelope| reject_envelope envelope }
235-
@buffer.clear
236-
end
237-
super event
238-
end
239-
end
240-
241-
class RemoveChild < Abstract
242-
def on_envelope(envelope)
243-
if envelope.message == :remove_child
244-
core.remove_child envelope.sender
245-
else
246-
pass envelope
247-
end
248-
end
249-
end
250-
251-
class SetResults < Abstract
252-
attr_reader :error_strategy
253-
254-
def initialize(core, subsequent, error_strategy)
255-
super core, subsequent
256-
@error_strategy = Match! error_strategy, :just_log, :terminate, :pause
257-
end
258-
259-
def on_envelope(envelope)
260-
result = pass envelope
261-
if result != MESSAGE_PROCESSED && !envelope.ivar.nil?
262-
envelope.ivar.set result
263-
end
264-
nil
265-
rescue => error
266-
log Logging::ERROR, error
267-
case error_strategy
268-
when :terminate
269-
terminate!
270-
when :pause
271-
behaviour!(Pausing).pause!(error)
272-
else
273-
raise
274-
end
275-
envelope.ivar.fail error unless envelope.ivar.nil?
276-
end
277-
end
278-
279-
class Buffer < Abstract
280-
def initialize(core, subsequent)
281-
super core, subsequent
282-
@buffer = []
283-
@receive_envelope_scheduled = false
284-
end
285-
286-
def on_envelope(envelope)
287-
@buffer.push envelope
288-
process_envelopes?
289-
MESSAGE_PROCESSED
290-
end
291-
292-
# Ensures that only one envelope processing is scheduled with #schedule_execution,
293-
# this allows other scheduled blocks to be executed before next envelope processing.
294-
# Simply put this ensures that Core is still responsive to internal calls (like add_child)
295-
# even though the Actor is flooded with messages.
296-
def process_envelopes?
297-
unless @buffer.empty? || @receive_envelope_scheduled
298-
@receive_envelope_scheduled = true
299-
receive_envelope
300-
end
301-
end
302-
303-
def receive_envelope
304-
envelope = @buffer.shift
305-
return nil unless envelope
306-
pass envelope
307-
ensure
308-
@receive_envelope_scheduled = false
309-
core.schedule_execution { process_envelopes? }
310-
end
311-
312-
def on_event(event)
313-
if event == :terminated
314-
@buffer.each { |envelope| reject_envelope envelope }
315-
@buffer.clear
316-
end
317-
super event
318-
end
319-
end
320-
321-
class Await < Abstract
322-
def on_envelope(envelope)
323-
if envelope.message == :await
324-
true
325-
else
326-
pass envelope
327-
end
328-
end
329-
end
330-
331-
class DoContext < Abstract
332-
def on_envelope(envelope)
333-
context.on_envelope envelope
334-
end
335-
end
336-
337-
class ErrorOnUnknownMessage < Abstract
338-
def on_envelope(envelope)
339-
raise UnknownMessage, envelope
340-
end
341-
end
9+
require 'concurrent/actor/behaviour/abstract'
10+
require 'concurrent/actor/behaviour/awaits'
11+
require 'concurrent/actor/behaviour/buffer'
12+
require 'concurrent/actor/behaviour/errors_on_unknown_message'
13+
require 'concurrent/actor/behaviour/executes_context'
14+
require 'concurrent/actor/behaviour/linking'
15+
require 'concurrent/actor/behaviour/pausing'
16+
require 'concurrent/actor/behaviour/removes_child'
17+
require 'concurrent/actor/behaviour/sets_results'
18+
require 'concurrent/actor/behaviour/supervising'
19+
require 'concurrent/actor/behaviour/termination'
20+
require 'concurrent/actor/behaviour/terminates_children'
34221

34322
def self.basic_behaviour
34423
[*base,
@@ -353,9 +32,10 @@ def self.restarting_behaviour
35332

35433
def self.base
35534
[[SetResults, [:terminate]],
356-
[RemoveChild, []],
35+
# has to be before Termination to be able to remove children form terminated actor
36+
[RemovesChild, []],
35737
[Termination, []],
358-
[TerminateChildren, []],
38+
[TerminatesChildren, []],
35939
[Linking, []]]
36040
end
36141

@@ -367,9 +47,9 @@ def self.supervising
36747
def self.user_messages(on_error)
36848
[[Buffer, []],
36949
[SetResults, [on_error]],
370-
[Await, []],
371-
[DoContext, []],
372-
[ErrorOnUnknownMessage, []]]
50+
[Awaits, []],
51+
[ExecutesContext, []],
52+
[ErrorsOnUnknownMessage, []]]
37353
end
37454
end
37555
end

0 commit comments

Comments
 (0)