Skip to content

Commit 6b11145

Browse files
committed
Eliminate some array allocations
1 parent 6bfd19a commit 6b11145

File tree

1 file changed

+70
-35
lines changed

1 file changed

+70
-35
lines changed

lib/concurrent/edge/promises.rb

Lines changed: 70 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -577,7 +577,7 @@ def chain(*args, &task)
577577
# @overload a_future.chain_on(executor, *args, &task)
578578
# @yield [fulfilled?, value, reason, *args] to the task.
579579
def chain_on(executor, *args, &task)
580-
ChainPromise.new_blocked([self], @DefaultExecutor, executor, args, &task).future
580+
ChainPromise.new_blocked1(self, @DefaultExecutor, executor, args, &task).future
581581
end
582582

583583
# Short string representation.
@@ -664,7 +664,7 @@ def resolve_with(state, raise_on_reassign = true)
664664
# @return [Array<AbstractPromise>]
665665
def blocks
666666
# TODO (pitr-ch 18-Dec-2016): add macro noting that debug methods may change api without warning
667-
@Callbacks.each_with_object([]) do |(method, *args), promises|
667+
@Callbacks.each_with_object([]) do |(method, args), promises|
668668
promises.push(args[0]) if method == :callback_notify_blocked
669669
end
670670
end
@@ -697,9 +697,9 @@ def waiting_threads
697697
def add_callback(method, *args)
698698
state = internal_state
699699
if resolved?(state)
700-
call_callback method, state, *args
700+
call_callback method, state, args
701701
else
702-
@Callbacks.push [method, *args]
702+
@Callbacks.push [method, args]
703703
state = internal_state
704704
# take back if it was resolved in the meanwhile
705705
call_callbacks state if resolved?(state)
@@ -729,15 +729,15 @@ def wait_until_resolved(timeout)
729729
resolved?
730730
end
731731

732-
def call_callback(method, state, *args)
732+
def call_callback(method, state, args)
733733
self.send method, state, *args
734734
end
735735

736736
def call_callbacks(state)
737-
method, *args = @Callbacks.pop
737+
method, args = @Callbacks.pop
738738
while method
739-
call_callback method, state, *args
740-
method, *args = @Callbacks.pop
739+
call_callback method, state, args
740+
method, args = @Callbacks.pop
741741
end
742742
end
743743

@@ -774,9 +774,9 @@ class Event < AbstractEventFuture
774774
# @return [Future, Event]
775775
def zip(other)
776776
if other.is_a?(Future)
777-
ZipFutureEventPromise.new_blocked([other, self], @DefaultExecutor).future
777+
ZipFutureEventPromise.new_blocked2(other, self, @DefaultExecutor).future
778778
else
779-
ZipEventEventPromise.new_blocked([self, other], @DefaultExecutor).event
779+
ZipEventEventPromise.new_blocked2(self, other, @DefaultExecutor).event
780780
end
781781
end
782782

@@ -787,7 +787,7 @@ def zip(other)
787787
#
788788
# @return [Event]
789789
def any(event_or_future)
790-
AnyResolvedEventPromise.new_blocked([self, event_or_future], @DefaultExecutor).event
790+
AnyResolvedEventPromise.new_blocked2(self, event_or_future, @DefaultExecutor).event
791791
end
792792

793793
alias_method :|, :any
@@ -798,7 +798,7 @@ def any(event_or_future)
798798
# @return [Event]
799799
def delay
800800
event = DelayPromise.new(@DefaultExecutor).event
801-
ZipEventEventPromise.new_blocked([self, event], @DefaultExecutor).event
801+
ZipEventEventPromise.new_blocked2(self, event, @DefaultExecutor).event
802802
end
803803

804804
# @!macro [new] promise.method.schedule
@@ -811,7 +811,7 @@ def delay
811811
def schedule(intended_time)
812812
chain do
813813
event = ScheduledPromise.new(@DefaultExecutor, intended_time).event
814-
ZipEventEventPromise.new_blocked([self, event], @DefaultExecutor).event
814+
ZipEventEventPromise.new_blocked2(self, event, @DefaultExecutor).event
815815
end.flat_event
816816
end
817817

@@ -833,7 +833,7 @@ def to_event
833833
# @!macro promises.method.with_default_executor
834834
# @return [Event]
835835
def with_default_executor(executor)
836-
EventWrapperPromise.new_blocked([self], executor).event
836+
EventWrapperPromise.new_blocked1(self, executor).event
837837
end
838838

839839
private
@@ -946,7 +946,7 @@ def then(*args, &task)
946946
# @return [Future]
947947
# @yield [value, *args] to the task.
948948
def then_on(executor, *args, &task)
949-
ThenPromise.new_blocked([self], @DefaultExecutor, executor, args, &task).future
949+
ThenPromise.new_blocked1(self, @DefaultExecutor, executor, args, &task).future
950950
end
951951

952952
# @!macro promises.shortcut.on
@@ -964,16 +964,16 @@ def rescue(*args, &task)
964964
# @return [Future]
965965
# @yield [reason, *args] to the task.
966966
def rescue_on(executor, *args, &task)
967-
RescuePromise.new_blocked([self], @DefaultExecutor, executor, args, &task).future
967+
RescuePromise.new_blocked1(self, @DefaultExecutor, executor, args, &task).future
968968
end
969969

970970
# @!macro promises.method.zip
971971
# @return [Future]
972972
def zip(other)
973973
if other.is_a?(Future)
974-
ZipFuturesPromise.new_blocked([self, other], @DefaultExecutor).future
974+
ZipFuturesPromise.new_blocked2(self, other, @DefaultExecutor).future
975975
else
976-
ZipFutureEventPromise.new_blocked([self, other], @DefaultExecutor).future
976+
ZipFutureEventPromise.new_blocked2(self, other, @DefaultExecutor).future
977977
end
978978
end
979979

@@ -985,7 +985,7 @@ def zip(other)
985985
#
986986
# @return [Future]
987987
def any(event_or_future)
988-
AnyResolvedFuturePromise.new_blocked([self, event_or_future], @DefaultExecutor).future
988+
AnyResolvedFuturePromise.new_blocked2(self, event_or_future, @DefaultExecutor).future
989989
end
990990

991991
alias_method :|, :any
@@ -996,22 +996,22 @@ def any(event_or_future)
996996
# @return [Future]
997997
def delay
998998
event = DelayPromise.new(@DefaultExecutor).event
999-
ZipFutureEventPromise.new_blocked([self, event], @DefaultExecutor).future
999+
ZipFutureEventPromise.new_blocked2(self, event, @DefaultExecutor).future
10001000
end
10011001

10021002
# @!macro promise.method.schedule
10031003
# @return [Future]
10041004
def schedule(intended_time)
10051005
chain do
10061006
event = ScheduledPromise.new(@DefaultExecutor, intended_time).event
1007-
ZipFutureEventPromise.new_blocked([self, event], @DefaultExecutor).future
1007+
ZipFutureEventPromise.new_blocked2(self, event, @DefaultExecutor).future
10081008
end.flat
10091009
end
10101010

10111011
# @!macro promises.method.with_default_executor
10121012
# @return [Future]
10131013
def with_default_executor(executor)
1014-
FutureWrapperPromise.new_blocked([self], executor).future
1014+
FutureWrapperPromise.new_blocked1(self, executor).future
10151015
end
10161016

10171017
# Creates new future which will have result of the future returned by receiver. If receiver
@@ -1020,7 +1020,7 @@ def with_default_executor(executor)
10201020
# @param [Integer] level how many levels of futures should flatten
10211021
# @return [Future]
10221022
def flat_future(level = 1)
1023-
FlatFuturePromise.new_blocked([self], level, @DefaultExecutor).future
1023+
FlatFuturePromise.new_blocked1(self, level, @DefaultExecutor).future
10241024
end
10251025

10261026
alias_method :flat, :flat_future
@@ -1030,7 +1030,7 @@ def flat_future(level = 1)
10301030
#
10311031
# @return [Event]
10321032
def flat_event
1033-
FlatEventPromise.new_blocked([self], @DefaultExecutor).event
1033+
FlatEventPromise.new_blocked1(self, @DefaultExecutor).event
10341034
end
10351035

10361036
# @!macro promises.shortcut.using
@@ -1104,7 +1104,7 @@ def on_rejection_using(executor, *args, &callback)
11041104
# end
11051105
# future(0, &body).run.value! # => 5
11061106
def run
1107-
RunFuturePromise.new_blocked([self], @DefaultExecutor).future
1107+
RunFuturePromise.new_blocked1(self, @DefaultExecutor).future
11081108
end
11091109

11101110
# @!visibility private
@@ -1199,7 +1199,7 @@ def resolve(raise_on_reassign = true)
11991199
#
12001200
# @return [Event]
12011201
def with_hidden_resolvable
1202-
@with_hidden_resolvable ||= EventWrapperPromise.new_blocked([self], @DefaultExecutor).event
1202+
@with_hidden_resolvable ||= EventWrapperPromise.new_blocked1(self, @DefaultExecutor).event
12031203
end
12041204
end
12051205

@@ -1255,7 +1255,7 @@ def evaluate_to!(*args, &block)
12551255
#
12561256
# @return [Future]
12571257
def with_hidden_resolvable
1258-
@with_hidden_resolvable ||= FutureWrapperPromise.new_blocked([self], @DefaultExecutor).future
1258+
@with_hidden_resolvable ||= FutureWrapperPromise.new_blocked1(self, @DefaultExecutor).future
12591259
end
12601260
end
12611261

@@ -1354,16 +1354,49 @@ class BlockedPromise < InnerPromise
13541354

13551355
private_class_method :new
13561356

1357+
def self.new_blocked1(blocker, *args, &block)
1358+
blocker_delayed = blocker.promise.delayed
1359+
delayed = blocker_delayed ? LockFreeStack.new.push(blocker_delayed) : nil
1360+
promise = new(delayed, 1, *args, &block)
1361+
ensure
1362+
blocker.add_callback :callback_notify_blocked, promise, 0
1363+
end
1364+
1365+
def self.new_blocked2(blocker1, blocker2, *args, &block)
1366+
blocker_delayed1 = blocker1.promise.delayed
1367+
blocker_delayed2 = blocker2.promise.delayed
1368+
delayed = if blocker_delayed1
1369+
if blocker_delayed2
1370+
LockFreeStack.new2(blocker_delayed1, blocker_delayed2)
1371+
else
1372+
LockFreeStack.new1(blocker_delayed1)
1373+
end
1374+
else
1375+
blocker_delayed2 ? LockFreeStack.new1(blocker_delayed2) : nil
1376+
end
1377+
promise = new(delayed, 2, *args, &block)
1378+
ensure
1379+
blocker1.add_callback :callback_notify_blocked, promise, 0
1380+
blocker2.add_callback :callback_notify_blocked, promise, 1
1381+
end
1382+
13571383
def self.new_blocked(blockers, *args, &block)
1358-
delayed = blockers.each_with_object(LockFreeStack.new, &method(:add_delayed))
1384+
delayed = blockers.reduce(nil, &method(:add_delayed))
13591385
promise = new(delayed, blockers.size, *args, &block)
13601386
ensure
13611387
blockers.each_with_index { |f, i| f.add_callback :callback_notify_blocked, promise, i }
13621388
end
13631389

1364-
def self.add_delayed(blocker, delayed)
1365-
d = blocker.promise.delayed
1366-
delayed.push(d) if d
1390+
def self.add_delayed(delayed, blocker)
1391+
blocker_delayed = blocker.promise.delayed
1392+
if blocker_delayed
1393+
delayed = unless delayed
1394+
LockFreeStack.new1(blocker_delayed)
1395+
else
1396+
delayed.push(blocker_delayed)
1397+
end
1398+
end
1399+
delayed
13671400
end
13681401

13691402
def initialize(delayed, blockers_count, future)
@@ -1390,7 +1423,7 @@ def touch
13901423
end
13911424

13921425
def clear_propagate_touch
1393-
@Delayed.clear_each { |o| propagate_touch o }
1426+
@Delayed.clear_each { |o| propagate_touch o } if @Delayed
13941427
end
13951428

13961429
# @!visibility private
@@ -1535,7 +1568,7 @@ def add_delayed_of(future)
15351568
if touched?
15361569
propagate_touch future.promise.delayed
15371570
else
1538-
BlockedPromise.add_delayed future, @Delayed
1571+
BlockedPromise.add_delayed @Delayed, future
15391572
clear_propagate_touch if touched?
15401573
end
15411574
end
@@ -1581,7 +1614,9 @@ class FlatFuturePromise < AbstractFlatPromise
15811614

15821615
def initialize(delayed, blockers_count, levels, default_executor)
15831616
raise ArgumentError, 'levels has to be higher than 0' if levels < 1
1584-
super delayed, 1 + levels, Future.new(self, default_executor)
1617+
# flat promise may result to a future having delayed futures, therefore we have to have empty stack
1618+
# to be able to add new delayed futures
1619+
super delayed || LockFreeStack.new, 1 + levels, Future.new(self, default_executor)
15851620
end
15861621

15871622
def process_on_blocker_resolution(future, index)
@@ -1915,7 +1950,7 @@ class Future < AbstractEventFuture
19151950
# @return [Future]
19161951
def then_select(*channels)
19171952
future = Concurrent::Promises.select(*channels)
1918-
ZipFuturesPromise.new_blocked([self, future], @DefaultExecutor).future
1953+
ZipFuturesPromise.new_blocked2(self, future, @DefaultExecutor).future
19191954
end
19201955

19211956
# @note may block

0 commit comments

Comments
 (0)