Skip to content

Commit 36f167d

Browse files
committed
Merge pull request #37 from jdantonio/async-await
Async and Await
2 parents 96beb64 + 3b10496 commit 36f167d

File tree

5 files changed

+574
-1
lines changed

5 files changed

+574
-1
lines changed

lib/concurrent.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
require 'concurrent/actor'
1212
require 'concurrent/agent'
13+
require 'concurrent/async'
1314
require 'concurrent/channel'
1415
require 'concurrent/dataflow'
1516
require 'concurrent/delay'

lib/concurrent/async.rb

Lines changed: 274 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,274 @@
1+
require 'thread'
2+
require 'concurrent/ivar'
3+
require 'concurrent/future'
4+
5+
module Concurrent
6+
7+
# A mixin module that provides simple asynchronous behavior to any standard
8+
# class/object or object.
9+
#
10+
# Scenario:
11+
# As a stateful, plain old Ruby class/object
12+
# I want safe, asynchronous behavior
13+
# So my long-running methods don't block the main thread
14+
#
15+
# Stateful, mutable objects must be managed carefully when used asynchronously.
16+
# But Ruby is an object-oriented language so designing with objects and classes
17+
# plays to Ruby's strengths and is often more natural to many Ruby programmers.
18+
# The +Async+ module is a way to mix simple yet powerful asynchronous capabilities
19+
# into any plain old Ruby object or class. These capabilities provide a reasonable
20+
# level of thread safe guarantees when used correctly.
21+
#
22+
# When this module is mixed into a class or object it provides to new methods:
23+
# +async+ and +await+. These methods are thread safe with respect to the enclosing
24+
# object. The former method allows methods to be called asynchronously by posting
25+
# to the global thread pool. The latter allows a method to be called synchronously
26+
# on the current thread but does so safely with respect to any pending asynchronous
27+
# method calls. Both methods return an +Obligation+ which can be inspected for
28+
# the result of the method call. Calling a method with +async+ will return a
29+
# +:pending+ +Obligation+ whereas +await+ will return a +:complete+ +Obligation+.
30+
#
31+
# Very loosely based on the +async+ and +await+ keywords in C#.
32+
#
33+
# @example Defining an asynchronous class
34+
# class Echo
35+
# include Concurrent::Async
36+
#
37+
# def echo(msg)
38+
# sleep(rand)
39+
# print "#{msg}\n"
40+
# nil
41+
# end
42+
# end
43+
#
44+
# horn = Echo.new
45+
# horn.echo('zero') # synchronous, not thread-safe
46+
#
47+
# horn.async.echo('one') # asynchronous, non-blocking, thread-safe
48+
# horn.await.echo('two') # synchronous, blocking, thread-safe
49+
#
50+
# @example Monkey-patching an existing object
51+
# numbers = 1_000_000.times.collect{ rand }
52+
# numbers.extend(Concurrent::Async)
53+
#
54+
# future = numbers.async.max
55+
# future.state #=> :pending
56+
#
57+
# sleep(2)
58+
#
59+
# future.state #=> :fulfilled
60+
# future.value #=> 0.999999138918843
61+
#
62+
# @note Thread safe guarantees can only be made when asynchronous method calls
63+
# are not mixed with synchronous method calls. Use only synchronous calls
64+
# when the object is used exclusively on a single thread. Use only
65+
# asynchronous calls when the object is shared between threads.
66+
#
67+
# @since 0.6.0
68+
#
69+
# @see Concurrent::Obligation
70+
#
71+
# @see http://msdn.microsoft.com/en-us/library/hh191443.aspx Asynchronous Programming with Async and Await (C# and Visual Basic)
72+
# @see http://msdn.microsoft.com/en-us/magazine/jj991977.aspx Best Practices in Asynchronous Programming
73+
module Async
74+
75+
# Check for the presence of a method on an object and determine if a given
76+
# set of arguments matches the required arity.
77+
#
78+
# @param [Object] obj the object to check against
79+
# @param [Symbol] method the method to check the object for
80+
# @param [Array] args zero or more arguments for the arity check
81+
#
82+
# @raise [NameError] the object does not respond to +method+ method
83+
# @raise [ArgumentError] the given +args+ do not match the arity of +method+
84+
#
85+
# @note This check is imperfect because of the way Ruby reports the arity of
86+
# methods with a variable number of arguments. It is possible to determine
87+
# if too few arguments are given but impossible to determine if too many
88+
# arguments are given. This check may also fail to recognize dynamic behavior
89+
# of the object, such as methods simulated with +method_missing+.
90+
#
91+
# @see http://www.ruby-doc.org/core-2.1.1/Method.html#method-i-arity Method#arity
92+
# @see http://ruby-doc.org/core-2.1.0/Object.html#method-i-respond_to-3F Object#respond_to?
93+
# @see http://www.ruby-doc.org/core-2.1.0/BasicObject.html#method-i-method_missing BasicObject#method_missing
94+
def validate_argc(obj, method, *args)
95+
argc = args.length
96+
arity = obj.method(method).arity
97+
98+
if arity >= 0 && argc != arity
99+
raise ArgumentError.new("wrong number of arguments (#{argc} for #{arity})")
100+
elsif arity < 0 && (arity = (arity + 1).abs) > argc
101+
raise ArgumentError.new("wrong number of arguments (#{argc} for #{arity}..*)")
102+
end
103+
end
104+
module_function :validate_argc
105+
106+
# Delegates synchronous, thread-safe method calls to the wrapped object.
107+
#
108+
# @!visibility private
109+
class AwaitDelegator # :nodoc:
110+
111+
# Create a new delegator object wrapping the given +delegate+ and
112+
# protecting it with the given +mutex+.
113+
#
114+
# @param [Object] delegate the object to wrap and delegate method calls to
115+
# @param [Mutex] mutex the mutex lock to use when delegating method calls
116+
def initialize(delegate, mutex)
117+
@delegate = delegate
118+
@mutex = mutex
119+
end
120+
121+
# Delegates method calls to the wrapped object. For performance,
122+
# dynamically defines the given method on the delegator so that
123+
# all future calls to +method+ will not be directed here.
124+
#
125+
# @param [Symbol] method the method being called
126+
# @param [Array] args zero or more arguments to the method
127+
#
128+
# @return [IVar] the result of the method call
129+
#
130+
# @raise [NameError] the object does not respond to +method+ method
131+
# @raise [ArgumentError] the given +args+ do not match the arity of +method+
132+
def method_missing(method, *args, &block)
133+
super unless @delegate.respond_to?(method)
134+
Async::validate_argc(@delegate, method, *args)
135+
136+
self.define_singleton_method(method) do |*args|
137+
Async::validate_argc(@delegate, method, *args)
138+
ivar = Concurrent::IVar.new
139+
value, reason = nil, nil
140+
begin
141+
mutex.synchronize do
142+
value = @delegate.send(method, *args, &block)
143+
end
144+
rescue => reason
145+
# caught
146+
ensure
147+
return ivar.complete(reason.nil?, value, reason)
148+
end
149+
end
150+
151+
self.send(method, *args)
152+
end
153+
154+
# The lock used when delegating methods to the wrapped object.
155+
#
156+
# @!visibility private
157+
attr_reader :mutex # :nodoc:
158+
end
159+
160+
# Delegates asynchronous, thread-safe method calls to the wrapped object.
161+
#
162+
# @!visibility private
163+
class AsyncDelegator # :nodoc:
164+
165+
# Create a new delegator object wrapping the given +delegate+ and
166+
# protecting it with the given +mutex+.
167+
#
168+
# @param [Object] delegate the object to wrap and delegate method calls to
169+
# @param [Mutex] mutex the mutex lock to use when delegating method calls
170+
def initialize(delegate, mutex)
171+
@delegate = delegate
172+
@mutex = mutex
173+
end
174+
175+
# Delegates method calls to the wrapped object. For performance,
176+
# dynamically defines the given method on the delegator so that
177+
# all future calls to +method+ will not be directed here.
178+
#
179+
# @param [Symbol] method the method being called
180+
# @param [Array] args zero or more arguments to the method
181+
#
182+
# @return [IVar] the result of the method call
183+
#
184+
# @raise [NameError] the object does not respond to +method+ method
185+
# @raise [ArgumentError] the given +args+ do not match the arity of +method+
186+
def method_missing(method, *args, &block)
187+
super unless @delegate.respond_to?(method)
188+
Async::validate_argc(@delegate, method, *args)
189+
190+
self.define_singleton_method(method) do |*args|
191+
Async::validate_argc(@delegate, method, *args)
192+
Concurrent::Future.execute do
193+
mutex.synchronize do
194+
@delegate.send(method, *args, &block)
195+
end
196+
end
197+
end
198+
199+
self.send(method, *args)
200+
end
201+
202+
private
203+
204+
# The lock used when delegating methods to the wrapped object.
205+
#
206+
# @!visibility private
207+
attr_reader :mutex # :nodoc:
208+
end
209+
210+
# Causes the chained method call to be performed asynchronously on the
211+
# global thread pool. The method called by this method will return a
212+
# +Future+ object in the +:pending+ state and the method call will have
213+
# been scheduled on the global thread pool. The final disposition of the
214+
# method call can be obtained by inspecting the returned +Future+.
215+
#
216+
# Before scheduling the method on the global thread pool a best-effort
217+
# attempt will be made to validate that the method exists on the object
218+
# and that the given arguments match the arity of the requested function.
219+
# Due to the dynamic nature of Ruby and limitations of its reflection
220+
# library, some edge cases will be missed. For more information see
221+
# the documentation for the +validate_argc+ method.
222+
#
223+
# @note The method call is guaranteed to be thread safe with respect to
224+
# all other method calls against the same object that are called with
225+
# either +async+ or +await+. The mutable nature of Ruby references
226+
# (and object orientation in general) prevent any other thread safety
227+
# guarantees. Do NOT mix non-protected method calls with protected
228+
# method call. Use ONLY protected method calls when sharing the object
229+
# between threads.
230+
#
231+
# @return [Concurrent::Future] the pending result of the asynchronous operation
232+
#
233+
# @raise [NameError] the object does not respond to +method+ method
234+
# @raise [ArgumentError] the given +args+ do not match the arity of +method+
235+
#
236+
# @see Concurrent::Future
237+
def async
238+
@__async_delegator__ ||= AsyncDelegator.new(self, await.mutex)
239+
end
240+
alias_method :future, :async
241+
242+
# Causes the chained method call to be performed synchronously on the
243+
# current thread. The method called by this method will return an
244+
# +IVar+ object in either the +:fulfilled+ or +rejected+ state and the
245+
# method call will have completed. The final disposition of the
246+
# method call can be obtained by inspecting the returned +IVar+.
247+
#
248+
# Before scheduling the method on the global thread pool a best-effort
249+
# attempt will be made to validate that the method exists on the object
250+
# and that the given arguments match the arity of the requested function.
251+
# Due to the dynamic nature of Ruby and limitations of its reflection
252+
# library, some edge cases will be missed. For more information see
253+
# the documentation for the +validate_argc+ method.
254+
#
255+
# @note The method call is guaranteed to be thread safe with respect to
256+
# all other method calls against the same object that are called with
257+
# either +async+ or +await+. The mutable nature of Ruby references
258+
# (and object orientation in general) prevent any other thread safety
259+
# guarantees. Do NOT mix non-protected method calls with protected
260+
# method call. Use ONLY protected method calls when sharing the object
261+
# between threads.
262+
#
263+
# @return [Concurrent::IVar] the completed result of the synchronous operation
264+
#
265+
# @raise [NameError] the object does not respond to +method+ method
266+
# @raise [ArgumentError] the given +args+ do not match the arity of +method+
267+
#
268+
# @see Concurrent::IVar
269+
def await
270+
@__await_delegator__ ||= AwaitDelegator.new(self, Mutex.new)
271+
end
272+
alias_method :defer, :await
273+
end
274+
end

lib/concurrent/ivar.rb

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ def set(value)
6060
complete(true, value, nil)
6161
end
6262

63-
def fail(reason = nil)
63+
def fail(reason = StandardError.new)
6464
complete(false, nil, reason)
6565
end
6666

@@ -73,6 +73,7 @@ def complete(success, value, reason)
7373

7474
time = Time.now
7575
@observers.notify_and_delete_observers{ [time, self.value, reason] }
76+
self
7677
end
7778
end
7879
end

0 commit comments

Comments
 (0)