1
1
### Simple asynchronous task
2
2
3
3
future = Concurrent . future { sleep 0.1 ; 1 + 1 } # evaluation starts immediately
4
- # => <#Concurrent::Edge::Future:0x7fcc038ca588 pending blocks:[]>
4
+ # => <#Concurrent::Edge::Future:0x7fd41a956d70 pending blocks:[]>
5
5
future . completed? # => false
6
6
# block until evaluated
7
7
future . value # => 2
11
11
### Failing asynchronous task
12
12
13
13
future = Concurrent . future { raise 'Boom' }
14
- # => <#Concurrent::Edge::Future:0x7fcc038c05b0 pending blocks:[]>
14
+ # => <#Concurrent::Edge::Future:0x7fd41a946c18 failed blocks:[]>
15
15
future . value # => nil
16
16
future . value! rescue $! # => #<RuntimeError: Boom>
17
17
future . reason # => #<RuntimeError: Boom>
21
21
22
22
### Chaining
23
23
24
- head = Concurrent . future { 1 }
24
+ head = Concurrent . completed_future 1
25
25
branch1 = head . then ( &:succ )
26
26
branch2 = head . then ( &:succ ) . then ( &:succ )
27
27
branch1 . zip ( branch2 ) . value! # => [2, 3]
32
32
# pick only first completed
33
33
( branch1 | branch2 ) . value! # => 2
34
34
35
- # auto splat arrays for blocks
36
- Concurrent . future { [ 1 , 2 ] } . then ( &:+ ) . value! # => 3
37
-
38
-
39
35
### Error handling
40
36
41
37
Concurrent . future { Object . new } . then ( &:succ ) . then ( &:succ ) . rescue { |e | e . class } . value # error propagates
50
46
51
47
# will not evaluate until asked by #value or other method requiring completion
52
48
future = Concurrent . delay { 'lazy' }
53
- # => <#Concurrent::Edge::Future:0x7fcc022afaa0 pending blocks:[]>
49
+ # => <#Concurrent::Edge::Future:0x7fd41a8f6ec0 pending blocks:[]>
54
50
sleep 0.1
55
51
future . completed? # => false
56
52
future . value # => "lazy"
57
53
58
54
# propagates trough chain allowing whole or partial lazy chains
59
55
60
56
head = Concurrent . delay { 1 }
61
- # => <#Concurrent::Edge::Future:0x7fcc022a5640 pending blocks:[]>
57
+ # => <#Concurrent::Edge::Future:0x7fd41a8ee1a8 pending blocks:[]>
62
58
branch1 = head . then ( &:succ )
63
- # => <#Concurrent::Edge::Future:0x7fcc0229c478 pending blocks:[]>
59
+ # => <#Concurrent::Edge::Future:0x7fd41a8ed438 pending blocks:[]>
64
60
branch2 = head . delay . then ( &:succ )
65
- # => <#Concurrent::Edge::Future:0x7fcc0228f318 pending blocks:[]>
61
+ # => <#Concurrent::Edge::Future:0x7fd41a8e7588 pending blocks:[]>
66
62
join = branch1 & branch2
67
- # => <#Concurrent::Edge::Future:0x7fcc0228de78 pending blocks:[]>
63
+ # => <#Concurrent::Edge::Future:0x7fd41a8e4ec8 pending blocks:[]>
68
64
69
65
sleep 0.1 # nothing will complete # => 0
70
66
[ head , branch1 , branch2 , join ] . map ( &:completed? ) # => [false, false, false, false]
71
67
72
68
branch1 . value # => 2
73
69
sleep 0.1 # forces only head to complete, branch 2 stays incomplete
74
- # => 0
70
+ # => 1
75
71
[ head , branch1 , branch2 , join ] . map ( &:completed? ) # => [true, true, false, false]
76
72
77
73
join . value # => [2, 2]
92
88
### Schedule
93
89
94
90
scheduled = Concurrent . schedule ( 0.1 ) { 1 }
95
- # => <#Concurrent::Edge::Future:0x7fcc0385b368 pending blocks:[]>
91
+ # => <#Concurrent::Edge::Future:0x7fd41a8a4468 pending blocks:[]>
96
92
97
93
scheduled . completed? # => false
98
94
scheduled . value # available after 0.1sec # => 1
99
95
100
96
# and in chain
101
97
scheduled = Concurrent . delay { 1 } . schedule ( 0.1 ) . then ( &:succ )
102
- # => <#Concurrent::Edge::Future:0x7fcc03843948 pending blocks:[]>
98
+ # => <#Concurrent::Edge::Future:0x7fd41a895828 pending blocks:[]>
103
99
# will not be scheduled until value is requested
104
100
sleep 0.1
105
101
scheduled . value # returns after another 0.1sec # => 2
108
104
### Completable Future and Event
109
105
110
106
future = Concurrent . future
111
- # => <#Concurrent::Edge::CompletableFuture:0x7fcc0304fdb8 pending blocks:[]>
107
+ # => <#Concurrent::Edge::CompletableFuture:0x7fd41a87c648 pending blocks:[]>
112
108
event = Concurrent . event
113
- # => <#Concurrent::Edge::CompletableEvent:0x7fcc0304e210 pending blocks:[]>
109
+ # => <#Concurrent::Edge::CompletableEvent:0x7fd41a8770d0 pending blocks:[]>
110
+ # Don't forget to keep the reference, `Concurrent.future.then { |v| v }` is incompletable
114
111
115
112
# will be blocked until completed
116
113
t1 = Thread . new { future . value }
117
114
t2 = Thread . new { event . wait }
118
115
119
116
future . success 1
120
- # => <#Concurrent::Edge::CompletableFuture:0x7fcc0304fdb8 success blocks:[]>
117
+ # => <#Concurrent::Edge::CompletableFuture:0x7fd41a87c648 success blocks:[]>
121
118
future . success 1 rescue $!
122
- # => #<Concurrent::MultipleAssignmentError: multiple assignment >
119
+ # => #<Concurrent::MultipleAssignmentError: Future can be completed only once. Current result is [true, 1, nil], trying to set [true, 1, nil] >
123
120
future . try_success 2 # => false
124
121
event . complete
125
- # => <#Concurrent::Edge::CompletableEvent:0x7fcc0304e210 completed blocks:[]>
122
+ # => <#Concurrent::Edge::CompletableEvent:0x7fd41a8770d0 completed blocks:[]>
126
123
127
124
[ t1 , t2 ] . each &:join
128
125
129
126
130
127
### Callbacks
131
128
132
- queue = Queue . new # => #<Thread::Queue:0x007fcc03101720 >
129
+ queue = Queue . new # => #<Thread::Queue:0x007fd41b0543e8 >
133
130
future = Concurrent . delay { 1 + 1 }
134
- # => <#Concurrent::Edge::Future:0x7fcc030fbaf0 pending blocks:[]>
131
+ # => <#Concurrent::Edge::Future:0x7fd41b04f028 pending blocks:[]>
135
132
136
133
future . on_success { queue << 1 } # evaluated asynchronously
137
- # => <#Concurrent::Edge::Future:0x7fcc030fbaf0 pending blocks:[]>
134
+ # => <#Concurrent::Edge::Future:0x7fd41b04f028 pending blocks:[]>
138
135
future . on_success! { queue << 2 } # evaluated on completing thread
139
- # => <#Concurrent::Edge::Future:0x7fcc030fbaf0 pending blocks:[]>
136
+ # => <#Concurrent::Edge::Future:0x7fd41b04f028 pending blocks:[]>
140
137
141
138
queue . empty? # => true
142
139
future . value # => 2
147
144
### Thread-pools
148
145
149
146
Concurrent . future ( :fast ) { 2 } . then ( :io ) { File . read __FILE__ } . wait
150
- # => <#Concurrent::Edge::Future:0x7fcc030e8bd0 success blocks:[]>
147
+ # => <#Concurrent::Edge::Future:0x7fd41a857550 success blocks:[]>
151
148
152
149
153
150
### Interoperability with actors
154
151
155
152
actor = Concurrent ::Actor ::Utils ::AdHoc . spawn :square do
156
153
-> v { v ** 2 }
157
154
end
158
- # => #<Concurrent::Actor::Reference:0x7fcc0223f020 /square (Concurrent::Actor::Utils::AdHoc)>
155
+ # => #<Concurrent::Actor::Reference:0x7fd41c03b5b8 /square (Concurrent::Actor::Utils::AdHoc)>
159
156
160
157
Concurrent .
161
158
future { 2 } .
168
165
169
166
### Interoperability with channels
170
167
171
- ch1 = Concurrent ::Edge ::Channel . new # => #<Concurrent::Edge::Channel:0x007fcc021ec6b8 >
172
- ch2 = Concurrent ::Edge ::Channel . new # => #<Concurrent::Edge::Channel:0x007fcc021e7b18 >
168
+ ch1 = Concurrent ::Edge ::Channel . new # => #<Concurrent::Edge::Channel:0x007fd41a297458 >
169
+ ch2 = Concurrent ::Edge ::Channel . new # => #<Concurrent::Edge::Channel:0x007fd41a296940 >
173
170
174
171
result = Concurrent . select ( ch1 , ch2 )
175
- # => <#Concurrent::Edge::CompletableFuture:0x7fcc021e6060 pending blocks:[]>
172
+ # => <#Concurrent::Edge::CompletableFuture:0x7fd41a295c98 pending blocks:[]>
176
173
ch1 . push 1 # => nil
177
174
result . value!
178
- # => [1, #<Concurrent::Edge::Channel:0x007fcc021ec6b8 >]
175
+ # => [1, #<Concurrent::Edge::Channel:0x007fd41a297458 >]
179
176
180
177
Concurrent .
181
178
future { 1 +1 } .
182
179
then_push ( ch1 )
183
- # => <#Concurrent::Edge::Future:0x7fcc021dc3d0 pending blocks:[]>
180
+ # => <#Concurrent::Edge::Future:0x7fd41a284010 pending blocks:[]>
184
181
result = Concurrent .
185
182
future { '%02d' } .
186
183
then_select ( ch1 , ch2 ) .
187
184
then { |format , ( value , channel ) | format format , value }
188
- # => <#Concurrent::Edge::Future:0x7fcc021cd9e8 pending blocks:[]>
185
+ # => <#Concurrent::Edge::Future:0x7fd41a25d938 pending blocks:[]>
189
186
result . value! # => "02"
190
187
191
188
192
189
### Common use-cases Examples
193
190
194
191
# simple background processing
195
192
Concurrent . future { do_stuff }
196
- # => <#Concurrent::Edge::Future:0x7fcc021b7530 pending blocks:[]>
193
+ # => <#Concurrent::Edge::Future:0x7fd41a23d520 pending blocks:[]>
197
194
198
195
# parallel background processing
199
196
jobs = 10 . times . map { |i | Concurrent . future { i } }
@@ -210,7 +207,7 @@ def schedule_job
210
207
end # => :schedule_job
211
208
212
209
schedule_job
213
- # => <#Concurrent::Edge::Future:0x7fcc0218faf8 pending blocks:[]>
210
+ # => <#Concurrent::Edge::Future:0x7fd41a2245e8 pending blocks:[]>
214
211
@end = true # => true
215
212
216
213
@@ -223,7 +220,7 @@ def schedule_job
223
220
data [ message ]
224
221
end
225
222
end
226
- # => #<Concurrent::Actor::Reference:0x7fcc0214f458 /db (Concurrent::Actor::Utils::AdHoc)>
223
+ # => #<Concurrent::Actor::Reference:0x7fd41a206228 /db (Concurrent::Actor::Utils::AdHoc)>
227
224
228
225
concurrent_jobs = 11 . times . map do |v |
229
226
Concurrent .
@@ -253,7 +250,7 @@ def schedule_job
253
250
end
254
251
end
255
252
end
256
- # => #<Concurrent::Actor::Reference:0x7fcc02930398 /DB-pool (Concurrent::Actor::Utils::Pool)>
253
+ # => #<Concurrent::Actor::Reference:0x7fd41a0530c0 /DB-pool (Concurrent::Actor::Utils::Pool)>
257
254
258
255
concurrent_jobs = 11 . times . map do |v |
259
256
Concurrent .
0 commit comments