Skip to content

Commit afe15b6

Browse files
committed
Make resolving of dependent futures independent on blocked_by internal list
1 parent b96e048 commit afe15b6

File tree

1 file changed

+48
-37
lines changed

1 file changed

+48
-37
lines changed

lib/concurrent/edge/promises.rb

Lines changed: 48 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,7 @@ def zip_futures(*futures_and_or_events)
214214
# @param [AbstractEventFuture] futures_and_or_events
215215
# @return [Future]
216216
def zip_futures_on(default_executor, *futures_and_or_events)
217-
ZipFuturesPromise.new(futures_and_or_events, default_executor).future
217+
ZipFuturesPromise.new_blocked(futures_and_or_events, futures_and_or_events, default_executor).future
218218
end
219219

220220
alias_method :zip, :zip_futures
@@ -232,7 +232,7 @@ def zip_events(*futures_and_or_events)
232232
# @param [AbstractEventFuture] futures_and_or_events
233233
# @return [Event]
234234
def zip_events_on(default_executor, *futures_and_or_events)
235-
ZipEventsPromise.new(futures_and_or_events, default_executor).future
235+
ZipEventsPromise.new_blocked(futures_and_or_events, futures_and_or_events, default_executor).future
236236
end
237237

238238
# @!macro promises.shortcut.on
@@ -254,7 +254,7 @@ def any_resolved_future(*futures_and_or_events)
254254
# @param [AbstractEventFuture] futures_and_or_events
255255
# @return [Future]
256256
def any_resolved_future_on(default_executor, *futures_and_or_events)
257-
AnyResolvedFuturePromise.new(futures_and_or_events, default_executor).future
257+
AnyResolvedFuturePromise.new_blocked(futures_and_or_events, futures_and_or_events, default_executor).future
258258
end
259259

260260
# @!macro promises.shortcut.on
@@ -273,7 +273,7 @@ def any_fulfilled_future(*futures_and_or_events)
273273
# @param [AbstractEventFuture] futures_and_or_events
274274
# @return [Future]
275275
def any_fulfilled_future_on(default_executor, *futures_and_or_events)
276-
AnyFulfilledFuturePromise.new(futures_and_or_events, default_executor).future
276+
AnyFulfilledFuturePromise.new_blocked(futures_and_or_events, futures_and_or_events, default_executor).future
277277
end
278278

279279
# @!macro promises.shortcut.on
@@ -580,7 +580,7 @@ def chain(*args, &task)
580580
# @overload a_future.chain_on(executor, *args, &task)
581581
# @yield [fulfilled?, value, reason, *args] to the task.
582582
def chain_on(executor, *args, &task)
583-
ChainPromise.new(self, @DefaultExecutor, executor, args, &task).future
583+
ChainPromise.new_blocked([self], self, @DefaultExecutor, executor, args, &task).future
584584
end
585585

586586
# Short string representation.
@@ -782,9 +782,9 @@ class Event < AbstractEventFuture
782782
# @return [Future, Event]
783783
def zip(other)
784784
if other.is_a?(Future)
785-
ZipFutureEventPromise.new(other, self, @DefaultExecutor).future
785+
ZipFutureEventPromise.new_blocked([other, self], other, self, @DefaultExecutor).future
786786
else
787-
ZipEventEventPromise.new(self, other, @DefaultExecutor).event
787+
ZipEventEventPromise.new_blocked([self, other], self, other, @DefaultExecutor).event
788788
end
789789
end
790790

@@ -795,7 +795,7 @@ def zip(other)
795795
#
796796
# @return [Event]
797797
def any(event_or_future)
798-
AnyResolvedEventPromise.new([self, event_or_future], @DefaultExecutor).event
798+
AnyResolvedEventPromise.new_blocked([self, event_or_future], [self, event_or_future], @DefaultExecutor).event
799799
end
800800

801801
alias_method :|, :any
@@ -805,9 +805,11 @@ def any(event_or_future)
805805
#
806806
# @return [Event]
807807
def delay
808-
ZipEventEventPromise.new(self,
809-
DelayPromise.new(@DefaultExecutor).event,
810-
@DefaultExecutor).event
808+
event = DelayPromise.new(@DefaultExecutor).event
809+
ZipEventEventPromise.new_blocked([self, event],
810+
self,
811+
event,
812+
@DefaultExecutor).event
811813
end
812814

813815
# @!macro [new] promise.method.schedule
@@ -819,9 +821,11 @@ def delay
819821
# @return [Event]
820822
def schedule(intended_time)
821823
chain do
822-
ZipEventEventPromise.new(self,
823-
ScheduledPromise.new(@DefaultExecutor, intended_time).event,
824-
@DefaultExecutor).event
824+
event = ScheduledPromise.new(@DefaultExecutor, intended_time).event
825+
ZipEventEventPromise.new_blocked([self, event],
826+
self,
827+
event,
828+
@DefaultExecutor).event
825829
end.flat_event
826830
end
827831

@@ -956,7 +960,7 @@ def then(*args, &task)
956960
# @return [Future]
957961
# @yield [value, *args] to the task.
958962
def then_on(executor, *args, &task)
959-
ThenPromise.new(self, @DefaultExecutor, executor, args, &task).future
963+
ThenPromise.new_blocked([self], self, @DefaultExecutor, executor, args, &task).future
960964
end
961965

962966
# @!macro promises.shortcut.on
@@ -974,16 +978,16 @@ def rescue(*args, &task)
974978
# @return [Future]
975979
# @yield [reason, *args] to the task.
976980
def rescue_on(executor, *args, &task)
977-
RescuePromise.new(self, @DefaultExecutor, executor, args, &task).future
981+
RescuePromise.new_blocked([self], self, @DefaultExecutor, executor, args, &task).future
978982
end
979983

980984
# @!macro promises.method.zip
981985
# @return [Future]
982986
def zip(other)
983987
if other.is_a?(Future)
984-
ZipFutureFuturePromise.new(self, other, @DefaultExecutor).future
988+
ZipFutureFuturePromise.new_blocked([self, other], self, other, @DefaultExecutor).future
985989
else
986-
ZipFutureEventPromise.new(self, other, @DefaultExecutor).future
990+
ZipFutureEventPromise.new_blocked([self, other], self, other, @DefaultExecutor).future
987991
end
988992
end
989993

@@ -995,7 +999,7 @@ def zip(other)
995999
#
9961000
# @return [Future]
9971001
def any(event_or_future)
998-
AnyResolvedFuturePromise.new([self, event_or_future], @DefaultExecutor).future
1002+
AnyResolvedFuturePromise.new_blocked([self, event_or_future], [self, event_or_future], @DefaultExecutor).future
9991003
end
10001004

10011005
alias_method :|, :any
@@ -1005,25 +1009,29 @@ def any(event_or_future)
10051009
#
10061010
# @return [Future]
10071011
def delay
1008-
ZipFutureEventPromise.new(self,
1009-
DelayPromise.new(@DefaultExecutor).future,
1010-
@DefaultExecutor).future
1012+
future = DelayPromise.new(@DefaultExecutor).future
1013+
ZipFutureEventPromise.new_blocked([self, future],
1014+
self,
1015+
future,
1016+
@DefaultExecutor).future
10111017
end
10121018

10131019
# @!macro promise.method.schedule
10141020
# @return [Future]
10151021
def schedule(intended_time)
10161022
chain do
1017-
ZipFutureEventPromise.new(self,
1018-
ScheduledPromise.new(@DefaultExecutor, intended_time).event,
1019-
@DefaultExecutor).future
1023+
event = ScheduledPromise.new(@DefaultExecutor, intended_time).event
1024+
ZipFutureEventPromise.new_blocked([self, event],
1025+
self,
1026+
event,
1027+
@DefaultExecutor).future
10201028
end.flat
10211029
end
10221030

10231031
# @!macro promises.method.with_default_executor
10241032
# @return [Future]
10251033
def with_default_executor(executor)
1026-
FutureWrapperPromise.new(self, executor).future
1034+
FutureWrapperPromise.new_blocked([self], self, executor).future
10271035
end
10281036

10291037
# Creates new future which will have result of the future returned by receiver. If receiver
@@ -1032,7 +1040,7 @@ def with_default_executor(executor)
10321040
# @param [Integer] level how many levels of futures should flatten
10331041
# @return [Future]
10341042
def flat_future(level = 1)
1035-
FlatFuturePromise.new(self, level, @DefaultExecutor).future
1043+
FlatFuturePromise.new_blocked([self], self, level, @DefaultExecutor).future
10361044
end
10371045

10381046
alias_method :flat, :flat_future
@@ -1042,7 +1050,7 @@ def flat_future(level = 1)
10421050
#
10431051
# @return [Event]
10441052
def flat_event
1045-
FlatEventPromise.new(self, @DefaultExecutor).event
1053+
FlatEventPromise.new_blocked([self], self, @DefaultExecutor).event
10461054
end
10471055

10481056
# @!macro promises.shortcut.using
@@ -1116,7 +1124,7 @@ def on_rejection_using(executor, *args, &callback)
11161124
# end
11171125
# future(0, &body).run.value! # => 5
11181126
def run
1119-
RunFuturePromise.new(self, @DefaultExecutor).future
1127+
RunFuturePromise.new_blocked([self], self, @DefaultExecutor).future
11201128
end
11211129

11221130
# @!visibility private
@@ -1211,7 +1219,7 @@ def resolve(raise_on_reassign = true)
12111219
#
12121220
# @return [Event]
12131221
def with_hidden_resolvable
1214-
@with_hidden_resolvable ||= EventWrapperPromise.new(self, @DefaultExecutor).event
1222+
@with_hidden_resolvable ||= EventWrapperPromise.new_blocked([self], self, @DefaultExecutor).event
12151223
end
12161224
end
12171225

@@ -1224,7 +1232,6 @@ class ResolvableFuture < Future
12241232
#
12251233
# @!macro promise.param.raise_on_reassign
12261234
def resolve(fulfilled = true, value = nil, reason = nil, raise_on_reassign = true)
1227-
# TODO (pitr-ch 25-Sep-2016): should the defaults be kept to match event resolve api?
12281235
resolve_with(fulfilled ? Fulfilled.new(value) : Rejected.new(reason), raise_on_reassign)
12291236
end
12301237

@@ -1268,7 +1275,7 @@ def evaluate_to!(*args, &block)
12681275
#
12691276
# @return [Future]
12701277
def with_hidden_resolvable
1271-
@with_hidden_resolvable ||= FutureWrapperPromise.new(self, @DefaultExecutor).future
1278+
@with_hidden_resolvable ||= FutureWrapperPromise.new_blocked([self], self, @DefaultExecutor).future
12721279
end
12731280
end
12741281

@@ -1360,10 +1367,13 @@ class InnerPromise < AbstractPromise
13601367
# @abstract
13611368
class BlockedPromise < InnerPromise
13621369
# @!visibility private
1363-
def self.new(*args, &block)
1364-
promise = super(*args, &block)
1365-
promise.blocked_by.each { |f| f.add_callback :callback_notify_blocked, promise }
1366-
promise
1370+
1371+
private_class_method :new
1372+
1373+
def self.new_blocked(blockers, *args, &block)
1374+
promise = new(*args, &block)
1375+
ensure
1376+
blockers.each { |f| f.add_callback :callback_notify_blocked, promise }
13671377
end
13681378

13691379
def initialize(future, blocked_by_futures, countdown)
@@ -1925,7 +1935,8 @@ class Future < AbstractEventFuture
19251935
# Zips with selected value form the suplied channels
19261936
# @return [Future]
19271937
def then_select(*channels)
1928-
ZipFuturesPromise.new([self, Concurrent::Promises.select(*channels)], @DefaultExecutor).future
1938+
future = Concurrent::Promises.select(*channels)
1939+
ZipFuturesPromise.new_blocked([self, future], [self, future], @DefaultExecutor).future
19291940
end
19301941

19311942
# @note may block

0 commit comments

Comments
 (0)