Skip to content

Commit b1fd807

Browse files
committed
Replaced Executor with ExecutorService.
1 parent 880290a commit b1fd807

26 files changed

+161
-151
lines changed

examples/benchmark_thread_pool_implementations.rb

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,16 +8,21 @@
88
COUNT = 100_000
99

1010
EXECUTORS = [
11-
Concurrent::JavaThreadPoolExecutor
11+
[Concurrent::JavaCachedThreadPool],
12+
[Concurrent::JavaFixedThreadPool, 10],
13+
[Concurrent::JavaSingleThreadExecutor],
14+
[Concurrent::JavaThreadPoolExecutor]
1215
]
1316

14-
def test_executor(executor_class, count)
15-
executor = executor_class.new
16-
count.times { executor.post{} }
17-
end
18-
1917
Benchmark.bmbm do |x|
20-
EXECUTORS.each do |executor_class|
21-
x.report(executor_class.to_s) { test_executor(executor_class, COUNT) }
18+
EXECUTORS.each do |executor_class, *args|
19+
x.report(executor_class.to_s) do
20+
if args.empty?
21+
executor = executor_class.new
22+
else
23+
executor = executor_class.new(*args)
24+
end
25+
COUNT.times { executor.post{} }
26+
end
2227
end
2328
end

lib/concurrent/actor/core.rb

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
require 'concurrent/executors'
2+
13
module Concurrent
24
module Actor
35

@@ -158,7 +160,7 @@ def ns_initialize(opts, &block)
158160
@context_class = Child! opts.fetch(:class), AbstractContext
159161
allocate_context
160162

161-
@executor = Type! opts.fetch(:executor, Concurrent.global_io_executor), Executor
163+
@executor = Type! opts.fetch(:executor, Concurrent.global_io_executor), Concurrent::AbstractExecutorService
162164
raise ArgumentError, 'ImmediateExecutor is not supported' if @executor.is_a? ImmediateExecutor
163165

164166
@reference = (Child! opts[:reference_class] || @context.default_reference_class, Reference).new self

lib/concurrent/agent.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
require 'thread'
2-
32
require 'concurrent/dereferenceable'
43
require 'concurrent/observable'
54
require 'concurrent/logging'
5+
require 'concurrent/executor/executor'
66

77
module Concurrent
88

lib/concurrent/dataflow.rb

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
require 'concurrent/future'
22
require 'concurrent/atomic/atomic_fixnum'
3-
require 'concurrent/executor/per_thread_executor'
43

54
module Concurrent
65

lib/concurrent/delay.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
require 'thread'
22
require 'concurrent/configuration'
33
require 'concurrent/obligation'
4+
require 'concurrent/executor/executor'
45
require 'concurrent/executor/immediate_executor'
56
require 'concurrent/synchronization'
67

lib/concurrent/executor/executor.rb

Lines changed: 2 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,8 @@
1-
require 'concurrent/errors'
2-
require 'concurrent/logging'
3-
require 'concurrent/at_exit'
4-
require 'concurrent/atomic/event'
1+
require 'concurrent/executor/executor_service'
52

63
module Concurrent
74

85
module Executor
9-
include Logging
106

117
# Get the requested `Executor` based on the values set in the options hash.
128
#
@@ -58,91 +54,11 @@ def self.executor(executor_identifier)
5854
when :task
5955
Kernel.warn '[DEPRECATED] use `executor: :io` instead'
6056
Concurrent.global_io_executor
61-
when Executor
57+
when Concurrent::ExecutorService
6258
executor_identifier
6359
else
6460
raise ArgumentError, "executor not recognized by '#{executor_identifier}'"
6561
end
6662
end
67-
68-
# @!macro [attach] executor_method_post
69-
#
70-
# Submit a task to the executor for asynchronous processing.
71-
#
72-
# @param [Array] args zero or more arguments to be passed to the task
73-
#
74-
# @yield the asynchronous task to perform
75-
#
76-
# @return [Boolean] `true` if the task is queued, `false` if the executor
77-
# is not running
78-
#
79-
# @raise [ArgumentError] if no task is given
80-
def post(*args, &task)
81-
raise NotImplementedError
82-
end
83-
84-
# @!macro [attach] executor_method_left_shift
85-
#
86-
# Submit a task to the executor for asynchronous processing.
87-
#
88-
# @param [Proc] task the asynchronous task to perform
89-
#
90-
# @return [self] returns itself
91-
def <<(task)
92-
post(&task)
93-
self
94-
end
95-
96-
# @!macro [attach] executor_module_method_can_overflow_question
97-
#
98-
# Does the task queue have a maximum size?
99-
#
100-
# @return [Boolean] True if the task queue has a maximum size else false.
101-
#
102-
# @note Always returns `false`
103-
def can_overflow?
104-
false
105-
end
106-
107-
# @!macro [attach] executor_module_method_serialized_question
108-
#
109-
# Does this executor guarantee serialization of its operations?
110-
#
111-
# @return [Boolean] True if the executor guarantees that all operations
112-
# will be post in the order they are received and no two operations may
113-
# occur simultaneously. Else false.
114-
#
115-
# @note Always returns `false`
116-
def serialized?
117-
false
118-
end
119-
end
120-
121-
# Indicates that the including `Executor` or `ExecutorService` guarantees
122-
# that all operations will occur in the order they are post and that no
123-
# two operations may occur simultaneously. This module provides no
124-
# functionality and provides no guarantees. That is the responsibility
125-
# of the including class. This module exists solely to allow the including
126-
# object to be interrogated for its serialization status.
127-
#
128-
# @example
129-
# class Foo
130-
# include Concurrent::SerialExecutor
131-
# end
132-
#
133-
# foo = Foo.new
134-
#
135-
# foo.is_a? Concurrent::Executor #=> true
136-
# foo.is_a? Concurrent::SerialExecutor #=> true
137-
# foo.serialized? #=> true
138-
module SerialExecutor
139-
include Executor
140-
141-
# @!macro executor_module_method_serialized_question
142-
#
143-
# @note Always returns `true`
144-
def serialized?
145-
true
146-
end
14763
end
14864
end

lib/concurrent/executor/executor_service.rb

Lines changed: 98 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,47 +1,105 @@
1+
require 'concurrent/errors'
12
require 'concurrent/logging'
3+
require 'concurrent/at_exit'
4+
require 'concurrent/atomic/event'
25
require 'concurrent/synchronization'
3-
require 'concurrent/executor/executor'
46

57
module Concurrent
68

7-
class AbstractExecutorService < Synchronization::Object
8-
include Executor
9+
module ExecutorService
910
include Logging
1011

11-
# The set of possible fallback policies that may be set at thread pool creation.
12-
FALLBACK_POLICIES = [:abort, :discard, :caller_runs].freeze
13-
14-
attr_reader :fallback_policy
15-
16-
def initialize(*args, &block)
17-
super
12+
# @!macro [attach] executor_service_method_post
13+
#
14+
# Submit a task to the executor for asynchronous processing.
15+
#
16+
# @param [Array] args zero or more arguments to be passed to the task
17+
#
18+
# @yield the asynchronous task to perform
19+
#
20+
# @return [Boolean] `true` if the task is queued, `false` if the executor
21+
# is not running
22+
#
23+
# @raise [ArgumentError] if no task is given
24+
def post(*args, &task)
25+
raise NotImplementedError
1826
end
1927

20-
# @!macro [attach] executor_service_method_running_question
28+
# @!macro [attach] executor_service_method_left_shift
2129
#
22-
# Is the executor running?
30+
# Submit a task to the executor for asynchronous processing.
2331
#
24-
# @return [Boolean] `true` when running, `false` when shutting down or shutdown
25-
def running?
26-
raise NotImplementedError
32+
# @param [Proc] task the asynchronous task to perform
33+
#
34+
# @return [self] returns itself
35+
def <<(task)
36+
post(&task)
37+
self
2738
end
2839

29-
# @!macro [attach] executor_service_method_shuttingdown_question
40+
# @!macro [attach] executor_service_method_can_overflow_question
3041
#
31-
# Is the executor shuttingdown?
42+
# Does the task queue have a maximum size?
3243
#
33-
# @return [Boolean] `true` when not running and not shutdown, else `false`
34-
def shuttingdown?
35-
raise NotImplementedError
44+
# @return [Boolean] True if the task queue has a maximum size else false.
45+
#
46+
# @note Always returns `false`
47+
def can_overflow?
48+
false
3649
end
3750

38-
# @!macro [attach] executor_service_method_shutdown_question
51+
# @!macro [attach] executor_service_method_serialized_question
3952
#
40-
# Is the executor shutdown?
53+
# Does this executor guarantee serialization of its operations?
4154
#
42-
# @return [Boolean] `true` when shutdown, `false` when shutting down or running
43-
def shutdown?
44-
raise NotImplementedError
55+
# @return [Boolean] True if the executor guarantees that all operations
56+
# will be post in the order they are received and no two operations may
57+
# occur simultaneously. Else false.
58+
#
59+
# @note Always returns `false`
60+
def serialized?
61+
false
62+
end
63+
end
64+
65+
# Indicates that the including `ExecutorService` guarantees
66+
# that all operations will occur in the order they are post and that no
67+
# two operations may occur simultaneously. This module provides no
68+
# functionality and provides no guarantees. That is the responsibility
69+
# of the including class. This module exists solely to allow the including
70+
# object to be interrogated for its serialization status.
71+
#
72+
# @example
73+
# class Foo
74+
# include Concurrent::SerialExecutor
75+
# end
76+
#
77+
# foo = Foo.new
78+
#
79+
# foo.is_a? Concurrent::ExecutorService #=> true
80+
# foo.is_a? Concurrent::SerialExecutor #=> true
81+
# foo.serialized? #=> true
82+
module SerialExecutorService
83+
include ExecutorService
84+
85+
# @!macro executor_module_method_serialized_question
86+
#
87+
# @note Always returns `true`
88+
def serialized?
89+
true
90+
end
91+
end
92+
93+
class AbstractExecutorService < Synchronization::Object
94+
include ExecutorService
95+
96+
# The set of possible fallback policies that may be set at thread pool creation.
97+
FALLBACK_POLICIES = [:abort, :discard, :caller_runs].freeze
98+
99+
attr_reader :fallback_policy
100+
101+
def initialize(*args, &block)
102+
super
45103
end
46104

47105
# @!macro [attach] executor_service_method_shutdown
@@ -78,14 +136,29 @@ def wait_for_termination(timeout = nil)
78136
raise NotImplementedError
79137
end
80138

139+
# @!macro [attach] executor_service_method_running_question
140+
#
141+
# Is the executor running?
142+
#
143+
# @return [Boolean] `true` when running, `false` when shutting down or shutdown
81144
def running?
82145
synchronize { ns_running? }
83146
end
84147

148+
# @!macro [attach] executor_service_method_shuttingdown_question
149+
#
150+
# Is the executor shuttingdown?
151+
#
152+
# @return [Boolean] `true` when not running and not shutdown, else `false`
85153
def shuttingdown?
86154
synchronize { ns_shuttingdown? }
87155
end
88156

157+
# @!macro [attach] executor_service_method_shutdown_question
158+
#
159+
# Is the executor shutdown?
160+
#
161+
# @return [Boolean] `true` when shutdown, `false` when shutting down or running
89162
def shutdown?
90163
synchronize { ns_shutdown? }
91164
end
@@ -238,7 +311,6 @@ def ns_shutdown?
238311
if Concurrent.on_jruby?
239312

240313
class JavaExecutorService < AbstractExecutorService
241-
include Executor
242314
java_import 'java.lang.Runnable'
243315

244316
FALLBACK_POLICY_CLASSES = {

lib/concurrent/executor/immediate_executor.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
require 'concurrent/atomic/event'
2-
require 'concurrent/executor/executor'
2+
require 'concurrent/executor/executor_service'
33

44
module Concurrent
55

@@ -14,7 +14,7 @@ module Concurrent
1414
#
1515
# @note Intended for use primarily in testing and debugging.
1616
class ImmediateExecutor
17-
include SerialExecutor
17+
include SerialExecutorService
1818

1919
# Creates a new executor
2020
def initialize

lib/concurrent/executor/indirect_immediate_executor.rb

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
require 'concurrent/executor/executor'
1+
require 'concurrent/executor/immediate_executor'
2+
require 'concurrent/executor/simple_executor_service'
23

34
module Concurrent
45
# An executor service which runs all operations on a new thread, blocking
@@ -19,7 +20,7 @@ class IndirectImmediateExecutor < ImmediateExecutor
1920
# Creates a new executor
2021
def initialize
2122
super
22-
@internal_executor = PerThreadExecutor.new
23+
@internal_executor = SimpleExecutorService.new
2324
end
2425

2526
# @!macro executor_method_post

lib/concurrent/executor/java_single_thread_executor.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ module Concurrent
66
# @!macro single_thread_executor
77
# @!macro thread_pool_options
88
class JavaSingleThreadExecutor < JavaExecutorService
9-
include SerialExecutor
9+
include SerialExecutorService
1010

1111
# Create a new thread pool.
1212
#

0 commit comments

Comments
 (0)