Skip to content

Commit 0a85d02

Browse files
committed
Remove dependency in zip promise on blocked_by
1 parent afe15b6 commit 0a85d02

File tree

1 file changed

+69
-78
lines changed

1 file changed

+69
-78
lines changed

lib/concurrent/edge/promises.rb

Lines changed: 69 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -672,8 +672,9 @@ def resolve_with(state, raise_on_reassign = true)
672672
# @!visibility private
673673
# @return [Array<AbstractPromise>]
674674
def blocks
675-
@Callbacks.each_with_object([]) do |callback, promises|
676-
promises.push(*(callback.select { |v| v.is_a? AbstractPromise }))
675+
# TODO (pitr-ch 18-Dec-2016): add macro noting that debug methods may change api without warning
676+
@Callbacks.each_with_object([]) do |(method, *args), promises|
677+
promises.push(args[0]) if method == :callback_notify_blocked
677678
end
678679
end
679680

@@ -759,8 +760,8 @@ def async_callback_on_resolution(state, executor, args, callback)
759760
end
760761
end
761762

762-
def callback_notify_blocked(state, promise)
763-
promise.on_resolution self
763+
def callback_notify_blocked(state, promise, index)
764+
promise.on_resolution self, index
764765
end
765766
end
766767

@@ -985,7 +986,7 @@ def rescue_on(executor, *args, &task)
985986
# @return [Future]
986987
def zip(other)
987988
if other.is_a?(Future)
988-
ZipFutureFuturePromise.new_blocked([self, other], self, other, @DefaultExecutor).future
989+
ZipFuturesPromise.new_blocked([self, other], [self, other], @DefaultExecutor).future
989990
else
990991
ZipFutureEventPromise.new_blocked([self, other], self, other, @DefaultExecutor).future
991992
end
@@ -1009,10 +1010,10 @@ def any(event_or_future)
10091010
#
10101011
# @return [Future]
10111012
def delay
1012-
future = DelayPromise.new(@DefaultExecutor).future
1013-
ZipFutureEventPromise.new_blocked([self, future],
1013+
event = DelayPromise.new(@DefaultExecutor).event
1014+
ZipFutureEventPromise.new_blocked([self, event],
10141015
self,
1015-
future,
1016+
event,
10161017
@DefaultExecutor).future
10171018
end
10181019

@@ -1373,7 +1374,7 @@ class BlockedPromise < InnerPromise
13731374
def self.new_blocked(blockers, *args, &block)
13741375
promise = new(*args, &block)
13751376
ensure
1376-
blockers.each { |f| f.add_callback :callback_notify_blocked, promise }
1377+
blockers.each_with_index { |f, i| f.add_callback :callback_notify_blocked, promise, i }
13771378
end
13781379

13791380
def initialize(future, blocked_by_futures, countdown)
@@ -1383,12 +1384,13 @@ def initialize(future, blocked_by_futures, countdown)
13831384
end
13841385

13851386
# @!visibility private
1386-
def on_resolution(future)
1387-
countdown = process_on_resolution(future)
1388-
resolvable = resolvable?(countdown, future)
1387+
def on_resolution(future, index)
1388+
# TODO (pitr-ch 18-Dec-2016): rename to on_blocker_resolution
1389+
countdown = process_on_resolution(future, index)
1390+
resolvable = resolvable?(countdown, future, index)
13891391

13901392
if resolvable
1391-
on_resolvable(future)
1393+
on_resolvable(future, index)
13921394
# futures could be deleted from blocked_by one by one here, but that would be too expensive,
13931395
# it's done once when all are resolved to free their references
13941396
clear_blocked_by!
@@ -1428,15 +1430,15 @@ def clear_blocked_by!
14281430
end
14291431

14301432
# @return [true,false] if resolvable
1431-
def resolvable?(countdown, future)
1433+
def resolvable?(countdown, future, index)
14321434
countdown.zero?
14331435
end
14341436

1435-
def process_on_resolution(future)
1437+
def process_on_resolution(future, index)
14361438
@Countdown.decrement
14371439
end
14381440

1439-
def on_resolvable(resolved_future)
1441+
def on_resolvable(resolved_future, index)
14401442
raise NotImplementedError
14411443
end
14421444
end
@@ -1465,7 +1467,7 @@ def initialize(blocked_by_future, default_executor, executor, args, &task)
14651467
super blocked_by_future, default_executor, executor, args, &task
14661468
end
14671469

1468-
def on_resolvable(resolved_future)
1470+
def on_resolvable(resolved_future, index)
14691471
if resolved_future.fulfilled?
14701472
Concurrent.executor(@Executor).post(resolved_future, @Args, @Task) do |future, args, task|
14711473
evaluate_to lambda { future.apply args, task }
@@ -1483,7 +1485,7 @@ def initialize(blocked_by_future, default_executor, executor, args, &task)
14831485
super blocked_by_future, default_executor, executor, args, &task
14841486
end
14851487

1486-
def on_resolvable(resolved_future)
1488+
def on_resolvable(resolved_future, index)
14871489
if resolved_future.rejected?
14881490
Concurrent.executor(@Executor).post(resolved_future, @Args, @Task) do |future, args, task|
14891491
evaluate_to lambda { future.apply args, task }
@@ -1497,7 +1499,7 @@ def on_resolvable(resolved_future)
14971499
class ChainPromise < BlockedTaskPromise
14981500
private
14991501

1500-
def on_resolvable(resolved_future)
1502+
def on_resolvable(resolved_future, index)
15011503
if Future === resolved_future
15021504
Concurrent.executor(@Executor).post(resolved_future, @Args, @Task) do |future, args, task|
15031505
evaluate_to(*future.result, *args, task)
@@ -1536,7 +1538,7 @@ def initialize_blocked_by(blocked_by_future)
15361538
@BlockedBy = LockFreeStack.new.push(blocked_by_future)
15371539
end
15381540

1539-
def on_resolvable(resolved_future)
1541+
def on_resolvable(resolved_future, index)
15401542
resolve_with resolved_future.internal_state
15411543
end
15421544

@@ -1550,8 +1552,8 @@ def blocked_by_add(future)
15501552
future.touch if self.future.touched?
15511553
end
15521554

1553-
def resolvable?(countdown, future)
1554-
!@Future.internal_state.resolved? && super(countdown, future)
1555+
def resolvable?(countdown, future, index)
1556+
!@Future.internal_state.resolved? && super(countdown, future, index)
15551557
end
15561558
end
15571559

@@ -1563,8 +1565,8 @@ def initialize(blocked_by_future, default_executor)
15631565
super Event.new(self, default_executor), blocked_by_future, 2
15641566
end
15651567

1566-
def process_on_resolution(future)
1567-
countdown = super(future)
1568+
def process_on_resolution(future, index)
1569+
countdown = super(future, index)
15681570
if countdown.nonzero?
15691571
internal_state = future.internal_state
15701572

@@ -1577,8 +1579,8 @@ def process_on_resolution(future)
15771579
case value
15781580
when Future, Event
15791581
blocked_by_add value
1580-
value.add_callback :callback_notify_blocked, self
1581-
@Countdown.value
1582+
value.add_callback :callback_notify_blocked, self, nil
1583+
countdown
15821584
else
15831585
resolve_with RESOLVED
15841586
end
@@ -1597,8 +1599,8 @@ def initialize(blocked_by_future, levels, default_executor)
15971599
super Future.new(self, default_executor), blocked_by_future, 1 + levels
15981600
end
15991601

1600-
def process_on_resolution(future)
1601-
countdown = super(future)
1602+
def process_on_resolution(future, index)
1603+
countdown = super(future, index)
16021604
if countdown.nonzero?
16031605
internal_state = future.internal_state
16041606

@@ -1611,8 +1613,8 @@ def process_on_resolution(future)
16111613
case value
16121614
when Future
16131615
blocked_by_add value
1614-
value.add_callback :callback_notify_blocked, self
1615-
@Countdown.value
1616+
value.add_callback :callback_notify_blocked, self, nil
1617+
countdown
16161618
when Event
16171619
evaluate_to(lambda { raise TypeError, 'cannot flatten to Event' })
16181620
else
@@ -1632,7 +1634,7 @@ def initialize(blocked_by_future, default_executor)
16321634
super Future.new(self, default_executor), blocked_by_future, 1
16331635
end
16341636

1635-
def process_on_resolution(future)
1637+
def process_on_resolution(future, index)
16361638
internal_state = future.internal_state
16371639

16381640
unless internal_state.fulfilled?
@@ -1645,7 +1647,7 @@ def process_on_resolution(future)
16451647
when Future
16461648
# FIXME (pitr-ch 08-Dec-2016): will accumulate the completed futures
16471649
blocked_by_add value
1648-
value.add_callback :callback_notify_blocked, self
1650+
value.add_callback :callback_notify_blocked, self, nil
16491651
else
16501652
resolve_with internal_state
16511653
end
@@ -1661,43 +1663,28 @@ def initialize(event1, event2, default_executor)
16611663

16621664
private
16631665

1664-
def on_resolvable(resolved_future)
1666+
def on_resolvable(resolved_future, index)
16651667
resolve_with RESOLVED
16661668
end
16671669
end
16681670

16691671
class ZipFutureEventPromise < BlockedPromise
16701672
def initialize(future, event, default_executor)
16711673
super Future.new(self, default_executor), [future, event], 2
1672-
@FutureResult = future
1674+
@result = nil
16731675
end
16741676

16751677
private
16761678

1677-
def on_resolvable(resolved_future)
1678-
resolve_with @FutureResult.internal_state
1679-
end
1680-
end
1681-
1682-
class ZipFutureFuturePromise < BlockedPromise
1683-
def initialize(future1, future2, default_executor)
1684-
super Future.new(self, default_executor), [future1, future2], 2
1685-
@Future1Result = future1
1686-
@Future2Result = future2
1679+
def process_on_resolution(future, index)
1680+
# first blocking is future, take its result
1681+
@result = future.internal_state if index == 0
1682+
# super has to be called after above to piggyback on volatile @Countdown
1683+
super future, index
16871684
end
16881685

1689-
private
1690-
1691-
def on_resolvable(resolved_future)
1692-
fulfilled1, value1, reason1 = @Future1Result.result
1693-
fulfilled2, value2, reason2 = @Future2Result.result
1694-
fulfilled = fulfilled1 && fulfilled2
1695-
new_state = if fulfilled
1696-
FulfilledArray.new([value1, value2])
1697-
else
1698-
PartiallyRejected.new([value1, value2], [reason1, reason2])
1699-
end
1700-
resolve_with new_state
1686+
def on_resolvable(resolved_future, index)
1687+
resolve_with @result
17011688
end
17021689
end
17031690

@@ -1708,7 +1695,7 @@ def initialize(event, default_executor)
17081695

17091696
private
17101697

1711-
def on_resolvable(resolved_future)
1698+
def on_resolvable(resolved_future, index)
17121699
resolve_with RESOLVED
17131700
end
17141701
end
@@ -1720,7 +1707,7 @@ def initialize(future, default_executor)
17201707

17211708
private
17221709

1723-
def on_resolvable(resolved_future)
1710+
def on_resolvable(resolved_future, index)
17241711
resolve_with resolved_future.internal_state
17251712
end
17261713
end
@@ -1730,23 +1717,28 @@ class ZipFuturesPromise < BlockedPromise
17301717
private
17311718

17321719
def initialize(blocked_by_futures, default_executor)
1733-
super(Future.new(self, default_executor), blocked_by_futures, blocked_by_futures.size)
1720+
size = blocked_by_futures.size
1721+
super(Future.new(self, default_executor), blocked_by_futures, size)
1722+
@Resolutions = ::Array.new(size)
1723+
1724+
on_resolvable nil, -1 if blocked_by_futures.empty?
1725+
end
17341726

1735-
on_resolvable nil if blocked_by_futures.empty?
1727+
def process_on_resolution(future, index)
1728+
countdown = super future, index
1729+
# TODO (pitr-ch 18-Dec-2016): Can we assume that array will never break under parallel access when never resized?
1730+
@Resolutions[index] = future.internal_state
1731+
countdown
17361732
end
17371733

1738-
def on_resolvable(resolved_future)
1734+
def on_resolvable(resolved_future, index)
17391735
all_fulfilled = true
1740-
values = Array.new(blocked_by.size)
1741-
reasons = Array.new(blocked_by.size)
1736+
values = Array.new(@Resolutions.size)
1737+
reasons = Array.new(@Resolutions.size)
17421738

1743-
blocked_by.each_with_index do |future, i|
1744-
if future.is_a?(Future)
1745-
fulfilled, values[i], reasons[i] = future.result
1746-
all_fulfilled &&= fulfilled
1747-
else
1748-
values[i] = reasons[i] = nil
1749-
end
1739+
@Resolutions.each_with_index do |internal_state, i|
1740+
fulfilled, values[i], reasons[i] = internal_state.result
1741+
all_fulfilled &&= fulfilled
17501742
end
17511743

17521744
if all_fulfilled
@@ -1764,10 +1756,10 @@ class ZipEventsPromise < BlockedPromise
17641756
def initialize(blocked_by_futures, default_executor)
17651757
super(Event.new(self, default_executor), blocked_by_futures, blocked_by_futures.size)
17661758

1767-
on_resolvable nil if blocked_by_futures.empty?
1759+
on_resolvable nil, -1 if blocked_by_futures.empty?
17681760
end
17691761

1770-
def on_resolvable(resolved_future)
1762+
def on_resolvable(resolved_future, index)
17711763
resolve_with RESOLVED
17721764
end
17731765
end
@@ -1784,11 +1776,11 @@ def initialize(blocked_by_futures, default_executor)
17841776
super(Future.new(self, default_executor), blocked_by_futures, blocked_by_futures.size)
17851777
end
17861778

1787-
def resolvable?(countdown, future)
1779+
def resolvable?(countdown, future, index)
17881780
true
17891781
end
17901782

1791-
def on_resolvable(resolved_future)
1783+
def on_resolvable(resolved_future, index)
17921784
resolve_with resolved_future.internal_state, false
17931785
end
17941786
end
@@ -1801,11 +1793,11 @@ def initialize(blocked_by_futures, default_executor)
18011793
super(Event.new(self, default_executor), blocked_by_futures, blocked_by_futures.size)
18021794
end
18031795

1804-
def resolvable?(countdown, future)
1796+
def resolvable?(countdown, future, index)
18051797
true
18061798
end
18071799

1808-
def on_resolvable(resolved_future)
1800+
def on_resolvable(resolved_future, index)
18091801
resolve_with RESOLVED, false
18101802
end
18111803
end
@@ -1814,7 +1806,7 @@ class AnyFulfilledFuturePromise < AnyResolvedFuturePromise
18141806

18151807
private
18161808

1817-
def resolvable?(countdown, future)
1809+
def resolvable?(countdown, future, index)
18181810
future.fulfilled? ||
18191811
# inlined super from BlockedPromise
18201812
countdown.zero?
@@ -1887,7 +1879,6 @@ def initialize(default_executor, intended_time)
18871879
:RunFuturePromise,
18881880
:ZipEventEventPromise,
18891881
:ZipFutureEventPromise,
1890-
:ZipFutureFuturePromise,
18911882
:EventWrapperPromise,
18921883
:FutureWrapperPromise,
18931884
:ZipFuturesPromise,

0 commit comments

Comments
 (0)