Skip to content

Commit de789ad

Browse files
committed
Merge pull request #283 from ruby-concurrency/synchronization-executor-service
Synchronization ExecutorService
2 parents 37edfe3 + 9538c40 commit de789ad

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

44 files changed

+829
-686
lines changed
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
#!/usr/bin/env ruby
2+
3+
$: << File.expand_path('../../lib', __FILE__)
4+
5+
require 'benchmark'
6+
require 'concurrent/executors'
7+
8+
COUNT = 100_000
9+
10+
EXECUTORS = [
11+
[Concurrent::JavaCachedThreadPool],
12+
[Concurrent::JavaFixedThreadPool, 10],
13+
[Concurrent::JavaSingleThreadExecutor],
14+
[Concurrent::JavaThreadPoolExecutor]
15+
]
16+
17+
Benchmark.bmbm do |x|
18+
EXECUTORS.each do |executor_class, *args|
19+
x.report(executor_class.to_s) do
20+
if args.empty?
21+
executor = executor_class.new
22+
else
23+
executor = executor_class.new(*args)
24+
end
25+
COUNT.times { executor.post{} }
26+
end
27+
end
28+
end

ext/com/concurrent_ruby/ext/SynchronizationLibrary.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -55,14 +55,14 @@ public IRubyObject initialize(ThreadContext context, IRubyObject[] args, Block b
5555
}
5656
}
5757

58-
@JRubyMethod(name = "synchronize", visibility = Visibility.PRIVATE)
58+
@JRubyMethod(name = "synchronize", visibility = Visibility.PROTECTED)
5959
public IRubyObject rubySynchronize(ThreadContext context, Block block) {
6060
synchronized (this) {
6161
return block.yield(context, null);
6262
}
6363
}
6464

65-
@JRubyMethod(name = "ns_wait", optional = 1, visibility = Visibility.PRIVATE)
65+
@JRubyMethod(name = "ns_wait", optional = 1, visibility = Visibility.PROTECTED)
6666
public IRubyObject nsWait(ThreadContext context, IRubyObject[] args) {
6767
Ruby runtime = context.runtime;
6868
if (args.length > 1) {
@@ -94,19 +94,19 @@ public IRubyObject nsWait(ThreadContext context, IRubyObject[] args) {
9494
return this;
9595
}
9696

97-
@JRubyMethod(name = "ns_signal", visibility = Visibility.PRIVATE)
97+
@JRubyMethod(name = "ns_signal", visibility = Visibility.PROTECTED)
9898
public IRubyObject nsSignal(ThreadContext context) {
9999
notify();
100100
return this;
101101
}
102102

103-
@JRubyMethod(name = "ns_broadcast", visibility = Visibility.PRIVATE)
103+
@JRubyMethod(name = "ns_broadcast", visibility = Visibility.PROTECTED)
104104
public IRubyObject nsBroadcast(ThreadContext context) {
105105
notifyAll();
106106
return this;
107107
}
108108

109-
@JRubyMethod(name = "ensure_ivar_visibility!", visibility = Visibility.PRIVATE)
109+
@JRubyMethod(name = "ensure_ivar_visibility!", visibility = Visibility.PROTECTED)
110110
public IRubyObject ensureIvarVisibilityBang(ThreadContext context) {
111111
return context.nil;
112112
}

lib/concurrent/actor/core.rb

Lines changed: 58 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
require 'concurrent/executors'
2+
13
module Concurrent
24
module Actor
35

@@ -47,58 +49,7 @@ class Core < Synchronization::Object
4749
# any logging system
4850
# @param [Proc] block for class instantiation
4951
def initialize(opts = {}, &block)
50-
super(&nil) # TODO use ns_initialize
51-
synchronize do
52-
@mailbox = Array.new
53-
@serialized_execution = SerializedExecution.new
54-
@children = Set.new
55-
56-
@context_class = Child! opts.fetch(:class), AbstractContext
57-
allocate_context
58-
59-
@executor = Type! opts.fetch(:executor, Concurrent.global_io_executor), Executor
60-
raise ArgumentError, 'ImmediateExecutor is not supported' if @executor.is_a? ImmediateExecutor
61-
62-
@reference = (Child! opts[:reference_class] || @context.default_reference_class, Reference).new self
63-
@name = (Type! opts.fetch(:name), String, Symbol).to_s
64-
65-
parent = opts[:parent]
66-
@parent_core = (Type! parent, Reference, NilClass) && parent.send(:core)
67-
if @parent_core.nil? && @name != '/'
68-
raise 'only root has no parent'
69-
end
70-
71-
@path = @parent_core ? File.join(@parent_core.path, @name) : @name
72-
@logger = opts[:logger]
73-
74-
@parent_core.add_child reference if @parent_core
75-
76-
initialize_behaviours opts
77-
78-
@args = opts.fetch(:args, [])
79-
@block = block
80-
initialized = Type! opts[:initialized], Edge::CompletableFuture, NilClass
81-
82-
messages = []
83-
messages << :link if opts[:link]
84-
messages << :supervise if opts[:supervise]
85-
86-
schedule_execution do
87-
begin
88-
build_context
89-
90-
messages.each do |message|
91-
handle_envelope Envelope.new(message, nil, parent, reference)
92-
end
93-
94-
initialized.success reference if initialized
95-
rescue => ex
96-
log ERROR, ex
97-
@first_behaviour.terminate!
98-
initialized.fail ex if initialized
99-
end
100-
end
101-
end
52+
super
10253
end
10354

10455
# @return [Reference, nil] of parent actor
@@ -199,6 +150,60 @@ def build_context
199150
@context.send :initialize, *@args, &@block
200151
end
201152

153+
protected
154+
155+
def ns_initialize(opts, &block)
156+
@mailbox = Array.new
157+
@serialized_execution = SerializedExecution.new
158+
@children = Set.new
159+
160+
@context_class = Child! opts.fetch(:class), AbstractContext
161+
allocate_context
162+
163+
@executor = Type! opts.fetch(:executor, Concurrent.global_io_executor), Concurrent::AbstractExecutorService
164+
raise ArgumentError, 'ImmediateExecutor is not supported' if @executor.is_a? ImmediateExecutor
165+
166+
@reference = (Child! opts[:reference_class] || @context.default_reference_class, Reference).new self
167+
@name = (Type! opts.fetch(:name), String, Symbol).to_s
168+
169+
parent = opts[:parent]
170+
@parent_core = (Type! parent, Reference, NilClass) && parent.send(:core)
171+
if @parent_core.nil? && @name != '/'
172+
raise 'only root has no parent'
173+
end
174+
175+
@path = @parent_core ? File.join(@parent_core.path, @name) : @name
176+
@logger = opts[:logger]
177+
178+
@parent_core.add_child reference if @parent_core
179+
180+
initialize_behaviours opts
181+
182+
@args = opts.fetch(:args, [])
183+
@block = block
184+
initialized = Type! opts[:initialized], Edge::CompletableFuture, NilClass
185+
186+
messages = []
187+
messages << :link if opts[:link]
188+
messages << :supervise if opts[:supervise]
189+
190+
schedule_execution do
191+
begin
192+
build_context
193+
194+
messages.each do |message|
195+
handle_envelope Envelope.new(message, nil, parent, reference)
196+
end
197+
198+
initialized.success reference if initialized
199+
rescue => ex
200+
log ERROR, ex
201+
@first_behaviour.terminate!
202+
initialized.fail ex if initialized
203+
end
204+
end
205+
end
206+
202207
private
203208

204209
def handle_envelope(envelope)
@@ -215,7 +220,7 @@ def initialize_behaviours(opts)
215220
end
216221
@behaviours = {}
217222
@first_behaviour = @behaviour_definition.reverse.
218-
reduce(nil) { |last, (behaviour, args)| @behaviours[behaviour] = behaviour.new(self, last, *args) }
223+
reduce(nil) { |last, (behaviour, args)| @behaviours[behaviour] = behaviour.new(self, last, *args) }
219224
end
220225
end
221226
end

lib/concurrent/agent.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
require 'thread'
2-
32
require 'concurrent/dereferenceable'
43
require 'concurrent/observable'
54
require 'concurrent/logging'
5+
require 'concurrent/executor/executor'
66

77
module Concurrent
88

lib/concurrent/atomic/count_down_latch.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ def count
5858
synchronize { @count }
5959
end
6060

61-
private
61+
protected
6262

6363
def ns_initialize(count)
6464
@count = count

lib/concurrent/atomic/cyclic_barrier.rb

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,6 @@ def wait(timeout = nil)
6262
end
6363
end
6464

65-
6665
# resets the barrier to its initial state
6766
# If there is at least one waiting thread, it will be woken up, the `wait`
6867
# method will return false and the barrier will be broken
@@ -83,7 +82,7 @@ def broken?
8382
synchronize { @generation.status != :waiting }
8483
end
8584

86-
private
85+
protected
8786

8887
def ns_generation_done(generation, status, continue = true)
8988
generation.status = status
@@ -101,6 +100,5 @@ def ns_initialize(parties, &block)
101100
@action = block
102101
ns_next_generation
103102
end
104-
105103
end
106104
end

lib/concurrent/atomic/event.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ def wait(timeout = nil)
7070
end
7171
end
7272

73-
private
73+
protected
7474

7575
def ns_set
7676
unless @set

lib/concurrent/dataflow.rb

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
require 'concurrent/future'
22
require 'concurrent/atomic/atomic_fixnum'
3-
require 'concurrent/executor/per_thread_executor'
43

54
module Concurrent
65

lib/concurrent/delay.rb

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
require 'thread'
22
require 'concurrent/configuration'
33
require 'concurrent/obligation'
4+
require 'concurrent/executor/executor'
45
require 'concurrent/executor/immediate_executor'
56
require 'concurrent/synchronization'
67

@@ -68,19 +69,9 @@ class Delay < Synchronization::Object
6869
# @raise [ArgumentError] if no block is given
6970
def initialize(opts = {}, &block)
7071
raise ArgumentError.new('no block given') unless block_given?
71-
72-
super()
73-
init_obligation(self)
74-
set_deref_options(opts)
75-
@task_executor = Executor.executor_from_options(opts)
76-
77-
@task = block
78-
@state = :pending
79-
@computing = false
72+
super(opts, &block)
8073
end
8174

82-
protected :synchronize
83-
8475
# Return the value this object represents after applying the options
8576
# specified by the `#set_deref_options` method. If the delayed operation
8677
# raised an exception this method will return nil. The execption object
@@ -169,6 +160,18 @@ def reconfigure(&block)
169160
end
170161
end
171162

163+
protected
164+
165+
def ns_initialize(opts, &block)
166+
init_obligation(self)
167+
set_deref_options(opts)
168+
@task_executor = Executor.executor_from_options(opts)
169+
170+
@task = block
171+
@state = :pending
172+
@computing = false
173+
end
174+
172175
private
173176

174177
# @!visibility private

0 commit comments

Comments
 (0)