5
5
6
6
### Simple asynchronous task
7
7
8
- future = future { sleep 0.1 ; 1 + 1 } # evaluation starts immediately
9
- # => <#Concurrent::Promises::Future:0x7fc5cc1e5340 pending blocks:[]>
8
+ future = future ( 0.1 ) { | duration | sleep duration ; :result } # evaluation starts immediately
9
+ # => <#Concurrent::Promises::Future:0x7fa602198e30 pending blocks:[]>
10
10
future . completed? # => false
11
11
# block until evaluated
12
- future . value # => 2
12
+ future . value # => :result
13
13
future . completed? # => true
14
14
15
15
16
16
### Failing asynchronous task
17
17
18
18
future = future { raise 'Boom' }
19
- # => <#Concurrent::Promises::Future:0x7fc5cc1dc808 pending blocks:[]>
19
+ # => <#Concurrent::Promises::Future:0x7fa602188a58 pending blocks:[]>
20
20
future . value # => nil
21
21
future . value! rescue $! # => #<RuntimeError: Boom>
22
22
future . reason # => #<RuntimeError: Boom>
23
23
# re-raising
24
24
raise future rescue $! # => #<RuntimeError: Boom>
25
25
26
+
26
27
### Direct creation of completed futures
27
28
28
29
succeeded_future ( Object . new )
29
- # => <#Concurrent::Promises::Future:0x7fc5cc1c6030 success blocks:[]>
30
+ # => <#Concurrent::Promises::Future:0x7fa60217bf10 success blocks:[]>
30
31
failed_future ( StandardError . new ( "boom" ) )
31
- # => <#Concurrent::Promises::Future:0x7fc5cc1c50b8 failed blocks:[]>
32
+ # => <#Concurrent::Promises::Future:0x7fa60217aa48 failed blocks:[]>
32
33
33
34
### Chaining of futures
34
35
68
69
# => 3
69
70
70
71
failing_zip = succeeded_future ( 1 ) & failed_future ( StandardError . new ( 'boom' ) )
71
- # => <#Concurrent::Promises::Future:0x7fc5cc11ec90 failed blocks:[]>
72
+ # => <#Concurrent::Promises::Future:0x7fa602129300 failed blocks:[]>
72
73
failing_zip . result # => [false, [1, nil], [nil, #<StandardError: boom>]]
73
74
failing_zip . then { |v | 'never happens' } . result # => [false, [1, nil], [nil, #<StandardError: boom>]]
74
75
failing_zip . rescue { |a , b | ( a || b ) . message } . value
81
82
82
83
# will not evaluate until asked by #value or other method requiring completion
83
84
future = delay { 'lazy' }
84
- # => <#Concurrent::Promises::Future:0x7fc5cc0ff660 pending blocks:[]>
85
+ # => <#Concurrent::Promises::Future:0x7fa60211a490 pending blocks:[]>
85
86
sleep 0.1
86
87
future . completed? # => false
87
88
future . value # => "lazy"
88
89
89
90
# propagates trough chain allowing whole or partial lazy chains
90
91
91
92
head = delay { 1 }
92
- # => <#Concurrent::Promises::Future:0x7fc5cc0fc938 pending blocks:[]>
93
+ # => <#Concurrent::Promises::Future:0x7fa602113898 pending blocks:[]>
93
94
branch1 = head . then ( &:succ )
94
- # => <#Concurrent::Promises::Future:0x7fc5cc0df068 pending blocks:[]>
95
+ # => <#Concurrent::Promises::Future:0x7fa602112a60 pending blocks:[]>
95
96
branch2 = head . delay . then ( &:succ )
96
- # => <#Concurrent::Promises::Future:0x7fc5cc0dd178 pending blocks:[]>
97
+ # => <#Concurrent::Promises::Future:0x7fa602111188 pending blocks:[]>
97
98
join = branch1 & branch2
98
- # => <#Concurrent::Promises::Future:0x7fc5cc0dc430 pending blocks:[]>
99
+ # => <#Concurrent::Promises::Future:0x7fa6021102b0 pending blocks:[]>
99
100
100
101
sleep 0.1 # nothing will complete # => 0
101
102
[ head , branch1 , branch2 , join ] . map ( &:completed? ) # => [false, false, false, false]
102
103
103
104
branch1 . value # => 2
104
105
sleep 0.1 # forces only head to complete, branch 2 stays incomplete
105
- # => 0
106
+ # => 1
106
107
[ head , branch1 , branch2 , join ] . map ( &:completed? ) # => [true, true, false, false]
107
108
108
109
join . value # => [2, 2]
125
126
126
127
# it'll be executed after 0.1 seconds
127
128
scheduled = schedule ( 0.1 ) { 1 }
128
- # => <#Concurrent::Promises::Future:0x7fc5caaae028 pending blocks:[]>
129
+ # => <#Concurrent::Promises::Future:0x7fa60288a810 pending blocks:[]>
129
130
130
131
scheduled . completed? # => false
131
132
scheduled . value # available after 0.1sec # => 1
132
133
133
134
# and in chain
134
135
scheduled = delay { 1 } . schedule ( 0.1 ) . then ( &:succ )
135
- # => <#Concurrent::Promises::Future:0x7fc5caa9f2d0 pending blocks:[]>
136
+ # => <#Concurrent::Promises::Future:0x7fa603101638 pending blocks:[]>
136
137
# will not be scheduled until value is requested
137
138
sleep 0.1
138
139
scheduled . value # returns after another 0.1sec # => 2
141
142
### Completable Future and Event
142
143
143
144
future = completable_future
144
- # => <#Concurrent::Promises::CompletableFuture:0x7fc5caa8eae8 pending blocks:[]>
145
- event = event ( )
146
- # => <#Concurrent::Promises::CompletableEvent:0x7fc5caa8d648 pending blocks:[]>
145
+ # => <#Concurrent::Promises::CompletableFuture:0x7fa6030e0ac8 pending blocks:[]>
146
+ event = completable_event ( )
147
+ # => <#Concurrent::Promises::CompletableEvent:0x7fa6030e0a78 pending blocks:[]>
147
148
148
149
# These threads will be blocked until the future and event is completed
149
150
t1 = Thread . new { future . value }
150
151
t2 = Thread . new { event . wait }
151
152
152
153
future . success 1
153
- # => <#Concurrent::Promises::CompletableFuture:0x7fc5caa8eae8 success blocks:[]>
154
+ # => <#Concurrent::Promises::CompletableFuture:0x7fa6030e0ac8 success blocks:[]>
154
155
future . success 1 rescue $!
155
156
# => #<Concurrent::MultipleAssignmentError: Future can be completed only once. Current result is [true, 1, nil], trying to set [true, 1, nil]>
156
157
future . try_success 2 # => false
157
158
event . complete
158
- # => <#Concurrent::Promises::CompletableEvent:0x7fc5caa8d648 completed blocks:[]>
159
+ # => <#Concurrent::Promises::CompletableEvent:0x7fa6030e0a78 success blocks:[]>
159
160
160
161
# The threads can be joined now
161
162
[ t1 , t2 ] . each &:join
162
163
163
164
164
165
### Callbacks
165
166
166
- queue = Queue . new # => #<Thread::Queue:0x007fc5caa770c8 >
167
+ queue = Queue . new # => #<Thread::Queue:0x007fa6030ab5f8 >
167
168
future = delay { 1 + 1 }
168
- # => <#Concurrent::Promises::Future:0x7fc5caa754d0 pending blocks:[]>
169
+ # => <#Concurrent::Promises::Future:0x7fa6030aa888 pending blocks:[]>
169
170
170
171
future . on_success { queue << 1 } # evaluated asynchronously
171
- # => <#Concurrent::Promises::Future:0x7fc5caa754d0 pending blocks:[]>
172
+ # => <#Concurrent::Promises::Future:0x7fa6030aa888 pending blocks:[]>
172
173
future . on_success! { queue << 2 } # evaluated on completing thread
173
- # => <#Concurrent::Promises::Future:0x7fc5caa754d0 pending blocks:[]>
174
+ # => <#Concurrent::Promises::Future:0x7fa6030aa888 pending blocks:[]>
174
175
175
176
queue . empty? # => true
176
177
future . value # => 2
188
189
# executed on executor for blocking and long operations
189
190
then_on ( :io ) { File . read __FILE__ } .
190
191
wait
191
- # => <#Concurrent::Promises::Future:0x7fc5cb010b10 success blocks:[]>
192
+ # => <#Concurrent::Promises::Future:0x7fa60307abb0 success blocks:[]>
192
193
193
194
194
195
### Interoperability with actors
195
196
196
197
actor = Concurrent ::Actor ::Utils ::AdHoc . spawn :square do
197
198
-> v { v ** 2 }
198
199
end
199
- # => #<Concurrent::Actor::Reference:0x7fc5caa35268 /square (Concurrent::Actor::Utils::AdHoc)>
200
+ # => #<Concurrent::Actor::Reference:0x7fa601ab8ea8 /square (Concurrent::Actor::Utils::AdHoc)>
200
201
201
202
202
203
future { 2 } .
210
211
### Interoperability with channels
211
212
212
213
ch1 = Concurrent ::Channel . new
213
- # => #<Concurrent::Channel:0x007fc5ca9f4448 @buffer=#<Concurrent::Channel::Buffer::Unbuffered:0x007fc5ca9f41f0 @__lock__=#<Mutex:0x007fc5ca9f4128 >, @__condition__=#<Thread::ConditionVariable:0x007fc5ca9f4088 >, @closed=false, @size=0, @capacity=1, @buffer=nil, @putting=[], @taking=[]>, @validator=#<Proc:0x007fc5cc0ce970 @/Users/pitr/Workspace/public/concurrent-ruby/lib/concurrent/channel.rb:28 (lambda)>>
214
+ # => #<Concurrent::Channel:0x007fa601a6a2d0 @buffer=#<Concurrent::Channel::Buffer::Unbuffered:0x007fa601a6a230 @__lock__=#<Mutex:0x007fa601a6a168 >, @__condition__=#<Thread::ConditionVariable:0x007fa601a6a140 >, @closed=false, @size=0, @capacity=1, @buffer=nil, @putting=[], @taking=[]>, @validator=#<Proc:0x007fa6028b12d0 @/Users/pitr/Workspace/public/concurrent-ruby/lib/concurrent/channel.rb:28 (lambda)>>
214
215
ch2 = Concurrent ::Channel . new
215
- # => #<Concurrent::Channel:0x007fc5cc89a528 @buffer=#<Concurrent::Channel::Buffer::Unbuffered:0x007fc5cc89a2a8 @__lock__=#<Mutex:0x007fc5cc89a140 >, @__condition__=#<Thread::ConditionVariable:0x007fc5cc89a078 >, @closed=false, @size=0, @capacity=1, @buffer=nil, @putting=[], @taking=[]>, @validator=#<Proc:0x007fc5cc0ce970 @/Users/pitr/Workspace/public/concurrent-ruby/lib/concurrent/channel.rb:28 (lambda)>>
216
+ # => #<Concurrent::Channel:0x007fa601a69380 @buffer=#<Concurrent::Channel::Buffer::Unbuffered:0x007fa601a69308 @__lock__=#<Mutex:0x007fa601a69268 >, @__condition__=#<Thread::ConditionVariable:0x007fa601a69218 >, @closed=false, @size=0, @capacity=1, @buffer=nil, @putting=[], @taking=[]>, @validator=#<Proc:0x007fa6028b12d0 @/Users/pitr/Workspace/public/concurrent-ruby/lib/concurrent/channel.rb:28 (lambda)>>
216
217
217
218
result = select ( ch1 , ch2 )
218
- # => <#Concurrent::Promises::Future:0x7fc5cc892e40 pending blocks:[]>
219
+ # => <#Concurrent::Promises::Future:0x7fa601a62d50 pending blocks:[]>
219
220
ch1 . put 1 # => true
220
221
result . value!
221
- # => [1, #<Concurrent::Channel:0x007fc5ca9f4448 @buffer=#<Concurrent::Channel::Buffer::Unbuffered:0x007fc5ca9f41f0 @__lock__=#<Mutex:0x007fc5ca9f4128 >, @__condition__=#<Thread::ConditionVariable:0x007fc5ca9f4088 >, @closed=false, @size=0, @capacity=1, @buffer=nil, @putting=[], @taking=[]>, @validator=#<Proc:0x007fc5cc0ce970 @/Users/pitr/Workspace/public/concurrent-ruby/lib/concurrent/channel.rb:28 (lambda)>>]
222
+ # => [1, #<Concurrent::Channel:0x007fa601a6a2d0 @buffer=#<Concurrent::Channel::Buffer::Unbuffered:0x007fa601a6a230 @__lock__=#<Mutex:0x007fa601a6a168 >, @__condition__=#<Thread::ConditionVariable:0x007fa601a6a140 >, @closed=false, @size=0, @capacity=1, @buffer=nil, @putting=[], @taking=[]>, @validator=#<Proc:0x007fa6028b12d0 @/Users/pitr/Workspace/public/concurrent-ruby/lib/concurrent/channel.rb:28 (lambda)>>]
222
223
223
224
224
225
future { 1 +1 } .
225
226
then_put ( ch1 )
226
- # => <#Concurrent::Promises::Future:0x7fc5cc87b920 pending blocks:[]>
227
+ # => <#Concurrent::Promises::Future:0x7fa601a51910 pending blocks:[]>
227
228
result = future { '%02d' } .
228
229
then_select ( ch1 , ch2 ) .
229
230
then { |format , ( value , channel ) | format format , value }
230
- # => <#Concurrent::Promises::Future:0x7fc5cc862f60 pending blocks:[]>
231
+ # => <#Concurrent::Promises::Future:0x7fa601a2b008 pending blocks:[]>
231
232
result . value! # => "02"
232
233
233
234
234
235
### Common use-cases Examples
235
236
236
237
# simple background processing
237
238
future { do_stuff }
238
- # => <#Concurrent::Promises::Future:0x7fc5cc080518 pending blocks:[]>
239
+ # => <#Concurrent::Promises::Future:0x7fa601a1b478 pending blocks:[]>
239
240
240
241
# parallel background processing
241
242
jobs = 10 . times . map { |i | future { i } }
242
243
zip ( *jobs ) . value # => [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
243
244
244
245
245
246
# periodic task
246
- DONE = Concurrent ::AtomicBoolean . new false # => #<Concurrent::AtomicBoolean:0x007fc5cc079a38>
247
-
248
- def schedule_job
249
- schedule ( 1 ) { do_stuff } .
250
- rescue { |e | StandardError === e ? report_error ( e ) : raise ( e ) } .
251
- then { schedule_job unless DONE . true? }
247
+ def schedule_job ( interval , &job )
248
+ # schedule the first execution and chain restart og the job
249
+ Concurrent . schedule ( interval , &job ) . chain do |success , continue , reason |
250
+ if success
251
+ schedule_job ( interval , &job ) if continue
252
+ else
253
+ # handle error
254
+ p reason
255
+ # retry
256
+ schedule_job ( interval , &job )
257
+ end
258
+ end
252
259
end # => :schedule_job
253
260
254
- schedule_job
255
- # => <#Concurrent::Promises::Future:0x7fc5ca9949d0 pending blocks:[]>
256
- DONE . make_true # => true
261
+ queue = Queue . new # => #<Thread::Queue:0x007fa6020d2cf8>
262
+ count = 0 # => 0
263
+
264
+ schedule_job 0.05 do
265
+ queue . push count
266
+ count += 1
267
+ # to continue scheduling return true, false will end the task
268
+ if count < 4
269
+ # to continue scheduling return true
270
+ true
271
+ else
272
+ queue . push nil
273
+ # to end the task return false
274
+ false
275
+ end
276
+ end
277
+ # => <#Concurrent::Promises::Future:0x7fa6020c23d0 pending blocks:[]>
257
278
279
+ # read the queue
280
+ arr , v = [ ] , nil ; arr << v while ( v = queue . pop )
281
+ arr # => [0, 1, 2, 3]
258
282
259
283
# How to limit processing where there are limited resources?
260
284
# By creating an actor managing the resource
@@ -265,7 +289,7 @@ def schedule_job
265
289
data [ message ]
266
290
end
267
291
end
268
- # => #<Concurrent::Actor::Reference:0x7fc5cc843318 /db (Concurrent::Actor::Utils::AdHoc)>
292
+ # => #<Concurrent::Actor::Reference:0x7fa6019b8788 /db (Concurrent::Actor::Utils::AdHoc)>
269
293
270
294
concurrent_jobs = 11 . times . map do |v |
271
295
@@ -295,7 +319,7 @@ def schedule_job
295
319
end
296
320
end
297
321
end
298
- # => #<Concurrent::Actor::Reference:0x7fc5ca89a160 /DB-pool (Concurrent::Actor::Utils::Pool)>
322
+ # => #<Concurrent::Actor::Reference:0x7fa60284b278 /DB-pool (Concurrent::Actor::Utils::Pool)>
299
323
300
324
concurrent_jobs = 11 . times . map do |v |
301
325
0 commit comments