Skip to content

Commit 0f88dc1

Browse files
committed
Usage examples of new Futures
1 parent 4519f94 commit 0f88dc1

File tree

5 files changed

+441
-11
lines changed

5 files changed

+441
-11
lines changed

examples/benchmark_new_futures.rb

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,31 +7,38 @@
77
warmup = 2 * scale
88
warmup *= 10 if Concurrent.on_jruby?
99

10-
11-
Benchmark.ips(time, warmup) do |x|
12-
of = Concurrent::Promise.execute { 1 }
13-
nf = Concurrent.future { 1 }
14-
x.report('value-old') { of.value! }
15-
x.report('value-new') { nf.value! }
16-
x.compare!
17-
end
18-
1910
Benchmark.ips(time, warmup) do |x|
2011
x.report('graph-old') do
2112
head = Concurrent::Promise.execute { 1 }
2213
branch1 = head.then(&:succ)
2314
branch2 = head.then(&:succ).then(&:succ)
24-
Concurrent::Promise.zip(branch1, branch2).then { |(a, b)| a + b }.value!
15+
branch3 = head.then(&:succ).then(&:succ).then(&:succ)
16+
Concurrent::Promise.zip(branch1, branch2, branch3).then { |(a, b, c)| a + b + c }.value!
2517
end
2618
x.report('graph-new') do
2719
head = Concurrent.future { 1 }
2820
branch1 = head.then(&:succ)
2921
branch2 = head.then(&:succ).then(&:succ)
30-
(branch1 + branch2).then { |(a, b)| a + b }.value!
22+
branch3 = head.then(&:succ).then(&:succ).then(&:succ)
23+
Concurrent.zip(branch1, branch2, branch3).then { |(a, b, c)| a + b + c }.value!
3124
end
3225
x.compare!
3326
end
3427

28+
Benchmark.ips(time, warmup) do |x|
29+
x.report('status-old') { f = Concurrent::Promise.execute { nil }; 10.times { f.complete? } }
30+
x.report('status-new') { f = Concurrent.future { nil }; 10.times { f.completed? } }
31+
x.compare!
32+
end
33+
34+
Benchmark.ips(time, warmup) do |x|
35+
of = Concurrent::Promise.execute { 1 }
36+
nf = Concurrent.future { 1 }
37+
x.report('value-old') { of.value! }
38+
x.report('value-new') { nf.value! }
39+
x.compare!
40+
end
41+
3542
Benchmark.ips(time, warmup) do |x|
3643
x.report('immediate-old') { Concurrent::Promise.execute { nil }.value! }
3744
x.report('immediate-new') { Concurrent.future { nil }.value! }

examples/edge_futures.in.rb

Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
### Simple asynchronous task
2+
3+
future = Concurrent.future { sleep 0.1; 1 + 1 } # evaluation starts immediately
4+
future.completed?
5+
# block until evaluated
6+
future.value
7+
future.completed?
8+
9+
10+
### Failing asynchronous task
11+
12+
future = Concurrent.future { raise 'Boom' }
13+
future.value
14+
future.value! rescue $!
15+
future.reason
16+
# re-raising
17+
raise future rescue $!
18+
19+
20+
### Chaining
21+
22+
head = Concurrent.future { 1 } #
23+
branch1 = head.then(&:succ) #
24+
branch2 = head.then(&:succ).then(&:succ) #
25+
branch1.zip(branch2).value
26+
(branch1 & branch2).then { |(a, b)| a + b }.value
27+
# pick only first completed
28+
(branch1 | branch2).value
29+
30+
31+
### Error handling
32+
33+
Concurrent.future { Object.new }.then(&:succ).then(&:succ).rescue { |e| e.class }.value # error propagates
34+
Concurrent.future { Object.new }.then(&:succ).rescue { 1 }.then(&:succ).value
35+
Concurrent.future { 1 }.then(&:succ).rescue { |e| e.message }.then(&:succ).value
36+
37+
38+
### Delay
39+
40+
# will not evaluate until asked by #value or other method requiring completion
41+
scheduledfuture = Concurrent.delay { 'lazy' }
42+
sleep 0.1 #
43+
future.completed?
44+
future.value
45+
46+
# propagates trough chain allowing whole or partial lazy chains
47+
48+
head = Concurrent.delay { 1 }
49+
branch1 = head.then(&:succ)
50+
branch2 = head.delay.then(&:succ)
51+
join = branch1 & branch2
52+
53+
sleep 0.1 # nothing will complete
54+
[head, branch1, branch2, join].map(&:completed?)
55+
56+
branch1.value
57+
sleep 0.1 # forces only head to complete, branch 2 stays incomplete
58+
[head, branch1, branch2, join].map(&:completed?)
59+
60+
join.value
61+
62+
63+
### Flatting
64+
65+
Concurrent.future { Concurrent.future { 1+1 } }.flat.value # waits for inner future
66+
67+
# more complicated example
68+
Concurrent.future { Concurrent.future { Concurrent.future { 1 + 1 } } }.
69+
flat(1).
70+
then { |f| f.then(&:succ) }.
71+
flat(1).value
72+
73+
74+
### Schedule
75+
76+
scheduled = Concurrent.schedule(0.1) { 1 }
77+
78+
scheduled.completed?
79+
scheduled.value # available after 0.1sec
80+
81+
# and in chain
82+
scheduled = Concurrent.delay { 1 }.schedule(0.1).then(&:succ)
83+
# will not be scheduled until value is requested
84+
sleep 0.1 #
85+
scheduled.value # returns after another 0.1sec
86+
87+
88+
### Completable Future and Event
89+
90+
future = Concurrent.future
91+
event = Concurrent.event
92+
93+
# will be blocked until completed
94+
t1 = Thread.new { future.value } #
95+
t2 = Thread.new { event.wait } #
96+
97+
future.success 1
98+
future.success 1 rescue $!
99+
future.try_success 2
100+
event.complete
101+
102+
[t1, t2].each &:join #
103+
104+
105+
### Callbacks
106+
107+
queue = Queue.new
108+
future = Concurrent.delay { 1 + 1 }
109+
110+
future.on_success { queue << 1 } # evaluated asynchronously
111+
future.on_success! { queue << 2 } # evaluated on completing thread
112+
113+
queue.empty?
114+
future.value
115+
queue.pop
116+
queue.pop
117+
118+
119+
### Thread-pools
120+
121+
Concurrent.future(:fast) { 2 }.then(:io) { File.read __FILE__ }.wait
122+
123+
124+
### Interoperability with actors
125+
126+
actor = Concurrent::Actor::Utils::AdHoc.spawn :square do
127+
-> v { v ** 2 }
128+
end
129+
130+
Concurrent.
131+
future { 2 }.
132+
then_ask(actor).
133+
then { |v| v + 2 }.
134+
value
135+
136+
actor.ask(2).then(&:succ).value
137+
138+
139+
### Common use-cases Examples
140+
141+
# simple background processing
142+
Concurrent.future { do_stuff }
143+
144+
# parallel background processing
145+
jobs = 10.times.map { |i| Concurrent.future { i } } #
146+
Concurrent.zip(*jobs).value
147+
148+
149+
# periodic task
150+
def schedule_job
151+
Concurrent.schedule(1) { do_stuff }.
152+
rescue { |e| report_error e }.
153+
then { schedule_job }
154+
end
155+
156+
schedule_job
157+
158+

examples/edge_futures.out.rb

Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
### Simple asynchronous task
2+
3+
future = Concurrent.future { sleep 0.1; 1 + 1 } # evaluation starts immediately
4+
# => <#Concurrent::Edge::Future:0x7fa08385da60 pending blocks:[]>
5+
future.completed? # => false
6+
# block until evaluated
7+
future.value # => 2
8+
future.completed? # => true
9+
10+
11+
### Failing asynchronous task
12+
13+
future = Concurrent.future { raise 'Boom' }
14+
# => <#Concurrent::Edge::Future:0x7fa083834638 failed blocks:[]>
15+
future.value # => nil
16+
future.value! rescue $! # => #<RuntimeError: Boom>
17+
future.reason # => #<RuntimeError: Boom>
18+
# re-raising
19+
raise future rescue $! # => #<RuntimeError: Boom>
20+
21+
22+
### Chaining
23+
24+
head = Concurrent.future { 1 }
25+
branch1 = head.then(&:succ)
26+
branch2 = head.then(&:succ).then(&:succ)
27+
branch1.zip(branch2).value # => [2, 3]
28+
(branch1 & branch2).then { |(a, b)| a + b }.value
29+
# => 5
30+
# pick only first completed
31+
(branch1 | branch2).value # => 2
32+
33+
34+
### Error handling
35+
36+
Concurrent.future { Object.new }.then(&:succ).then(&:succ).rescue { |e| e.class }.value # error propagates
37+
# => NoMethodError
38+
Concurrent.future { Object.new }.then(&:succ).rescue { 1 }.then(&:succ).value
39+
# => 2
40+
Concurrent.future { 1 }.then(&:succ).rescue { |e| e.message }.then(&:succ).value
41+
# => 3
42+
43+
44+
### Delay
45+
46+
# will not evaluate until asked by #value or other method requiring completion
47+
scheduledfuture = Concurrent.delay { 'lazy' }
48+
# => <#Concurrent::Edge::Future:0x7fa0831917b8 pending blocks:[]>
49+
sleep 0.1
50+
future.completed? # => true
51+
future.value # => nil
52+
53+
# propagates trough chain allowing whole or partial lazy chains
54+
55+
head = Concurrent.delay { 1 }
56+
# => <#Concurrent::Edge::Future:0x7fa083172ef8 pending blocks:[]>
57+
branch1 = head.then(&:succ)
58+
# => <#Concurrent::Edge::Future:0x7fa083171c88 pending blocks:[]>
59+
branch2 = head.delay.then(&:succ)
60+
# => <#Concurrent::Edge::Future:0x7fa08294f528 pending blocks:[]>
61+
join = branch1 & branch2
62+
# => <#Concurrent::Edge::Future:0x7fa08294e218 pending blocks:[]>
63+
64+
sleep 0.1 # nothing will complete # => 0
65+
[head, branch1, branch2, join].map(&:completed?) # => [false, false, false, false]
66+
67+
branch1.value # => 2
68+
sleep 0.1 # forces only head to complete, branch 2 stays incomplete
69+
# => 0
70+
[head, branch1, branch2, join].map(&:completed?) # => [true, true, false, false]
71+
72+
join.value # => [2, 2]
73+
74+
75+
### Flatting
76+
77+
Concurrent.future { Concurrent.future { 1+1 } }.flat.value # waits for inner future
78+
# => 2
79+
80+
# more complicated example
81+
Concurrent.future { Concurrent.future { Concurrent.future { 1 + 1 } } }.
82+
flat(1).
83+
then { |f| f.then(&:succ) }.
84+
flat(1).value # => 3
85+
86+
87+
### Schedule
88+
89+
scheduled = Concurrent.schedule(0.1) { 1 }
90+
# => <#Concurrent::Edge::Future:0x7fa08224edf0 pending blocks:[]>
91+
92+
scheduled.completed? # => false
93+
scheduled.value # available after 0.1sec # => 1
94+
95+
# and in chain
96+
scheduled = Concurrent.delay { 1 }.schedule(0.1).then(&:succ)
97+
# => <#Concurrent::Edge::Future:0x7fa0831f3d50 pending blocks:[]>
98+
# will not be scheduled until value is requested
99+
sleep 0.1
100+
scheduled.value # returns after another 0.1sec # => 2
101+
102+
103+
### Completable Future and Event
104+
105+
future = Concurrent.future
106+
# => <#Concurrent::Edge::CompletableFuture:0x7fa0831e8090 pending blocks:[]>
107+
event = Concurrent.event
108+
# => <#Concurrent::Edge::CompletableEvent:0x7fa0831dae68 pending blocks:[]>
109+
110+
# will be blocked until completed
111+
t1 = Thread.new { future.value }
112+
t2 = Thread.new { event.wait }
113+
114+
future.success 1
115+
# => <#Concurrent::Edge::CompletableFuture:0x7fa0831e8090 success blocks:[]>
116+
future.success 1 rescue $!
117+
# => #<Concurrent::MultipleAssignmentError: multiple assignment>
118+
future.try_success 2 # => false
119+
event.complete
120+
# => <#Concurrent::Edge::CompletableEvent:0x7fa0831dae68 completed blocks:[]>
121+
122+
[t1, t2].each &:join
123+
124+
125+
### Callbacks
126+
127+
queue = Queue.new # => #<Thread::Queue:0x007fa0831bac30>
128+
future = Concurrent.delay { 1 + 1 }
129+
# => <#Concurrent::Edge::Future:0x7fa0831b96c8 pending blocks:[]>
130+
131+
future.on_success { queue << 1 } # evaluated asynchronously
132+
# => <#Concurrent::Edge::Future:0x7fa0831b96c8 pending blocks:[]>
133+
future.on_success! { queue << 2 } # evaluated on completing thread
134+
# => <#Concurrent::Edge::Future:0x7fa0831b96c8 pending blocks:[]>
135+
136+
queue.empty? # => true
137+
future.value # => 2
138+
queue.pop # => 2
139+
queue.pop # => 1
140+
141+
142+
### Thread-pools
143+
144+
Concurrent.future(:fast) { 2 }.then(:io) { File.read __FILE__ }.wait
145+
# => <#Concurrent::Edge::Future:0x7fa08318b070 success blocks:[]>
146+
147+
148+
### Interoperability with actors
149+
150+
actor = Concurrent::Actor::Utils::AdHoc.spawn :square do
151+
-> v { v ** 2 }
152+
end
153+
# => #<Concurrent::Actor::Reference /square (Concurrent::Actor::Utils::AdHoc)>
154+
155+
Concurrent.
156+
future { 2 }.
157+
then_ask(actor).
158+
then { |v| v + 2 }.
159+
value # => 6
160+
161+
actor.ask(2).then(&:succ).value # => 5
162+
163+
164+
### Common use-cases Examples
165+
166+
# simple background processing
167+
Concurrent.future { do_stuff }
168+
# => <#Concurrent::Edge::Future:0x7fa0839ee8e8 pending blocks:[]>
169+
170+
# parallel background processing
171+
jobs = 10.times.map { |i| Concurrent.future { i } }
172+
Concurrent.zip(*jobs).value # => [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
173+
174+
175+
# periodic task
176+
def schedule_job
177+
Concurrent.schedule(1) { do_stuff }.
178+
rescue { |e| report_error e }.
179+
then { schedule_job }
180+
end # => :schedule_job
181+
182+
schedule_job
183+
# => <#Concurrent::Edge::Future:0x7fa082904f78 pending blocks:[]>
184+
185+

0 commit comments

Comments
 (0)