Skip to content

Commit 9d5a757

Browse files
committed
wip: RubySingleThreadExecutor now gets run/stop/kill from Executor mixin.
1 parent 1fd07c2 commit 9d5a757

File tree

2 files changed

+34
-43
lines changed

2 files changed

+34
-43
lines changed

lib/concurrent/executor/executor.rb

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
require 'concurrent/atomic/event'
2+
13
module Concurrent
24

35
# An exception class raised when the maximum queue size is reached and the
@@ -6,12 +8,18 @@ module Concurrent
68

79
module Executor
810

11+
# Is the thread pool running?
12+
#
13+
# @return [Boolean] `true` when running, `false` when shutting down or shutdown
914
def running?
10-
true
15+
! @stop_event.set?
1116
end
1217

18+
# Is the thread pool shutdown?
19+
#
20+
# @return [Boolean] `true` when shutdown, `false` when shutting down or running
1321
def shutdown?
14-
false
22+
@stop_event.set?
1523
end
1624

1725
def shutdown
@@ -20,7 +28,17 @@ def shutdown
2028
def kill
2129
end
2230

31+
# Block until thread pool shutdown is complete or until `timeout` seconds have
32+
# passed.
33+
#
34+
# @note Does not initiate shutdown or termination. Either `shutdown` or `kill`
35+
# must be called before this method (or on another thread).
36+
#
37+
# @param [Integer] timeout the maximum number of seconds to wait for shutdown to complete
38+
#
39+
# @return [Boolean] `true` if shutdown complete or false on `timeout`
2340
def wait_for_termination(timeout)
41+
@stopped_event.wait(timeout.to_i)
2442
end
2543

2644
def post(*args, &task)
@@ -38,7 +56,12 @@ def <<(task)
3856

3957
protected
4058

59+
attr_reader :mutex, :stop_event, :stopped_event
60+
4161
def init_executor
62+
@mutex = Mutex.new
63+
@stop_event = Event.new
64+
@stopped_event = Event.new
4265
end
4366

4467
def execute(*args, &task)
Lines changed: 9 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,4 @@
1-
require 'thread'
2-
31
require_relative 'executor'
4-
require 'concurrent/atomic/event'
52

63
module Concurrent
74

@@ -15,38 +12,9 @@ class RubySingleThreadExecutor
1512
# @see http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executors.html
1613
# @see http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html
1714
def initialize(opts = {})
18-
@mutex = Mutex.new
1915
@queue = Queue.new
2016
@thread = nil
21-
@stop = Event.new
22-
@stopped = Event.new
23-
end
24-
25-
# Is the thread pool running?
26-
#
27-
# @return [Boolean] `true` when running, `false` when shutting down or shutdown
28-
def running?
29-
! @stop.set?
30-
end
31-
32-
# Is the thread pool shutdown?
33-
#
34-
# @return [Boolean] `true` when shutdown, `false` when shutting down or running
35-
def shutdown?
36-
@stopped.set?
37-
end
38-
39-
# Block until thread pool shutdown is complete or until `timeout` seconds have
40-
# passed.
41-
#
42-
# @note Does not initiate shutdown or termination. Either `shutdown` or `kill`
43-
# must be called before this method (or on another thread).
44-
#
45-
# @param [Integer] timeout the maximum number of seconds to wait for shutdown to complete
46-
#
47-
# @return [Boolean] `true` if shutdown complete or false on `timeout`
48-
def wait_for_termination(timeout)
49-
@stopped.wait(timeout.to_i)
17+
init_executor
5018
end
5119

5220
# Submit a task to the thread pool for asynchronous processing.
@@ -61,7 +29,7 @@ def wait_for_termination(timeout)
6129
# @raise [ArgumentError] if no task is given
6230
def post(*args, &task)
6331
raise ArgumentError.new('no block given') unless block_given?
64-
@mutex.synchronize do
32+
mutex.synchronize do
6533
break false unless running?
6634
supervise
6735
@queue << [args, task]
@@ -73,11 +41,11 @@ def post(*args, &task)
7341
# but no new tasks will be accepted. Has no additional effect if the
7442
# thread pool is not running.
7543
def shutdown
76-
@mutex.synchronize do
44+
mutex.synchronize do
7745
return unless running?
78-
@stop.set
46+
stop_event.set
7947
@queue << :stop
80-
@stopped.set unless alive?
48+
stopped_event.set unless alive?
8149
end
8250
end
8351

@@ -86,12 +54,12 @@ def shutdown
8654
# will be accepted. Has no additional effect if the thread pool is
8755
# not running.
8856
def kill
89-
@mutex.synchronize do
57+
mutex.synchronize do
9058
return if shutdown?
91-
@stop.set
59+
stop_event.set
9260
@queue.clear
9361
@thread.kill if alive?
94-
@stopped.set
62+
stopped_event.set unless alive?
9563
end
9664
end
9765

@@ -122,7 +90,7 @@ def work
12290
# let it fail
12391
end
12492
end
125-
@stopped.set
93+
stopped_event.set
12694
end
12795
end
12896
end

0 commit comments

Comments
 (0)