Skip to content

Commit 38ab132

Browse files
committed
Update throttle, add conversion methods
1 parent ef570a7 commit 38ab132

File tree

3 files changed

+94
-38
lines changed

3 files changed

+94
-38
lines changed

lib/concurrent/edge/promises.rb

Lines changed: 86 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
require 'concurrent/synchronization'
22
require 'concurrent/atomic/atomic_boolean'
33
require 'concurrent/atomic/atomic_fixnum'
4-
require 'concurrent/lock_free_stack'
4+
require 'concurrent/edge/lock_free_stack'
55
require 'concurrent/errors'
66

77
module Concurrent
@@ -781,7 +781,7 @@ class Event < AbstractEventFuture
781781
#
782782
# @return [Future, Event]
783783
def zip(other)
784-
if other.is?(Future)
784+
if other.is_a?(Future)
785785
ZipFutureEventPromise.new(other, self, @DefaultExecutor).future
786786
else
787787
ZipEventEventPromise.new(self, other, @DefaultExecutor).event
@@ -825,7 +825,20 @@ def schedule(intended_time)
825825
end.flat_event
826826
end
827827

828-
# TODO (pitr-ch 12-Jun-2016): add to_event, to_future
828+
# Converts event to a future. The future is fulfilled when the event is resolved, the future may never fail.
829+
#
830+
# @return [Future]
831+
def to_future
832+
future = Promises.resolvable_future
833+
ensure
834+
chain_resolvable(future)
835+
end
836+
837+
# Returns self, since this is event
838+
# @return [Event]
839+
def to_event
840+
self
841+
end
829842

830843
# @!macro promises.method.with_default_executor
831844
# @return [Event]
@@ -1111,6 +1124,21 @@ def apply(args, block)
11111124
internal_state.apply args, block
11121125
end
11131126

1127+
# Converts future to event which is resolved when future is resolved by fulfillment or rejection.
1128+
#
1129+
# @return [Event]
1130+
def to_event
1131+
event = Promises.resolvable_event
1132+
ensure
1133+
chain_resolvable(event)
1134+
end
1135+
1136+
# Returns self, since this is a future
1137+
# @return [Future]
1138+
def to_future
1139+
self
1140+
end
1141+
11141142
private
11151143

11161144
def rejected_resolution(raise_on_reassign, state)
@@ -1195,7 +1223,8 @@ class ResolvableFuture < Future
11951223
# which triggers all dependent futures.
11961224
#
11971225
# @!macro promise.param.raise_on_reassign
1198-
def resolve(fulfilled, value, reason, raise_on_reassign = true)
1226+
def resolve(fulfilled = true, value = nil, reason = nil, raise_on_reassign = true)
1227+
# TODO (pitr-ch 25-Sep-2016): should the defaults be kept to match event resolve api?
11991228
resolve_with(fulfilled ? Fulfilled.new(value) : Rejected.new(reason), raise_on_reassign)
12001229
end
12011230

@@ -1288,8 +1317,8 @@ def resolve_with(new_state, raise_on_reassign = true)
12881317
# @return [Future]
12891318
def evaluate_to(*args, block)
12901319
resolve_with Fulfilled.new(block.call(*args))
1291-
# TODO (pitr-ch 30-Jul-2016): figure out what should be rescued, there is an issue about it
12921320
rescue Exception => error
1321+
# TODO (pitr-ch 30-Jul-2016): figure out what should be rescued, there is an issue about it
12931322
resolve_with Rejected.new(error)
12941323
end
12951324
end
@@ -1358,7 +1387,7 @@ def on_resolution(future)
13581387

13591388
# @!visibility private
13601389
def touch
1361-
# TODO (pitr-ch 13-Jun-2016): on construction pass down references of delays to be touched, avoids extra casses
1390+
# TODO (pitr-ch 13-Jun-2016): on construction pass down references of delays to be touched, avoids extra CASses
13621391
blocked_by.each(&:touch)
13631392
end
13641393

@@ -1506,6 +1535,11 @@ def clear_blocked_by!
15061535
nil
15071536
end
15081537

1538+
def blocked_by_add(future)
1539+
@BlockedBy.push future
1540+
future.touch if self.future.touched?
1541+
end
1542+
15091543
def resolvable?(countdown, future)
15101544
!@Future.internal_state.resolved? && super(countdown, future)
15111545
end
@@ -1532,7 +1566,7 @@ def process_on_resolution(future)
15321566
value = internal_state.value
15331567
case value
15341568
when Future, Event
1535-
@BlockedBy.push value
1569+
blocked_by_add value
15361570
value.add_callback :callback_notify_blocked, self
15371571
@Countdown.value
15381572
else
@@ -1566,7 +1600,7 @@ def process_on_resolution(future)
15661600
value = internal_state.value
15671601
case value
15681602
when Future
1569-
@BlockedBy.push value
1603+
blocked_by_add value
15701604
value.add_callback :callback_notify_blocked, self
15711605
@Countdown.value
15721606
when Event
@@ -1599,7 +1633,8 @@ def process_on_resolution(future)
15991633
value = internal_state.value
16001634
case value
16011635
when Future
1602-
# @BlockedBy.push value
1636+
# FIXME (pitr-ch 08-Dec-2016): will accumulate the completed futures
1637+
blocked_by_add value
16031638
value.add_callback :callback_notify_blocked, self
16041639
else
16051640
resolve_with internal_state
@@ -1871,7 +1906,7 @@ module FactoryMethods
18711906
# only proof of concept
18721907
# @return [Future]
18731908
def select(*channels)
1874-
# TODO (pitr-ch 26-Mar-2016): redo, has to be non-blocking
1909+
# TODO (pitr-ch 26-Mar-2016): re-do, has to be non-blocking
18751910
future do
18761911
# noinspection RubyArgCount
18771912
Channel.select do |s|
@@ -1924,12 +1959,14 @@ def each_body(value, &block)
19241959
end
19251960
end
19261961

1962+
# TODO example: parallel jobs, cancell them all when one fails, clean-up in zip
19271963
# inspired by https://msdn.microsoft.com/en-us/library/dd537607(v=vs.110).aspx
19281964
class Cancellation < Synchronization::Object
19291965
safe_initialization!
19301966

19311967
def self.create(future_or_event = Promises.resolvable_event, *resolve_args)
1932-
[(i = new(future_or_event, *resolve_args)), i.token]
1968+
cancellation = new(future_or_event, *resolve_args)
1969+
[cancellation, cancellation.token]
19331970
end
19341971

19351972
private_class_method :new
@@ -1960,20 +1997,18 @@ def initialize(cancel)
19601997
@Cancel = cancel
19611998
end
19621999

1963-
def event
1964-
@Cancel
2000+
def to_event
2001+
@Cancel.to_event
19652002
end
19662003

1967-
alias_method :future, :event
2004+
def to_future
2005+
@Cancel.to_future
2006+
end
19682007

19692008
def on_cancellation(*args, &block)
19702009
@Cancel.on_resolution *args, &block
19712010
end
19722011

1973-
def then(*args, &block)
1974-
@Cancel.chain *args, &block
1975-
end
1976-
19772012
def canceled?
19782013
@Cancel.resolved?
19792014
end
@@ -1985,13 +2020,14 @@ def loop_until_canceled(&block)
19852020
result
19862021
end
19872022

1988-
def raise_if_canceled
1989-
raise CancelledOperationError if canceled?
2023+
def raise_if_canceled(error = CancelledOperationError)
2024+
raise error if canceled?
19902025
self
19912026
end
19922027

1993-
def join(*tokens)
1994-
Token.new Promises.any_event(@Cancel, *tokens.map(&:event))
2028+
def join(*tokens, &block)
2029+
block ||= -> tokens { Promises.any_event(*tokens.map(&:to_event)) }
2030+
self.class.new block.call([@Cancel, *tokens])
19952031
end
19962032

19972033
end
@@ -2002,7 +2038,7 @@ def join(*tokens)
20022038
# TODO (pitr-ch 27-Mar-2016): examples (scheduled to be cancelled in 10 sec)
20032039
end
20042040

2005-
class Throttle < Synchronization::Object
2041+
class Promises::Throttle < Synchronization::Object
20062042

20072043
safe_initialization!
20082044
private *attr_atomic(:can_run)
@@ -2015,16 +2051,23 @@ def initialize(max)
20152051
end
20162052

20172053
def limit(future = nil, &block)
2018-
# TODO (pitr-ch 11-Jun-2016): triggers should allocate resources when they are to be required
2019-
trigger = future ? future & get_event : get_event
2020-
2021-
if block_given?
2022-
block.call(trigger).on_resolution! { done }
2054+
if future
2055+
# future.chain { block.call(new_trigger & future).on_resolution! { done } }.flat
2056+
block.call(new_trigger & future).on_resolution! { done }
20232057
else
2024-
get_event
2058+
if block_given?
2059+
block.call(new_trigger).on_resolution! { done }
2060+
else
2061+
new_trigger
2062+
end
20252063
end
20262064
end
20272065

2066+
# TODO (pitr-ch 10-Oct-2016): maybe just then?
2067+
def then_limit(&block)
2068+
limit { |trigger| trigger.then &block }
2069+
end
2070+
20282071
def done
20292072
while true
20302073
current_can_run = can_run
@@ -2037,7 +2080,7 @@ def done
20372080

20382081
private
20392082

2040-
def get_event
2083+
def new_trigger
20412084
while true
20422085
current_can_run = can_run
20432086
if compare_and_set_can_run current_can_run, current_can_run - 1
@@ -2059,5 +2102,18 @@ def throttle(throttle, &throttled_future)
20592102
throttle.limit(self, &throttled_future)
20602103
end
20612104

2105+
def then_throttle(throttle, &block)
2106+
throttle(throttle) { |trigger| trigger.then &block }
2107+
end
2108+
2109+
end
2110+
2111+
module Promises::FactoryMethods
2112+
2113+
# @!visibility private
2114+
2115+
def throttle(count)
2116+
Promises::Throttle.new count
2117+
end
20622118
end
20632119
end

lib/concurrent/synchronization/object.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,8 @@ def self.safe_initialization?
7070
# any instance variables with CamelCase names and isn't {.safe_initialization?}.
7171
def self.ensure_safe_initialization_when_final_fields_are_present
7272
Object.class_eval do
73-
def self.new(*)
74-
object = super
73+
def self.new(*args, &block)
74+
object = super(*args, &block)
7575
ensure
7676
has_final_field = object.instance_variables.any? { |v| v.to_s =~ /^@[A-Z]/ }
7777
if has_final_field && !safe_initialization?

spec/concurrent/edge/promises_spec.rb

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -395,7 +395,7 @@ def behaves_as_delay(delay, value)
395395
end
396396

397397
it 'propagates requests for values to delayed futures' do
398-
expect(Concurrent.future { Concurrent.delay { 1 } }.flat.value!(0.1)).to eq 1
398+
expect(future { delay { 1 } }.flat.value!(0.1)).to eq 1
399399
end
400400
end
401401

@@ -467,29 +467,29 @@ def behaves_as_delay(delay, value)
467467
specify do
468468
source, token = Concurrent::Cancellation.create
469469
source.cancel
470-
expect(token.event.resolved?).to be_truthy
470+
expect(token.canceled?).to be_truthy
471471

472472
cancellable_branch = Concurrent::Promises.delay { 1 }
473-
expect((cancellable_branch | token.event).value).to be_nil
473+
expect((cancellable_branch | token.to_event).value).to be_nil
474474
expect(cancellable_branch.resolved?).to be_falsey
475475
end
476476

477477
specify do
478478
source, token = Concurrent::Cancellation.create
479479

480480
cancellable_branch = Concurrent::Promises.delay { 1 }
481-
expect(any_resolved_future(cancellable_branch, token.event).value).to eq 1
481+
expect(any_resolved_future(cancellable_branch, token.to_event).value).to eq 1
482482
expect(cancellable_branch.resolved?).to be_truthy
483483
end
484484

485485
specify do
486486
source, token = Concurrent::Cancellation.create(
487487
Concurrent::Promises.resolvable_future, false, nil, err = StandardError.new('Cancelled'))
488488
source.cancel
489-
expect(token.future.resolved?).to be_truthy
489+
expect(token.canceled?).to be_truthy
490490

491491
cancellable_branch = Concurrent::Promises.delay { 1 }
492-
expect((cancellable_branch | token.future).reason).to eq err
492+
expect((cancellable_branch | token.to_future).reason).to eq err
493493
expect(cancellable_branch.resolved?).to be_falsey
494494
end
495495
end

0 commit comments

Comments
 (0)