Skip to content

Commit 500cd3e

Browse files
committed
Edge::Future: auto splat array values when used as arguments for blocks
1 parent eb1650c commit 500cd3e

File tree

4 files changed

+232
-151
lines changed

4 files changed

+232
-151
lines changed

examples/edge_futures.in.rb

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,15 @@
2222
head = Concurrent.future { 1 } #
2323
branch1 = head.then(&:succ) #
2424
branch2 = head.then(&:succ).then(&:succ) #
25-
branch1.zip(branch2).value
25+
branch1.zip(branch2).value!
2626
(branch1 & branch2).then { |a, b| a + b }.value!
2727
(branch1 & branch2).then(&:+).value!
2828
Concurrent.zip(branch1, branch2, branch1).then { |*values| values.reduce &:+ }.value!
2929
# pick only first completed
30-
(branch1 | branch2).value
30+
(branch1 | branch2).value!
31+
32+
# auto splat arrays for blocks
33+
Concurrent.future { [1, 2] }.then(&:+).value!
3134

3235

3336
### Error handling
@@ -209,7 +212,7 @@ def schedule_job
209212

210213
DB_POOL = Concurrent::Actor::Utils::Pool.spawn!('DB-pool', pool_size) do |index|
211214
# DB connection constructor
212-
Concurrent::Actor::Utils::AdHoc.spawn(name: "worker-#{index}", args: [data]) do
215+
Concurrent::Actor::Utils::AdHoc.spawn(name: "worker-#{index}", args: [data]) do |data|
213216
lambda do |message|
214217
# pretending that this queries a DB
215218
data[message]

examples/edge_futures.out.rb

Lines changed: 35 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
### Simple asynchronous task
22

33
future = Concurrent.future { sleep 0.1; 1 + 1 } # evaluation starts immediately
4-
# => <#Concurrent::Edge::Future:0x7fa1b59bbbb8 pending blocks:[]>
4+
# => <#Concurrent::Edge::Future:0x7fcc038ca588 pending blocks:[]>
55
future.completed? # => false
66
# block until evaluated
77
future.value # => 2
@@ -11,7 +11,7 @@
1111
### Failing asynchronous task
1212

1313
future = Concurrent.future { raise 'Boom' }
14-
# => <#Concurrent::Edge::Future:0x7fa1b59b2fe0 failed blocks:[]>
14+
# => <#Concurrent::Edge::Future:0x7fcc038c05b0 pending blocks:[]>
1515
future.value # => nil
1616
future.value! rescue $! # => #<RuntimeError: Boom>
1717
future.reason # => #<RuntimeError: Boom>
@@ -24,13 +24,16 @@
2424
head = Concurrent.future { 1 }
2525
branch1 = head.then(&:succ)
2626
branch2 = head.then(&:succ).then(&:succ)
27-
branch1.zip(branch2).value # => [2, 3]
27+
branch1.zip(branch2).value! # => [2, 3]
2828
(branch1 & branch2).then { |a, b| a + b }.value! # => 5
2929
(branch1 & branch2).then(&:+).value! # => 5
3030
Concurrent.zip(branch1, branch2, branch1).then { |*values| values.reduce &:+ }.value!
3131
# => 7
3232
# pick only first completed
33-
(branch1 | branch2).value # => 2
33+
(branch1 | branch2).value! # => 2
34+
35+
# auto splat arrays for blocks
36+
Concurrent.future { [1, 2] }.then(&:+).value! # => 3
3437

3538

3639
### Error handling
@@ -47,21 +50,21 @@
4750

4851
# will not evaluate until asked by #value or other method requiring completion
4952
future = Concurrent.delay { 'lazy' }
50-
# => <#Concurrent::Edge::Future:0x7fa1b5948f00 pending blocks:[]>
53+
# => <#Concurrent::Edge::Future:0x7fcc022afaa0 pending blocks:[]>
5154
sleep 0.1
5255
future.completed? # => false
5356
future.value # => "lazy"
5457

5558
# propagates trough chain allowing whole or partial lazy chains
5659

5760
head = Concurrent.delay { 1 }
58-
# => <#Concurrent::Edge::Future:0x7fa1b59401c0 pending blocks:[]>
61+
# => <#Concurrent::Edge::Future:0x7fcc022a5640 pending blocks:[]>
5962
branch1 = head.then(&:succ)
60-
# => <#Concurrent::Edge::Future:0x7fa1b5933290 pending blocks:[]>
63+
# => <#Concurrent::Edge::Future:0x7fcc0229c478 pending blocks:[]>
6164
branch2 = head.delay.then(&:succ)
62-
# => <#Concurrent::Edge::Future:0x7fa1b5930c98 pending blocks:[]>
65+
# => <#Concurrent::Edge::Future:0x7fcc0228f318 pending blocks:[]>
6366
join = branch1 & branch2
64-
# => <#Concurrent::Edge::ArrayFuture:0x7fa1b592b7c0 pending blocks:[]>
67+
# => <#Concurrent::Edge::Future:0x7fcc0228de78 pending blocks:[]>
6568

6669
sleep 0.1 # nothing will complete # => 0
6770
[head, branch1, branch2, join].map(&:completed?) # => [false, false, false, false]
@@ -89,14 +92,14 @@
8992
### Schedule
9093

9194
scheduled = Concurrent.schedule(0.1) { 1 }
92-
# => <#Concurrent::Edge::Future:0x7fa1b49df190 pending blocks:[]>
95+
# => <#Concurrent::Edge::Future:0x7fcc0385b368 pending blocks:[]>
9396

9497
scheduled.completed? # => false
9598
scheduled.value # available after 0.1sec # => 1
9699

97100
# and in chain
98101
scheduled = Concurrent.delay { 1 }.schedule(0.1).then(&:succ)
99-
# => <#Concurrent::Edge::Future:0x7fa1b49d4ec0 pending blocks:[]>
102+
# => <#Concurrent::Edge::Future:0x7fcc03843948 pending blocks:[]>
100103
# will not be scheduled until value is requested
101104
sleep 0.1
102105
scheduled.value # returns after another 0.1sec # => 2
@@ -105,35 +108,35 @@
105108
### Completable Future and Event
106109

107110
future = Concurrent.future
108-
# => <#Concurrent::Edge::CompletableFuture:0x7fa1b49bce10 pending blocks:[]>
111+
# => <#Concurrent::Edge::CompletableFuture:0x7fcc0304fdb8 pending blocks:[]>
109112
event = Concurrent.event
110-
# => <#Concurrent::Edge::CompletableEvent:0x7fa1b49bc118 pending blocks:[]>
113+
# => <#Concurrent::Edge::CompletableEvent:0x7fcc0304e210 pending blocks:[]>
111114

112115
# will be blocked until completed
113116
t1 = Thread.new { future.value }
114117
t2 = Thread.new { event.wait }
115118

116119
future.success 1
117-
# => <#Concurrent::Edge::CompletableFuture:0x7fa1b49bce10 success blocks:[]>
120+
# => <#Concurrent::Edge::CompletableFuture:0x7fcc0304fdb8 success blocks:[]>
118121
future.success 1 rescue $!
119122
# => #<Concurrent::MultipleAssignmentError: multiple assignment>
120123
future.try_success 2 # => false
121124
event.complete
122-
# => <#Concurrent::Edge::CompletableEvent:0x7fa1b49bc118 completed blocks:[]>
125+
# => <#Concurrent::Edge::CompletableEvent:0x7fcc0304e210 completed blocks:[]>
123126

124127
[t1, t2].each &:join
125128

126129

127130
### Callbacks
128131

129-
queue = Queue.new # => #<Thread::Queue:0x007fa1b49acec0>
132+
queue = Queue.new # => #<Thread::Queue:0x007fcc03101720>
130133
future = Concurrent.delay { 1 + 1 }
131-
# => <#Concurrent::Edge::Future:0x7fa1b49ac150 pending blocks:[]>
134+
# => <#Concurrent::Edge::Future:0x7fcc030fbaf0 pending blocks:[]>
132135

133136
future.on_success { queue << 1 } # evaluated asynchronously
134-
# => <#Concurrent::Edge::Future:0x7fa1b49ac150 pending blocks:[]>
137+
# => <#Concurrent::Edge::Future:0x7fcc030fbaf0 pending blocks:[]>
135138
future.on_success! { queue << 2 } # evaluated on completing thread
136-
# => <#Concurrent::Edge::Future:0x7fa1b49ac150 pending blocks:[]>
139+
# => <#Concurrent::Edge::Future:0x7fcc030fbaf0 pending blocks:[]>
137140

138141
queue.empty? # => true
139142
future.value # => 2
@@ -144,15 +147,15 @@
144147
### Thread-pools
145148

146149
Concurrent.future(:fast) { 2 }.then(:io) { File.read __FILE__ }.wait
147-
# => <#Concurrent::Edge::Future:0x7fa1b498dd18 success blocks:[]>
150+
# => <#Concurrent::Edge::Future:0x7fcc030e8bd0 success blocks:[]>
148151

149152

150153
### Interoperability with actors
151154

152155
actor = Concurrent::Actor::Utils::AdHoc.spawn :square do
153156
-> v { v ** 2 }
154157
end
155-
# => #<Concurrent::Actor::Reference /square (Concurrent::Actor::Utils::AdHoc)>
158+
# => #<Concurrent::Actor::Reference:0x7fcc0223f020 /square (Concurrent::Actor::Utils::AdHoc)>
156159

157160
Concurrent.
158161
future { 2 }.
@@ -165,32 +168,32 @@
165168

166169
### Interoperability with channels
167170

168-
ch1 = Concurrent::Edge::Channel.new # => #<Concurrent::Edge::Channel:0x007fa1b41c5e28>
169-
ch2 = Concurrent::Edge::Channel.new # => #<Concurrent::Edge::Channel:0x007fa1b41c5338>
171+
ch1 = Concurrent::Edge::Channel.new # => #<Concurrent::Edge::Channel:0x007fcc021ec6b8>
172+
ch2 = Concurrent::Edge::Channel.new # => #<Concurrent::Edge::Channel:0x007fcc021e7b18>
170173

171174
result = Concurrent.select(ch1, ch2)
172-
# => <#Concurrent::Edge::CompletableFuture:0x7fa1b41bf4d8 pending blocks:[]>
175+
# => <#Concurrent::Edge::CompletableFuture:0x7fcc021e6060 pending blocks:[]>
173176
ch1.push 1 # => nil
174177
result.value!
175-
# => [1, #<Concurrent::Edge::Channel:0x007fa1b41c5e28>]
178+
# => [1, #<Concurrent::Edge::Channel:0x007fcc021ec6b8>]
176179

177180
Concurrent.
178181
future { 1+1 }.
179182
then_push(ch1)
180-
# => <#Concurrent::Edge::Future:0x7fa1b41b6450 pending blocks:[]>
183+
# => <#Concurrent::Edge::Future:0x7fcc021dc3d0 pending blocks:[]>
181184
result = Concurrent.
182185
future { '%02d' }.
183186
then_select(ch1, ch2).
184187
then { |format, (value, channel)| format format, value }
185-
# => <#Concurrent::Edge::Future:0x7fa1b41a7f40 pending blocks:[]>
188+
# => <#Concurrent::Edge::Future:0x7fcc021cd9e8 pending blocks:[]>
186189
result.value! # => "02"
187190

188191

189192
### Common use-cases Examples
190193

191194
# simple background processing
192195
Concurrent.future { do_stuff }
193-
# => <#Concurrent::Edge::Future:0x7fa1b4196d08 pending blocks:[]>
196+
# => <#Concurrent::Edge::Future:0x7fcc021b7530 pending blocks:[]>
194197

195198
# parallel background processing
196199
jobs = 10.times.map { |i| Concurrent.future { i } }
@@ -207,7 +210,7 @@ def schedule_job
207210
end # => :schedule_job
208211

209212
schedule_job
210-
# => <#Concurrent::Edge::Future:0x7fa1b4147960 pending blocks:[]>
213+
# => <#Concurrent::Edge::Future:0x7fcc0218faf8 pending blocks:[]>
211214
@end = true # => true
212215

213216

@@ -220,7 +223,7 @@ def schedule_job
220223
data[message]
221224
end
222225
end
223-
# => #<Concurrent::Actor::Reference /db (Concurrent::Actor::Utils::AdHoc)>
226+
# => #<Concurrent::Actor::Reference:0x7fcc0214f458 /db (Concurrent::Actor::Utils::AdHoc)>
224227

225228
concurrent_jobs = 11.times.map do |v|
226229
Concurrent.
@@ -243,14 +246,14 @@ def schedule_job
243246

244247
DB_POOL = Concurrent::Actor::Utils::Pool.spawn!('DB-pool', pool_size) do |index|
245248
# DB connection constructor
246-
Concurrent::Actor::Utils::AdHoc.spawn(name: "worker-#{index}", args: [data]) do
249+
Concurrent::Actor::Utils::AdHoc.spawn(name: "worker-#{index}", args: [data]) do |data|
247250
lambda do |message|
248251
# pretending that this queries a DB
249252
data[message]
250253
end
251254
end
252255
end
253-
# => #<Concurrent::Actor::Reference /DB-pool (Concurrent::Actor::Utils::Pool)>
256+
# => #<Concurrent::Actor::Reference:0x7fcc02930398 /DB-pool (Concurrent::Actor::Utils::Pool)>
254257

255258
concurrent_jobs = 11.times.map do |v|
256259
Concurrent.

0 commit comments

Comments
 (0)