Skip to content

Commit 12d6aec

Browse files
author
Petr Chalupa
committed
Merge pull request #333 from pitr-ch/futures
Performance improvements
2 parents 1917c2d + 4d93bb7 commit 12d6aec

File tree

3 files changed

+32
-30
lines changed

3 files changed

+32
-30
lines changed

examples/benchmark_new_futures.rb

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
#!/usr/bin/env ruby
22

3-
$: << File.expand_path('../../lib', __FILE__)
4-
53
require 'benchmark/ips'
64
require 'concurrent'
75
require 'concurrent-edge'
@@ -33,7 +31,7 @@
3331

3432
Benchmark.ips(time, warmup) do |x|
3533
x.report('flat-old') { Concurrent::Promise.execute { 1 }.flat_map { |v| Concurrent::Promise.execute { v + 2 } }.value! }
36-
x.report('flat-new') { Concurrent.future(:fast) { 1 }.then { |v| Concurrent.future(:fast) { v+ 1 } }.flat.value! }
34+
x.report('flat-new') { Concurrent.future(:fast) { 1 }.then { |v| Concurrent.future(:fast) { v + 2 } }.flat.value! }
3735
x.compare!
3836
end
3937

lib/concurrent/edge/future.rb

Lines changed: 12 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -120,11 +120,7 @@ def post_on(executor, *args, &job)
120120
class Event < Synchronization::Object
121121
include Concern::Deprecation
122122

123-
class State < Synchronization::Object
124-
def initialize
125-
ensure_ivar_visibility!
126-
end
127-
123+
class State
128124
def completed?
129125
raise NotImplementedError
130126
end
@@ -331,19 +327,22 @@ def touched
331327
@Touched.value
332328
end
333329

330+
# only for debugging inspection
331+
def waiting_threads
332+
@Waiters.each.to_a
333+
end
334+
334335
private
335336

336337
def wait_until_complete(timeout)
337-
lock = Synchronization::Lock.new
338-
339338
while true
340339
last_waiter = @Waiters.peek # waiters' state before completion
341340
break if completed?
342341

343342
# synchronize so it cannot be signaled before it waits
344343
synchronize do
345344
# ok only if completing thread did not start signaling
346-
next unless @Waiters.compare_and_push last_waiter, lock
345+
next unless @Waiters.compare_and_push last_waiter, Thread.current
347346
ns_wait_until(timeout) { completed? }
348347
break
349348
end
@@ -407,7 +406,6 @@ def reason
407406
class Success < CompletedWithResult
408407
def initialize(value)
409408
@Value = value
410-
super()
411409
end
412410

413411
def success?
@@ -440,7 +438,6 @@ def apply(block)
440438
class Failed < CompletedWithResult
441439
def initialize(reason)
442440
@Reason = reason
443-
super()
444441
end
445442

446443
def success?
@@ -764,7 +761,6 @@ def hide_completable
764761
# @!visibility private
765762
class AbstractPromise < Synchronization::Object
766763
def initialize(future)
767-
super(&nil)
768764
@Future = future
769765
ensure_ivar_visibility!
770766
end
@@ -873,7 +869,7 @@ def initialize(future, blocked_by_futures, countdown, &block)
873869
@Countdown = AtomicFixnum.new countdown
874870

875871
super(future)
876-
blocked_by.each { |future| future.add_callback :pr_callback_notify_blocked, self }
872+
@BlockedBy.each { |future| future.add_callback :pr_callback_notify_blocked, self }
877873
end
878874

879875
# @api private
@@ -893,7 +889,7 @@ def touch
893889
blocked_by.each(&:touch)
894890
end
895891

896-
# @api private
892+
# !visibility private
897893
# for inspection only
898894
def blocked_by
899895
@BlockedBy
@@ -906,7 +902,7 @@ def inspect
906902
private
907903

908904
def initialize_blocked_by(blocked_by_futures)
909-
(@BlockedBy = Array(blocked_by_futures).freeze).size
905+
@BlockedBy = Array(blocked_by_futures)
910906
end
911907

912908
def clear_blocked_by!
@@ -999,8 +995,6 @@ def on_completable(done_future)
999995
# @!visibility private
1000996
class ImmediatePromise < InnerPromise
1001997
def initialize(default_executor, *args)
1002-
# FIXME optimize, create completed futures directly, with/without args
1003-
1004998
super(if args.empty?
1005999
Event.new(self, default_executor).complete
10061000
else
@@ -1011,6 +1005,8 @@ def initialize(default_executor, *args)
10111005

10121006
# @!visibility private
10131007
class FlattingPromise < BlockedPromise
1008+
1009+
# !visibility private
10141010
def blocked_by
10151011
@BlockedBy.each.to_a
10161012
end
@@ -1044,7 +1040,6 @@ def initialize(blocked_by_future, levels, default_executor)
10441040

10451041
def initialize_blocked_by(blocked_by_future)
10461042
@BlockedBy = LockFreeStack.new.push(blocked_by_future)
1047-
1
10481043
end
10491044

10501045
def on_completable(done_future)

lib/concurrent/edge/lock_free_stack.rb

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,20 +2,26 @@ module Concurrent
22
module Edge
33
class LockFreeStack < Synchronization::Object
44

5-
Node = ImmutableStruct.new(:value, :next) do
5+
class Node
6+
attr_reader :value, :next_node
7+
8+
def initialize(value, next_node)
9+
@value = value
10+
@next_node = next_node
11+
end
12+
613
singleton_class.send :alias_method, :[], :new
714
end
815

916
class Empty < Node
10-
def next
17+
def next_node
1118
self
1219
end
1320
end
1421

1522
EMPTY = Empty[nil, nil]
1623

1724
def initialize
18-
super()
1925
@Head = AtomicReference.new EMPTY
2026
ensure_ivar_visibility!
2127
end
@@ -29,22 +35,25 @@ def compare_and_push(head, value)
2935
end
3036

3137
def push(value)
32-
@Head.update { |head| Node[value, head] }
33-
self
38+
while true
39+
head = @Head.get
40+
return self if @Head.compare_and_set head, Node[value, head]
41+
end
3442
end
3543

3644
def peek
3745
@Head.get
3846
end
3947

4048
def compare_and_pop(head)
41-
@Head.compare_and_set head, head.next
49+
@Head.compare_and_set head, head.next_node
4250
end
4351

4452
def pop
45-
popped = nil
46-
@Head.update { |head| (popped = head).next }
47-
popped.value
53+
while true
54+
head = @Head.get
55+
return head.value if @Head.compare_and_set head, head.next_node
56+
end
4857
end
4958

5059
def compare_and_clear(head)
@@ -66,7 +75,7 @@ def each
6675
it = peek
6776
until it.equal?(EMPTY)
6877
yield it.value
69-
it = it.next
78+
it = it.next_node
7079
end
7180
self
7281
end

0 commit comments

Comments
 (0)