1
- # Promises Framework
2
-
3
1
Promises is a new framework unifying former ` Concurrent::Future ` ,
4
2
` Concurrent::Promise ` , ` Concurrent::IVar ` , ` Concurrent::Event ` ,
5
3
` Concurrent.dataflow ` , ` Delay ` , and ` TimerTask ` . It extensively uses the new
@@ -8,16 +6,14 @@ with the exception of obviously blocking operations like
8
6
` #wait ` , ` #value ` , etc. As a result it lowers a danger of deadlocking and offers
9
7
better performance.
10
8
11
- ## Overview
12
-
13
9
* TODO*
14
10
15
11
- What is it?
16
12
- What is it for?
17
13
- Main classes {Future}, {Event}
18
14
- Explain pool usage : io vs : fast , and ` _on ` ` _using ` suffixes.
19
15
20
- ## Old examples follow
16
+ # Old examples
21
17
22
18
* TODO review pending*
23
19
@@ -66,16 +62,16 @@ Class.new do
66
62
resolvable_event
67
63
end
68
64
end .new .a_method
69
- # => <#Concurrent::Promises::ResolvableEvent:0x7fbb023df1e8 pending blocks:[]>
65
+ # => <#Concurrent::Promises::ResolvableEvent:0x7fc5b1b085c8 pending blocks:[]>
70
66
71
67
Module .new { extend Concurrent ::Promises ::FactoryMethods }.resolvable_event
72
- # => <#Concurrent::Promises::ResolvableEvent:0x7fbb023d7600 pending blocks:[]>
68
+ # => <#Concurrent::Promises::ResolvableEvent:0x7fc5b1b02088 pending blocks:[]>
73
69
```
74
70
The module is already extended into {Promises} for convenience.
75
71
76
72
``` ruby
77
73
Concurrent ::Promises .resolvable_event
78
- # => <#Concurrent::Promises::ResolvableEvent:0x7fbb023d5ad0 pending blocks:[]>
74
+ # => <#Concurrent::Promises::ResolvableEvent:0x7fc5b1afac48 pending blocks:[]>
79
75
```
80
76
81
77
For this guide we include the module into ` main ` so we can call the factory
@@ -84,7 +80,7 @@ methods in following examples directly.
84
80
``` ruby
85
81
include Concurrent ::Promises ::FactoryMethods
86
82
resolvable_event
87
- # => <#Concurrent::Promises::ResolvableEvent:0x7fbb023cf608 pending blocks:[]>
83
+ # => <#Concurrent::Promises::ResolvableEvent:0x7fc5b1af8830 pending blocks:[]>
88
84
```
89
85
90
86
Simple asynchronous task:
@@ -101,7 +97,7 @@ Rejecting asynchronous task:
101
97
102
98
``` ruby
103
99
future = future { raise ' Boom' }
104
- # => <#Concurrent::Promises::Future:0x7fbb023b4308 pending blocks:[]>
100
+ # => <#Concurrent::Promises::Future:0x7fc5b1ad9700 pending blocks:[]>
105
101
future.value # => nil
106
102
future.value! rescue $! # => #<RuntimeError: Boom>
107
103
future.reason # => #<RuntimeError: Boom>
@@ -113,9 +109,9 @@ Direct creation of resolved futures:
113
109
114
110
``` ruby
115
111
fulfilled_future(Object .new )
116
- # => <#Concurrent::Promises::Future:0x7fbb023a58a8 fulfilled blocks:[]>
112
+ # => <#Concurrent::Promises::Future:0x7fc5b1acaa70 fulfilled blocks:[]>
117
113
rejected_future(StandardError .new (" boom" ))
118
- # => <#Concurrent::Promises::Future:0x7fbb023a79a0 rejected blocks:[]>
114
+ # => <#Concurrent::Promises::Future:0x7fc5b1ac97b0 rejected blocks:[]>
119
115
```
120
116
121
117
Chaining of futures:
@@ -155,7 +151,7 @@ fulfilled_future(Object.new).then(&:succ).rescue { 1 }.then(&:succ).value # resc
155
151
fulfilled_future(1 ).then(& :succ ).rescue { |e | e.message }.then(& :succ ).value # no error, rescue not applied
156
152
157
153
rejected_zip = fulfilled_future(1 ) & rejected_future(StandardError .new (' boom' ))
158
- # => <#Concurrent::Promises::Future:0x7fbb023343b0 rejected blocks:[]>
154
+ # => <#Concurrent::Promises::Future:0x7fc5b3051380 rejected blocks:[]>
159
155
rejected_zip.result
160
156
# => [false, [1, nil], [nil, #<StandardError: boom>]]
161
157
rejected_zip.then { |v | ' never happens' }.result
@@ -177,13 +173,13 @@ future.value
177
173
It propagates trough chain allowing whole or partial lazy chains.
178
174
``` ruby
179
175
head = delay { 1 }
180
- # => <#Concurrent::Promises::Future:0x7fbb02304b38 pending blocks:[]>
176
+ # => <#Concurrent::Promises::Future:0x7fc5b3021450 pending blocks:[]>
181
177
branch1 = head.then(& :succ )
182
- # => <#Concurrent::Promises::Future:0x7fbb022fe328 pending blocks:[]>
178
+ # => <#Concurrent::Promises::Future:0x7fc5b301b398 pending blocks:[]>
183
179
branch2 = head.delay.then(& :succ )
184
- # => <#Concurrent::Promises::Future:0x7fbb03867d68 pending blocks:[]>
180
+ # => <#Concurrent::Promises::Future:0x7fc5b30190c0 pending blocks:[]>
185
181
join = branch1 & branch2
186
- # => <#Concurrent::Promises::Future:0x7fbb03865fe0 pending blocks:[]>
182
+ # => <#Concurrent::Promises::Future:0x7fc5b30138f0 pending blocks:[]>
187
183
188
184
sleep 0.1 # nothing will resolve
189
185
[head, branch1, branch2, join].map(& :resolved? )
@@ -217,14 +213,14 @@ Scheduling of asynchronous tasks:
217
213
218
214
# it'll be executed after 0.1 seconds
219
215
scheduled = schedule(0.1 ) { 1 }
220
- # => <#Concurrent::Promises::Future:0x7fbb022a5570 pending blocks:[]>
216
+ # => <#Concurrent::Promises::Future:0x7fc5b1a2a7f0 pending blocks:[]>
221
217
222
218
scheduled.resolved? # => false
223
219
scheduled.value # available after 0.1sec
224
220
225
221
# and in chain
226
222
scheduled = delay { 1 }.schedule(0.1 ).then(& :succ )
227
- # => <#Concurrent::Promises::Future:0x7fbb022948b0 pending blocks:[]>
223
+ # => <#Concurrent::Promises::Future:0x7fc5b1a19a18 pending blocks:[]>
228
224
# will not be scheduled until value is requested
229
225
sleep 0.1
230
226
scheduled.value # returns after another 0.1sec
@@ -235,21 +231,21 @@ Resolvable Future and Event:
235
231
``` ruby
236
232
237
233
future = resolvable_future
238
- # => <#Concurrent::Promises::ResolvableFuture:0x7fbb0223c1b0 pending blocks:[]>
234
+ # => <#Concurrent::Promises::ResolvableFuture:0x7fc5b19c17a0 pending blocks:[]>
239
235
event = resolvable_event()
240
- # => <#Concurrent::Promises::ResolvableEvent:0x7fbb021df0c8 pending blocks:[]>
236
+ # => <#Concurrent::Promises::ResolvableEvent:0x7fc5b19c0468 pending blocks:[]>
241
237
242
238
# These threads will be blocked until the future and event is resolved
243
239
t1 = Thread .new { future.value }
244
240
t2 = Thread .new { event.wait }
245
241
246
242
future.fulfill 1
247
- # => <#Concurrent::Promises::ResolvableFuture:0x7fbb0223c1b0 fulfilled blocks:[]>
243
+ # => <#Concurrent::Promises::ResolvableFuture:0x7fc5b19c17a0 fulfilled blocks:[]>
248
244
future.fulfill 1 rescue $!
249
245
# => #<Concurrent::MultipleAssignmentError: Future can be resolved only once. Current result is [true, 1, nil], trying to set [true, 1, nil]>
250
246
future.fulfill 2 , false # => false
251
247
event.resolve
252
- # => <#Concurrent::Promises::ResolvableEvent:0x7fbb021df0c8 fulfilled blocks:[]>
248
+ # => <#Concurrent::Promises::ResolvableEvent:0x7fc5b19c0468 fulfilled blocks:[]>
253
249
254
250
# The threads can be joined now
255
251
[t1, t2].each & :join
@@ -258,9 +254,9 @@ event.resolve
258
254
Callbacks:
259
255
260
256
``` ruby
261
- queue = Queue .new # => #<Thread::Queue:0x007fbb021b63d0 >
257
+ queue = Queue .new # => #<Thread::Queue:0x007fc5b193b880 >
262
258
future = delay { 1 + 1 }
263
- # => <#Concurrent::Promises::Future:0x7fbb021b4fd0 pending blocks:[]>
259
+ # => <#Concurrent::Promises::Future:0x7fc5b193a9a8 pending blocks:[]>
264
260
265
261
future.on_fulfillment { queue << 1 } # evaluated asynchronously
266
262
future.on_fulfillment! { queue << 2 } # evaluated on resolving thread
@@ -278,7 +274,7 @@ Factory methods are taking names of the global executors
278
274
# executed on :fast executor, only short and non-blocking tasks can go there
279
275
future_on(:fast ) { 2 }.
280
276
# executed on executor for blocking and long operations
281
- then_using (:io ) { File .read __FILE__ }.
277
+ then_on (:io ) { File .read __FILE__ }.
282
278
wait
283
279
```
284
280
@@ -288,7 +284,7 @@ Interoperability with actors:
288
284
actor = Concurrent ::Actor ::Utils ::AdHoc .spawn :square do
289
285
-> v { v ** 2 }
290
286
end
291
- # => #<Concurrent::Actor::Reference:0x7fbb02104310 /square (Concurrent::Actor::Utils::AdHoc)>
287
+ # => #<Concurrent::Actor::Reference:0x7fc5b18a37b0 /square (Concurrent::Actor::Utils::AdHoc)>
292
288
293
289
294
290
future { 2 }.
@@ -299,23 +295,23 @@ future { 2 }.
299
295
actor.ask(2 ).then(& :succ ).value # => 5
300
296
```
301
297
302
- ### Common use-cases Examples
298
+ # Common use-cases Examples
303
299
304
- #### simple background processing
300
+ ## simple background processing
305
301
306
302
``` ruby
307
303
future { do_stuff }
308
- # => <#Concurrent::Promises::Future:0x7fbb020bfe40 pending blocks:[]>
304
+ # => <#Concurrent::Promises::Future:0x7fc5b186b4f0 pending blocks:[]>
309
305
```
310
306
311
- #### parallel background processing
307
+ ## parallel background processing
312
308
313
309
``` ruby
314
310
jobs = 10 .times.map { |i | future { i } }
315
311
zip(* jobs).value # => [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
316
312
```
317
313
318
- #### periodic task
314
+ ## periodic task
319
315
320
316
``` ruby
321
317
def schedule_job (interval , & job )
@@ -332,7 +328,7 @@ def schedule_job(interval, &job)
332
328
end
333
329
end
334
330
335
- queue = Queue .new # => #<Thread::Queue:0x007fbb01065658 >
331
+ queue = Queue .new # => #<Thread::Queue:0x007fc5b10a9730 >
336
332
count = 0 # => 0
337
333
interval = 0.05 # small just not to delay execution of this example
338
334
@@ -356,7 +352,7 @@ arr, v = [], nil; arr << v while (v = queue.pop)
356
352
# arr has the results from the executed scheduled tasks
357
353
arr # => [0, 1, 2, 3]
358
354
```
359
- #### How to limit processing where there are limited resources?
355
+ ## How to limit processing where there are limited resources?
360
356
361
357
By creating an actor managing the resource
362
358
421
417
zip(* concurrent_jobs).value!
422
418
# => [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, "undefined method `size' for nil:NilClass"]
423
419
```
420
+
421
+ # Experimental
422
+
423
+ ## Cancellation
424
+
425
+ ``` ruby
426
+ source, token = Concurrent ::Cancellation .create
427
+ # => [#<Concurrent::Cancellation:0x007fc5b19c1390
428
+ # @Cancel=
429
+ # <#Concurrent::Promises::ResolvableEvent:0x7fc5b19c1688 pending blocks:[<#Concurrent::Promises::EventWrapperPromise:0x7fc5b19c1250 pending>]>,
430
+ # @ResolveArgs=[],
431
+ # @Token=
432
+ # #<Concurrent::Cancellation::Token:0x007fc5b19c0e18
433
+ # @Cancel=<#Concurrent::Promises::Event:0x7fc5b19c11d8 pending blocks:[]>>>,
434
+ # #<Concurrent::Cancellation::Token:0x007fc5b19c0e18
435
+ # @Cancel=<#Concurrent::Promises::Event:0x7fc5b19c11d8 pending blocks:[]>>]
436
+
437
+ futures = Array .new (2 ) do
438
+ future(token) do |token |
439
+ token.loop_until_canceled { Thread .pass }
440
+ :done
441
+ end
442
+ end
443
+ # => [<#Concurrent::Promises::Future:0x7fc5b1938ef0 pending blocks:[]>,
444
+ # <#Concurrent::Promises::Future:0x7fc5b0a1f860 pending blocks:[]>]
445
+
446
+ sleep 0.05 # => 0
447
+ source.cancel # => true
448
+ futures.map(& :value! ) # => [:done, :done]
449
+ ```
450
+
451
+ ## Throttling
452
+
453
+ ``` ruby
454
+ data = (0 ..10 ).to_a # => [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
455
+ max_tree = Concurrent ::Throttle .new 3
456
+ # => #<Concurrent::Throttle:0x007fc5b1888e10
457
+ # @AtomicCanRun=#<Concurrent::AtomicReference:0x007fc5b1888de8>,
458
+ # @Queue=#<Thread::Queue:0x007fc5b1888dc0>>
459
+
460
+ futures = data.map do |data |
461
+ future(data) do |data |
462
+ # un-throttled
463
+ data + 1
464
+ end .throttle(max_tree) do |trigger |
465
+ # throttled, imagine it uses DB connections or other limited resource
466
+ trigger.then { |v | v * 2 * 2 }
467
+ end
468
+ end
469
+
470
+ futures.map(& :value! )
471
+ # => [4, 8, 12, 16, 20, 24, 28, 32, 36, 40, 44]
472
+ ```
0 commit comments