@@ -31,6 +31,7 @@ def future(*args, &task)
31
31
future_on ( :io , *args , &task )
32
32
end
33
33
34
+ # As {#future} but takes default_executor as first argument
34
35
def future_on ( default_executor , *args , &task )
35
36
ImmediateEventPromise . new ( default_executor ) . future . then ( *args , &task )
36
37
end
@@ -68,6 +69,7 @@ def delay(*args, &task)
68
69
delay_on :io , *args , &task
69
70
end
70
71
72
+ # As {#delay} but takes default_executor as first argument
71
73
def delay_on ( default_executor , *args , &task )
72
74
DelayPromise . new ( default_executor ) . future . then ( *args , &task )
73
75
end
@@ -79,6 +81,7 @@ def schedule(intended_time, *args, &task)
79
81
schedule_on :io , intended_time , *args , &task
80
82
end
81
83
84
+ # As {#schedule} but takes default_executor as first argument
82
85
def schedule_on ( default_executor , intended_time , *args , &task )
83
86
ScheduledPromise . new ( default_executor , intended_time ) . future . then ( *args , &task )
84
87
end
@@ -92,6 +95,7 @@ def zip_futures(*futures_and_or_events)
92
95
zip_futures_on :io , *futures_and_or_events
93
96
end
94
97
98
+ # As {#zip_futures} but takes default_executor as first argument
95
99
def zip_futures_on ( default_executor , *futures_and_or_events )
96
100
ZipFuturesPromise . new ( futures_and_or_events , default_executor ) . future
97
101
end
@@ -106,6 +110,7 @@ def zip_events(*futures_and_or_events)
106
110
zip_events_on :io , *futures_and_or_events
107
111
end
108
112
113
+ # As {#zip_events} but takes default_executor as first argument
109
114
def zip_events_on ( default_executor , *futures_and_or_events )
110
115
ZipEventsPromise . new ( futures_and_or_events , default_executor ) . future
111
116
end
@@ -117,6 +122,7 @@ def any_complete_future(*futures)
117
122
any_complete_future_on :io , *futures
118
123
end
119
124
125
+ # As {#any_complete_future} but takes default_executor as first argument
120
126
def any_complete_future_on ( default_executor , *futures )
121
127
AnyCompleteFuturePromise . new ( futures , default_executor ) . future
122
128
end
@@ -131,14 +137,17 @@ def any_successful_future(*futures)
131
137
any_successful_future_on :io , *futures
132
138
end
133
139
140
+ # As {#any_succesful_future} but takes default_executor as first argument
134
141
def any_successful_future_on ( default_executor , *futures )
135
142
AnySuccessfulFuturePromise . new ( futures , default_executor ) . future
136
143
end
137
144
145
+ # Constructs new {Event} which becomes complete after first if the events completes.
138
146
def any_event ( *events )
139
147
any_event_on :io , *events
140
148
end
141
149
150
+ # As {#any_event} but takes default_executor as first argument
142
151
def any_event_on ( default_executor , *events )
143
152
AnyCompleteEventPromise . new ( events , default_executor ) . event
144
153
end
@@ -147,14 +156,7 @@ def any_event_on(default_executor, *events)
147
156
# TODO consider adding zip_by(slice, *futures) processing futures in slices
148
157
end
149
158
150
- # Represents an event which will happen in future (will be completed). It has to always happen.
151
- class Event < Synchronization ::Object
152
- safe_initialization!
153
- private ( *attr_atomic ( :internal_state ) )
154
- # @!visibility private
155
- public :internal_state
156
- include Concern ::Logging
157
-
159
+ module InternalStates
158
160
class State
159
161
def completed?
160
162
raise NotImplementedError
@@ -238,13 +240,17 @@ def to_sym
238
240
end
239
241
end
240
242
243
+ private_constant :Success
244
+
241
245
# @!visibility private
242
246
class SuccessArray < Success
243
247
def apply ( args , block )
244
248
block . call ( *value , *args )
245
249
end
246
250
end
247
251
252
+ private_constant :SuccessArray
253
+
248
254
# @!visibility private
249
255
class Failed < CompletedWithResult
250
256
def initialize ( reason )
@@ -272,6 +278,8 @@ def apply(args, block)
272
278
end
273
279
end
274
280
281
+ private_constant :Failed
282
+
275
283
# @!visibility private
276
284
class PartiallyFailed < CompletedWithResult
277
285
def initialize ( value , reason )
@@ -301,12 +309,26 @@ def apply(args, block)
301
309
end
302
310
end
303
311
312
+ private_constant :PartiallyFailed
304
313
305
- # @!visibility private
306
314
PENDING = Pending . new
307
- # @!visibility private
308
315
COMPLETED = Success . new ( nil )
309
316
317
+ private_constant :PENDING , :COMPLETED
318
+ end
319
+
320
+ private_constant :InternalStates
321
+
322
+ # Represents an event which will happen in future (will be completed). It has to always happen.
323
+ class Event < Synchronization ::Object
324
+ safe_initialization!
325
+ private ( *attr_atomic ( :internal_state ) )
326
+ # @!visibility private
327
+ public :internal_state
328
+
329
+ include Concern ::Logging
330
+ include InternalStates
331
+
310
332
def initialize ( promise , default_executor )
311
333
super ( )
312
334
@Lock = Mutex . new
@@ -913,6 +935,7 @@ def with_hidden_completable
913
935
# @abstract
914
936
class AbstractPromise < Synchronization ::Object
915
937
safe_initialization!
938
+ include InternalStates
916
939
include Concern ::Logging
917
940
918
941
def initialize ( future )
@@ -953,12 +976,12 @@ def complete_with(new_state, raise_on_reassign = true)
953
976
954
977
# @return [Future]
955
978
def evaluate_to ( *args , block )
956
- complete_with Future :: Success . new ( block . call ( *args ) )
979
+ complete_with Success . new ( block . call ( *args ) )
957
980
rescue StandardError => error
958
- complete_with Future :: Failed . new ( error )
981
+ complete_with Failed . new ( error )
959
982
rescue Exception => error
960
983
log ( ERROR , 'Promises::Future' , error )
961
- complete_with Future :: Failed . new ( error )
984
+ complete_with Failed . new ( error )
962
985
end
963
986
end
964
987
@@ -973,35 +996,24 @@ def initialize(default_executor)
973
996
super CompletableFuture . new ( self , default_executor )
974
997
end
975
998
976
- # Set the `Future` to a value and wake or notify all threads waiting on it.
977
- #
978
- # @param [Object] value the value to store in the `Future`
979
- # @raise [Concurrent::MultipleAssignmentError] if the `Future` has already been set or otherwise completed
980
- # @return [Future]
981
999
def success ( value )
982
- complete_with Future :: Success . new ( value )
1000
+ complete_with Success . new ( value )
983
1001
end
984
1002
985
1003
def try_success ( value )
986
- !!complete_with ( Future :: Success . new ( value ) , false )
1004
+ !!complete_with ( Success . new ( value ) , false )
987
1005
end
988
1006
989
- # Set the `Future` to failed due to some error and wake or notify all threads waiting on it.
990
- #
991
- # @param [Object] reason for the failure
992
- # @raise [Concurrent::MultipleAssignmentError] if the `Future` has already been set or otherwise completed
993
- # @return [Future]
994
1007
def fail ( reason = StandardError . new )
995
- complete_with Future :: Failed . new ( reason )
1008
+ complete_with Failed . new ( reason )
996
1009
end
997
1010
998
1011
def try_fail ( reason = StandardError . new )
999
- !!complete_with ( Future :: Failed . new ( reason ) , false )
1012
+ !!complete_with ( Failed . new ( reason ) , false )
1000
1013
end
1001
1014
1002
1015
public :evaluate_to
1003
1016
1004
- # @return [Future]
1005
1017
def evaluate_to! ( *args , block )
1006
1018
evaluate_to ( *args , block ) . wait!
1007
1019
end
@@ -1152,14 +1164,14 @@ def on_completable(done_future)
1152
1164
# will be immediately completed
1153
1165
class ImmediateEventPromise < InnerPromise
1154
1166
def initialize ( default_executor )
1155
- super Event . new ( self , default_executor ) . complete_with ( Event :: COMPLETED )
1167
+ super Event . new ( self , default_executor ) . complete_with ( COMPLETED )
1156
1168
end
1157
1169
end
1158
1170
1159
1171
class ImmediateFuturePromise < InnerPromise
1160
1172
def initialize ( default_executor , success , value , reason )
1161
1173
super Future . new ( self , default_executor ) .
1162
- complete_with ( success ? Future :: Success . new ( value ) : Future :: Failed . new ( reason ) )
1174
+ complete_with ( success ? Success . new ( value ) : Failed . new ( reason ) )
1163
1175
end
1164
1176
end
1165
1177
@@ -1226,7 +1238,7 @@ def initialize(event1, event2, default_executor)
1226
1238
end
1227
1239
1228
1240
def on_completable ( done_future )
1229
- complete_with Event :: COMPLETED
1241
+ complete_with COMPLETED
1230
1242
end
1231
1243
end
1232
1244
@@ -1253,9 +1265,9 @@ def on_completable(done_future)
1253
1265
success2 , value2 , reason2 = @Future2Result . result
1254
1266
success = success1 && success2
1255
1267
new_state = if success
1256
- Future :: SuccessArray . new ( [ value1 , value2 ] )
1268
+ SuccessArray . new ( [ value1 , value2 ] )
1257
1269
else
1258
- Future :: PartiallyFailed . new ( [ value1 , value2 ] , [ reason1 , reason2 ] )
1270
+ PartiallyFailed . new ( [ value1 , value2 ] , [ reason1 , reason2 ] )
1259
1271
end
1260
1272
complete_with new_state
1261
1273
end
@@ -1267,7 +1279,7 @@ def initialize(event, default_executor)
1267
1279
end
1268
1280
1269
1281
def on_completable ( done_future )
1270
- complete_with Event :: COMPLETED
1282
+ complete_with COMPLETED
1271
1283
end
1272
1284
end
1273
1285
@@ -1306,9 +1318,9 @@ def on_completable(done_future)
1306
1318
end
1307
1319
1308
1320
if all_success
1309
- complete_with Future :: SuccessArray . new ( values )
1321
+ complete_with SuccessArray . new ( values )
1310
1322
else
1311
- complete_with Future :: PartiallyFailed . new ( values , reasons )
1323
+ complete_with PartiallyFailed . new ( values , reasons )
1312
1324
end
1313
1325
end
1314
1326
end
@@ -1324,10 +1336,11 @@ def initialize(blocked_by_futures, default_executor)
1324
1336
end
1325
1337
1326
1338
def on_completable ( done_future )
1327
- complete_with Event :: COMPLETED
1339
+ complete_with COMPLETED
1328
1340
end
1329
1341
end
1330
1342
1343
+ # @abstract
1331
1344
class AbstractAnyPromise < BlockedPromise
1332
1345
def touch
1333
1346
blocked_by . each ( &:touch ) unless @Future . completed?
@@ -1364,7 +1377,7 @@ def completable?(countdown, future)
1364
1377
end
1365
1378
1366
1379
def on_completable ( done_future )
1367
- complete_with Event :: COMPLETED , false
1380
+ complete_with COMPLETED , false
1368
1381
end
1369
1382
end
1370
1383
@@ -1373,14 +1386,15 @@ class AnySuccessfulFuturePromise < AnyCompleteFuturePromise
1373
1386
private
1374
1387
1375
1388
def completable? ( countdown , future )
1376
- future . success? || super ( countdown , future )
1389
+ future . success? ||
1390
+ # inlined super from BlockedPromise
1377
1391
countdown . zero?
1378
1392
end
1379
1393
end
1380
1394
1381
1395
class DelayPromise < InnerPromise
1382
1396
def touch
1383
- @Future . complete_with Event :: COMPLETED
1397
+ @Future . complete_with COMPLETED
1384
1398
end
1385
1399
1386
1400
private
@@ -1390,7 +1404,6 @@ def initialize(default_executor)
1390
1404
end
1391
1405
end
1392
1406
1393
- # will be evaluated to task in intended_time
1394
1407
class ScheduledPromise < InnerPromise
1395
1408
def intended_time
1396
1409
@IntendedTime
@@ -1418,7 +1431,7 @@ def initialize(default_executor, intended_time)
1418
1431
end
1419
1432
1420
1433
Concurrent . global_timer_set . post ( in_seconds ) do
1421
- @Future . complete_with Event :: COMPLETED
1434
+ @Future . complete_with COMPLETED
1422
1435
end
1423
1436
end
1424
1437
end
0 commit comments