Skip to content

Commit dd4e7bf

Browse files
committed
Improved initialization synchronization.
1 parent 42320af commit dd4e7bf

File tree

9 files changed

+89
-89
lines changed

9 files changed

+89
-89
lines changed

ext/com/concurrent_ruby/ext/SynchronizationLibrary.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -55,14 +55,14 @@ public IRubyObject initialize(ThreadContext context, IRubyObject[] args, Block b
5555
}
5656
}
5757

58-
@JRubyMethod(name = "synchronize", visibility = Visibility.PRIVATE)
58+
@JRubyMethod(name = "synchronize", visibility = Visibility.PROTECTED)
5959
public IRubyObject rubySynchronize(ThreadContext context, Block block) {
6060
synchronized (this) {
6161
return block.yield(context, null);
6262
}
6363
}
6464

65-
@JRubyMethod(name = "ns_wait", optional = 1, visibility = Visibility.PRIVATE)
65+
@JRubyMethod(name = "ns_wait", optional = 1, visibility = Visibility.PROTECTED)
6666
public IRubyObject nsWait(ThreadContext context, IRubyObject[] args) {
6767
Ruby runtime = context.runtime;
6868
if (args.length > 1) {
@@ -94,19 +94,19 @@ public IRubyObject nsWait(ThreadContext context, IRubyObject[] args) {
9494
return this;
9595
}
9696

97-
@JRubyMethod(name = "ns_signal", visibility = Visibility.PRIVATE)
97+
@JRubyMethod(name = "ns_signal", visibility = Visibility.PROTECTED)
9898
public IRubyObject nsSignal(ThreadContext context) {
9999
notify();
100100
return this;
101101
}
102102

103-
@JRubyMethod(name = "ns_broadcast", visibility = Visibility.PRIVATE)
103+
@JRubyMethod(name = "ns_broadcast", visibility = Visibility.PROTECTED)
104104
public IRubyObject nsBroadcast(ThreadContext context) {
105105
notifyAll();
106106
return this;
107107
}
108108

109-
@JRubyMethod(name = "ensure_ivar_visibility!", visibility = Visibility.PRIVATE)
109+
@JRubyMethod(name = "ensure_ivar_visibility!", visibility = Visibility.PROTECTED)
110110
public IRubyObject ensureIvarVisibilityBang(ThreadContext context) {
111111
return context.nil;
112112
}

lib/concurrent/actor/core.rb

Lines changed: 56 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -47,58 +47,7 @@ class Core < Synchronization::Object
4747
# any logging system
4848
# @param [Proc] block for class instantiation
4949
def initialize(opts = {}, &block)
50-
super(&nil) # TODO use ns_initialize
51-
synchronize do
52-
@mailbox = Array.new
53-
@serialized_execution = SerializedExecution.new
54-
@children = Set.new
55-
56-
@context_class = Child! opts.fetch(:class), AbstractContext
57-
allocate_context
58-
59-
@executor = Type! opts.fetch(:executor, Concurrent.global_io_executor), Executor
60-
raise ArgumentError, 'ImmediateExecutor is not supported' if @executor.is_a? ImmediateExecutor
61-
62-
@reference = (Child! opts[:reference_class] || @context.default_reference_class, Reference).new self
63-
@name = (Type! opts.fetch(:name), String, Symbol).to_s
64-
65-
parent = opts[:parent]
66-
@parent_core = (Type! parent, Reference, NilClass) && parent.send(:core)
67-
if @parent_core.nil? && @name != '/'
68-
raise 'only root has no parent'
69-
end
70-
71-
@path = @parent_core ? File.join(@parent_core.path, @name) : @name
72-
@logger = opts[:logger]
73-
74-
@parent_core.add_child reference if @parent_core
75-
76-
initialize_behaviours opts
77-
78-
@args = opts.fetch(:args, [])
79-
@block = block
80-
initialized = Type! opts[:initialized], Edge::CompletableFuture, NilClass
81-
82-
messages = []
83-
messages << :link if opts[:link]
84-
messages << :supervise if opts[:supervise]
85-
86-
schedule_execution do
87-
begin
88-
build_context
89-
90-
messages.each do |message|
91-
handle_envelope Envelope.new(message, nil, parent, reference)
92-
end
93-
94-
initialized.success reference if initialized
95-
rescue => ex
96-
log ERROR, ex
97-
@first_behaviour.terminate!
98-
initialized.fail ex if initialized
99-
end
100-
end
101-
end
50+
super
10251
end
10352

10453
# @return [Reference, nil] of parent actor
@@ -199,6 +148,60 @@ def build_context
199148
@context.send :initialize, *@args, &@block
200149
end
201150

151+
protected
152+
153+
def ns_initialize(opts, &block)
154+
@mailbox = Array.new
155+
@serialized_execution = SerializedExecution.new
156+
@children = Set.new
157+
158+
@context_class = Child! opts.fetch(:class), AbstractContext
159+
allocate_context
160+
161+
@executor = Type! opts.fetch(:executor, Concurrent.global_io_executor), Executor
162+
raise ArgumentError, 'ImmediateExecutor is not supported' if @executor.is_a? ImmediateExecutor
163+
164+
@reference = (Child! opts[:reference_class] || @context.default_reference_class, Reference).new self
165+
@name = (Type! opts.fetch(:name), String, Symbol).to_s
166+
167+
parent = opts[:parent]
168+
@parent_core = (Type! parent, Reference, NilClass) && parent.send(:core)
169+
if @parent_core.nil? && @name != '/'
170+
raise 'only root has no parent'
171+
end
172+
173+
@path = @parent_core ? File.join(@parent_core.path, @name) : @name
174+
@logger = opts[:logger]
175+
176+
@parent_core.add_child reference if @parent_core
177+
178+
initialize_behaviours opts
179+
180+
@args = opts.fetch(:args, [])
181+
@block = block
182+
initialized = Type! opts[:initialized], Edge::CompletableFuture, NilClass
183+
184+
messages = []
185+
messages << :link if opts[:link]
186+
messages << :supervise if opts[:supervise]
187+
188+
schedule_execution do
189+
begin
190+
build_context
191+
192+
messages.each do |message|
193+
handle_envelope Envelope.new(message, nil, parent, reference)
194+
end
195+
196+
initialized.success reference if initialized
197+
rescue => ex
198+
log ERROR, ex
199+
@first_behaviour.terminate!
200+
initialized.fail ex if initialized
201+
end
202+
end
203+
end
204+
202205
private
203206

204207
def handle_envelope(envelope)
@@ -215,7 +218,7 @@ def initialize_behaviours(opts)
215218
end
216219
@behaviours = {}
217220
@first_behaviour = @behaviour_definition.reverse.
218-
reduce(nil) { |last, (behaviour, args)| @behaviours[behaviour] = behaviour.new(self, last, *args) }
221+
reduce(nil) { |last, (behaviour, args)| @behaviours[behaviour] = behaviour.new(self, last, *args) }
219222
end
220223
end
221224
end

lib/concurrent/atomic/count_down_latch.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ def count
5858
synchronize { @count }
5959
end
6060

61-
private
61+
protected
6262

6363
def ns_initialize(count)
6464
@count = count

lib/concurrent/atomic/cyclic_barrier.rb

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,6 @@ def wait(timeout = nil)
6262
end
6363
end
6464

65-
6665
# resets the barrier to its initial state
6766
# If there is at least one waiting thread, it will be woken up, the `wait`
6867
# method will return false and the barrier will be broken
@@ -83,7 +82,7 @@ def broken?
8382
synchronize { @generation.status != :waiting }
8483
end
8584

86-
private
85+
protected
8786

8887
def ns_generation_done(generation, status, continue = true)
8988
generation.status = status
@@ -101,6 +100,5 @@ def ns_initialize(parties, &block)
101100
@action = block
102101
ns_next_generation
103102
end
104-
105103
end
106104
end

lib/concurrent/atomic/event.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ def wait(timeout = nil)
7070
end
7171
end
7272

73-
private
73+
protected
7474

7575
def ns_set
7676
unless @set

lib/concurrent/delay.rb

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -68,19 +68,9 @@ class Delay < Synchronization::Object
6868
# @raise [ArgumentError] if no block is given
6969
def initialize(opts = {}, &block)
7070
raise ArgumentError.new('no block given') unless block_given?
71-
72-
super()
73-
init_obligation(self)
74-
set_deref_options(opts)
75-
@task_executor = Executor.executor_from_options(opts)
76-
77-
@task = block
78-
@state = :pending
79-
@computing = false
71+
super(opts, &block)
8072
end
8173

82-
protected :synchronize
83-
8474
# Return the value this object represents after applying the options
8575
# specified by the `#set_deref_options` method. If the delayed operation
8676
# raised an exception this method will return nil. The execption object
@@ -169,6 +159,18 @@ def reconfigure(&block)
169159
end
170160
end
171161

162+
protected
163+
164+
def ns_initialize(opts, &block)
165+
init_obligation(self)
166+
set_deref_options(opts)
167+
@task_executor = Executor.executor_from_options(opts)
168+
169+
@task = block
170+
@state = :pending
171+
@computing = false
172+
end
173+
172174
private
173175

174176
# @!visibility private

lib/concurrent/executor/per_thread_executor.rb

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,6 @@ module Concurrent
1919
class PerThreadExecutor < Synchronization::Object
2020
include Executor
2121

22-
# Creates a new executor
23-
def initialize
24-
@running = Concurrent::AtomicBoolean.new(true)
25-
@stopped = Concurrent::Event.new
26-
@count = Concurrent::AtomicFixnum.new(0)
27-
end
28-
2922
# @!macro executor_method_post
3023
def self.post(*args)
3124
raise ArgumentError.new('no block given') unless block_given?
@@ -97,5 +90,13 @@ def kill
9790
def wait_for_termination(timeout = nil)
9891
@stopped.wait(timeout)
9992
end
93+
94+
protected
95+
96+
def ns_initialize
97+
@running = Concurrent::AtomicBoolean.new(true)
98+
@stopped = Concurrent::Event.new
99+
@count = Concurrent::AtomicFixnum.new(0)
100+
end
100101
end
101102
end

lib/concurrent/ivar.rb

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,8 @@ class IVar < Synchronization::Object
5757
# @option opts [String] :copy_on_deref (nil) call the given `Proc` passing
5858
# the internal value and returning the value returned from the proc
5959
def initialize(value = NO_VALUE, opts = {})
60-
super(&nil)
60+
#FIXME use ns_initialize
61+
super(&:nil)
6162
init_obligation(self)
6263
self.observers = CopyOnWriteObserverSet.new
6364
set_deref_options(opts)
@@ -88,7 +89,7 @@ def add_observer(observer = nil, func = :update, &block)
8889
func = :call
8990
end
9091

91-
mutex.synchronize do
92+
synchronize do
9293
if event.set?
9394
direct_notification = true
9495
else
@@ -151,7 +152,7 @@ def try_set(value = NO_VALUE, &block)
151152

152153
# @!visibility private
153154
def complete(success, value, reason) # :nodoc:
154-
mutex.synchronize do
155+
synchronize do
155156
raise MultipleAssignmentError if [:fulfilled, :rejected].include? @state
156157
set_state(success, value, reason)
157158
event.set

lib/concurrent/timer_task.rb

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -267,11 +267,6 @@ def timeout_interval=(value)
267267
end
268268
end
269269

270-
if Concurrent.on_jruby?
271-
#FIXME: JRuby seems to handle privacy different here
272-
public :synchronize
273-
end
274-
275270
private :post, :<<
276271

277272
protected

0 commit comments

Comments
 (0)