Skip to content

Commit a50092e

Browse files
committed
Interoperability with Channel
1 parent eadb12c commit a50092e

File tree

4 files changed

+260
-37
lines changed

4 files changed

+260
-37
lines changed

examples/edge_futures.in.rb

Lines changed: 77 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
### Delay
3939

4040
# will not evaluate until asked by #value or other method requiring completion
41-
scheduledfuture = Concurrent.delay { 'lazy' }
41+
future = Concurrent.delay { 'lazy' }
4242
sleep 0.1 #
4343
future.completed?
4444
future.value
@@ -136,6 +136,25 @@
136136
actor.ask(2).then(&:succ).value
137137

138138

139+
### Interoperability with channels
140+
141+
ch1 = Concurrent::Edge::Channel.new
142+
ch2 = Concurrent::Edge::Channel.new
143+
144+
result = Concurrent.select(ch1, ch2)
145+
ch1.push 1
146+
result.value!
147+
148+
Concurrent.
149+
future { 1+1 }.
150+
then_push(ch1)
151+
result = Concurrent.
152+
future { '%02d' }.
153+
then_select(ch1, ch2).
154+
then { |format, (value, channel)| format format, value }
155+
result.value!
156+
157+
139158
### Common use-cases Examples
140159

141160
# simple background processing
@@ -147,12 +166,68 @@
147166

148167

149168
# periodic task
169+
@end = false
170+
150171
def schedule_job
151172
Concurrent.schedule(1) { do_stuff }.
152173
rescue { |e| report_error e }.
153-
then { schedule_job }
174+
then { schedule_job unless @end }
154175
end
155176

156177
schedule_job
178+
@end = true
179+
180+
181+
# How to limit processing where there are limited resources?
182+
# By creating an actor managing the resource
183+
DB = Concurrent::Actor::Utils::AdHoc.spawn :db do
184+
data = Array.new(10) { |i| '*' * i }
185+
lambda do |message|
186+
# pretending that this queries a DB
187+
data[message]
188+
end
189+
end
190+
191+
concurrent_jobs = 11.times.map do |v|
192+
Concurrent.
193+
future { v }.
194+
# ask the DB with the `v`, only one at the time, rest is parallel
195+
then_ask(DB).
196+
# get size of the string, fails for 11
197+
then(&:size).
198+
rescue { |reason| reason.message } # translate error to value (exception, message)
199+
end #
200+
201+
Concurrent.zip(*concurrent_jobs).value!
202+
203+
204+
# In reality there is often a pool though:
205+
class DBConnection < Concurrent::Actor::Utils::AbstractWorker
206+
def initialize(balancer, data)
207+
super balancer
208+
@data = data
209+
end
210+
211+
def work(message)
212+
# pretending that this queries a DB
213+
@data[message]
214+
end
215+
end
216+
217+
data = Array.new(10) { |i| '*' * i }
218+
pool_size = 5
219+
220+
DB_POOL = Concurrent::Actor::Utils::Pool.spawn!('DB-pool', pool_size) do |balancer, index|
221+
DBConnection.spawn(name: "worker-#{index}", args: [balancer, data])
222+
end
157223

224+
concurrent_jobs = 11.times.map do |v|
225+
Concurrent.
226+
future { v }.
227+
# ask the DB_POOL with the `v`, only 5 at the time, rest is parallel
228+
then_ask(DB_POOL).
229+
then(&:size).
230+
rescue { |reason| reason.message }
231+
end #
158232

233+
Concurrent.zip(*concurrent_jobs).value!

examples/edge_futures.out.rb

Lines changed: 111 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
### Simple asynchronous task
22

33
future = Concurrent.future { sleep 0.1; 1 + 1 } # evaluation starts immediately
4-
# => <#Concurrent::Edge::Future:0x7fa08385da60 pending blocks:[]>
4+
# => <#Concurrent::Edge::Future:0x7fad9ca186e0 pending blocks:[]>
55
future.completed? # => false
66
# block until evaluated
77
future.value # => 2
@@ -11,7 +11,7 @@
1111
### Failing asynchronous task
1212

1313
future = Concurrent.future { raise 'Boom' }
14-
# => <#Concurrent::Edge::Future:0x7fa083834638 failed blocks:[]>
14+
# => <#Concurrent::Edge::Future:0x7fad9c9f95b0 pending blocks:[]>
1515
future.value # => nil
1616
future.value! rescue $! # => #<RuntimeError: Boom>
1717
future.reason # => #<RuntimeError: Boom>
@@ -26,7 +26,7 @@
2626
branch2 = head.then(&:succ).then(&:succ)
2727
branch1.zip(branch2).value # => [2, 3]
2828
(branch1 & branch2).then { |(a, b)| a + b }.value
29-
# => 5
29+
# => nil
3030
# pick only first completed
3131
(branch1 | branch2).value # => 2
3232

@@ -44,22 +44,22 @@
4444
### Delay
4545

4646
# will not evaluate until asked by #value or other method requiring completion
47-
scheduledfuture = Concurrent.delay { 'lazy' }
48-
# => <#Concurrent::Edge::Future:0x7fa0831917b8 pending blocks:[]>
47+
future = Concurrent.delay { 'lazy' }
48+
# => <#Concurrent::Edge::Future:0x7fad9c8fb3e8 pending blocks:[]>
4949
sleep 0.1
50-
future.completed? # => true
51-
future.value # => nil
50+
future.completed? # => false
51+
future.value # => "lazy"
5252

5353
# propagates trough chain allowing whole or partial lazy chains
5454

5555
head = Concurrent.delay { 1 }
56-
# => <#Concurrent::Edge::Future:0x7fa083172ef8 pending blocks:[]>
56+
# => <#Concurrent::Edge::Future:0x7fad9b158bf0 pending blocks:[]>
5757
branch1 = head.then(&:succ)
58-
# => <#Concurrent::Edge::Future:0x7fa083171c88 pending blocks:[]>
58+
# => <#Concurrent::Edge::Future:0x7fad9b149ba0 pending blocks:[]>
5959
branch2 = head.delay.then(&:succ)
60-
# => <#Concurrent::Edge::Future:0x7fa08294f528 pending blocks:[]>
60+
# => <#Concurrent::Edge::Future:0x7fad9b12a020 pending blocks:[]>
6161
join = branch1 & branch2
62-
# => <#Concurrent::Edge::Future:0x7fa08294e218 pending blocks:[]>
62+
# => <#Concurrent::Edge::ArrayFuture:0x7fad9b8a0778 pending blocks:[]>
6363

6464
sleep 0.1 # nothing will complete # => 0
6565
[head, branch1, branch2, join].map(&:completed?) # => [false, false, false, false]
@@ -87,14 +87,14 @@
8787
### Schedule
8888

8989
scheduled = Concurrent.schedule(0.1) { 1 }
90-
# => <#Concurrent::Edge::Future:0x7fa08224edf0 pending blocks:[]>
90+
# => <#Concurrent::Edge::Future:0x7fad9a941e08 pending blocks:[]>
9191

9292
scheduled.completed? # => false
9393
scheduled.value # available after 0.1sec # => 1
9494

9595
# and in chain
9696
scheduled = Concurrent.delay { 1 }.schedule(0.1).then(&:succ)
97-
# => <#Concurrent::Edge::Future:0x7fa0831f3d50 pending blocks:[]>
97+
# => <#Concurrent::Edge::Future:0x7fad9b0aa7d0 pending blocks:[]>
9898
# will not be scheduled until value is requested
9999
sleep 0.1
100100
scheduled.value # returns after another 0.1sec # => 2
@@ -103,35 +103,35 @@
103103
### Completable Future and Event
104104

105105
future = Concurrent.future
106-
# => <#Concurrent::Edge::CompletableFuture:0x7fa0831e8090 pending blocks:[]>
106+
# => <#Concurrent::Edge::CompletableFuture:0x7fad9a87b6e0 pending blocks:[]>
107107
event = Concurrent.event
108-
# => <#Concurrent::Edge::CompletableEvent:0x7fa0831dae68 pending blocks:[]>
108+
# => <#Concurrent::Edge::CompletableEvent:0x7fad9a86ba88 pending blocks:[]>
109109

110110
# will be blocked until completed
111111
t1 = Thread.new { future.value }
112112
t2 = Thread.new { event.wait }
113113

114114
future.success 1
115-
# => <#Concurrent::Edge::CompletableFuture:0x7fa0831e8090 success blocks:[]>
115+
# => <#Concurrent::Edge::CompletableFuture:0x7fad9a87b6e0 success blocks:[]>
116116
future.success 1 rescue $!
117117
# => #<Concurrent::MultipleAssignmentError: multiple assignment>
118118
future.try_success 2 # => false
119119
event.complete
120-
# => <#Concurrent::Edge::CompletableEvent:0x7fa0831dae68 completed blocks:[]>
120+
# => <#Concurrent::Edge::CompletableEvent:0x7fad9a86ba88 completed blocks:[]>
121121

122122
[t1, t2].each &:join
123123

124124

125125
### Callbacks
126126

127-
queue = Queue.new # => #<Thread::Queue:0x007fa0831bac30>
127+
queue = Queue.new # => #<Thread::Queue:0x007fad9a862320>
128128
future = Concurrent.delay { 1 + 1 }
129-
# => <#Concurrent::Edge::Future:0x7fa0831b96c8 pending blocks:[]>
129+
# => <#Concurrent::Edge::Future:0x7fad9a853960 pending blocks:[]>
130130

131131
future.on_success { queue << 1 } # evaluated asynchronously
132-
# => <#Concurrent::Edge::Future:0x7fa0831b96c8 pending blocks:[]>
132+
# => <#Concurrent::Edge::Future:0x7fad9a853960 pending blocks:[]>
133133
future.on_success! { queue << 2 } # evaluated on completing thread
134-
# => <#Concurrent::Edge::Future:0x7fa0831b96c8 pending blocks:[]>
134+
# => <#Concurrent::Edge::Future:0x7fad9a853960 pending blocks:[]>
135135

136136
queue.empty? # => true
137137
future.value # => 2
@@ -142,7 +142,7 @@
142142
### Thread-pools
143143

144144
Concurrent.future(:fast) { 2 }.then(:io) { File.read __FILE__ }.wait
145-
# => <#Concurrent::Edge::Future:0x7fa08318b070 success blocks:[]>
145+
# => <#Concurrent::Edge::Future:0x7fad9a883958 success blocks:[]>
146146

147147

148148
### Interoperability with actors
@@ -161,25 +161,109 @@
161161
actor.ask(2).then(&:succ).value # => 5
162162

163163

164+
### Interoperability with channels
165+
166+
ch1 = Concurrent::Edge::Channel.new # => #<Concurrent::Edge::Channel:0x007fad9c892ac8>
167+
ch2 = Concurrent::Edge::Channel.new # => #<Concurrent::Edge::Channel:0x007fad9c8904a8>
168+
169+
result = Concurrent.select(ch1, ch2)
170+
# => <#Concurrent::Edge::CompletableFuture:0x7fad9b86aa88 pending blocks:[]>
171+
ch1.push 1 # => nil
172+
result.value!
173+
# => [1, #<Concurrent::Edge::Channel:0x007fad9c892ac8>]
174+
175+
Concurrent.
176+
future { 1+1 }.
177+
then_push(ch1)
178+
# => <#Concurrent::Edge::Future:0x7fad9c898d88 pending blocks:[]>
179+
result = Concurrent.
180+
future { '%02d' }.
181+
then_select(ch1, ch2).
182+
then { |format, (value, channel)| format format, value }
183+
# => <#Concurrent::Edge::Future:0x7fad9b88b4e0 pending blocks:[]>
184+
result.value! # => "02"
185+
186+
164187
### Common use-cases Examples
165188

166189
# simple background processing
167190
Concurrent.future { do_stuff }
168-
# => <#Concurrent::Edge::Future:0x7fa0839ee8e8 pending blocks:[]>
191+
# => <#Concurrent::Edge::Future:0x7fad9b151b98 pending blocks:[]>
169192

170193
# parallel background processing
171194
jobs = 10.times.map { |i| Concurrent.future { i } }
172195
Concurrent.zip(*jobs).value # => [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
173196

174197

175198
# periodic task
199+
@end = false # => false
200+
176201
def schedule_job
177202
Concurrent.schedule(1) { do_stuff }.
178203
rescue { |e| report_error e }.
179-
then { schedule_job }
204+
then { schedule_job unless @end }
180205
end # => :schedule_job
181206

182207
schedule_job
183-
# => <#Concurrent::Edge::Future:0x7fa082904f78 pending blocks:[]>
184-
185-
208+
# => <#Concurrent::Edge::Future:0x7fad9c96a6a8 pending blocks:[]>
209+
@end = true # => true
210+
211+
212+
# How to limit processing where there are limited resources?
213+
# By creating an actor managing the resource
214+
DB = Concurrent::Actor::Utils::AdHoc.spawn :db do
215+
data = Array.new(10) { |i| '*' * i }
216+
lambda do |message|
217+
# pretending that this queries a DB
218+
data[message]
219+
end
220+
end
221+
# => #<Concurrent::Actor::Reference /db (Concurrent::Actor::Utils::AdHoc)>
222+
223+
concurrent_jobs = 11.times.map do |v|
224+
Concurrent.
225+
future { v }.
226+
# ask the DB with the `v`, only one at the time, rest is parallel
227+
then_ask(DB).
228+
# get size of the string, fails for 11
229+
then(&:size).
230+
rescue { |reason| reason.message } # translate error to value (exception, message)
231+
end
232+
233+
Concurrent.zip(*concurrent_jobs).value!
234+
# => [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, "undefined method `size' for nil:NilClass"]
235+
236+
237+
# In reality there is often a pool though:
238+
class DBConnection < Concurrent::Actor::Utils::AbstractWorker
239+
def initialize(balancer, data)
240+
super balancer
241+
@data = data
242+
end
243+
244+
def work(message)
245+
# pretending that this queries a DB
246+
@data[message]
247+
end
248+
end # => :work
249+
250+
data = Array.new(10) { |i| '*' * i }
251+
# => ["", "*", "**", "***", "****", "*****", "******", "*******", "********", "*********"]
252+
pool_size = 5 # => 5
253+
254+
DB_POOL = Concurrent::Actor::Utils::Pool.spawn!('DB-pool', pool_size) do |balancer, index|
255+
DBConnection.spawn(name: "worker-#{index}", args: [balancer, data])
256+
end
257+
# => #<Concurrent::Actor::Reference /DB-pool (Concurrent::Actor::Utils::Pool)>
258+
259+
concurrent_jobs = 11.times.map do |v|
260+
Concurrent.
261+
future { v }.
262+
# ask the DB_POOL with the `v`, only 5 at the time, rest is parallel
263+
then_ask(DB_POOL).
264+
then(&:size).
265+
rescue { |reason| reason.message }
266+
end
267+
268+
Concurrent.zip(*concurrent_jobs).value!
269+
# => [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, "undefined method `size' for nil:NilClass"]

0 commit comments

Comments
 (0)