Skip to content

Commit d31bf98

Browse files
author
Petr Chalupa
committed
Merge pull request #363 from pitr-ch/futures
Future, Actor improvements
2 parents 4ce3c14 + ced9823 commit d31bf98

File tree

11 files changed

+99
-116
lines changed

11 files changed

+99
-116
lines changed

examples/benchmark_new_futures.rb

Lines changed: 4 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -4,25 +4,6 @@
44
require 'concurrent'
55
require 'concurrent-edge'
66

7-
# require 'ruby-prof'
8-
#
9-
# result = RubyProf.profile do
10-
# 1000.times do
11-
# head = Concurrent.future { 1 }
12-
# branch1 = head.then(&:succ)
13-
# branch2 = head.then(&:succ).then(&:succ)
14-
# branch3 = head.then(&:succ).then(&:succ).then(&:succ)
15-
# Concurrent.join(branch1, branch2, branch3).then { |(a, b, c)| a + b + c }.value!
16-
# end
17-
# end
18-
#
19-
# printer = RubyProf::FlatPrinter.new(result)
20-
# printer.print(STDOUT)
21-
#
22-
# printer = RubyProf::GraphPrinter.new(result)
23-
# printer.print(STDOUT, {})
24-
#
25-
# exit
267

278
scale = 1
289
time = 10 * scale
@@ -43,7 +24,7 @@
4324

4425
Benchmark.ips(time, warmup) do |x|
4526
of = Concurrent::Promise.execute { 1 }
46-
nf = Concurrent.future(:fast) { 1 }
27+
nf = Concurrent.succeeded_future(1, :fast)
4728
x.report('value-old') { of.value! }
4829
x.report('value-new') { nf.value! }
4930
x.compare!
@@ -60,7 +41,7 @@
6041
head.value!
6142
end
6243
x.report('graph-new') do
63-
head = Concurrent.future(:fast) { 1 }
44+
head = Concurrent.succeeded_future(1, :fast)
6445
10.times do
6546
branch1 = head.then(&:succ)
6647
branch2 = head.then(&:succ).then(&:succ)
@@ -73,13 +54,13 @@
7354

7455
Benchmark.ips(time, warmup) do |x|
7556
x.report('immediate-old') { Concurrent::Promise.execute { nil }.value! }
76-
x.report('immediate-new') { Concurrent.future(:fast) { nil }.value! }
57+
x.report('immediate-new') { Concurrent.succeeded_future(nil, :fast).value! }
7758
x.compare!
7859
end
7960

8061
Benchmark.ips(time, warmup) do |x|
8162
of = Concurrent::Promise.execute { 1 }
82-
nf = Concurrent.future(:fast) { 1 }
63+
nf = Concurrent.succeeded_future(1, :fast)
8364
x.report('then-old') { 100.times.reduce(nf) { |nf, _| nf.then(&:succ) }.value! }
8465
x.report('then-new') { 100.times.reduce(nf) { |nf, _| nf.then(&:succ) }.value! }
8566
x.compare!

examples/edge_futures.in.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
### Chaining
2121

22-
head = Concurrent.completed_future 1 #
22+
head = Concurrent.succeeded_future 1 #
2323
branch1 = head.then(&:succ) #
2424
branch2 = head.then(&:succ).then(&:succ) #
2525
branch1.zip(branch2).value!
@@ -35,7 +35,7 @@
3535
Concurrent.future { Object.new }.then(&:succ).rescue { 1 }.then(&:succ).value
3636
Concurrent.future { 1 }.then(&:succ).rescue { |e| e.message }.then(&:succ).value
3737

38-
failing_zip = Concurrent.completed_future(1) & Concurrent.future { raise 'boom' }
38+
failing_zip = Concurrent.succeeded_future(1) & Concurrent.failed_future(StandardError.new('boom'))
3939
failing_zip.result
4040
failing_zip.then { |v| 'never happens' }.result
4141
failing_zip.rescue { |a, b| (a || b).message }.value

examples/edge_futures.out.rb

Lines changed: 35 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
### Simple asynchronous task
22

33
future = Concurrent.future { sleep 0.1; 1 + 1 } # evaluation starts immediately
4-
# => <#Concurrent::Edge::Future:0x7fc6218f2318 pending blocks:[]>
4+
# => <#Concurrent::Edge::Future:0x7fcc73208180 pending blocks:[]>
55
future.completed? # => false
66
# block until evaluated
77
future.value # => 2
@@ -11,7 +11,7 @@
1111
### Failing asynchronous task
1212

1313
future = Concurrent.future { raise 'Boom' }
14-
# => <#Concurrent::Edge::Future:0x7fc6218eae38 pending blocks:[]>
14+
# => <#Concurrent::Edge::Future:0x7fcc731fa0a8 pending blocks:[]>
1515
future.value # => nil
1616
future.value! rescue $! # => #<RuntimeError: Boom>
1717
future.reason # => #<RuntimeError: Boom>
@@ -21,7 +21,7 @@
2121

2222
### Chaining
2323

24-
head = Concurrent.completed_future 1
24+
head = Concurrent.succeeded_future 1
2525
branch1 = head.then(&:succ)
2626
branch2 = head.then(&:succ).then(&:succ)
2727
branch1.zip(branch2).value! # => [2, 3]
@@ -41,10 +41,10 @@
4141
Concurrent.future { 1 }.then(&:succ).rescue { |e| e.message }.then(&:succ).value
4242
# => 3
4343

44-
failing_zip = Concurrent.completed_future(1) & Concurrent.future { raise 'boom' }
45-
# => <#Concurrent::Edge::Future:0x7fc6218b0f08 pending blocks:[]>
46-
failing_zip.result # => [false, [1, nil], [nil, #<RuntimeError: boom>]]
47-
failing_zip.then { |v| 'never happens' }.result # => [false, [1, nil], [nil, #<RuntimeError: boom>]]
44+
failing_zip = Concurrent.succeeded_future(1) & Concurrent.failed_future(StandardError.new('boom'))
45+
# => <#Concurrent::Edge::Future:0x7fcc731c00b0 failed blocks:[]>
46+
failing_zip.result # => [false, [1, nil], [nil, #<StandardError: boom>]]
47+
failing_zip.then { |v| 'never happens' }.result # => [false, [1, nil], [nil, #<StandardError: boom>]]
4848
failing_zip.rescue { |a, b| (a || b).message }.value
4949
# => "boom"
5050
failing_zip.chain { |success, values, reasons| [success, values.compact, reasons.compactß] }.value
@@ -54,28 +54,28 @@
5454

5555
# will not evaluate until asked by #value or other method requiring completion
5656
future = Concurrent.delay { 'lazy' }
57-
# => <#Concurrent::Edge::Future:0x7fc6218a37e0 pending blocks:[]>
57+
# => <#Concurrent::Edge::Future:0x7fcc731a1840 pending blocks:[]>
5858
sleep 0.1
5959
future.completed? # => false
6060
future.value # => "lazy"
6161

6262
# propagates trough chain allowing whole or partial lazy chains
6363

6464
head = Concurrent.delay { 1 }
65-
# => <#Concurrent::Edge::Future:0x7fc6218a0720 pending blocks:[]>
65+
# => <#Concurrent::Edge::Future:0x7fcc73193b28 pending blocks:[]>
6666
branch1 = head.then(&:succ)
67-
# => <#Concurrent::Edge::Future:0x7fc6212c7b50 pending blocks:[]>
67+
# => <#Concurrent::Edge::Future:0x7fcc73190900 pending blocks:[]>
6868
branch2 = head.delay.then(&:succ)
69-
# => <#Concurrent::Edge::Future:0x7fc6212c6098 pending blocks:[]>
69+
# => <#Concurrent::Edge::Future:0x7fcc7318b400 pending blocks:[]>
7070
join = branch1 & branch2
71-
# => <#Concurrent::Edge::Future:0x7fc6212c4f40 pending blocks:[]>
71+
# => <#Concurrent::Edge::Future:0x7fcc73180af0 pending blocks:[]>
7272

7373
sleep 0.1 # nothing will complete # => 0
7474
[head, branch1, branch2, join].map(&:completed?) # => [false, false, false, false]
7575

7676
branch1.value # => 2
7777
sleep 0.1 # forces only head to complete, branch 2 stays incomplete
78-
# => 1
78+
# => 0
7979
[head, branch1, branch2, join].map(&:completed?) # => [true, true, false, false]
8080

8181
join.value # => [2, 2]
@@ -96,14 +96,14 @@
9696
### Schedule
9797

9898
scheduled = Concurrent.schedule(0.1) { 1 }
99-
# => <#Concurrent::Edge::Future:0x7fc62128c550 pending blocks:[]>
99+
# => <#Concurrent::Edge::Future:0x7fcc73143e48 pending blocks:[]>
100100

101101
scheduled.completed? # => false
102102
scheduled.value # available after 0.1sec # => 1
103103

104104
# and in chain
105105
scheduled = Concurrent.delay { 1 }.schedule(0.1).then(&:succ)
106-
# => <#Concurrent::Edge::Future:0x7fc6228bcdc0 pending blocks:[]>
106+
# => <#Concurrent::Edge::Future:0x7fcc7313a758 pending blocks:[]>
107107
# will not be scheduled until value is requested
108108
sleep 0.1
109109
scheduled.value # returns after another 0.1sec # => 2
@@ -112,36 +112,36 @@
112112
### Completable Future and Event
113113

114114
future = Concurrent.future
115-
# => <#Concurrent::Edge::CompletableFuture:0x7fc623083720 pending blocks:[]>
115+
# => <#Concurrent::Edge::CompletableFuture:0x7fcc731286e8 pending blocks:[]>
116116
event = Concurrent.event
117-
# => <#Concurrent::Edge::CompletableEvent:0x7fc623081100 pending blocks:[]>
117+
# => <#Concurrent::Edge::CompletableEvent:0x7fcc73123058 pending blocks:[]>
118118
# Don't forget to keep the reference, `Concurrent.future.then { |v| v }` is incompletable
119119

120120
# will be blocked until completed
121121
t1 = Thread.new { future.value }
122122
t2 = Thread.new { event.wait }
123123

124124
future.success 1
125-
# => <#Concurrent::Edge::CompletableFuture:0x7fc623083720 success blocks:[]>
125+
# => <#Concurrent::Edge::CompletableFuture:0x7fcc731286e8 success blocks:[]>
126126
future.success 1 rescue $!
127127
# => #<Concurrent::MultipleAssignmentError: Future can be completed only once. Current result is [true, 1, nil], trying to set [true, 1, nil]>
128128
future.try_success 2 # => false
129129
event.complete
130-
# => <#Concurrent::Edge::CompletableEvent:0x7fc623081100 completed blocks:[]>
130+
# => <#Concurrent::Edge::CompletableEvent:0x7fcc73123058 completed blocks:[]>
131131

132132
[t1, t2].each &:join
133133

134134

135135
### Callbacks
136136

137-
queue = Queue.new # => #<Thread::Queue:0x007fc62127df00>
137+
queue = Queue.new # => #<Thread::Queue:0x007fcc73110638>
138138
future = Concurrent.delay { 1 + 1 }
139-
# => <#Concurrent::Edge::Future:0x7fc62127c060 pending blocks:[]>
139+
# => <#Concurrent::Edge::Future:0x7fcc7310ab98 pending blocks:[]>
140140

141141
future.on_success { queue << 1 } # evaluated asynchronously
142-
# => <#Concurrent::Edge::Future:0x7fc62127c060 pending blocks:[]>
142+
# => <#Concurrent::Edge::Future:0x7fcc7310ab98 pending blocks:[]>
143143
future.on_success! { queue << 2 } # evaluated on completing thread
144-
# => <#Concurrent::Edge::Future:0x7fc62127c060 pending blocks:[]>
144+
# => <#Concurrent::Edge::Future:0x7fcc7310ab98 pending blocks:[]>
145145

146146
queue.empty? # => true
147147
future.value # => 2
@@ -152,15 +152,15 @@
152152
### Thread-pools
153153

154154
Concurrent.future(:fast) { 2 }.then(:io) { File.read __FILE__ }.wait
155-
# => <#Concurrent::Edge::Future:0x7fc62125d9a8 success blocks:[]>
155+
# => <#Concurrent::Edge::Future:0x7fcc730f98e8 success blocks:[]>
156156

157157

158158
### Interoperability with actors
159159

160160
actor = Concurrent::Actor::Utils::AdHoc.spawn :square do
161161
-> v { v ** 2 }
162162
end
163-
# => #<Concurrent::Actor::Reference:0x7fc621234a30 /square (Concurrent::Actor::Utils::AdHoc)>
163+
# => #<Concurrent::Actor::Reference:0x7fcc730c36f8 /square (Concurrent::Actor::Utils::AdHoc)>
164164

165165
Concurrent.
166166
future { 2 }.
@@ -173,32 +173,32 @@
173173

174174
### Interoperability with channels
175175

176-
ch1 = Concurrent::Edge::Channel.new # => #<Concurrent::Edge::Channel:0x007fc621205460>
177-
ch2 = Concurrent::Edge::Channel.new # => #<Concurrent::Edge::Channel:0x007fc6212041c8>
176+
ch1 = Concurrent::Edge::Channel.new # => #<Concurrent::Edge::Channel:0x007fcc73043188>
177+
ch2 = Concurrent::Edge::Channel.new # => #<Concurrent::Edge::Channel:0x007fcc730425f8>
178178

179179
result = Concurrent.select(ch1, ch2)
180-
# => <#Concurrent::Edge::CompletableFuture:0x7fc6211fe5c0 pending blocks:[]>
180+
# => <#Concurrent::Edge::CompletableFuture:0x7fcc730411a8 pending blocks:[]>
181181
ch1.push 1 # => nil
182182
result.value!
183-
# => [1, #<Concurrent::Edge::Channel:0x007fc621205460>]
183+
# => [1, #<Concurrent::Edge::Channel:0x007fcc73043188>]
184184

185185
Concurrent.
186186
future { 1+1 }.
187187
then_push(ch1)
188-
# => <#Concurrent::Edge::Future:0x7fc6211f7338 pending blocks:[]>
188+
# => <#Concurrent::Edge::Future:0x7fcc73032c98 pending blocks:[]>
189189
result = Concurrent.
190190
future { '%02d' }.
191191
then_select(ch1, ch2).
192192
then { |format, (value, channel)| format format, value }
193-
# => <#Concurrent::Edge::Future:0x7fc6211ec668 pending blocks:[]>
193+
# => <#Concurrent::Edge::Future:0x7fcc7302a4f8 pending blocks:[]>
194194
result.value! # => "02"
195195

196196

197197
### Common use-cases Examples
198198

199199
# simple background processing
200200
Concurrent.future { do_stuff }
201-
# => <#Concurrent::Edge::Future:0x7fc6211df170 pending blocks:[]>
201+
# => <#Concurrent::Edge::Future:0x7fcc72123c48 pending blocks:[]>
202202

203203
# parallel background processing
204204
jobs = 10.times.map { |i| Concurrent.future { i } }
@@ -215,7 +215,7 @@ def schedule_job
215215
end # => :schedule_job
216216

217217
schedule_job
218-
# => <#Concurrent::Edge::Future:0x7fc62119c140 pending blocks:[]>
218+
# => <#Concurrent::Edge::Future:0x7fcc75011370 pending blocks:[]>
219219
@end = true # => true
220220

221221

@@ -228,7 +228,7 @@ def schedule_job
228228
data[message]
229229
end
230230
end
231-
# => #<Concurrent::Actor::Reference:0x7fc62117f568 /db (Concurrent::Actor::Utils::AdHoc)>
231+
# => #<Concurrent::Actor::Reference:0x7fcc71832a08 /db (Concurrent::Actor::Utils::AdHoc)>
232232

233233
concurrent_jobs = 11.times.map do |v|
234234
Concurrent.
@@ -258,7 +258,7 @@ def schedule_job
258258
end
259259
end
260260
end
261-
# => #<Concurrent::Actor::Reference:0x7fc6218969f0 /DB-pool (Concurrent::Actor::Utils::Pool)>
261+
# => #<Concurrent::Actor::Reference:0x7fcc72320118 /DB-pool (Concurrent::Actor::Utils::Pool)>
262262

263263
concurrent_jobs = 11.times.map do |v|
264264
Concurrent.

lib/concurrent/actor/behaviour.rb

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,6 @@ module Actor
3737
#
3838
# > {include:Actor::Behaviour::Termination}
3939
#
40-
# - {Behaviour::TerminatesChildren}:
41-
#
42-
# > {include:Actor::Behaviour::TerminatesChildren}
43-
#
4440
# - {Behaviour::RemovesChild}:
4541
#
4642
# > {include:Actor::Behaviour::RemovesChild}
@@ -66,14 +62,12 @@ module Behaviour
6662
require 'concurrent/actor/behaviour/sets_results'
6763
require 'concurrent/actor/behaviour/supervising'
6864
require 'concurrent/actor/behaviour/termination'
69-
require 'concurrent/actor/behaviour/terminates_children'
7065

7166
# Array of behaviours and their construction parameters.
7267
#
7368
# [[Behaviour::SetResults, :terminate!],
7469
# [Behaviour::RemovesChild],
7570
# [Behaviour::Termination],
76-
# [Behaviour::TerminatesChildren],
7771
# [Behaviour::Linking],
7872
# [Behaviour::Awaits],
7973
# [Behaviour::ExecutesContext],
@@ -91,7 +85,6 @@ def self.basic_behaviour_definition
9185
# [[Behaviour::SetResults, :pause!],
9286
# [Behaviour::RemovesChild],
9387
# [Behaviour::Termination],
94-
# [Behaviour::TerminatesChildren],
9588
# [Behaviour::Linking],
9689
# [Behaviour::Pausing],
9790
# [Behaviour::Supervising, :reset!, :one_for_one],
@@ -113,8 +106,7 @@ def self.base(on_error)
113106
[[SetResults, on_error],
114107
# has to be before Termination to be able to remove children from terminated actor
115108
RemovesChild,
116-
Termination,
117-
TerminatesChildren]
109+
Termination]
118110
end
119111

120112
# @see '' its source code

lib/concurrent/actor/behaviour/terminates_children.rb

Lines changed: 0 additions & 14 deletions
This file was deleted.

0 commit comments

Comments
 (0)