@@ -66,7 +66,7 @@ def completed_event(default_executor = :io)
66
66
# requested by calling `#wait`, `#value`, `#value!`, etc. on it or on any of the chained futures.
67
67
# @return [Future]
68
68
def delay ( default_executor = :io , &task )
69
- Delay . new ( default_executor ) . future . then ( &task )
69
+ DelayPromise . new ( default_executor ) . future . then ( &task )
70
70
end
71
71
72
72
# Schedules the block to be executed on executor in given intended_time.
@@ -98,8 +98,18 @@ def zip_events(*futures_and_or_events)
98
98
# Constructs new {Future} which is completed after first of the futures is complete.
99
99
# @param [Event] futures
100
100
# @return [Future]
101
- def any ( *futures )
102
- AnyPromise . new ( futures , :io ) . future
101
+ def any_complete ( *futures )
102
+ AnyCompletePromise . new ( futures , :io ) . future
103
+ end
104
+
105
+ alias_method :any , :any_complete
106
+
107
+ # Constructs new {Future} which becomes succeeded after first of the futures succeedes or
108
+ # failed if all futures fail (reason is last error).
109
+ # @param [Event] futures
110
+ # @return [Future]
111
+ def any_successful ( *futures )
112
+ AnySuccessfulPromise . new ( futures , :io ) . future
103
113
end
104
114
105
115
# only proof of concept
@@ -137,13 +147,11 @@ def post_on(executor, *args, &job)
137
147
# TODO allow to to have a zip point for many futures and process them in batches by 10
138
148
end
139
149
140
- extend FutureShortcuts
141
- include FutureShortcuts
142
-
143
150
# Represents an event which will happen in future (will be completed). It has to always happen.
144
- class Event < Synchronization ::LockableObject
151
+ class Event < Synchronization ::Object
145
152
safe_initialization!
146
153
private ( *attr_atomic ( :internal_state ) )
154
+ # @!visibility private
147
155
public :internal_state
148
156
include Concern ::Deprecation
149
157
include Concern ::Logging
@@ -188,13 +196,13 @@ def to_sym
188
196
189
197
def initialize ( promise , default_executor )
190
198
super ( )
199
+ @Lock = Mutex . new
200
+ @Condition = ConditionVariable . new
191
201
@Promise = promise
192
202
@DefaultExecutor = default_executor
193
- @Touched = AtomicBoolean . new ( false )
203
+ @Touched = AtomicBoolean . new false
194
204
@Callbacks = LockFreeStack . new
195
- # TODO (pitr 12-Sep-2015): replace with AtomicFixnum, avoid aba problem
196
- # TODO (pitr 12-Sep-2015): look at java.util.concurrent solution
197
- @Waiters = LockFreeStack . new
205
+ @Waiters = AtomicFixnum . new 0
198
206
self . internal_state = PENDING
199
207
end
200
208
@@ -276,7 +284,7 @@ def zip(other)
276
284
# Inserts delay into the chain of Futures making rest of it lazy evaluated.
277
285
# @return [Event]
278
286
def delay
279
- ZipEventEventPromise . new ( self , Delay . new ( @DefaultExecutor ) . event , @DefaultExecutor ) . event
287
+ ZipEventEventPromise . new ( self , DelayPromise . new ( @DefaultExecutor ) . event , @DefaultExecutor ) . event
280
288
end
281
289
282
290
# # Schedules rest of the chain for execution with specified time or on specified time
@@ -298,13 +306,13 @@ def then_select(*channels)
298
306
# @yield [success, value, reason] executed async on `executor` when completed
299
307
# @return self
300
308
def on_completion ( executor = nil , &callback )
301
- add_callback :pr_async_callback_on_completion , executor || @DefaultExecutor , callback
309
+ add_callback :async_callback_on_completion , executor || @DefaultExecutor , callback
302
310
end
303
311
304
312
# @yield [success, value, reason] executed sync when completed
305
313
# @return self
306
314
def on_completion! ( &callback )
307
- add_callback :pr_callback_on_completion , callback
315
+ add_callback :callback_on_completion , callback
308
316
end
309
317
310
318
# Changes default executor for rest of the chain
@@ -329,9 +337,8 @@ def set(*args, &block)
329
337
# @!visibility private
330
338
def complete_with ( state , raise_on_reassign = true )
331
339
if compare_and_set_internal_state ( PENDING , state )
332
- #(state)
333
340
# go to synchronized block only if there were waiting threads
334
- synchronize { ns_broadcast } if @Waiters . clear
341
+ @Lock . synchronize { @Condition . broadcast } unless @Waiters . value == 0
335
342
call_callbacks
336
343
else
337
344
Concurrent ::MultipleAssignmentError . new ( 'Event can be completed only once' ) if raise_on_reassign
@@ -388,32 +395,31 @@ def waiting_threads
388
395
389
396
# @return [true, false]
390
397
def wait_until_complete ( timeout )
391
- while true
392
- last_waiter = @Waiters . peek # waiters' state before completion
393
- return true if completed?
394
-
395
- # synchronize so it cannot be signaled before it waits
396
- synchronize do
397
- # ok only if completing thread did not start signaling
398
- next unless @Waiters . compare_and_push last_waiter , Thread . current
399
- return ns_wait_until ( timeout ) { completed? }
398
+ return true if completed?
399
+
400
+ @Lock . synchronize do
401
+ @Waiters . increment
402
+ unless completed?
403
+ @Condition . wait @Lock , timeout
400
404
end
405
+ @Waiters . decrement
401
406
end
407
+ completed?
402
408
end
403
409
404
- def pr_with_async ( executor , *args , &block )
410
+ def with_async ( executor , *args , &block )
405
411
Concurrent . post_on ( executor , *args , &block )
406
412
end
407
413
408
- def pr_async_callback_on_completion ( executor , callback )
409
- pr_with_async ( executor ) { pr_callback_on_completion callback }
414
+ def async_callback_on_completion ( executor , callback )
415
+ with_async ( executor ) { callback_on_completion callback }
410
416
end
411
417
412
- def pr_callback_on_completion ( callback )
418
+ def callback_on_completion ( callback )
413
419
callback . call
414
420
end
415
421
416
- def pr_callback_notify_blocked ( promise )
422
+ def callback_notify_blocked ( promise )
417
423
promise . on_done self
418
424
end
419
425
@@ -660,13 +666,13 @@ def flat(level = 1)
660
666
661
667
# @return [Future] which has first completed value from futures
662
668
def any ( *futures )
663
- AnyPromise . new ( [ self , *futures ] , @DefaultExecutor ) . future
669
+ AnyCompletePromise . new ( [ self , *futures ] , @DefaultExecutor ) . future
664
670
end
665
671
666
672
# Inserts delay into the chain of Futures making rest of it lazy evaluated.
667
673
# @return [Future]
668
674
def delay
669
- ZipFutureEventPromise . new ( self , Delay . new ( @DefaultExecutor ) . future , @DefaultExecutor ) . future
675
+ ZipFutureEventPromise . new ( self , DelayPromise . new ( @DefaultExecutor ) . future , @DefaultExecutor ) . future
670
676
end
671
677
672
678
# Schedules rest of the chain for execution with specified time or on specified time
@@ -714,32 +720,32 @@ def then_put(channel)
714
720
# @yield [value] executed async on `executor` when success
715
721
# @return self
716
722
def on_success ( executor = nil , &callback )
717
- add_callback :pr_async_callback_on_success , executor || @DefaultExecutor , callback
723
+ add_callback :async_callback_on_success , executor || @DefaultExecutor , callback
718
724
end
719
725
720
726
# @yield [reason] executed async on `executor` when failed?
721
727
# @return self
722
728
def on_failure ( executor = nil , &callback )
723
- add_callback :pr_async_callback_on_failure , executor || @DefaultExecutor , callback
729
+ add_callback :async_callback_on_failure , executor || @DefaultExecutor , callback
724
730
end
725
731
726
732
# @yield [value] executed sync when success
727
733
# @return self
728
734
def on_success! ( &callback )
729
- add_callback :pr_callback_on_success , callback
735
+ add_callback :callback_on_success , callback
730
736
end
731
737
732
738
# @yield [reason] executed sync when failed?
733
739
# @return self
734
740
def on_failure! ( &callback )
735
- add_callback :pr_callback_on_failure , callback
741
+ add_callback :callback_on_failure , callback
736
742
end
737
743
738
744
# @!visibility private
739
745
def complete_with ( state , raise_on_reassign = true )
740
746
if compare_and_set_internal_state ( PENDING , state )
741
- @Waiters . clear
742
- synchronize { ns_broadcast }
747
+ # go to synchronized block only if there were waiting threads
748
+ @Lock . synchronize { @Condition . broadcast } unless @Waiters . value == 0
743
749
call_callbacks state
744
750
else
745
751
if raise_on_reassign
@@ -792,37 +798,37 @@ def call_callback(method, state, *args)
792
798
self . send method , state , *args
793
799
end
794
800
795
- def pr_async_callback_on_success ( state , executor , callback )
796
- pr_with_async ( executor , state , callback ) do |st , cb |
797
- pr_callback_on_success st , cb
801
+ def async_callback_on_success ( state , executor , callback )
802
+ with_async ( executor , state , callback ) do |st , cb |
803
+ callback_on_success st , cb
798
804
end
799
805
end
800
806
801
- def pr_async_callback_on_failure ( state , executor , callback )
802
- pr_with_async ( executor , state , callback ) do |st , cb |
803
- pr_callback_on_failure st , cb
807
+ def async_callback_on_failure ( state , executor , callback )
808
+ with_async ( executor , state , callback ) do |st , cb |
809
+ callback_on_failure st , cb
804
810
end
805
811
end
806
812
807
- def pr_callback_on_success ( state , callback )
813
+ def callback_on_success ( state , callback )
808
814
state . apply callback if state . success?
809
815
end
810
816
811
- def pr_callback_on_failure ( state , callback )
817
+ def callback_on_failure ( state , callback )
812
818
state . apply callback unless state . success?
813
819
end
814
820
815
- def pr_callback_on_completion ( state , callback )
821
+ def callback_on_completion ( state , callback )
816
822
callback . call state . result
817
823
end
818
824
819
- def pr_callback_notify_blocked ( state , promise )
825
+ def callback_notify_blocked ( state , promise )
820
826
super ( promise )
821
827
end
822
828
823
- def pr_async_callback_on_completion ( state , executor , callback )
824
- pr_with_async ( executor , state , callback ) do |st , cb |
825
- pr_callback_on_completion st , cb
829
+ def async_callback_on_completion ( state , executor , callback )
830
+ with_async ( executor , state , callback ) do |st , cb |
831
+ callback_on_completion st , cb
826
832
end
827
833
end
828
834
@@ -1001,7 +1007,7 @@ class InnerPromise < AbstractPromise
1001
1007
class BlockedPromise < InnerPromise
1002
1008
def self . new ( *args , &block )
1003
1009
promise = super ( *args , &block )
1004
- promise . blocked_by . each { |f | f . add_callback :pr_callback_notify_blocked , promise }
1010
+ promise . blocked_by . each { |f | f . add_callback :callback_notify_blocked , promise }
1005
1011
promise
1006
1012
end
1007
1013
@@ -1014,7 +1020,7 @@ def initialize(future, blocked_by_futures, countdown)
1014
1020
# @api private
1015
1021
def on_done ( future )
1016
1022
countdown = process_on_done ( future )
1017
- completable = completable? ( countdown )
1023
+ completable = completable? ( countdown , future )
1018
1024
1019
1025
if completable
1020
1026
on_completable ( future )
@@ -1051,7 +1057,7 @@ def clear_blocked_by!
1051
1057
end
1052
1058
1053
1059
# @return [true,false] if completable
1054
- def completable? ( countdown )
1060
+ def completable? ( countdown , future )
1055
1061
countdown . zero?
1056
1062
end
1057
1063
@@ -1171,7 +1177,7 @@ def process_on_done(future)
1171
1177
case value
1172
1178
when Future
1173
1179
@BlockedBy . push value
1174
- value . add_callback :pr_callback_notify_blocked , self
1180
+ value . add_callback :callback_notify_blocked , self
1175
1181
@Countdown . value
1176
1182
when Event
1177
1183
evaluate_to ( lambda { raise TypeError , 'cannot flatten to Event' } )
@@ -1200,8 +1206,8 @@ def clear_blocked_by!
1200
1206
nil
1201
1207
end
1202
1208
1203
- def completable? ( countdown )
1204
- !@Future . internal_state . completed? && super ( countdown )
1209
+ def completable? ( countdown , future )
1210
+ !@Future . internal_state . completed? && super ( countdown , future )
1205
1211
end
1206
1212
end
1207
1213
@@ -1321,7 +1327,7 @@ def on_completable(done_future)
1321
1327
end
1322
1328
1323
1329
# @!visibility private
1324
- class AnyPromise < BlockedPromise
1330
+ class AnyCompletePromise < BlockedPromise
1325
1331
1326
1332
private
1327
1333
@@ -1331,7 +1337,7 @@ def initialize(blocked_by_futures, default_executor)
1331
1337
super ( Future . new ( self , default_executor ) , blocked_by_futures , blocked_by_futures . size )
1332
1338
end
1333
1339
1334
- def completable? ( countdown )
1340
+ def completable? ( countdown , future )
1335
1341
true
1336
1342
end
1337
1343
@@ -1341,29 +1347,35 @@ def on_completable(done_future)
1341
1347
end
1342
1348
1343
1349
# @!visibility private
1344
- class Delay < InnerPromise
1345
- def touch
1346
- @Future . complete_with Event ::COMPLETED
1347
- end
1350
+ class AnySuccessfulPromise < BlockedPromise
1348
1351
1349
1352
private
1350
1353
1351
- def initialize ( default_executor )
1352
- super Event . new ( self , default_executor )
1354
+ def initialize ( blocked_by_futures , default_executor )
1355
+ blocked_by_futures . all? { |f | f . is_a? Future } or
1356
+ raise ArgumentError , 'accepts only Futures not Events'
1357
+ super ( Future . new ( self , default_executor ) , blocked_by_futures , blocked_by_futures . size )
1358
+ end
1359
+
1360
+ def completable? ( countdown , future )
1361
+ future . success? || super ( countdown , future )
1362
+ end
1363
+
1364
+ def on_completable ( done_future )
1365
+ complete_with done_future . internal_state , false
1353
1366
end
1354
1367
end
1355
1368
1356
1369
# @!visibility private
1357
- class DelayValue < InnerPromise
1370
+ class DelayPromise < InnerPromise
1358
1371
def touch
1359
- @Future . complete_with Future :: Success . new ( @Value )
1372
+ @Future . complete_with Event :: COMPLETED
1360
1373
end
1361
1374
1362
1375
private
1363
1376
1364
- def initialize ( default_executor , value )
1365
- super Future . new ( self , default_executor )
1366
- @Value = value
1377
+ def initialize ( default_executor )
1378
+ super Event . new ( self , default_executor )
1367
1379
end
1368
1380
end
1369
1381
@@ -1401,7 +1413,10 @@ def initialize(default_executor, intended_time)
1401
1413
end
1402
1414
end
1403
1415
end
1404
-
1405
- extend Edge ::FutureShortcuts
1406
- include Edge ::FutureShortcuts
1407
1416
end
1417
+
1418
+ Concurrent ::Edge . send :extend , Concurrent ::Edge ::FutureShortcuts
1419
+ Concurrent ::Edge . send :include , Concurrent ::Edge ::FutureShortcuts
1420
+
1421
+ Concurrent . send :extend , Concurrent ::Edge ::FutureShortcuts
1422
+ Concurrent . send :include , Concurrent ::Edge ::FutureShortcuts
0 commit comments