Skip to content

Commit 5cf5a60

Browse files
committed
Agent now uses executor option instead of UsesGlobalThreadPool.
1 parent 2bb9c05 commit 5cf5a60

File tree

2 files changed

+86
-79
lines changed

2 files changed

+86
-79
lines changed

lib/concurrent/agent.rb

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
require 'thread'
22
require 'observer'
33

4+
require 'concurrent/configuration'
45
require 'concurrent/dereferenceable'
5-
require 'concurrent/uses_global_thread_pool'
66
require 'concurrent/utilities'
77

88
module Concurrent
@@ -34,7 +34,7 @@ module Concurrent
3434
# @return [Fixnum] the maximum number of seconds before an update is cancelled
3535
class Agent
3636
include Dereferenceable
37-
include UsesGlobalThreadPool
37+
include OptionsParser
3838

3939
# The default timeout value (in seconds); used when no timeout option
4040
# is given at initialization
@@ -47,6 +47,11 @@ class Agent
4747
# @param [Object] initial the initial value
4848
# @param [Hash] opts the options used to define the behavior at update and deref
4949
# @option opts [Fixnum] :timeout (TIMEOUT) maximum number of seconds before an update is cancelled
50+
# @option opts [Boolean] :operation (false) when +true+ will execute the future on the global
51+
# operation pool (for long-running operations), when +false+ will execute the future on the
52+
# global task pool (for short-running tasks)
53+
# @option opts [object] :executor when provided will run all operations on
54+
# this executor rather than the global thread pool (overrides :operation)
5055
# @option opts [String] :dup_on_deref (false) call +#dup+ before returning the data
5156
# @option opts [String] :freeze_on_deref (false) call +#freeze+ before returning the data
5257
# @option opts [String] :copy_on_deref (nil) call the given +Proc+ passing the internal value and
@@ -57,6 +62,7 @@ def initialize(initial, opts = {})
5762
@validator = Proc.new { |result| true }
5863
@timeout = opts.fetch(:timeout, TIMEOUT).freeze
5964
@observers = CopyOnWriteObserverSet.new
65+
@executor = get_executor_from_options(opts)
6066
init_mutex
6167
set_deref_options(opts)
6268
end
@@ -116,7 +122,7 @@ def validate(&block)
116122
# @yieldparam [Object] value the current value
117123
# @yieldreturn [Object] the new value
118124
def post(&block)
119-
Agent.thread_pool.post{ work(&block) } unless block.nil?
125+
@executor.post{ work(&block) } unless block.nil?
120126
end
121127

122128
# Update the current value with the result of the given block operation

spec/concurrent/agent_spec.rb

Lines changed: 77 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
require 'spec_helper'
22
require_relative 'dereferenceable_shared'
3-
require_relative 'uses_global_thread_pool_shared'
43

54
module Concurrent
65

76
describe Agent do
87

9-
subject { Agent.new(0) }
8+
let(:executor) { PerThreadExecutor.new }
9+
10+
subject { Agent.new(0, executor: executor) }
1011

1112
let(:observer) do
1213
Class.new do
@@ -17,25 +18,17 @@ module Concurrent
1718
end.new
1819
end
1920

20-
before(:each) do
21-
Agent.thread_pool = PerThreadExecutor.new
22-
end
23-
2421
context 'behavior' do
2522

26-
# uses_global_thread_pool
27-
28-
let!(:thread_pool_user) { Agent }
29-
30-
it_should_behave_like Concurrent::UsesGlobalThreadPool
31-
3223
# dereferenceable
3324

3425
def dereferenceable_subject(value, opts = {})
26+
opts = opts.merge(executor: executor)
3527
Agent.new(value, opts)
3628
end
3729

3830
def dereferenceable_observable(opts = {})
31+
opts = opts.merge(executor: executor)
3932
Agent.new(0, opts)
4033
end
4134

@@ -60,6 +53,14 @@ def execute_dereferenceable(subject)
6053
it 'sets the timeout to the default when nil' do
6154
Agent.new(0).timeout.should eq Agent::TIMEOUT
6255
end
56+
57+
it 'uses the executor given with the :executor option'
58+
59+
it 'uses the global operation pool when :operation is true'
60+
61+
it 'uses the global task pool when :task is true'
62+
63+
it 'uses the global task pool by default'
6364
end
6465

6566
context '#rescue' do
@@ -116,15 +117,15 @@ def execute_dereferenceable(subject)
116117
context '#post' do
117118

118119
it 'adds the given block to the queue' do
119-
Agent.thread_pool.should_receive(:post).with(no_args).exactly(3).times
120+
executor.should_receive(:post).with(no_args).exactly(3).times
120121
subject.post { sleep(100) }
121122
subject.post { nil }
122123
subject.post { nil }
123124
sleep(0.1)
124125
end
125126

126127
it 'does not add to the queue when no block is given' do
127-
Agent.thread_pool.should_receive(:post).with(no_args).exactly(2).times
128+
executor.should_receive(:post).with(no_args).exactly(2).times
128129
subject.post { sleep(100) }
129130
subject.post
130131
subject.post { nil }
@@ -145,7 +146,7 @@ def execute_dereferenceable(subject)
145146

146147
it 'passes the current value to the handler' do
147148
@expected = nil
148-
Agent.new(10).post { |i| @expected = i }
149+
Agent.new(10, executor: executor).post { |i| @expected = i }
149150
sleep(0.1)
150151
@expected.should eq 10
151152
end
@@ -157,7 +158,7 @@ def execute_dereferenceable(subject)
157158
end
158159

159160
it 'rejects the handler after timeout reached' do
160-
agent = Agent.new(0, timeout: 0.1)
161+
agent = Agent.new(0, timeout: 0.1, executor: executor)
161162
agent.post { sleep(1); 10 }
162163
agent.value.should eq 0
163164
end
@@ -182,21 +183,21 @@ def execute_dereferenceable(subject)
182183
end
183184

184185
it 'sets the new value when the validator returns true' do
185-
agent = Agent.new(0).validate { true }
186+
agent = Agent.new(0, executor: executor).validate { true }
186187
agent.post { 10 }
187188
sleep(0.1)
188189
agent.value.should eq 10
189190
end
190191

191192
it 'does not change the value when the validator returns false' do
192-
agent = Agent.new(0).validate { false }
193+
agent = Agent.new(0, executor: executor).validate { false }
193194
agent.post { 10 }
194195
sleep(0.1)
195196
agent.value.should eq 0
196197
end
197198

198199
it 'does not change the value when the validator raises an exception' do
199-
agent = Agent.new(0).validate { raise StandardError }
200+
agent = Agent.new(0, executor: executor).validate { raise StandardError }
200201
agent.post { 10 }
201202
sleep(0.1)
202203
agent.value.should eq 0
@@ -208,85 +209,85 @@ def execute_dereferenceable(subject)
208209
it 'calls the first exception block with a matching class' do
209210
@expected = nil
210211
subject.
211-
rescue(StandardError) { |ex| @expected = 1 }.
212-
rescue(StandardError) { |ex| @expected = 2 }.
213-
rescue(StandardError) { |ex| @expected = 3 }
214-
subject.post { raise StandardError }
215-
sleep(0.1)
216-
@expected.should eq 1
217-
end
212+
rescue(StandardError) { |ex| @expected = 1 }.
213+
rescue(StandardError) { |ex| @expected = 2 }.
214+
rescue(StandardError) { |ex| @expected = 3 }
215+
subject.post { raise StandardError }
216+
sleep(0.1)
217+
@expected.should eq 1
218+
end
218219

219220
it 'matches all with a rescue with no class given' do
220221
@expected = nil
221222
subject.
222-
rescue(LoadError) { |ex| @expected = 1 }.
223-
rescue { |ex| @expected = 2 }.
224-
rescue(StandardError) { |ex| @expected = 3 }
225-
subject.post { raise NoMethodError }
226-
sleep(0.1)
227-
@expected.should eq 2
228-
end
223+
rescue(LoadError) { |ex| @expected = 1 }.
224+
rescue { |ex| @expected = 2 }.
225+
rescue(StandardError) { |ex| @expected = 3 }
226+
subject.post { raise NoMethodError }
227+
sleep(0.1)
228+
@expected.should eq 2
229+
end
229230

230231
it 'searches associated rescue handlers in order' do
231232
@expected = nil
232233
subject.
233-
rescue(ArgumentError) { |ex| @expected = 1 }.
234-
rescue(LoadError) { |ex| @expected = 2 }.
235-
rescue(StandardError) { |ex| @expected = 3 }
236-
subject.post { raise ArgumentError }
237-
sleep(0.1)
238-
@expected.should eq 1
234+
rescue(ArgumentError) { |ex| @expected = 1 }.
235+
rescue(LoadError) { |ex| @expected = 2 }.
236+
rescue(StandardError) { |ex| @expected = 3 }
237+
subject.post { raise ArgumentError }
238+
sleep(0.1)
239+
@expected.should eq 1
239240

240-
@expected = nil
241-
subject.
242-
rescue(ArgumentError) { |ex| @expected = 1 }.
241+
@expected = nil
242+
subject.
243+
rescue(ArgumentError) { |ex| @expected = 1 }.
243244
rescue(LoadError) { |ex| @expected = 2 }.
244245
rescue(StandardError) { |ex| @expected = 3 }
245-
subject.post { raise LoadError }
246-
sleep(0.1)
247-
@expected.should eq 2
246+
subject.post { raise LoadError }
247+
sleep(0.1)
248+
@expected.should eq 2
248249

249-
@expected = nil
250-
subject.
250+
@expected = nil
251+
subject.
251252
rescue(ArgumentError) { |ex| @expected = 1 }.
252-
rescue(LoadError) { |ex| @expected = 2 }.
253-
rescue(StandardError) { |ex| @expected = 3 }
254-
subject.post { raise StandardError }
255-
sleep(0.1)
256-
@expected.should eq 3
257-
end
253+
rescue(LoadError) { |ex| @expected = 2 }.
254+
rescue(StandardError) { |ex| @expected = 3 }
255+
subject.post { raise StandardError }
256+
sleep(0.1)
257+
@expected.should eq 3
258+
end
258259

259260
it 'passes the exception object to the matched block' do
260261
@expected = nil
261262
subject.
262-
rescue(ArgumentError) { |ex| @expected = ex }.
263-
rescue(LoadError) { |ex| @expected = ex }.
264-
rescue(StandardError) { |ex| @expected = ex }
265-
subject.post { raise StandardError }
266-
sleep(0.1)
267-
@expected.should be_a(StandardError)
268-
end
263+
rescue(ArgumentError) { |ex| @expected = ex }.
264+
rescue(LoadError) { |ex| @expected = ex }.
265+
rescue(StandardError) { |ex| @expected = ex }
266+
subject.post { raise StandardError }
267+
sleep(0.1)
268+
@expected.should be_a(StandardError)
269+
end
269270

270271
it 'ignores rescuers without a block' do
271272
@expected = nil
272273
subject.
273-
rescue(StandardError).
274-
rescue(StandardError) { |ex| @expected = ex }
275-
subject.post { raise StandardError }
276-
sleep(0.1)
277-
@expected.should be_a(StandardError)
278-
end
274+
rescue(StandardError).
275+
rescue(StandardError) { |ex| @expected = ex }
276+
subject.post { raise StandardError }
277+
sleep(0.1)
278+
@expected.should be_a(StandardError)
279+
end
279280

280281
it 'supresses the exception if no rescue matches' do
281282
lambda {
282283
subject.
283-
rescue(ArgumentError) { |ex| @expected = ex }.
284-
rescue(NotImplementedError) { |ex| @expected = ex }.
285-
rescue(NoMethodError) { |ex| @expected = ex }
284+
rescue(ArgumentError) { |ex| @expected = ex }.
285+
rescue(NotImplementedError) { |ex| @expected = ex }.
286+
rescue(NoMethodError) { |ex| @expected = ex }
286287
subject.post { raise StandardError }
287288
sleep(0.1)
288289
}.should_not raise_error
289-
end
290+
end
290291

291292
it 'suppresses exceptions thrown from rescue handlers' do
292293
lambda {
@@ -300,15 +301,15 @@ def execute_dereferenceable(subject)
300301
context 'observation' do
301302

302303
it 'notifies all observers when the value changes' do
303-
agent = Agent.new(0)
304+
agent = Agent.new(0, executor: executor)
304305
agent.add_observer(observer)
305306
agent.post { 10 }
306307
sleep(0.1)
307308
observer.value.should eq 10
308309
end
309310

310311
it 'does not notify removed observers when the value changes' do
311-
agent = Agent.new(0)
312+
agent = Agent.new(0, executor: executor)
312313
agent.add_observer(observer)
313314
agent.delete_observer(observer)
314315
agent.post { 10 }
@@ -317,7 +318,7 @@ def execute_dereferenceable(subject)
317318
end
318319

319320
it 'does not notify observers when validation fails' do
320-
agent = Agent.new(0)
321+
agent = Agent.new(0, executor: executor)
321322
agent.validate { false }
322323
agent.add_observer(observer)
323324
agent.post { 10 }
@@ -326,7 +327,7 @@ def execute_dereferenceable(subject)
326327
end
327328

328329
it 'does not notify observers when the handler raises an exception' do
329-
agent = Agent.new(0)
330+
agent = Agent.new(0, executor: executor)
330331
agent.add_observer(observer)
331332
agent.post { raise StandardError }
332333
sleep(0.1)
@@ -337,7 +338,7 @@ def execute_dereferenceable(subject)
337338
context 'aliases' do
338339

339340
it 'aliases #deref for #value' do
340-
Agent.new(10).deref.should eq 10
341+
Agent.new(10, executor: executor).deref.should eq 10
341342
end
342343

343344
it 'aliases #validates for :validate' do
@@ -381,7 +382,7 @@ def execute_dereferenceable(subject)
381382
end
382383

383384
it 'aliases #add_watch for #add_observer' do
384-
agent = Agent.new(0)
385+
agent = Agent.new(0, executor: executor)
385386
agent.add_watch(observer)
386387
agent.post { 10 }
387388
sleep(0.1)

0 commit comments

Comments
 (0)