Skip to content

Commit 54f8682

Browse files
committed
Fix callback yielded arguments and delay propagation
1 parent 5c2e43c commit 54f8682

File tree

2 files changed

+137
-74
lines changed

2 files changed

+137
-74
lines changed

lib/concurrent/edge/promises.rb

Lines changed: 72 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -727,7 +727,18 @@ def waiting_threads
727727
@Waiters.each.to_a
728728
end
729729

730+
# # @!visibility private
731+
def add_callback_notify_blocked(promise, index)
732+
add_callback :callback_notify_blocked, promise, index
733+
end
734+
730735
# @!visibility private
736+
def add_callback_clear_delayed_node(node)
737+
add_callback(:callback_clear_delayed_node, node)
738+
end
739+
740+
private
741+
731742
def add_callback(method, *args)
732743
state = internal_state
733744
if resolved?(state)
@@ -741,7 +752,9 @@ def add_callback(method, *args)
741752
self
742753
end
743754

744-
private
755+
def callback_clear_delayed_node(state, node)
756+
node.value = nil
757+
end
745758

746759
# @return [Boolean]
747760
def wait_until_resolved(timeout)
@@ -1202,7 +1215,7 @@ def callback_on_rejection(state, args, callback)
12021215
end
12031216

12041217
def callback_on_resolution(state, args, callback)
1205-
callback.call state.result, *args
1218+
callback.call *state.result, *args
12061219
end
12071220

12081221
end
@@ -1326,7 +1339,7 @@ def touch
13261339

13271340
alias_method :inspect, :to_s
13281341

1329-
def delayed
1342+
def delayed_because
13301343
nil
13311344
end
13321345

@@ -1381,55 +1394,45 @@ class BlockedPromise < InnerPromise
13811394
private_class_method :new
13821395

13831396
def self.new_blocked_by1(blocker, *args, &block)
1384-
blocker_delayed = blocker.promise.delayed
1385-
delayed = blocker_delayed ? LockFreeStack.new.push(blocker_delayed) : nil
1386-
promise = new(delayed, 1, *args, &block)
1387-
blocker.add_callback :callback_notify_blocked, promise, 0
1397+
blocker_delayed = blocker.promise.delayed_because
1398+
promise = new(blocker_delayed, 1, *args, &block)
1399+
blocker.add_callback_notify_blocked promise, 0
13881400
promise
13891401
end
13901402

13911403
def self.new_blocked_by2(blocker1, blocker2, *args, &block)
1392-
blocker_delayed1 = blocker1.promise.delayed
1393-
blocker_delayed2 = blocker2.promise.delayed
1394-
# TODO (pitr-ch 23-Dec-2016): use arrays when we know it will not grow (only flat adds delay)
1395-
delayed = if blocker_delayed1
1396-
if blocker_delayed2
1397-
LockFreeStack.of2(blocker_delayed1, blocker_delayed2)
1398-
else
1399-
LockFreeStack.of1(blocker_delayed1)
1400-
end
1404+
blocker_delayed1 = blocker1.promise.delayed_because
1405+
blocker_delayed2 = blocker2.promise.delayed_because
1406+
delayed = if blocker_delayed1 && blocker_delayed2
1407+
# TODO (pitr-ch 23-Dec-2016): use arrays when we know it will not grow (only flat adds delay)
1408+
LockFreeStack.of2(blocker_delayed1, blocker_delayed2)
14011409
else
1402-
blocker_delayed2 ? LockFreeStack.of1(blocker_delayed2) : nil
1410+
blocker_delayed1 || blocker_delayed2
14031411
end
14041412
promise = new(delayed, 2, *args, &block)
1405-
blocker1.add_callback :callback_notify_blocked, promise, 0
1406-
blocker2.add_callback :callback_notify_blocked, promise, 1
1413+
blocker1.add_callback_notify_blocked promise, 0
1414+
blocker2.add_callback_notify_blocked promise, 1
14071415
promise
14081416
end
14091417

14101418
def self.new_blocked_by(blockers, *args, &block)
1411-
delayed = blockers.reduce(nil, &method(:add_delayed))
1419+
delayed = blockers.reduce(nil) { |d, f| add_delayed d, f.promise.delayed_because }
14121420
promise = new(delayed, blockers.size, *args, &block)
1413-
blockers.each_with_index { |f, i| f.add_callback :callback_notify_blocked, promise, i }
1421+
blockers.each_with_index { |f, i| f.add_callback_notify_blocked promise, i }
14141422
promise
14151423
end
14161424

1417-
def self.add_delayed(delayed, blocker)
1418-
blocker_delayed = blocker.promise.delayed
1419-
if blocker_delayed
1420-
delayed = unless delayed
1421-
LockFreeStack.of1(blocker_delayed)
1422-
else
1423-
delayed.push(blocker_delayed)
1424-
end
1425+
def self.add_delayed(delayed1, delayed2)
1426+
if delayed1 && delayed2
1427+
delayed1.push delayed2
1428+
delayed1
1429+
else
1430+
delayed1 || delayed2
14251431
end
1426-
delayed
14271432
end
14281433

14291434
def initialize(delayed, blockers_count, future)
14301435
super(future)
1431-
# noinspection RubyArgCount
1432-
@Touched = AtomicBoolean.new false
14331436
@Delayed = delayed
14341437
# noinspection RubyArgCount
14351438
@Countdown = AtomicFixnum.new blockers_count
@@ -1442,16 +1445,12 @@ def on_blocker_resolution(future, index)
14421445
on_resolvable(future, index) if resolvable
14431446
end
14441447

1445-
def delayed
1448+
def delayed_because
14461449
@Delayed
14471450
end
14481451

14491452
def touch
1450-
clear_propagate_touch if @Touched.make_true
1451-
end
1452-
1453-
def touched?
1454-
@Touched.value
1453+
clear_and_propagate_touch
14551454
end
14561455

14571456
# for inspection only
@@ -1463,13 +1462,11 @@ def blocked_by
14631462

14641463
private
14651464

1466-
def clear_propagate_touch
1467-
@Delayed.clear_each { |o| propagate_touch o } if @Delayed
1468-
end
1465+
def clear_and_propagate_touch(stack_or_element = @Delayed)
1466+
return if stack_or_element.nil?
14691467

1470-
def propagate_touch(stack_or_element = @Delayed)
14711468
if stack_or_element.is_a? LockFreeStack
1472-
stack_or_element.each { |element| propagate_touch element }
1469+
stack_or_element.clear_each { |element| clear_and_propagate_touch element }
14731470
else
14741471
stack_or_element.touch unless stack_or_element.nil? # if still present
14751472
end
@@ -1572,8 +1569,28 @@ def initialize(default_executor, fulfilled, value, reason)
15721569

15731570
class AbstractFlatPromise < BlockedPromise
15741571

1572+
def initialize(delayed_because, blockers_count, event_or_future)
1573+
delayed = LockFreeStack.of1(self)
1574+
super(delayed, blockers_count, event_or_future)
1575+
# noinspection RubyArgCount
1576+
@Touched = AtomicBoolean.new false
1577+
@DelayedBecause = delayed_because || LockFreeStack.new
1578+
1579+
event_or_future.add_callback_clear_delayed_node delayed.peek
1580+
end
1581+
1582+
def touch
1583+
if @Touched.make_true
1584+
clear_and_propagate_touch @DelayedBecause
1585+
end
1586+
end
1587+
15751588
private
15761589

1590+
def touched?
1591+
@Touched.value
1592+
end
1593+
15771594
def on_resolvable(resolved_future, index)
15781595
resolve_with resolved_future.internal_state
15791596
end
@@ -1583,11 +1600,12 @@ def resolvable?(countdown, future, index)
15831600
end
15841601

15851602
def add_delayed_of(future)
1603+
delayed = future.promise.delayed_because
15861604
if touched?
1587-
propagate_touch future.promise.delayed
1605+
clear_and_propagate_touch delayed
15881606
else
1589-
BlockedPromise.add_delayed @Delayed, future
1590-
clear_propagate_touch if touched?
1607+
BlockedPromise.add_delayed @DelayedBecause, delayed
1608+
clear_and_propagate_touch @DelayedBecause if touched?
15911609
end
15921610
end
15931611

@@ -1615,7 +1633,7 @@ def process_on_blocker_resolution(future, index)
16151633
case value
16161634
when Future, Event
16171635
add_delayed_of value
1618-
value.add_callback :callback_notify_blocked, self, nil
1636+
value.add_callback_notify_blocked self, nil
16191637
countdown
16201638
else
16211639
resolve_with RESOLVED
@@ -1651,7 +1669,7 @@ def process_on_blocker_resolution(future, index)
16511669
case value
16521670
when Future
16531671
add_delayed_of value
1654-
value.add_callback :callback_notify_blocked, self, nil
1672+
value.add_callback_notify_blocked self, nil
16551673
countdown
16561674
when Event
16571675
evaluate_to(lambda { raise TypeError, 'cannot flatten to Event' })
@@ -1684,7 +1702,7 @@ def process_on_blocker_resolution(future, index)
16841702
case value
16851703
when Future
16861704
add_delayed_of value
1687-
value.add_callback :callback_notify_blocked, self, nil
1705+
value.add_callback_notify_blocked self, nil
16881706
else
16891707
resolve_with internal_state
16901708
end
@@ -1851,17 +1869,17 @@ def resolvable?(countdown, future, index)
18511869
class DelayPromise < InnerPromise
18521870

18531871
def initialize(default_executor)
1854-
super event = Event.new(self, default_executor)
1855-
@Delayed = LockFreeStack.new.push self
1856-
# TODO (pitr-ch 20-Dec-2016): implement directly without callback?
1857-
event.on_resolution!(@Delayed.peek) { |stack_node| stack_node.value = nil }
1872+
event = Event.new(self, default_executor)
1873+
@Delayed = LockFreeStack.of1(self)
1874+
super event
1875+
event.add_callback_clear_delayed_node @Delayed.peek
18581876
end
18591877

18601878
def touch
18611879
@Future.resolve_with RESOLVED
18621880
end
18631881

1864-
def delayed
1882+
def delayed_because
18651883
@Delayed
18661884
end
18671885

spec/concurrent/edge/promises_spec.rb

Lines changed: 65 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -244,28 +244,71 @@ def behaves_as_delay(delay, value)
244244

245245
describe 'Future' do
246246
it 'has sync and async callbacks' do
247-
callbacks_tester = ->(future) do
248-
queue = Queue.new
249-
future.on_resolution_using(:io) { |result| queue.push("async on_resolution #{ result.inspect }") }
250-
future.on_resolution! { |result| queue.push("sync on_resolution #{ result.inspect }") }
251-
future.on_fulfillment_using(:io) { |value| queue.push("async on_fulfillment #{ value.inspect }") }
252-
future.on_fulfillment! { |value| queue.push("sync on_fulfillment #{ value.inspect }") }
253-
future.on_rejection_using(:io) { |reason| queue.push("async on_rejection #{ reason.inspect }") }
254-
future.on_rejection! { |reason| queue.push("sync on_rejection #{ reason.inspect }") }
255-
future.wait
256-
[queue.pop, queue.pop, queue.pop, queue.pop].sort
247+
callbacks_tester = ->(event_or_future) do
248+
queue = Queue.new
249+
push_args = -> *args { queue.push args }
250+
251+
event_or_future.on_resolution! &push_args
252+
event_or_future.on_resolution!(1, &push_args)
253+
if event_or_future.is_a? Concurrent::Promises::Future
254+
event_or_future.on_fulfillment! &push_args
255+
event_or_future.on_fulfillment!(2, &push_args)
256+
event_or_future.on_rejection! &push_args
257+
event_or_future.on_rejection!(3, &push_args)
258+
end
259+
260+
event_or_future.on_resolution &push_args
261+
event_or_future.on_resolution(4, &push_args)
262+
if event_or_future.is_a? Concurrent::Promises::Future
263+
event_or_future.on_fulfillment &push_args
264+
event_or_future.on_fulfillment(5, &push_args)
265+
event_or_future.on_rejection &push_args
266+
event_or_future.on_rejection(6, &push_args)
267+
end
268+
event_or_future.on_resolution_using(:io, &push_args)
269+
event_or_future.on_resolution_using(:io, 7, &push_args)
270+
if event_or_future.is_a? Concurrent::Promises::Future
271+
event_or_future.on_fulfillment_using(:io, &push_args)
272+
event_or_future.on_fulfillment_using(:io, 8, &push_args)
273+
event_or_future.on_rejection_using(:io, &push_args)
274+
event_or_future.on_rejection_using(:io, 9, &push_args)
275+
end
276+
277+
event_or_future.wait
278+
Array.new(event_or_future.is_a?(Concurrent::Promises::Future) ? 12 : 6) { queue.pop }
257279
end
258-
callback_results = callbacks_tester.call(future { :value })
259-
expect(callback_results).to eq ["async on_fulfillment :value",
260-
"async on_resolution [true, :value, nil]",
261-
"sync on_fulfillment :value",
262-
"sync on_resolution [true, :value, nil]"]
263280

264-
callback_results = callbacks_tester.call(future { raise 'error' })
265-
expect(callback_results).to eq ["async on_rejection #<RuntimeError: error>",
266-
"async on_resolution [false, nil, #<RuntimeError: error>]",
267-
"sync on_rejection #<RuntimeError: error>",
268-
"sync on_resolution [false, nil, #<RuntimeError: error>]"]
281+
callback_results = callbacks_tester.call(fulfilled_future(:v))
282+
expect(callback_results).to contain_exactly([true, :v, nil],
283+
[true, :v, nil, 1],
284+
[:v],
285+
[:v, 2],
286+
[true, :v, nil],
287+
[true, :v, nil, 4],
288+
[:v],
289+
[:v, 5],
290+
[true, :v, nil],
291+
[true, :v, nil, 7],
292+
[:v],
293+
[:v, 8])
294+
295+
err = StandardError.new 'boo'
296+
callback_results = callbacks_tester.call(rejected_future(err))
297+
expect(callback_results).to contain_exactly([false, nil, err],
298+
[false, nil, err, 1],
299+
[err],
300+
[err, 3],
301+
[false, nil, err],
302+
[false, nil, err, 4],
303+
[err],
304+
[err, 6],
305+
[false, nil, err],
306+
[false, nil, err, 7],
307+
[err],
308+
[err, 9])
309+
310+
callback_results = callbacks_tester.call(resolved_event)
311+
expect(callback_results).to contain_exactly([], [1], [], [4], [], [7])
269312
end
270313

271314
[:wait, :wait!, :value, :value!, :reason, :result].each do |method_with_timeout|
@@ -387,6 +430,8 @@ def behaves_as_delay(delay, value)
387430

388431
it 'propagates requests for values to delayed futures' do
389432
expect(future { delay { 1 } }.flat.value!(0.1)).to eq 1
433+
expect(Array.new(3) { |i| Concurrent::Promises.delay { i } }.
434+
inject { |a, b| a.then { b }.flat }.value!(0.2)).to eq 2
390435
end
391436
end
392437

0 commit comments

Comments
 (0)