Skip to content

Commit 6ff743f

Browse files
author
Petr Chalupa
committed
Merge pull request #521 from pitr-ch/futures
edge-future updates:
2 parents 5364788 + e7260a5 commit 6ff743f

File tree

3 files changed

+112
-86
lines changed

3 files changed

+112
-86
lines changed

.travis.yml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,13 @@ rvm:
44
- 2.2.3
55
- 2.2.2
66
- 2.1.5
7-
- 2.1.4
87
- 2.0.0
98
- 1.9.3
109
- ruby-head
1110
- jruby-1.7.19
1211
- jruby-9.0.1.0
1312
- jruby-9.0.3.0
14-
- jruby-9.0.4.0
13+
- jruby-9.0.5.0
1514
- jruby-head
1615
- rbx-2
1716

lib/concurrent/edge/future.rb

Lines changed: 88 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ def completed_event(default_executor = :io)
6666
# requested by calling `#wait`, `#value`, `#value!`, etc. on it or on any of the chained futures.
6767
# @return [Future]
6868
def delay(default_executor = :io, &task)
69-
Delay.new(default_executor).future.then(&task)
69+
DelayPromise.new(default_executor).future.then(&task)
7070
end
7171

7272
# Schedules the block to be executed on executor in given intended_time.
@@ -98,8 +98,18 @@ def zip_events(*futures_and_or_events)
9898
# Constructs new {Future} which is completed after first of the futures is complete.
9999
# @param [Event] futures
100100
# @return [Future]
101-
def any(*futures)
102-
AnyPromise.new(futures, :io).future
101+
def any_complete(*futures)
102+
AnyCompletePromise.new(futures, :io).future
103+
end
104+
105+
alias_method :any, :any_complete
106+
107+
# Constructs new {Future} which becomes succeeded after first of the futures succeedes or
108+
# failed if all futures fail (reason is last error).
109+
# @param [Event] futures
110+
# @return [Future]
111+
def any_successful(*futures)
112+
AnySuccessfulPromise.new(futures, :io).future
103113
end
104114

105115
# only proof of concept
@@ -137,13 +147,11 @@ def post_on(executor, *args, &job)
137147
# TODO allow to to have a zip point for many futures and process them in batches by 10
138148
end
139149

140-
extend FutureShortcuts
141-
include FutureShortcuts
142-
143150
# Represents an event which will happen in future (will be completed). It has to always happen.
144-
class Event < Synchronization::LockableObject
151+
class Event < Synchronization::Object
145152
safe_initialization!
146153
private(*attr_atomic(:internal_state))
154+
# @!visibility private
147155
public :internal_state
148156
include Concern::Deprecation
149157
include Concern::Logging
@@ -188,13 +196,13 @@ def to_sym
188196

189197
def initialize(promise, default_executor)
190198
super()
199+
@Lock = Mutex.new
200+
@Condition = ConditionVariable.new
191201
@Promise = promise
192202
@DefaultExecutor = default_executor
193-
@Touched = AtomicBoolean.new(false)
203+
@Touched = AtomicBoolean.new false
194204
@Callbacks = LockFreeStack.new
195-
# TODO (pitr 12-Sep-2015): replace with AtomicFixnum, avoid aba problem
196-
# TODO (pitr 12-Sep-2015): look at java.util.concurrent solution
197-
@Waiters = LockFreeStack.new
205+
@Waiters = AtomicFixnum.new 0
198206
self.internal_state = PENDING
199207
end
200208

@@ -276,7 +284,7 @@ def zip(other)
276284
# Inserts delay into the chain of Futures making rest of it lazy evaluated.
277285
# @return [Event]
278286
def delay
279-
ZipEventEventPromise.new(self, Delay.new(@DefaultExecutor).event, @DefaultExecutor).event
287+
ZipEventEventPromise.new(self, DelayPromise.new(@DefaultExecutor).event, @DefaultExecutor).event
280288
end
281289

282290
# # Schedules rest of the chain for execution with specified time or on specified time
@@ -298,13 +306,13 @@ def then_select(*channels)
298306
# @yield [success, value, reason] executed async on `executor` when completed
299307
# @return self
300308
def on_completion(executor = nil, &callback)
301-
add_callback :pr_async_callback_on_completion, executor || @DefaultExecutor, callback
309+
add_callback :async_callback_on_completion, executor || @DefaultExecutor, callback
302310
end
303311

304312
# @yield [success, value, reason] executed sync when completed
305313
# @return self
306314
def on_completion!(&callback)
307-
add_callback :pr_callback_on_completion, callback
315+
add_callback :callback_on_completion, callback
308316
end
309317

310318
# Changes default executor for rest of the chain
@@ -329,9 +337,8 @@ def set(*args, &block)
329337
# @!visibility private
330338
def complete_with(state, raise_on_reassign = true)
331339
if compare_and_set_internal_state(PENDING, state)
332-
#(state)
333340
# go to synchronized block only if there were waiting threads
334-
synchronize { ns_broadcast } if @Waiters.clear
341+
@Lock.synchronize { @Condition.broadcast } unless @Waiters.value == 0
335342
call_callbacks
336343
else
337344
Concurrent::MultipleAssignmentError.new('Event can be completed only once') if raise_on_reassign
@@ -388,32 +395,31 @@ def waiting_threads
388395

389396
# @return [true, false]
390397
def wait_until_complete(timeout)
391-
while true
392-
last_waiter = @Waiters.peek # waiters' state before completion
393-
return true if completed?
394-
395-
# synchronize so it cannot be signaled before it waits
396-
synchronize do
397-
# ok only if completing thread did not start signaling
398-
next unless @Waiters.compare_and_push last_waiter, Thread.current
399-
return ns_wait_until(timeout) { completed? }
398+
return true if completed?
399+
400+
@Lock.synchronize do
401+
@Waiters.increment
402+
unless completed?
403+
@Condition.wait @Lock, timeout
400404
end
405+
@Waiters.decrement
401406
end
407+
completed?
402408
end
403409

404-
def pr_with_async(executor, *args, &block)
410+
def with_async(executor, *args, &block)
405411
Concurrent.post_on(executor, *args, &block)
406412
end
407413

408-
def pr_async_callback_on_completion(executor, callback)
409-
pr_with_async(executor) { pr_callback_on_completion callback }
414+
def async_callback_on_completion(executor, callback)
415+
with_async(executor) { callback_on_completion callback }
410416
end
411417

412-
def pr_callback_on_completion(callback)
418+
def callback_on_completion(callback)
413419
callback.call
414420
end
415421

416-
def pr_callback_notify_blocked(promise)
422+
def callback_notify_blocked(promise)
417423
promise.on_done self
418424
end
419425

@@ -660,13 +666,13 @@ def flat(level = 1)
660666

661667
# @return [Future] which has first completed value from futures
662668
def any(*futures)
663-
AnyPromise.new([self, *futures], @DefaultExecutor).future
669+
AnyCompletePromise.new([self, *futures], @DefaultExecutor).future
664670
end
665671

666672
# Inserts delay into the chain of Futures making rest of it lazy evaluated.
667673
# @return [Future]
668674
def delay
669-
ZipFutureEventPromise.new(self, Delay.new(@DefaultExecutor).future, @DefaultExecutor).future
675+
ZipFutureEventPromise.new(self, DelayPromise.new(@DefaultExecutor).future, @DefaultExecutor).future
670676
end
671677

672678
# Schedules rest of the chain for execution with specified time or on specified time
@@ -714,32 +720,32 @@ def then_put(channel)
714720
# @yield [value] executed async on `executor` when success
715721
# @return self
716722
def on_success(executor = nil, &callback)
717-
add_callback :pr_async_callback_on_success, executor || @DefaultExecutor, callback
723+
add_callback :async_callback_on_success, executor || @DefaultExecutor, callback
718724
end
719725

720726
# @yield [reason] executed async on `executor` when failed?
721727
# @return self
722728
def on_failure(executor = nil, &callback)
723-
add_callback :pr_async_callback_on_failure, executor || @DefaultExecutor, callback
729+
add_callback :async_callback_on_failure, executor || @DefaultExecutor, callback
724730
end
725731

726732
# @yield [value] executed sync when success
727733
# @return self
728734
def on_success!(&callback)
729-
add_callback :pr_callback_on_success, callback
735+
add_callback :callback_on_success, callback
730736
end
731737

732738
# @yield [reason] executed sync when failed?
733739
# @return self
734740
def on_failure!(&callback)
735-
add_callback :pr_callback_on_failure, callback
741+
add_callback :callback_on_failure, callback
736742
end
737743

738744
# @!visibility private
739745
def complete_with(state, raise_on_reassign = true)
740746
if compare_and_set_internal_state(PENDING, state)
741-
@Waiters.clear
742-
synchronize { ns_broadcast }
747+
# go to synchronized block only if there were waiting threads
748+
@Lock.synchronize { @Condition.broadcast } unless @Waiters.value == 0
743749
call_callbacks state
744750
else
745751
if raise_on_reassign
@@ -792,37 +798,37 @@ def call_callback(method, state, *args)
792798
self.send method, state, *args
793799
end
794800

795-
def pr_async_callback_on_success(state, executor, callback)
796-
pr_with_async(executor, state, callback) do |st, cb|
797-
pr_callback_on_success st, cb
801+
def async_callback_on_success(state, executor, callback)
802+
with_async(executor, state, callback) do |st, cb|
803+
callback_on_success st, cb
798804
end
799805
end
800806

801-
def pr_async_callback_on_failure(state, executor, callback)
802-
pr_with_async(executor, state, callback) do |st, cb|
803-
pr_callback_on_failure st, cb
807+
def async_callback_on_failure(state, executor, callback)
808+
with_async(executor, state, callback) do |st, cb|
809+
callback_on_failure st, cb
804810
end
805811
end
806812

807-
def pr_callback_on_success(state, callback)
813+
def callback_on_success(state, callback)
808814
state.apply callback if state.success?
809815
end
810816

811-
def pr_callback_on_failure(state, callback)
817+
def callback_on_failure(state, callback)
812818
state.apply callback unless state.success?
813819
end
814820

815-
def pr_callback_on_completion(state, callback)
821+
def callback_on_completion(state, callback)
816822
callback.call state.result
817823
end
818824

819-
def pr_callback_notify_blocked(state, promise)
825+
def callback_notify_blocked(state, promise)
820826
super(promise)
821827
end
822828

823-
def pr_async_callback_on_completion(state, executor, callback)
824-
pr_with_async(executor, state, callback) do |st, cb|
825-
pr_callback_on_completion st, cb
829+
def async_callback_on_completion(state, executor, callback)
830+
with_async(executor, state, callback) do |st, cb|
831+
callback_on_completion st, cb
826832
end
827833
end
828834

@@ -1001,7 +1007,7 @@ class InnerPromise < AbstractPromise
10011007
class BlockedPromise < InnerPromise
10021008
def self.new(*args, &block)
10031009
promise = super(*args, &block)
1004-
promise.blocked_by.each { |f| f.add_callback :pr_callback_notify_blocked, promise }
1010+
promise.blocked_by.each { |f| f.add_callback :callback_notify_blocked, promise }
10051011
promise
10061012
end
10071013

@@ -1014,7 +1020,7 @@ def initialize(future, blocked_by_futures, countdown)
10141020
# @api private
10151021
def on_done(future)
10161022
countdown = process_on_done(future)
1017-
completable = completable?(countdown)
1023+
completable = completable?(countdown, future)
10181024

10191025
if completable
10201026
on_completable(future)
@@ -1051,7 +1057,7 @@ def clear_blocked_by!
10511057
end
10521058

10531059
# @return [true,false] if completable
1054-
def completable?(countdown)
1060+
def completable?(countdown, future)
10551061
countdown.zero?
10561062
end
10571063

@@ -1171,7 +1177,7 @@ def process_on_done(future)
11711177
case value
11721178
when Future
11731179
@BlockedBy.push value
1174-
value.add_callback :pr_callback_notify_blocked, self
1180+
value.add_callback :callback_notify_blocked, self
11751181
@Countdown.value
11761182
when Event
11771183
evaluate_to(lambda { raise TypeError, 'cannot flatten to Event' })
@@ -1200,8 +1206,8 @@ def clear_blocked_by!
12001206
nil
12011207
end
12021208

1203-
def completable?(countdown)
1204-
!@Future.internal_state.completed? && super(countdown)
1209+
def completable?(countdown, future)
1210+
!@Future.internal_state.completed? && super(countdown, future)
12051211
end
12061212
end
12071213

@@ -1321,7 +1327,7 @@ def on_completable(done_future)
13211327
end
13221328

13231329
# @!visibility private
1324-
class AnyPromise < BlockedPromise
1330+
class AnyCompletePromise < BlockedPromise
13251331

13261332
private
13271333

@@ -1331,7 +1337,7 @@ def initialize(blocked_by_futures, default_executor)
13311337
super(Future.new(self, default_executor), blocked_by_futures, blocked_by_futures.size)
13321338
end
13331339

1334-
def completable?(countdown)
1340+
def completable?(countdown, future)
13351341
true
13361342
end
13371343

@@ -1341,29 +1347,35 @@ def on_completable(done_future)
13411347
end
13421348

13431349
# @!visibility private
1344-
class Delay < InnerPromise
1345-
def touch
1346-
@Future.complete_with Event::COMPLETED
1347-
end
1350+
class AnySuccessfulPromise < BlockedPromise
13481351

13491352
private
13501353

1351-
def initialize(default_executor)
1352-
super Event.new(self, default_executor)
1354+
def initialize(blocked_by_futures, default_executor)
1355+
blocked_by_futures.all? { |f| f.is_a? Future } or
1356+
raise ArgumentError, 'accepts only Futures not Events'
1357+
super(Future.new(self, default_executor), blocked_by_futures, blocked_by_futures.size)
1358+
end
1359+
1360+
def completable?(countdown, future)
1361+
future.success? || super(countdown, future)
1362+
end
1363+
1364+
def on_completable(done_future)
1365+
complete_with done_future.internal_state, false
13531366
end
13541367
end
13551368

13561369
# @!visibility private
1357-
class DelayValue < InnerPromise
1370+
class DelayPromise < InnerPromise
13581371
def touch
1359-
@Future.complete_with Future::Success.new(@Value)
1372+
@Future.complete_with Event::COMPLETED
13601373
end
13611374

13621375
private
13631376

1364-
def initialize(default_executor, value)
1365-
super Future.new(self, default_executor)
1366-
@Value = value
1377+
def initialize(default_executor)
1378+
super Event.new(self, default_executor)
13671379
end
13681380
end
13691381

@@ -1401,7 +1413,10 @@ def initialize(default_executor, intended_time)
14011413
end
14021414
end
14031415
end
1404-
1405-
extend Edge::FutureShortcuts
1406-
include Edge::FutureShortcuts
14071416
end
1417+
1418+
Concurrent::Edge.send :extend, Concurrent::Edge::FutureShortcuts
1419+
Concurrent::Edge.send :include, Concurrent::Edge::FutureShortcuts
1420+
1421+
Concurrent.send :extend, Concurrent::Edge::FutureShortcuts
1422+
Concurrent.send :include, Concurrent::Edge::FutureShortcuts

0 commit comments

Comments
 (0)