Skip to content

Commit 846ff10

Browse files
committed
Updated Async to use SerializedExecution.
1 parent 71a8a6f commit 846ff10

File tree

2 files changed

+43
-79
lines changed

2 files changed

+43
-79
lines changed

lib/concurrent/async.rb

Lines changed: 39 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@
33
require 'concurrent/delay'
44
require 'concurrent/errors'
55
require 'concurrent/ivar'
6-
require 'concurrent/future'
7-
require 'concurrent/executor/thread_pool_executor'
6+
require 'concurrent/executor/immediate_executor'
7+
require 'concurrent/executor/serialized_execution'
88

99
module Concurrent
1010

@@ -124,69 +124,24 @@ def validate_argc(obj, method, *args)
124124
end
125125
module_function :validate_argc
126126

127-
# Delegates synchronous, thread-safe method calls to the wrapped object.
128-
#
129-
# @!visibility private
130-
class AwaitDelegator # :nodoc:
131-
132-
# Create a new delegator object wrapping the given `delegate` and
133-
# protecting it with the given `mutex`.
134-
#
135-
# @param [Object] delegate the object to wrap and delegate method calls to
136-
# @param [Mutex] mutex the mutex lock to use when delegating method calls
137-
def initialize(delegate, mutex)
138-
@delegate = delegate
139-
@mutex = mutex
140-
end
141-
142-
# Delegates method calls to the wrapped object. For performance,
143-
# dynamically defines the given method on the delegator so that
144-
# all future calls to `method` will not be directed here.
145-
#
146-
# @param [Symbol] method the method being called
147-
# @param [Array] args zero or more arguments to the method
148-
#
149-
# @return [IVar] the result of the method call
150-
#
151-
# @raise [NameError] the object does not respond to `method` method
152-
# @raise [ArgumentError] the given `args` do not match the arity of `method`
153-
def method_missing(method, *args, &block)
154-
super unless @delegate.respond_to?(method)
155-
Async::validate_argc(@delegate, method, *args)
156-
157-
self.define_singleton_method(method) do |*args|
158-
Async::validate_argc(@delegate, method, *args)
159-
ivar = Concurrent::IVar.new
160-
value, reason = nil, nil
161-
begin
162-
@mutex.synchronize do
163-
value = @delegate.send(method, *args, &block)
164-
end
165-
rescue => reason
166-
# caught
167-
ensure
168-
return ivar.complete(reason.nil?, value, reason)
169-
end
170-
end
171-
172-
self.send(method, *args)
173-
end
174-
end
175-
176127
# Delegates asynchronous, thread-safe method calls to the wrapped object.
177128
#
178129
# @!visibility private
179130
class AsyncDelegator # :nodoc:
180131

181-
# Create a new delegator object wrapping the given `delegate` and
182-
# protecting it with the given `mutex`.
132+
# Create a new delegator object wrapping the given delegate,
133+
# protecting it with the given serializer, and executing it on the
134+
# given executor. Block if necessary.
183135
#
184136
# @param [Object] delegate the object to wrap and delegate method calls to
185-
# @param [Mutex] mutex the mutex lock to use when delegating method calls
186-
def initialize(delegate, executor, mutex)
137+
# @param [Concurrent::Delay] executor a `Delay` wrapping the executor on which to execute delegated method calls
138+
# @param [Concurrent::SerializedExecution] serializer the serializer to use when delegating method calls
139+
# @param [Boolean] blocking will block awaiting result when `true`
140+
def initialize(delegate, executor, serializer, blocking = false)
187141
@delegate = delegate
188142
@executor = executor
189-
@mutex = mutex
143+
@serializer = serializer
144+
@blocking = blocking
190145
end
191146

192147
# Delegates method calls to the wrapped object. For performance,
@@ -206,11 +161,19 @@ def method_missing(method, *args, &block)
206161

207162
self.define_singleton_method(method) do |*args|
208163
Async::validate_argc(@delegate, method, *args)
209-
Concurrent::Future.execute(executor: @executor.value) do
210-
@mutex.synchronize do
211-
@delegate.send(method, *args, &block)
164+
ivar = Concurrent::IVar.new
165+
value, reason = nil, nil
166+
@serializer.post(@executor.value) do
167+
begin
168+
value = @delegate.send(method, *args, &block)
169+
rescue => reason
170+
# caught
171+
ensure
172+
ivar.complete(reason.nil?, value, reason)
212173
end
213174
end
175+
ivar.value if @blocking
176+
ivar
214177
end
215178

216179
self.send(method, *args)
@@ -219,9 +182,9 @@ def method_missing(method, *args, &block)
219182

220183
# Causes the chained method call to be performed asynchronously on the
221184
# global thread pool. The method called by this method will return a
222-
# `Future` object in the `:pending` state and the method call will have
185+
# future object in the `:pending` state and the method call will have
223186
# been scheduled on the global thread pool. The final disposition of the
224-
# method call can be obtained by inspecting the returned `Future`.
187+
# method call can be obtained by inspecting the returned future.
225188
#
226189
# Before scheduling the method on the global thread pool a best-effort
227190
# attempt will be made to validate that the method exists on the object
@@ -238,15 +201,15 @@ def method_missing(method, *args, &block)
238201
# method call. Use *only* protected method calls when sharing the object
239202
# between threads.
240203
#
241-
# @return [Concurrent::Future] the pending result of the asynchronous operation
204+
# @return [Concurrent::IVar] the pending result of the asynchronous operation
242205
#
243206
# @raise [Concurrent::InitializationError] `#init_mutex` has not been called
244207
# @raise [NameError] the object does not respond to `method` method
245208
# @raise [ArgumentError] the given `args` do not match the arity of `method`
246209
#
247-
# @see Concurrent::Future
210+
# @see Concurrent::IVar
248211
def async
249-
raise InitializationError.new('#init_mutex was never called') unless @__async__mutex__
212+
raise InitializationError.new('#init_mutex was never called') unless @__async_initialized__
250213
@__async_delegator__.value
251214
end
252215
alias_method :future, :async
@@ -280,7 +243,7 @@ def async
280243
#
281244
# @see Concurrent::IVar
282245
def await
283-
raise InitializationError.new('#init_mutex was never called') unless @__async__mutex__
246+
raise InitializationError.new('#init_mutex was never called') unless @__async_initialized__
284247
@__await_delegator__.value
285248
end
286249
alias_method :delay, :await
@@ -290,12 +253,12 @@ def await
290253
# @raise [Concurrent::InitializationError] `#init_mutex` has not been called
291254
# @raise [ArgumentError] executor has already been set
292255
def executor=(executor)
293-
raise InitializationError.new('#init_mutex was never called') unless @__async__mutex__
294-
@__async__executor__.reconfigure { executor } or
256+
raise InitializationError.new('#init_mutex was never called') unless @__async_initialized__
257+
@__async_executor__.reconfigure { executor } or
295258
raise ArgumentError.new('executor has already been set')
296259
end
297260

298-
# Initialize the internal mutex and other synchronization objects. This method
261+
# Initialize the internal serializer and other synchronization objects. This method
299262
# *must* be called from the constructor of the including class or explicitly
300263
# by the caller prior to calling any other methods. If `init_mutex` is *not*
301264
# called explicitly the async/await/executor methods will raize a
@@ -308,12 +271,14 @@ def executor=(executor)
308271
#
309272
# @raise [Concurrent::InitializationError] when called more than once
310273
def init_mutex
311-
raise InitializationError.new('#init_mutex was already called') if @__async__mutex__
312-
(@__async__mutex__ = Mutex.new).lock
313-
@__async__executor__ = Delay.new{ Concurrent.configuration.global_operation_pool }
314-
@__await_delegator__ = Delay.new{ AwaitDelegator.new(self, @__async__mutex__) }
315-
@__async_delegator__ = Delay.new{ AsyncDelegator.new(self, @__async__executor__, @__async__mutex__) }
316-
@__async__mutex__.unlock
274+
raise InitializationError.new('#init_mutex was already called') if @__async_initialized__
275+
@__async_initialized__ = true
276+
serializer = Concurrent::SerializedExecution.new
277+
@__async_executor__ = Delay.new{ Concurrent.configuration.global_operation_pool }
278+
@__await_delegator__ = Delay.new{ AsyncDelegator.new(
279+
self, Delay.new{ Concurrent::ImmediateExecutor.new }, serializer, true) }
280+
@__async_delegator__ = Delay.new{ AsyncDelegator.new(
281+
self, @__async_executor__, serializer, false) }
317282
end
318283
end
319284
end

spec/concurrent/async_spec.rb

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -150,13 +150,13 @@ def many(*args, &block) nil; end
150150
}.to raise_error(StandardError)
151151
end
152152

153-
it 'returns a :pending Future' do
153+
it 'returns a :pending IVar' do
154154
val = subject.async.wait(5)
155-
val.should be_a Concurrent::Future
155+
val.should be_a Concurrent::IVar
156156
val.should be_pending
157157
end
158158

159-
it 'runs the Future on the memoized executor' do
159+
it 'runs the future on the memoized executor' do
160160
executor = ImmediateExecutor.new
161161
executor.should_receive(:post).with(any_args)
162162
subject = async_class.new
@@ -202,7 +202,7 @@ def many(*args, &block) nil; end
202202

203203
it 'is aliased as #future' do
204204
val = subject.future.wait(5)
205-
val.should be_a Concurrent::Future
205+
val.should be_a Concurrent::IVar
206206
end
207207

208208
context '#method_missing' do
@@ -315,7 +315,6 @@ def gather(seconds, first, *rest)
315315
}.new
316316

317317
object.async.gather(0.5, :a, :b)
318-
sleep(0.1)
319318
object.await.gather(0, :c, :d)
320319
object.bucket.should eq [:a, :b, :c, :d]
321320
end

0 commit comments

Comments
 (0)