Skip to content

Commit a05ac6b

Browse files
committed
Merge pull request #95 from jdantonio/fix/async-initialization
Safer initialization of the Async internal synchronization objects.
2 parents 78af7c6 + 71df73c commit a05ac6b

File tree

3 files changed

+90
-36
lines changed

3 files changed

+90
-36
lines changed

README.md

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -126,14 +126,6 @@ task = Concurrent::ScheduledTask.execute(2){ Ticker.new.get_year_end_closing('IN
126126
task.state #=> :pending
127127
sleep(3) # do other stuff
128128
task.value #=> 25.96
129-
130-
# Async
131-
ticker = Ticker.new
132-
ticker.extend(Concurrent::Async)
133-
hpq = ticker.async.get_year_end_closing('HPQ', 2013)
134-
ibm = ticker.await.get_year_end_closing('IBM', 2013)
135-
hpq.value #=> 27.98
136-
ibm.value #=> 187.57
137129
```
138130

139131
## Contributors

lib/concurrent/async.rb

Lines changed: 55 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
11
require 'thread'
22
require 'concurrent/configuration'
3+
require 'concurrent/delay'
34
require 'concurrent/ivar'
45
require 'concurrent/future'
56
require 'concurrent/executor/thread_pool_executor'
67

78
module Concurrent
89

10+
InitializationError = Class.new(StandardError)
11+
912
# A mixin module that provides simple asynchronous behavior to any standard
1013
# class/object or object.
1114
#
@@ -36,6 +39,10 @@ module Concurrent
3639
# class Echo
3740
# include Concurrent::Async
3841
#
42+
# def initialize
43+
# init_mutex # initialize the internal synchronization objects
44+
# end
45+
#
3946
# def echo(msg)
4047
# sleep(rand)
4148
# print "#{msg}\n"
@@ -52,6 +59,7 @@ module Concurrent
5259
# @example Monkey-patching an existing object
5360
# numbers = 1_000_000.times.collect{ rand }
5461
# numbers.extend(Concurrent::Async)
62+
# numbers.init_mutex # initialize the internal synchronization objects
5563
#
5664
# future = numbers.async.max
5765
# future.state #=> :pending
@@ -61,6 +69,16 @@ module Concurrent
6169
# future.state #=> :fulfilled
6270
# future.value #=> 0.999999138918843
6371
#
72+
# @note This module depends on several internal synchronization objects that
73+
# must be initialized prior to calling any of the async/await/executor methods.
74+
# The best practice is to call `init_mutex` from within the constructor
75+
# of the including class. A less ideal but acceptable practice is for the
76+
# thread creating the asynchronous object to explicitly call the `init_mutex`
77+
# method prior to calling any of the async/await/executor methods. If
78+
# `init_mutex` is *not* called explicitly the async/await/executor methods
79+
# will raize a `Concurrent::InitializationError`. This is the only way
80+
# thread-safe initialization can be guaranteed.
81+
#
6482
# @note Thread safe guarantees can only be made when asynchronous method calls
6583
# are not mixed with synchronous method calls. Use only synchronous calls
6684
# when the object is used exclusively on a single thread. Use only
@@ -142,7 +160,7 @@ def method_missing(method, *args, &block)
142160
ivar = Concurrent::IVar.new
143161
value, reason = nil, nil
144162
begin
145-
mutex.synchronize do
163+
@mutex.synchronize do
146164
value = @delegate.send(method, *args, &block)
147165
end
148166
rescue => reason
@@ -154,11 +172,6 @@ def method_missing(method, *args, &block)
154172

155173
self.send(method, *args)
156174
end
157-
158-
# The lock used when delegating methods to the wrapped object.
159-
#
160-
# @!visibility private
161-
attr_reader :mutex # :nodoc:
162175
end
163176

164177
# Delegates asynchronous, thread-safe method calls to the wrapped object.
@@ -194,22 +207,15 @@ def method_missing(method, *args, &block)
194207

195208
self.define_singleton_method(method) do |*args|
196209
Async::validate_argc(@delegate, method, *args)
197-
Concurrent::Future.execute(executor: @executor) do
198-
mutex.synchronize do
210+
Concurrent::Future.execute(executor: @executor.value) do
211+
@mutex.synchronize do
199212
@delegate.send(method, *args, &block)
200213
end
201214
end
202215
end
203216

204217
self.send(method, *args)
205218
end
206-
207-
private
208-
209-
# The lock used when delegating methods to the wrapped object.
210-
#
211-
# @!visibility private
212-
attr_reader :mutex # :nodoc:
213219
end
214220

215221
# Causes the chained method call to be performed asynchronously on the
@@ -230,17 +236,19 @@ def method_missing(method, *args, &block)
230236
# either `async` or `await`. The mutable nature of Ruby references
231237
# (and object orientation in general) prevent any other thread safety
232238
# guarantees. Do NOT mix non-protected method calls with protected
233-
# method call. Use ONLY protected method calls when sharing the object
239+
# method call. Use *only* protected method calls when sharing the object
234240
# between threads.
235241
#
236242
# @return [Concurrent::Future] the pending result of the asynchronous operation
237243
#
244+
# @raise [Concurrent::InitializationError] `#init_mutex` has not been called
238245
# @raise [NameError] the object does not respond to `method` method
239246
# @raise [ArgumentError] the given `args` do not match the arity of `method`
240247
#
241248
# @see Concurrent::Future
242249
def async
243-
@__async_delegator__ ||= AsyncDelegator.new(self, executor, await.mutex)
250+
raise InitializationError.new('#init_mutex was never called') unless @mutex
251+
@__async_delegator__.value
244252
end
245253
alias_method :future, :async
246254

@@ -262,29 +270,51 @@ def async
262270
# either `async` or `await`. The mutable nature of Ruby references
263271
# (and object orientation in general) prevent any other thread safety
264272
# guarantees. Do NOT mix non-protected method calls with protected
265-
# method call. Use ONLY protected method calls when sharing the object
273+
# method call. Use *only* protected method calls when sharing the object
266274
# between threads.
267275
#
268276
# @return [Concurrent::IVar] the completed result of the synchronous operation
269277
#
278+
# @raise [Concurrent::InitializationError] `#init_mutex` has not been called
270279
# @raise [NameError] the object does not respond to `method` method
271280
# @raise [ArgumentError] the given `args` do not match the arity of `method`
272281
#
273282
# @see Concurrent::IVar
274283
def await
275-
@__await_delegator__ ||= AwaitDelegator.new(self, Mutex.new)
284+
raise InitializationError.new('#init_mutex was never called') unless @mutex
285+
@__await_delegator__.value
276286
end
277287
alias_method :delay, :await
278288

289+
# Set a new executor
290+
#
291+
# @raise [Concurrent::InitializationError] `#init_mutex` has not been called
292+
# @raise [ArgumentError] executor has already been set
279293
def executor=(executor)
280-
raise ArgumentError.new('executor has already been set') unless @__async__executor__.nil?
281-
@__async__executor__ = executor
294+
raise InitializationError.new('#init_mutex was never called') unless @mutex
295+
@__async__executor__.reconfigure { executor } or
296+
raise ArgumentError.new('executor has already been set')
282297
end
283298

284-
private
285-
286-
def executor
287-
@__async__executor__ ||= Concurrent.configuration.global_task_pool
299+
# Initialize the internal mutex and other synchronization objects. This method
300+
# *must* be called from the constructor of the including class or explicitly
301+
# by the caller prior to calling any other methods. If `init_mutex` is *not*
302+
# called explicitly the async/await/executor methods will raize a
303+
# `Concurrent::InitializationError`. This is the only way thread-safe
304+
# initialization can be guaranteed.
305+
#
306+
# @note This method *must* be called from the constructor of the including
307+
# class or explicitly by the caller prior to calling any other methods.
308+
# This is the only way thread-safe initialization can be guaranteed.
309+
#
310+
# @raise [Concurrent::InitializationError] when called more than once
311+
def init_mutex
312+
raise InitializationError.new('#init_mutex was already called') if @mutex
313+
(@mutex = Mutex.new).lock
314+
@__async__executor__ = Delay.new{ Concurrent.configuration.global_operation_pool }
315+
@__await_delegator__ = Delay.new{ AwaitDelegator.new(self, @mutex) }
316+
@__async_delegator__ = Delay.new{ AsyncDelegator.new(self, @__async__executor__, @mutex) }
317+
@mutex.unlock
288318
end
289319
end
290320
end

spec/concurrent/async_spec.rb

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,9 @@ module Concurrent
1010
Class.new do
1111
include Concurrent::Async
1212
attr_accessor :accessor
13+
def initialize
14+
init_mutex
15+
end
1316
def echo(msg)
1417
msg
1518
end
@@ -103,7 +106,7 @@ def many(*args, &block) nil; end
103106
context 'executor' do
104107

105108
it 'returns the default executor when #executor= has never been called' do
106-
Concurrent.configuration.should_receive(:global_task_pool).
109+
Concurrent.configuration.should_receive(:global_operation_pool).
107110
and_return(ImmediateExecutor.new)
108111
subject = async_class.new
109112
subject.async.echo(:foo)
@@ -117,10 +120,10 @@ def many(*args, &block) nil; end
117120
subject.async.echo(:foo)
118121
end
119122

120-
it 'raises an exception if #executor= is called multiple times' do
123+
it 'raises an exception if #executor= is called after initialization complete' do
121124
executor = ImmediateExecutor.new
122125
subject = async_class.new
123-
subject.executor = executor
126+
subject.async.echo(:foo)
124127
expect {
125128
subject.executor = executor
126129
}.to raise_error(ArgumentError)
@@ -303,6 +306,7 @@ def many(*args, &block) nil; end
303306
object = Class.new {
304307
include Concurrent::Async
305308
attr_reader :bucket
309+
def initialize() init_mutex; end
306310
def gather(seconds, first, *rest)
307311
sleep(seconds)
308312
(@bucket ||= []).concat([first])
@@ -315,6 +319,34 @@ def gather(seconds, first, *rest)
315319
object.await.gather(0, :c, :d)
316320
object.bucket.should eq [:a, :b, :c, :d]
317321
end
322+
323+
context 'raises an InitializationError' do
324+
325+
let(:async_class) do
326+
Class.new do
327+
include Concurrent::Async
328+
def echo(msg) msg; end
329+
end
330+
end
331+
332+
it 'when #async is called before #init_mutex' do
333+
expect {
334+
async_class.new.async.echo(:foo)
335+
}.to raise_error(Concurrent::InitializationError)
336+
end
337+
338+
it 'when #await is called before #init_mutex' do
339+
expect {
340+
async_class.new.async.echo(:foo)
341+
}.to raise_error(Concurrent::InitializationError)
342+
end
343+
344+
it 'when #executor= is called before #init_mutex' do
345+
expect {
346+
async_class.new.executor = Concurrent::ImmediateExecutor.new
347+
}.to raise_error(Concurrent::InitializationError)
348+
end
349+
end
318350
end
319351
end
320352
end

0 commit comments

Comments
 (0)