1
1
### Simple asynchronous task
2
2
3
3
future = Concurrent . future { sleep 0.1 ; 1 + 1 } # evaluation starts immediately
4
- # => <#Concurrent::Edge::Future:0x7fc6218f2318 pending blocks:[]>
4
+ # => <#Concurrent::Edge::Future:0x7fcc73208180 pending blocks:[]>
5
5
future . completed? # => false
6
6
# block until evaluated
7
7
future . value # => 2
11
11
### Failing asynchronous task
12
12
13
13
future = Concurrent . future { raise 'Boom' }
14
- # => <#Concurrent::Edge::Future:0x7fc6218eae38 pending blocks:[]>
14
+ # => <#Concurrent::Edge::Future:0x7fcc731fa0a8 pending blocks:[]>
15
15
future . value # => nil
16
16
future . value! rescue $! # => #<RuntimeError: Boom>
17
17
future . reason # => #<RuntimeError: Boom>
21
21
22
22
### Chaining
23
23
24
- head = Concurrent . completed_future 1
24
+ head = Concurrent . succeeded_future 1
25
25
branch1 = head . then ( &:succ )
26
26
branch2 = head . then ( &:succ ) . then ( &:succ )
27
27
branch1 . zip ( branch2 ) . value! # => [2, 3]
41
41
Concurrent . future { 1 } . then ( &:succ ) . rescue { |e | e . message } . then ( &:succ ) . value
42
42
# => 3
43
43
44
- failing_zip = Concurrent . completed_future ( 1 ) & Concurrent . future { raise 'boom' }
45
- # => <#Concurrent::Edge::Future:0x7fc6218b0f08 pending blocks:[]>
46
- failing_zip . result # => [false, [1, nil], [nil, #<RuntimeError : boom>]]
47
- failing_zip . then { |v | 'never happens' } . result # => [false, [1, nil], [nil, #<RuntimeError : boom>]]
44
+ failing_zip = Concurrent . succeeded_future ( 1 ) & Concurrent . failed_future ( StandardError . new ( 'boom' ) )
45
+ # => <#Concurrent::Edge::Future:0x7fcc731c00b0 failed blocks:[]>
46
+ failing_zip . result # => [false, [1, nil], [nil, #<StandardError : boom>]]
47
+ failing_zip . then { |v | 'never happens' } . result # => [false, [1, nil], [nil, #<StandardError : boom>]]
48
48
failing_zip . rescue { |a , b | ( a || b ) . message } . value
49
49
# => "boom"
50
50
failing_zip . chain { |success , values , reasons | [ success , values . compact , reasons . compactß ] } . value
54
54
55
55
# will not evaluate until asked by #value or other method requiring completion
56
56
future = Concurrent . delay { 'lazy' }
57
- # => <#Concurrent::Edge::Future:0x7fc6218a37e0 pending blocks:[]>
57
+ # => <#Concurrent::Edge::Future:0x7fcc731a1840 pending blocks:[]>
58
58
sleep 0.1
59
59
future . completed? # => false
60
60
future . value # => "lazy"
61
61
62
62
# propagates trough chain allowing whole or partial lazy chains
63
63
64
64
head = Concurrent . delay { 1 }
65
- # => <#Concurrent::Edge::Future:0x7fc6218a0720 pending blocks:[]>
65
+ # => <#Concurrent::Edge::Future:0x7fcc73193b28 pending blocks:[]>
66
66
branch1 = head . then ( &:succ )
67
- # => <#Concurrent::Edge::Future:0x7fc6212c7b50 pending blocks:[]>
67
+ # => <#Concurrent::Edge::Future:0x7fcc73190900 pending blocks:[]>
68
68
branch2 = head . delay . then ( &:succ )
69
- # => <#Concurrent::Edge::Future:0x7fc6212c6098 pending blocks:[]>
69
+ # => <#Concurrent::Edge::Future:0x7fcc7318b400 pending blocks:[]>
70
70
join = branch1 & branch2
71
- # => <#Concurrent::Edge::Future:0x7fc6212c4f40 pending blocks:[]>
71
+ # => <#Concurrent::Edge::Future:0x7fcc73180af0 pending blocks:[]>
72
72
73
73
sleep 0.1 # nothing will complete # => 0
74
74
[ head , branch1 , branch2 , join ] . map ( &:completed? ) # => [false, false, false, false]
75
75
76
76
branch1 . value # => 2
77
77
sleep 0.1 # forces only head to complete, branch 2 stays incomplete
78
- # => 1
78
+ # => 0
79
79
[ head , branch1 , branch2 , join ] . map ( &:completed? ) # => [true, true, false, false]
80
80
81
81
join . value # => [2, 2]
96
96
### Schedule
97
97
98
98
scheduled = Concurrent . schedule ( 0.1 ) { 1 }
99
- # => <#Concurrent::Edge::Future:0x7fc62128c550 pending blocks:[]>
99
+ # => <#Concurrent::Edge::Future:0x7fcc73143e48 pending blocks:[]>
100
100
101
101
scheduled . completed? # => false
102
102
scheduled . value # available after 0.1sec # => 1
103
103
104
104
# and in chain
105
105
scheduled = Concurrent . delay { 1 } . schedule ( 0.1 ) . then ( &:succ )
106
- # => <#Concurrent::Edge::Future:0x7fc6228bcdc0 pending blocks:[]>
106
+ # => <#Concurrent::Edge::Future:0x7fcc7313a758 pending blocks:[]>
107
107
# will not be scheduled until value is requested
108
108
sleep 0.1
109
109
scheduled . value # returns after another 0.1sec # => 2
112
112
### Completable Future and Event
113
113
114
114
future = Concurrent . future
115
- # => <#Concurrent::Edge::CompletableFuture:0x7fc623083720 pending blocks:[]>
115
+ # => <#Concurrent::Edge::CompletableFuture:0x7fcc731286e8 pending blocks:[]>
116
116
event = Concurrent . event
117
- # => <#Concurrent::Edge::CompletableEvent:0x7fc623081100 pending blocks:[]>
117
+ # => <#Concurrent::Edge::CompletableEvent:0x7fcc73123058 pending blocks:[]>
118
118
# Don't forget to keep the reference, `Concurrent.future.then { |v| v }` is incompletable
119
119
120
120
# will be blocked until completed
121
121
t1 = Thread . new { future . value }
122
122
t2 = Thread . new { event . wait }
123
123
124
124
future . success 1
125
- # => <#Concurrent::Edge::CompletableFuture:0x7fc623083720 success blocks:[]>
125
+ # => <#Concurrent::Edge::CompletableFuture:0x7fcc731286e8 success blocks:[]>
126
126
future . success 1 rescue $!
127
127
# => #<Concurrent::MultipleAssignmentError: Future can be completed only once. Current result is [true, 1, nil], trying to set [true, 1, nil]>
128
128
future . try_success 2 # => false
129
129
event . complete
130
- # => <#Concurrent::Edge::CompletableEvent:0x7fc623081100 completed blocks:[]>
130
+ # => <#Concurrent::Edge::CompletableEvent:0x7fcc73123058 completed blocks:[]>
131
131
132
132
[ t1 , t2 ] . each &:join
133
133
134
134
135
135
### Callbacks
136
136
137
- queue = Queue . new # => #<Thread::Queue:0x007fc62127df00 >
137
+ queue = Queue . new # => #<Thread::Queue:0x007fcc73110638 >
138
138
future = Concurrent . delay { 1 + 1 }
139
- # => <#Concurrent::Edge::Future:0x7fc62127c060 pending blocks:[]>
139
+ # => <#Concurrent::Edge::Future:0x7fcc7310ab98 pending blocks:[]>
140
140
141
141
future . on_success { queue << 1 } # evaluated asynchronously
142
- # => <#Concurrent::Edge::Future:0x7fc62127c060 pending blocks:[]>
142
+ # => <#Concurrent::Edge::Future:0x7fcc7310ab98 pending blocks:[]>
143
143
future . on_success! { queue << 2 } # evaluated on completing thread
144
- # => <#Concurrent::Edge::Future:0x7fc62127c060 pending blocks:[]>
144
+ # => <#Concurrent::Edge::Future:0x7fcc7310ab98 pending blocks:[]>
145
145
146
146
queue . empty? # => true
147
147
future . value # => 2
152
152
### Thread-pools
153
153
154
154
Concurrent . future ( :fast ) { 2 } . then ( :io ) { File . read __FILE__ } . wait
155
- # => <#Concurrent::Edge::Future:0x7fc62125d9a8 success blocks:[]>
155
+ # => <#Concurrent::Edge::Future:0x7fcc730f98e8 success blocks:[]>
156
156
157
157
158
158
### Interoperability with actors
159
159
160
160
actor = Concurrent ::Actor ::Utils ::AdHoc . spawn :square do
161
161
-> v { v ** 2 }
162
162
end
163
- # => #<Concurrent::Actor::Reference:0x7fc621234a30 /square (Concurrent::Actor::Utils::AdHoc)>
163
+ # => #<Concurrent::Actor::Reference:0x7fcc730c36f8 /square (Concurrent::Actor::Utils::AdHoc)>
164
164
165
165
Concurrent .
166
166
future { 2 } .
173
173
174
174
### Interoperability with channels
175
175
176
- ch1 = Concurrent ::Edge ::Channel . new # => #<Concurrent::Edge::Channel:0x007fc621205460 >
177
- ch2 = Concurrent ::Edge ::Channel . new # => #<Concurrent::Edge::Channel:0x007fc6212041c8 >
176
+ ch1 = Concurrent ::Edge ::Channel . new # => #<Concurrent::Edge::Channel:0x007fcc73043188 >
177
+ ch2 = Concurrent ::Edge ::Channel . new # => #<Concurrent::Edge::Channel:0x007fcc730425f8 >
178
178
179
179
result = Concurrent . select ( ch1 , ch2 )
180
- # => <#Concurrent::Edge::CompletableFuture:0x7fc6211fe5c0 pending blocks:[]>
180
+ # => <#Concurrent::Edge::CompletableFuture:0x7fcc730411a8 pending blocks:[]>
181
181
ch1 . push 1 # => nil
182
182
result . value!
183
- # => [1, #<Concurrent::Edge::Channel:0x007fc621205460 >]
183
+ # => [1, #<Concurrent::Edge::Channel:0x007fcc73043188 >]
184
184
185
185
Concurrent .
186
186
future { 1 +1 } .
187
187
then_push ( ch1 )
188
- # => <#Concurrent::Edge::Future:0x7fc6211f7338 pending blocks:[]>
188
+ # => <#Concurrent::Edge::Future:0x7fcc73032c98 pending blocks:[]>
189
189
result = Concurrent .
190
190
future { '%02d' } .
191
191
then_select ( ch1 , ch2 ) .
192
192
then { |format , ( value , channel ) | format format , value }
193
- # => <#Concurrent::Edge::Future:0x7fc6211ec668 pending blocks:[]>
193
+ # => <#Concurrent::Edge::Future:0x7fcc7302a4f8 pending blocks:[]>
194
194
result . value! # => "02"
195
195
196
196
197
197
### Common use-cases Examples
198
198
199
199
# simple background processing
200
200
Concurrent . future { do_stuff }
201
- # => <#Concurrent::Edge::Future:0x7fc6211df170 pending blocks:[]>
201
+ # => <#Concurrent::Edge::Future:0x7fcc72123c48 pending blocks:[]>
202
202
203
203
# parallel background processing
204
204
jobs = 10 . times . map { |i | Concurrent . future { i } }
@@ -215,7 +215,7 @@ def schedule_job
215
215
end # => :schedule_job
216
216
217
217
schedule_job
218
- # => <#Concurrent::Edge::Future:0x7fc62119c140 pending blocks:[]>
218
+ # => <#Concurrent::Edge::Future:0x7fcc75011370 pending blocks:[]>
219
219
@end = true # => true
220
220
221
221
@@ -228,7 +228,7 @@ def schedule_job
228
228
data [ message ]
229
229
end
230
230
end
231
- # => #<Concurrent::Actor::Reference:0x7fc62117f568 /db (Concurrent::Actor::Utils::AdHoc)>
231
+ # => #<Concurrent::Actor::Reference:0x7fcc71832a08 /db (Concurrent::Actor::Utils::AdHoc)>
232
232
233
233
concurrent_jobs = 11 . times . map do |v |
234
234
Concurrent .
@@ -258,7 +258,7 @@ def schedule_job
258
258
end
259
259
end
260
260
end
261
- # => #<Concurrent::Actor::Reference:0x7fc6218969f0 /DB-pool (Concurrent::Actor::Utils::Pool)>
261
+ # => #<Concurrent::Actor::Reference:0x7fcc72320118 /DB-pool (Concurrent::Actor::Utils::Pool)>
262
262
263
263
concurrent_jobs = 11 . times . map do |v |
264
264
Concurrent .
0 commit comments