Skip to content

Commit 7f7bb63

Browse files
committed
Improved Async implementation using global thread pool.
1 parent 1a5e3f5 commit 7f7bb63

File tree

2 files changed

+77
-32
lines changed

2 files changed

+77
-32
lines changed

examples/benchmark_async.rb

Lines changed: 46 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -105,28 +105,61 @@ def foo(latch = nil)
105105
Calculating -------------------------------------
106106
celluloid 24.000 i/100ms
107107
async, thread per object
108-
30.000 i/100ms
108+
31.000 i/100ms
109109
async, global thread pool
110110
31.000 i/100ms
111111
-------------------------------------------------
112-
celluloid 242.34510.7%) i/s - 1.200k
112+
celluloid 277.834 9.4%) i/s - 1.392k
113113
async, thread per object
114-
316.3872.5%) i/s - 1.590k
114+
316.3571.9%) i/s - 1.612k
115115
async, global thread pool
116-
318.2001.6%) i/s - 1.612k
116+
318.7072.2%) i/s - 1.612k
117117

118118
Comparison:
119-
async, global thread pool: 318.2 i/s
119+
async, global thread pool: 318.7 i/s
120120
async, thread per object: 316.4 i/s - 1.01x slower
121-
celluloid: 242.3 i/s - 1.31x slower
121+
celluloid: 277.8 i/s - 1.15x slower
122+
123+
Rehearsal -------------------------------------------------------------
124+
celluloid 4.110000 0.650000 4.760000 ( 4.766239)
125+
async, thread per object 3.370000 0.100000 3.470000 ( 3.420537)
126+
async, global thread pool 3.460000 0.240000 3.700000 ( 3.598044)
127+
--------------------------------------------------- total: 11.930000sec
128+
129+
user system total real
130+
celluloid 4.000000 0.640000 4.640000 ( 4.652382)
131+
async, thread per object 3.640000 0.160000 3.800000 ( 3.751535)
132+
async, global thread pool 3.440000 0.220000 3.660000 ( 3.550602)
133+
134+
===========================================================
135+
jruby 1.7.19 (1.9.3p551) 2015-01-29 20786bd on Java HotSpot(TM) 64-Bit Server VM 1.8.0_45-b14 +jit [darwin-x86_64]
136+
===========================================================
137+
138+
Calculating -------------------------------------
139+
celluloid 2.000 i/100ms
140+
async, thread per object
141+
23.000 i/100ms
142+
async, global thread pool
143+
60.000 i/100ms
144+
-------------------------------------------------
145+
celluloid 155.480 (±38.6%) i/s - 606.000
146+
async, thread per object
147+
823.969 (±18.2%) i/s - 3.404k
148+
async, global thread pool
149+
852.728 (±14.7%) i/s - 4.140k
150+
151+
Comparison:
152+
async, global thread pool: 852.7 i/s
153+
async, thread per object: 824.0 i/s - 1.03x slower
154+
celluloid: 155.5 i/s - 5.48x slower
122155

123156
Rehearsal -------------------------------------------------------------
124-
celluloid 4.170000 0.630000 4.800000 ( 4.812120)
125-
async, thread per object 3.400000 0.110000 3.510000 ( 3.452749)
126-
async, global thread pool 3.410000 0.070000 3.480000 ( 3.455878)
127-
--------------------------------------------------- total: 11.790000sec
157+
celluloid 5.640000 1.560000 7.200000 ( 5.480000)
158+
async, thread per object 2.660000 0.240000 2.900000 ( 1.670000)
159+
async, global thread pool 2.110000 0.240000 2.350000 ( 1.360000)
160+
--------------------------------------------------- total: 12.450000sec
128161

129162
user system total real
130-
celluloid 4.080000 0.620000 4.700000 ( 4.687752)
131-
async, thread per object 3.380000 0.160000 3.540000 ( 3.469882)
132-
async, global thread pool 3.380000 0.050000 3.430000 ( 3.426759)
163+
celluloid 5.650000 1.540000 7.190000 ( 5.470000)
164+
async, thread per object 2.350000 0.230000 2.580000 ( 1.532000)
165+
async, global thread pool 1.910000 0.220000 2.130000 ( 1.272000)

lib/concurrent/async_alternate.rb

Lines changed: 31 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1+
require 'thread'
12
require 'concurrent/configuration'
23
require 'concurrent/ivar'
3-
require 'concurrent/executor/immediate_executor'
44

55
module Concurrent
66

@@ -42,10 +42,9 @@ module Concurrent
4242
# ## Basic Usage
4343
#
4444
# When this module is mixed into a class, objects of the class become inherently
45-
# asynchronous. Each object gets its own background thread (specifically,
46-
# `SingleThreadExecutor`) on which to post asynchronous method calls.
47-
# Asynchronous method calls are executed in the background one at a time in
48-
# the order they are received.
45+
# asynchronous. Each object gets its own background thread on which to post
46+
# asynchronous method calls. Asynchronous method calls are executed in the
47+
# background one at a time in the order they are received.
4948
#
5049
# To create an asynchronous class, simply mix in the `Concurrent::Async` module:
5150
#
@@ -232,7 +231,6 @@ module Concurrent
232231
# # returns an IVar in the :complete state
233232
#
234233
# @see Concurrent::Actor
235-
# @see Concurrent::SingleThreadExecutor
236234
# @see https://en.wikipedia.org/wiki/Actor_model "Actor Model" at Wikipedia
237235
# @see http://www.erlang.org/doc/man/gen_server.html Erlang gen_server
238236
# @see http://c2.com/cgi/wiki?LetItCrash "Let It Crash" at http://c2.com/
@@ -307,13 +305,15 @@ class AsyncDelegator
307305
# given executor. Block if necessary.
308306
#
309307
# @param [Object] delegate the object to wrap and delegate method calls to
310-
# @param [Concurrent::ExecutorService] executor the executor on which to execute delegated method calls
308+
# @param [Array] job queue which guarantees serialization of method calls
309+
# @param [Mutex] mutex which synchronizes queue operations
311310
# @param [Boolean] blocking will block awaiting result when `true`
312-
def initialize(delegate, executor, serializer, blocking)
311+
def initialize(delegate, queue, mutex, blocking)
313312
@delegate = delegate
314-
@executor = executor
315-
@serializer = serializer
313+
@queue = queue
314+
@mutex = mutex
316315
@blocking = blocking
316+
@executor = Concurrent.global_io_executor
317317
end
318318

319319
# Delegates method calls to the wrapped object. For performance,
@@ -332,15 +332,28 @@ def method_missing(method, *args, &block)
332332
Async::validate_argc(@delegate, method, *args)
333333

334334
ivar = Concurrent::IVar.new
335-
@serializer.post(@executor, args) do |arguments|
335+
@mutex.synchronize do
336+
@queue.push [ivar, method, args, block]
337+
@executor.post { perform } if @queue.length == 1
338+
end
339+
340+
ivar.wait if @blocking
341+
ivar
342+
end
343+
344+
def perform
345+
loop do
346+
ivar, method, args, block = @mutex.synchronize { @queue.first }
347+
break unless ivar # queue is empty
348+
336349
begin
337-
ivar.set(@delegate.send(method, *arguments, &block))
350+
ivar.set(@delegate.send(method, *args, &block))
338351
rescue => error
339352
ivar.fail(error)
340353
end
354+
355+
@mutex.synchronize { @queue.shift }
341356
end
342-
ivar.wait if @blocking
343-
ivar
344357
end
345358
end
346359
private_constant :AsyncDelegator
@@ -387,8 +400,6 @@ def await
387400
end
388401
alias_method :call, :await
389402

390-
private
391-
392403
# Initialize the internal serializer and other stnchronization mechanisms.
393404
#
394405
# @note This method *must* be called immediately upon object construction.
@@ -398,9 +409,10 @@ def await
398409
def init_synchronization
399410
return self if @__async_initialized__
400411
@__async_initialized__ = true
401-
serializer = Concurrent::SerializedExecution.new
402-
@__await_delegator__ = AsyncDelegator.new(self, Concurrent::ImmediateExecutor.new, serializer, true)
403-
@__async_delegator__ = AsyncDelegator.new(self, Concurrent.global_io_executor, serializer, false)
412+
queue = []
413+
mutex = Mutex.new
414+
@__await_delegator__ = AsyncDelegator.new(self, queue, mutex, true)
415+
@__async_delegator__ = AsyncDelegator.new(self, queue, mutex, false)
404416
self
405417
end
406418
end

0 commit comments

Comments
 (0)