Skip to content

Commit b165ff9

Browse files
committed
Moved executor classes into own files.
1 parent ce6884c commit b165ff9

14 files changed

+422
-396
lines changed
Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
require 'concurrent/errors'
2+
require 'concurrent/executor/executor_service'
3+
require 'concurrent/synchronization/object'
4+
require 'concurrent/utility/at_exit'
5+
6+
module Concurrent
7+
8+
# @!macro abstract_executor_service_public_api
9+
# @!visibility private
10+
class AbstractExecutorService < Synchronization::Object
11+
include ExecutorService
12+
13+
# The set of possible fallback policies that may be set at thread pool creation.
14+
FALLBACK_POLICIES = [:abort, :discard, :caller_runs].freeze
15+
16+
# @!macro executor_service_attr_reader_fallback_policy
17+
attr_reader :fallback_policy
18+
19+
# Create a new thread pool.
20+
def initialize(*args, &block)
21+
super(&nil)
22+
synchronize { ns_initialize(*args, &block) }
23+
end
24+
25+
# @!macro executor_service_method_shutdown
26+
def shutdown
27+
raise NotImplementedError
28+
end
29+
30+
# @!macro executor_service_method_kill
31+
def kill
32+
raise NotImplementedError
33+
end
34+
35+
# @!macro executor_service_method_wait_for_termination
36+
def wait_for_termination(timeout = nil)
37+
raise NotImplementedError
38+
end
39+
40+
# @!macro executor_service_method_running_question
41+
def running?
42+
synchronize { ns_running? }
43+
end
44+
45+
# @!macro executor_service_method_shuttingdown_question
46+
def shuttingdown?
47+
synchronize { ns_shuttingdown? }
48+
end
49+
50+
# @!macro executor_service_method_shutdown_question
51+
def shutdown?
52+
synchronize { ns_shutdown? }
53+
end
54+
55+
# @!macro executor_service_method_auto_terminate_question
56+
def auto_terminate?
57+
synchronize { ns_auto_terminate? }
58+
end
59+
60+
# @!macro executor_service_method_auto_terminate_setter
61+
def auto_terminate=(value)
62+
synchronize { self.ns_auto_terminate = value }
63+
end
64+
65+
protected
66+
67+
# Handler which executes the `fallback_policy` once the queue size
68+
# reaches `max_queue`.
69+
#
70+
# @param [Array] args the arguments to the task which is being handled.
71+
#
72+
# @!visibility private
73+
def handle_fallback(*args)
74+
case fallback_policy
75+
when :abort
76+
raise RejectedExecutionError
77+
when :discard
78+
false
79+
when :caller_runs
80+
begin
81+
yield(*args)
82+
rescue => ex
83+
# let it fail
84+
log DEBUG, ex
85+
end
86+
true
87+
else
88+
fail "Unknown fallback policy #{fallback_policy}"
89+
end
90+
end
91+
92+
def execute(*args, &task)
93+
raise NotImplementedError
94+
end
95+
96+
# @!macro [attach] executor_service_method_shutdown_execution
97+
#
98+
# Callback method called when an orderly shutdown has completed.
99+
# The default behavior is to signal all waiting threads.
100+
def shutdown_execution
101+
# do nothing
102+
end
103+
104+
# @!macro [attach] executor_service_method_kill_execution
105+
#
106+
# Callback method called when the executor has been killed.
107+
# The default behavior is to do nothing.
108+
def kill_execution
109+
# do nothing
110+
end
111+
112+
protected
113+
114+
def ns_auto_terminate?
115+
!!@auto_terminate
116+
end
117+
118+
def ns_auto_terminate=(value)
119+
case value
120+
when true
121+
AtExit.add(self) { terminate_at_exit }
122+
@auto_terminate = true
123+
when false
124+
AtExit.delete(self)
125+
@auto_terminate = false
126+
else
127+
raise ArgumentError
128+
end
129+
end
130+
131+
def terminate_at_exit
132+
kill # TODO be gentle first
133+
wait_for_termination(10)
134+
end
135+
end
136+
end

0 commit comments

Comments
 (0)