Skip to content

Commit 2be2f43

Browse files
committed
Edge - Future tweaks
- remove all synchronization from path when there is no thread waiting - improve AllPromise iterations - zipped futures apply as multiple arguments to block not as a single array
1 parent f8dcc9f commit 2be2f43

File tree

4 files changed

+170
-55
lines changed

4 files changed

+170
-55
lines changed

examples/benchmark_new_futures.rb

Lines changed: 28 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -8,48 +8,51 @@
88
warmup *= 10 if Concurrent.on_jruby?
99

1010
Benchmark.ips(time, warmup) do |x|
11-
x.report('graph-old') do
12-
head = Concurrent::Promise.execute { 1 }
13-
branch1 = head.then(&:succ)
14-
branch2 = head.then(&:succ).then(&:succ)
15-
branch3 = head.then(&:succ).then(&:succ).then(&:succ)
16-
Concurrent::Promise.zip(branch1, branch2, branch3).then { |(a, b, c)| a + b + c }.value!
17-
end
18-
x.report('graph-new') do
19-
head = Concurrent.future { 1 }
20-
branch1 = head.then(&:succ)
21-
branch2 = head.then(&:succ).then(&:succ)
22-
branch3 = head.then(&:succ).then(&:succ).then(&:succ)
23-
Concurrent.zip(branch1, branch2, branch3).then { |(a, b, c)| a + b + c }.value!
24-
end
11+
x.report('status-old') { f = Concurrent::Promise.execute { nil }; 100.times { f.complete? } }
12+
x.report('status-new') { f = Concurrent.future(:fast) { nil }; 100.times { f.completed? } }
2513
x.compare!
2614
end
2715

2816
Benchmark.ips(time, warmup) do |x|
29-
x.report('status-old') { f = Concurrent::Promise.execute { nil }; 10.times { f.complete? } }
30-
x.report('status-new') { f = Concurrent.future { nil }; 10.times { f.completed? } }
17+
of = Concurrent::Promise.execute { 1 }
18+
nf = Concurrent.future(:fast) { 1 }
19+
x.report('value-old') { of.value! }
20+
x.report('value-new') { nf.value! }
3121
x.compare!
3222
end
3323

3424
Benchmark.ips(time, warmup) do |x|
35-
of = Concurrent::Promise.execute { 1 }
36-
nf = Concurrent.future { 1 }
37-
x.report('value-old') { of.value! }
38-
x.report('value-new') { nf.value! }
25+
x.report('graph-old') do
26+
head = Concurrent::Promise.execute { 1 }
27+
10.times do
28+
branch1 = head.then(&:succ)
29+
branch2 = head.then(&:succ).then(&:succ)
30+
head = Concurrent::Promise.zip(branch1, branch2).then { |a, b| a + b }
31+
end
32+
head.value!
33+
end
34+
x.report('graph-new') do
35+
head = Concurrent.future(:fast) { 1 }
36+
10.times do
37+
branch1 = head.then(&:succ)
38+
branch2 = head.then(&:succ).then(&:succ)
39+
head = (branch1 & branch2).then { |a, b| a + b }
40+
end
41+
head.value!
42+
end
3943
x.compare!
4044
end
4145

4246
Benchmark.ips(time, warmup) do |x|
4347
x.report('immediate-old') { Concurrent::Promise.execute { nil }.value! }
44-
x.report('immediate-new') { Concurrent.future { nil }.value! }
48+
x.report('immediate-new') { Concurrent.future(:fast) { nil }.value! }
4549
x.compare!
4650
end
4751

4852
Benchmark.ips(time, warmup) do |x|
4953
of = Concurrent::Promise.execute { 1 }
50-
nf = Concurrent.future { 1 }
51-
x.report('then-old') { of.then(&:succ).then(&:succ).value! }
52-
x.report('then-new') { nf.then(&:succ).then(&:succ).value! }
54+
nf = Concurrent.future(:fast) { 1 }
55+
x.report('then-old') { 100.times.reduce(nf) { |nf, _| nf.then(&:succ) }.value! }
56+
x.report('then-new') { 100.times.reduce(nf) { |nf, _| nf.then(&:succ) }.value! }
5357
x.compare!
5458
end
55-

lib/concurrent/edge/future.rb

Lines changed: 83 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ def initialize(promise, default_executor = :io)
8383
@DefaultExecutor = default_executor
8484
@Touched = AtomicBoolean.new(false)
8585
@Callbacks = LockFreeStack.new
86+
@Waiters = LockFreeStack.new
8687
@State = AtomicReference.new :pending
8788
super()
8889
ensure_ivar_visibility!
@@ -174,7 +175,8 @@ def inspect
174175
# @api private
175176
def complete(raise = true)
176177
if complete_state
177-
synchronize { ns_broadcast }
178+
# go to synchronized block only if there were waiting threads
179+
synchronize { ns_broadcast } if @Waiters.clear
178180
call_callbacks
179181
else
180182
Concurrent::MultipleAssignmentError.new('multiple assignment') if raise
@@ -222,8 +224,19 @@ def touched
222224
private
223225

224226
def wait_until_complete(timeout)
225-
unless completed?
226-
synchronize { ns_wait_until(timeout) { completed? } }
227+
lock = Synchronization::Lock.new
228+
229+
while true
230+
last_waiter = @Waiters.peek # waiters' state before completion
231+
break if completed?
232+
233+
# synchronize so it cannot be signaled before it waits
234+
synchronize do
235+
# ok only if completing thread did not start signaling
236+
next unless @Waiters.compare_and_push last_waiter, lock
237+
ns_wait_until(timeout) { completed? }
238+
break
239+
end
227240
end
228241
self
229242
end
@@ -244,12 +257,11 @@ def pr_callback_on_completion(callback)
244257
callback.call
245258
end
246259

247-
def pr_notify_blocked(promise)
260+
def pr_callback_notify_blocked(promise)
248261
promise.on_done self
249262
end
250263

251264
def call_callback(method, *args)
252-
# all methods has to be pure
253265
self.send method, *args
254266
end
255267

@@ -273,7 +285,7 @@ def to_s
273285
end
274286
end
275287

276-
Failed = ImmutableStruct.new :reason do
288+
Failed = ImmutableStruct.new :reason do
277289
def value
278290
nil
279291
end
@@ -395,9 +407,15 @@ def on_failure!(&callback)
395407
add_callback :pr_callback_on_failure, callback
396408
end
397409

410+
# @api private
411+
def apply_value(value, block)
412+
block.call value
413+
end
414+
398415
# @api private
399416
def complete(success, value, reason, raise = true)
400417
if complete_state success, value, reason
418+
@Waiters.clear
401419
synchronize { ns_broadcast }
402420
call_callbacks success, value, reason
403421
else
@@ -414,6 +432,7 @@ def add_callback(method, *args)
414432
else
415433
@Callbacks.push [method, *args]
416434
state = self.state
435+
# take back if it was completed in the meanwhile
417436
call_callbacks success?(state), state.value, state.reason if completed?(state)
418437
end
419438
self
@@ -439,6 +458,10 @@ def call_callbacks(success, value, reason)
439458
end
440459
end
441460

461+
def call_callback(method, success, value, reason, *args)
462+
self.send method, success, value, reason, *args
463+
end
464+
442465
def pr_async_callback_on_success(success, value, reason, executor, callback)
443466
pr_with_async(executor, success, value, reason, callback) do |success, value, reason, callback|
444467
pr_callback_on_success success, value, reason, callback
@@ -452,7 +475,7 @@ def pr_async_callback_on_failure(success, value, reason, executor, callback)
452475
end
453476

454477
def pr_callback_on_success(success, value, reason, callback)
455-
callback.call value if success
478+
apply_value value, callback if success
456479
end
457480

458481
def pr_callback_on_failure(success, value, reason, callback)
@@ -463,7 +486,7 @@ def pr_callback_on_completion(success, value, reason, callback)
463486
callback.call success, value, reason
464487
end
465488

466-
def pr_notify_blocked(success, value, reason, promise)
489+
def pr_callback_notify_blocked(success, value, reason, promise)
467490
super(promise)
468491
end
469492

@@ -506,11 +529,11 @@ def try_fail(reason = StandardError.new)
506529
end
507530

508531
def evaluate_to(*args, &block)
509-
promise.evaluate_to(*args, &block)
532+
promise.evaluate_to(*args, block)
510533
end
511534

512535
def evaluate_to!(*args, &block)
513-
promise.evaluate_to!(*args, &block)
536+
promise.evaluate_to!(*args, block)
514537
end
515538
end
516539

@@ -609,8 +632,8 @@ def try_fail(reason = StandardError.new)
609632
public :evaluate_to
610633

611634
# @return [Future]
612-
def evaluate_to!(*args, &block)
613-
evaluate_to(*args, &block).wait!
635+
def evaluate_to!(*args, block)
636+
evaluate_to(*args, block).wait!
614637
end
615638
end
616639

@@ -625,7 +648,7 @@ def initialize(future, blocked_by_futures, countdown, &block)
625648
@Countdown = AtomicFixnum.new countdown
626649

627650
super(future)
628-
blocked_by.each { |f| f.add_callback :pr_notify_blocked, self }
651+
blocked_by.each { |future| future.add_callback :pr_callback_notify_blocked, self }
629652
end
630653

631654
# @api private
@@ -705,7 +728,9 @@ def initialize(blocked_by_future, default_executor = :io, executor = default_exe
705728

706729
def on_completable(done_future)
707730
if done_future.success?
708-
Concurrent.post_on(@Executor, done_future, @Task) { |done_future, task| evaluate_to done_future.value, &task }
731+
Concurrent.post_on(@Executor, done_future, @Task) do |done_future, task|
732+
evaluate_to { done_future.apply_value done_future.value, task }
733+
end
709734
else
710735
complete false, nil, done_future.reason
711736
end
@@ -722,7 +747,7 @@ def initialize(blocked_by_future, default_executor = :io, executor = default_exe
722747

723748
def on_completable(done_future)
724749
if done_future.failed?
725-
Concurrent.post_on(@Executor, done_future, @Task) { |done_future, task| evaluate_to done_future.reason, &task }
750+
Concurrent.post_on(@Executor, done_future.reason, @Task) { |reason, task| evaluate_to reason, &task }
726751
else
727752
complete true, done_future.value, nil
728753
end
@@ -757,12 +782,12 @@ def blocked_by
757782

758783
def process_on_done(future)
759784
countdown = super(future)
760-
value = future.value
785+
value = future.value
761786
if countdown.nonzero?
762787
case value
763788
when Future
764789
@BlockedBy.push value
765-
value.add_callback :pr_notify_blocked, self
790+
value.add_callback :pr_callback_notify_blocked, self
766791
@Countdown.value
767792
when Event
768793
raise TypeError, 'cannot flatten to Event'
@@ -797,26 +822,57 @@ def clear_blocked_by!
797822

798823
# used internally to support #with_default_executor
799824
class AllPromise < BlockedPromise
825+
826+
class ArrayFuture < Future
827+
def apply_value(value, block)
828+
block.call(*value)
829+
end
830+
end
831+
800832
private
801833

802834
def initialize(blocked_by_futures, default_executor = :io)
803-
klass = blocked_by_futures.any? { |f| f.is_a?(Future) } ? Future : Event
835+
klass = Event
836+
blocked_by_futures.each do |f|
837+
if f.is_a?(Future)
838+
if klass == Event
839+
klass = Future
840+
elsif klass == Future
841+
klass = ArrayFuture
842+
break
843+
end
844+
end
845+
end
846+
804847
# noinspection RubyArgCount
805848
super(klass.new(self, default_executor), blocked_by_futures, blocked_by_futures.size)
806849
end
807850

808851
def on_completable(done_future)
809-
results = blocked_by.select { |f| f.is_a?(Future) }.map(&:result)
810-
if results.empty?
811-
complete
812-
else
813-
if results.all? { |success, _, _| success }
814-
params = results.map { |_, value, _| value }
815-
complete(true, params.size == 1 ? params.first : params, nil)
852+
all_success = true
853+
reason = nil
854+
855+
values = blocked_by.each_with_object([]) do |future, values|
856+
next unless future.is_a?(Future)
857+
success, value, reason = future.result
858+
859+
unless success
860+
all_success = false
861+
reason = reason
862+
break
863+
end
864+
values << value
865+
end
866+
867+
if all_success
868+
if values.empty?
869+
complete
816870
else
817-
# TODO what about other reasons?
818-
complete false, nil, results.find { |success, _, _| !success }.last
871+
complete(true, values.size == 1 ? values.first : values, nil)
819872
end
873+
else
874+
# TODO what about other reasons?
875+
complete false, nil, reason
820876
end
821877
end
822878
end

lib/concurrent/edge/lock_free_stack.rb

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,11 @@ def compare_and_clear(head)
5252
end
5353

5454
def clear
55-
@Head.update { |_| EMPTY }
56-
self
55+
while true
56+
head = @Head.get
57+
return false if head == EMPTY
58+
return true if @Head.compare_and_set head, EMPTY
59+
end
5760
end
5861

5962
include Enumerable

0 commit comments

Comments
 (0)