3
3
Promises is a new framework unifying former ` Concurrent::Future ` ,
4
4
` Concurrent::Promise ` , ` Concurrent::IVar ` , ` Concurrent::Event ` ,
5
5
` Concurrent.dataflow ` , ` Delay ` , and ` TimerTask ` . It extensively uses the new
6
- synchronization layer to make all the features * non-blocking * and
7
- * lock-free * , with the exception of obviously blocking operations like
6
+ synchronization layer to make all the features * lock-free * ,
7
+ with the exception of obviously blocking operations like
8
8
` #wait ` , ` #value ` , etc. As a result it lowers a danger of deadlocking and offers
9
9
better performance.
10
10
@@ -15,11 +15,11 @@ better performance.
15
15
- What is it?
16
16
- What is it for?
17
17
- Main classes {Future}, {Event}
18
- - Explain ` _on ` ` _using ` suffixes.
18
+ - Explain pool usage : io vs : fast , and ` _on ` ` _using ` suffixes.
19
19
20
20
## Old examples follow
21
21
22
- * TODO rewrite into md with examples *
22
+ * TODO review pending *
23
23
24
24
Constructors are not accessible, instead there are many constructor methods in
25
25
FactoryMethods.
@@ -65,7 +65,7 @@ future.value
65
65
future.resolved?
66
66
```
67
67
68
- Rejecting asynchronous task
68
+ Rejecting asynchronous task:
69
69
70
70
``` ruby
71
71
future = future { raise ' Boom' }
@@ -76,14 +76,16 @@ future.reason
76
76
raise future rescue $!
77
77
```
78
78
79
- Direct creation of resolved futures
79
+ Direct creation of resolved futures:
80
80
81
81
``` ruby
82
82
fulfilled_future(Object .new )
83
83
rejected_future(StandardError .new (" boom" ))
84
+ ```
84
85
85
- # ## Chaining of futures
86
+ Chaining of futures:
86
87
88
+ ``` ruby
87
89
head = fulfilled_future 1 #
88
90
branch1 = head.then(& :succ ) #
89
91
branch2 = head.then(& :succ ).then(& :succ ) #
@@ -96,19 +98,19 @@ zip(branch1, branch2, branch1).then { |*values| values.reduce &:+ }.value!
96
98
# pick only first resolved
97
99
any(branch1, branch2).value!
98
100
(branch1 | branch2).value!
101
+ ```
99
102
103
+ Any supplied arguments are passed to the block, promises ensure that they are visible to the block:
100
104
101
- # ## Arguments
102
-
103
- # any supplied arguments are passed to the block, promises ensure that they are visible to the block
104
-
105
+ ``` ruby
105
106
future(' 3' ) { |s | s.to_i }.then(2 ) { |a , b | a + b }.value
106
107
fulfilled_future(1 ).then(2 , & :+ ).value
107
108
fulfilled_future(1 ).chain(2 ) { |fulfilled , value , reason , arg | value + arg }.value
109
+ ```
108
110
111
+ Error handling:
109
112
110
- # ## Error handling
111
-
113
+ ``` ruby
112
114
fulfilled_future(Object .new ).then(& :succ ).then(& :succ ).rescue { |e | e.class }.value # error propagates
113
115
fulfilled_future(Object .new ).then(& :succ ).rescue { 1 }.then(& :succ ).value # rescued and replaced with 1
114
116
fulfilled_future(1 ).then(& :succ ).rescue { |e | e.message }.then(& :succ ).value # no error, rescue not applied
@@ -118,18 +120,18 @@ rejected_zip.result
118
120
rejected_zip.then { |v | ' never happens' }.result
119
121
rejected_zip.rescue { |a , b | (a || b).message }.value
120
122
rejected_zip.chain { |fulfilled , values , reasons | [fulfilled, values.compact, reasons.compact] }.value
123
+ ```
121
124
125
+ Delay will not evaluate until asked by #value or other method requiring resolution.
122
126
123
- # ## Delay
124
-
125
- # will not evaluate until asked by #value or other method requiring resolution
127
+ ``` ruby
126
128
future = delay { ' lazy' }
127
129
sleep 0.1 #
128
130
future.resolved?
129
131
future.value
130
-
131
- # propagates trough chain allowing whole or partial lazy chains
132
-
132
+ ```
133
+ It propagates trough chain allowing whole or partial lazy chains.
134
+ ``` ruby
133
135
head = delay { 1 }
134
136
branch1 = head.then(& :succ )
135
137
branch2 = head.delay.then(& :succ )
@@ -144,21 +146,23 @@ sleep 0.1 # forces only head to resolve, branch 2 stays pending
144
146
145
147
join.value
146
148
[head, branch1, branch2, join].map(& :resolved? )
149
+ ```
147
150
151
+ When flatting, it waits for inner future. Only the last call to value blocks thread.
148
152
149
- # ## Flatting
150
-
151
- # waits for inner future, only the last call to value blocks thread
153
+ ``` ruby
152
154
future { future { 1 + 1 } }.flat.value
153
155
154
156
# more complicated example
155
157
future { future { future { 1 + 1 } } }.
156
158
flat(1 ).
157
159
then { |f | f.then(& :succ ) }.
158
160
flat(1 ).value
161
+ ```
159
162
163
+ Scheduling of asynchronous tasks:
160
164
161
- # ## Schedule
165
+ ``` ruby
162
166
163
167
# it'll be executed after 0.1 seconds
164
168
scheduled = schedule(0.1 ) { 1 }
@@ -171,9 +175,11 @@ scheduled = delay { 1 }.schedule(0.1).then(&:succ)
171
175
# will not be scheduled until value is requested
172
176
sleep 0.1 #
173
177
scheduled.value # returns after another 0.1sec
178
+ ```
174
179
180
+ Resolvable Future and Event:
175
181
176
- # ## Resolvable Future and Event
182
+ ``` ruby
177
183
178
184
future = resolvable_future
179
185
event = resolvable_event()
@@ -189,10 +195,11 @@ event.resolve
189
195
190
196
# The threads can be joined now
191
197
[t1, t2].each & :join #
198
+ ```
192
199
200
+ Callbacks:
193
201
194
- # ## Callbacks
195
-
202
+ ``` ruby
196
203
queue = Queue .new
197
204
future = delay { 1 + 1 }
198
205
@@ -203,22 +210,22 @@ queue.empty?
203
210
future.value
204
211
queue.pop
205
212
queue.pop
213
+ ```
206
214
215
+ Factory methods are taking names of the global executors
216
+ (or instances of custom executors).
207
217
208
- # ## Thread-pools
209
-
210
- # Factory methods are taking names of the global executors
211
- # (ot instances of custom executors)
212
-
218
+ ``` ruby
213
219
# executed on :fast executor, only short and non-blocking tasks can go there
214
220
future_on(:fast ) { 2 }.
215
221
# executed on executor for blocking and long operations
216
222
then_using(:io ) { File .read __FILE__ }.
217
223
wait
224
+ ```
218
225
226
+ Interoperability with actors:
219
227
220
- # ## Interoperability with actors
221
-
228
+ ``` ruby
222
229
actor = Concurrent ::Actor ::Utils ::AdHoc .spawn :square do
223
230
-> v { v ** 2 }
224
231
end
@@ -230,37 +237,26 @@ future { 2 }.
230
237
value
231
238
232
239
actor.ask(2 ).then(& :succ ).value
233
-
234
-
235
- # ## Interoperability with channels
236
-
237
- ch1 = Concurrent ::Channel .new
238
- ch2 = Concurrent ::Channel .new
239
-
240
- result = select (ch1, ch2)
241
- ch1.put 1
242
- result.value!
243
-
244
-
245
- future { 1 + 1 }.
246
- then_put(ch1)
247
- result = future { ' %02d' }.
248
- then_select(ch1, ch2).
249
- then { |format , (value , channel )| format format , value }
250
- result.value!
251
-
240
+ ```
252
241
253
242
### Common use-cases Examples
254
243
255
- # simple background processing
244
+ #### simple background processing
245
+
246
+ ``` ruby
256
247
future { do_stuff }
248
+ ```
257
249
258
- # parallel background processing
250
+ #### parallel background processing
251
+
252
+ ``` ruby
259
253
jobs = 10 .times.map { |i | future { i } } #
260
254
zip(* jobs).value
255
+ ```
261
256
257
+ #### periodic task
262
258
263
- # periodic task
259
+ ``` ruby
264
260
def schedule_job (interval , & job )
265
261
# schedule the first execution and chain restart og the job
266
262
Concurrent .schedule(interval, & job).chain do |fulfilled , continue , reason |
@@ -269,23 +265,25 @@ def schedule_job(interval, &job)
269
265
else
270
266
# handle error
271
267
p reason
272
- # retry
273
- schedule_job(interval, & job)
268
+ # retry sooner
269
+ schedule_job(interval / 10 , & job)
274
270
end
275
271
end
276
272
end
277
273
278
274
queue = Queue .new
279
275
count = 0
276
+ interval = 0.05 # small just not to delay execution of this example
280
277
281
- schedule_job 0.05 do
278
+ schedule_job interval do
282
279
queue.push count
283
280
count += 1
284
281
# to continue scheduling return true, false will end the task
285
282
if count < 4
286
283
# to continue scheduling return true
287
284
true
288
285
else
286
+ # close the queue with nil to simplify reading it
289
287
queue.push nil
290
288
# to end the task return false
291
289
false
@@ -294,10 +292,14 @@ end
294
292
295
293
# read the queue
296
294
arr, v = [], nil ; arr << v while (v = queue.pop) #
295
+ # arr has the results from the executed scheduled tasks
297
296
arr
297
+ ```
298
+ #### How to limit processing where there are limited resources?
299
+
300
+ By creating an actor managing the resource
298
301
299
- # How to limit processing where there are limited resources?
300
- # By creating an actor managing the resource
302
+ ``` ruby
301
303
DB = Concurrent ::Actor ::Utils ::AdHoc .spawn :db do
302
304
data = Array .new (10 ) { |i | ' *' * i }
303
305
lambda do |message |
@@ -317,9 +319,11 @@ concurrent_jobs = 11.times.map do |v|
317
319
end #
318
320
319
321
zip(* concurrent_jobs).value!
322
+ ```
320
323
324
+ In reality there is often a pool though:
321
325
322
- # In reality there is often a pool though:
326
+ ``` ruby
323
327
data = Array .new (10 ) { |i | ' *' * i }
324
328
pool_size = 5
325
329
0 commit comments