Skip to content

Commit 8e5ede4

Browse files
committed
Merged PR #69: Do not create another thread for each post call to Agent
2 parents f51eddb + 54441b1 commit 8e5ede4

File tree

2 files changed

+33
-35
lines changed

2 files changed

+33
-35
lines changed

lib/concurrent/agent.rb

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -38,19 +38,13 @@ class Agent
3838
include Concurrent::Observable
3939
include Logging
4040

41-
# The default timeout value (in seconds); used when no timeout option
42-
# is given at initialization
43-
TIMEOUT = 5
44-
4541
attr_reader :timeout, :task_executor, :operation_executor
4642

4743
# Initialize a new Agent with the given initial value and provided options.
4844
#
4945
# @param [Object] initial the initial value
5046
# @param [Hash] opts the options used to define the behavior at update and deref
5147
#
52-
# @option opts [Fixnum] :timeout (TIMEOUT) maximum number of seconds before an update is cancelled
53-
#
5448
# @option opts [Boolean] :operation (false) when `true` will execute the future on the global
5549
# operation pool (for long-running operations), when `false` will execute the future on the
5650
# global task pool (for short-running tasks)
@@ -65,7 +59,6 @@ def initialize(initial, opts = {})
6559
@value = initial
6660
@rescuers = []
6761
@validator = Proc.new { |result| true }
68-
@timeout = opts.fetch(:timeout, TIMEOUT).freeze
6962
self.observers = CopyOnWriteObserverSet.new
7063
@serialized_execution = SerializedExecution.new
7164
@task_executor = OptionsParser.get_task_executor_from(opts)
@@ -145,12 +138,19 @@ def post(&block)
145138
# Update the current value with the result of the given block operation,
146139
# block can do blocking calls
147140
#
141+
# @param [Fixnum, nil] timeout maximum number of seconds before an update is cancelled
142+
#
148143
# @yield the operation to be performed with the current value in order to calculate
149144
# the new value
150145
# @yieldparam [Object] value the current value
151146
# @yieldreturn [Object] the new value
152147
# @return [true, nil] nil when no block is given
153-
def post_off(&block)
148+
def post_off(timeout = nil, &block)
149+
block = if timeout
150+
lambda { |value| Concurrent::timeout(timeout) { block.call(value) } }
151+
else
152+
block
153+
end
154154
post_on(@operation_executor, &block)
155155
end
156156

@@ -203,10 +203,8 @@ def work(&handler) # :nodoc:
203203
validator, value = mutex.synchronize { [@validator, @value] }
204204

205205
begin
206-
result, valid = Concurrent::timeout(@timeout) do
207-
result = handler.call(value)
208-
[result, validator.call(result)]
209-
end
206+
result = handler.call(value)
207+
valid = validator.call(result)
210208
rescue Exception => ex
211209
exception = ex
212210
end

spec/concurrent/agent_spec.rb

Lines changed: 23 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ module Concurrent
1919
end.new
2020
end
2121

22-
context '#send_off' do
22+
context '#post_off' do
2323
subject { Agent.new 2, executor: executor }
2424

2525
it 'executes post and post-off in order' do
@@ -28,6 +28,14 @@ module Concurrent
2828
subject.await
2929
expect(subject.value).to eq 12
3030
end
31+
32+
it 'times out' do
33+
ex = nil
34+
subject.post_off(0.1) { |v| sleep(0.2); ex = true }
35+
subject.await
36+
sleep 0.3
37+
expect(ex).to eq nil
38+
end
3139
end
3240

3341
context 'behavior' do
@@ -71,14 +79,6 @@ def trigger_observable(observable)
7179
expect(Agent.new(10).value).to eq 10
7280
end
7381

74-
it 'sets the timeout to the given value' do
75-
expect(Agent.new(0, timeout: 5).timeout).to eq 5
76-
end
77-
78-
it 'sets the timeout to the default when nil' do
79-
expect(Agent.new(0).timeout).to eq Agent::TIMEOUT
80-
end
81-
8282
it 'uses the executor given with the :executor option' do
8383
expect(executor).to receive(:post).with(any_args).and_return(0)
8484
agent = Agent.new(0, executor: executor)
@@ -164,9 +164,9 @@ def trigger_observable(observable)
164164
subject.post { nil }
165165
sleep(0.1)
166166
expect(subject.
167-
instance_variable_get(:@serialized_execution).
168-
instance_variable_get(:@stash).
169-
size).to eq 2
167+
instance_variable_get(:@serialized_execution).
168+
instance_variable_get(:@stash).
169+
size).to eq 2
170170
end
171171

172172
it 'does not add to the queue when no block is given' do
@@ -221,7 +221,7 @@ def trigger_observable(observable)
221221
it 'passes the current value to the handler' do
222222
latch = Concurrent::CountDownLatch.new(5)
223223
Agent.new(latch.count, executor: executor).post do |i|
224-
i.times{ latch.count_down }
224+
i.times { latch.count_down }
225225
end
226226
expect(latch.wait(1)).to be_truthy
227227
end
@@ -252,7 +252,7 @@ def trigger_observable(observable)
252252

253253
it 'passes the new value to the validator' do
254254
expected = Concurrent::AtomicFixnum.new(0)
255-
latch = Concurrent::CountDownLatch.new(1)
255+
latch = Concurrent::CountDownLatch.new(1)
256256
subject.validate { |v| expected.value = v; latch.count_down; true }
257257
subject.post { 10 }
258258
latch.wait(1)
@@ -282,7 +282,7 @@ def trigger_observable(observable)
282282

283283
it 'calls the first exception block with a matching class' do
284284
expected = nil
285-
agent = Agent.new(0, executor: Concurrent::ImmediateExecutor.new).
285+
agent = Agent.new(0, executor: Concurrent::ImmediateExecutor.new).
286286
rescue(StandardError) { |ex| expected = 1 }.
287287
rescue(StandardError) { |ex| expected = 2 }.
288288
rescue(StandardError) { |ex| expected = 3 }
@@ -292,7 +292,7 @@ def trigger_observable(observable)
292292

293293
it 'matches all with a rescue with no class given' do
294294
expected = nil
295-
agent = Agent.new(0, executor: Concurrent::ImmediateExecutor.new).
295+
agent = Agent.new(0, executor: Concurrent::ImmediateExecutor.new).
296296
rescue(LoadError) { |ex| expected = 1 }.
297297
rescue { |ex| expected = 2 }.
298298
rescue(StandardError) { |ex| expected = 3 }
@@ -302,23 +302,23 @@ def trigger_observable(observable)
302302

303303
it 'searches associated rescue handlers in order' do
304304
expected = nil
305-
agent = Agent.new(0, executor: Concurrent::ImmediateExecutor.new).
305+
agent = Agent.new(0, executor: Concurrent::ImmediateExecutor.new).
306306
rescue(ArgumentError) { |ex| expected = 1 }.
307307
rescue(LoadError) { |ex| expected = 2 }.
308308
rescue(StandardError) { |ex| expected = 3 }
309309
agent.post { raise ArgumentError }
310310
expect(expected).to eq 1
311311

312312
expected = nil
313-
agent = Agent.new(0, executor: Concurrent::ImmediateExecutor.new).
313+
agent = Agent.new(0, executor: Concurrent::ImmediateExecutor.new).
314314
rescue(ArgumentError) { |ex| expected = 1 }.
315315
rescue(LoadError) { |ex| expected = 2 }.
316316
rescue(StandardError) { |ex| expected = 3 }
317317
agent.post { raise LoadError }
318318
expect(expected).to eq 2
319319

320320
expected = nil
321-
agent = Agent.new(0, executor: Concurrent::ImmediateExecutor.new).
321+
agent = Agent.new(0, executor: Concurrent::ImmediateExecutor.new).
322322
rescue(ArgumentError) { |ex| expected = 1 }.
323323
rescue(LoadError) { |ex| expected = 2 }.
324324
rescue(StandardError) { |ex| expected = 3 }
@@ -328,7 +328,7 @@ def trigger_observable(observable)
328328

329329
it 'passes the exception object to the matched block' do
330330
expected = nil
331-
agent = Agent.new(0, executor: Concurrent::ImmediateExecutor.new).
331+
agent = Agent.new(0, executor: Concurrent::ImmediateExecutor.new).
332332
rescue(ArgumentError) { |ex| expected = ex }.
333333
rescue(LoadError) { |ex| expected = ex }.
334334
rescue(StandardError) { |ex| expected = ex }
@@ -338,7 +338,7 @@ def trigger_observable(observable)
338338

339339
it 'ignores rescuers without a block' do
340340
expected = nil
341-
agent = Agent.new(0, executor: Concurrent::ImmediateExecutor.new).
341+
agent = Agent.new(0, executor: Concurrent::ImmediateExecutor.new).
342342
rescue(StandardError).
343343
rescue(StandardError) { |ex| expected = ex }
344344
agent.post { raise StandardError }
@@ -464,15 +464,15 @@ def trigger_observable(observable)
464464
end
465465

466466
it 'aliases #catch for #rescue' do
467-
agent = Agent.new(0, executor: Concurrent::ImmediateExecutor.new)
467+
agent = Agent.new(0, executor: Concurrent::ImmediateExecutor.new)
468468
expected = nil
469469
agent.catch { expected = true }
470470
agent.post { raise StandardError }
471471
expect(agent).to be_truthy
472472
end
473473

474474
it 'aliases #on_error for #rescue' do
475-
agent = Agent.new(0, executor: Concurrent::ImmediateExecutor.new)
475+
agent = Agent.new(0, executor: Concurrent::ImmediateExecutor.new)
476476
expected = nil
477477
agent.on_error { expected = true }
478478
agent.post { raise StandardError }

0 commit comments

Comments
 (0)