Skip to content

Commit b17b70a

Browse files
committed
Ensure proper synchronization of Actor instances
1 parent 2693941 commit b17b70a

File tree

4 files changed

+100
-46
lines changed

4 files changed

+100
-46
lines changed

lib/concurrent/actor.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
require 'concurrent/executor/serialized_execution'
33
require 'concurrent/ivar'
44
require 'concurrent/logging'
5+
require 'concurrent/atomic/synchronization'
56

67
module Concurrent
78
# TODO https://github.com/celluloid/celluloid/wiki/Supervision-Groups

lib/concurrent/actor/core.rb

Lines changed: 47 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ module Actor
1010
class Core
1111
include TypeCheck
1212
include Concurrent::Logging
13+
include Synchronization
1314

1415
# @!attribute [r] reference
1516
# @return [Reference] reference to this actor which can be safely passed around
@@ -42,53 +43,51 @@ class Core
4243
# can be used to hook actor instance to any logging system
4344
# @param [Proc] block for class instantiation
4445
def initialize(opts = {}, &block)
45-
# @mutex = Mutex.new
46-
# @mutex.lock
47-
# FIXME make initialization safe!
48-
49-
@mailbox = Array.new
50-
@serialized_execution = SerializedExecution.new
51-
@executor = Type! opts.fetch(:executor, Concurrent.configuration.global_task_pool), Executor
52-
@children = Set.new
53-
@context_class = Child! opts.fetch(:class), AbstractContext
54-
allocate_context
55-
@reference = (Child! opts[:reference_class] || @context.default_reference_class, Reference).new self
56-
@name = (Type! opts.fetch(:name), String, Symbol).to_s
57-
58-
parent = opts[:parent]
59-
@parent_core = (Type! parent, Reference, NilClass) && parent.send(:core)
60-
if @parent_core.nil? && @name != '/'
61-
raise 'only root has no parent'
62-
end
46+
synchronize do
47+
@mailbox = Array.new
48+
@serialized_execution = SerializedExecution.new
49+
@executor = Type! opts.fetch(:executor, Concurrent.configuration.global_task_pool), Executor
50+
@children = Set.new
51+
@context_class = Child! opts.fetch(:class), AbstractContext
52+
allocate_context
53+
@reference = (Child! opts[:reference_class] || @context.default_reference_class, Reference).new self
54+
@name = (Type! opts.fetch(:name), String, Symbol).to_s
55+
56+
parent = opts[:parent]
57+
@parent_core = (Type! parent, Reference, NilClass) && parent.send(:core)
58+
if @parent_core.nil? && @name != '/'
59+
raise 'only root has no parent'
60+
end
6361

64-
@path = @parent_core ? File.join(@parent_core.path, @name) : @name
65-
@logger = opts[:logger]
62+
@path = @parent_core ? File.join(@parent_core.path, @name) : @name
63+
@logger = opts[:logger]
6664

67-
@parent_core.add_child reference if @parent_core
65+
@parent_core.add_child reference if @parent_core
6866

69-
initialize_behaviours opts
67+
initialize_behaviours opts
7068

71-
@args = opts.fetch(:args, [])
72-
@block = block
73-
initialized = Type! opts[:initialized], IVar, NilClass
69+
@args = opts.fetch(:args, [])
70+
@block = block
71+
initialized = Type! opts[:initialized], IVar, NilClass
7472

75-
messages = []
76-
messages << :link if opts[:link]
77-
messages << :supervise if opts[:supervise]
73+
messages = []
74+
messages << :link if opts[:link]
75+
messages << :supervise if opts[:supervise]
7876

79-
schedule_execution do
80-
begin
81-
build_context
77+
schedule_execution do
78+
begin
79+
build_context
8280

83-
messages.each do |message|
84-
handle_envelope Envelope.new(message, nil, parent, reference)
85-
end
81+
messages.each do |message|
82+
handle_envelope Envelope.new(message, nil, parent, reference)
83+
end
8684

87-
initialized.set true if initialized
88-
rescue => ex
89-
log ERROR, ex
90-
@first_behaviour.terminate!
91-
initialized.fail ex if initialized
85+
initialized.set true if initialized
86+
rescue => ex
87+
log ERROR, ex
88+
@first_behaviour.terminate!
89+
initialized.fail ex if initialized
90+
end
9291
end
9392
end
9493
end
@@ -148,13 +147,15 @@ def log(level, message = nil, &block)
148147
# sets Actress.current
149148
def schedule_execution
150149
@serialized_execution.post(@executor) do
151-
begin
152-
Thread.current[:__current_actor__] = reference
153-
yield
154-
rescue => e
155-
log FATAL, e
156-
ensure
157-
Thread.current[:__current_actor__] = nil
150+
synchronize do
151+
begin
152+
Thread.current[:__current_actor__] = reference
153+
yield
154+
rescue => e
155+
log FATAL, e
156+
ensure
157+
Thread.current[:__current_actor__] = nil
158+
end
158159
end
159160
end
160161

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
module Concurrent
2+
3+
# Safe synchronization under JRuby, prevents reading uninitialized @mutex variable.
4+
# @note synchronized needs to be called in #initialize for this module to work properly
5+
# @example usage
6+
# class AClass
7+
# include Synchronized
8+
#
9+
# def initialize
10+
# synchronize do
11+
# # body of the constructor ...
12+
# end
13+
# end
14+
#
15+
# def a_method
16+
# synchronize do
17+
# # body of a_method ...
18+
# end
19+
# end
20+
# end
21+
module Synchronization
22+
23+
engine = defined?(RUBY_ENGINE) && RUBY_ENGINE
24+
25+
case engine
26+
when 'jruby'
27+
require 'jruby'
28+
29+
def synchronize
30+
JRuby.reference0(self).synchronized { yield }
31+
end
32+
33+
when 'rbx'
34+
35+
def synchronize
36+
Rubinius.lock(self)
37+
yield
38+
ensure
39+
Rubinius.unlock(self)
40+
end
41+
42+
else
43+
44+
def synchronize
45+
@mutex ||= Mutex.new
46+
@mutex.synchronize { yield }
47+
end
48+
49+
end
50+
end
51+
end

lib/concurrent/atomics.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,3 +8,4 @@
88
require 'concurrent/atomic/count_down_latch'
99
require 'concurrent/atomic/event'
1010
require 'concurrent/atomic/thread_local_var'
11+
require 'concurrent/atomic/synchronization'

0 commit comments

Comments
 (0)