Skip to content

Commit 1d3ac97

Browse files
committed
Added Concurrent::dataflow_with for explicitly setting the executor.
1 parent 297758e commit 1d3ac97

File tree

2 files changed

+102
-19
lines changed

2 files changed

+102
-19
lines changed

lib/concurrent/dataflow.rb

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,10 +61,16 @@ def update(time, value, reason)
6161
# @raise [ArgumentError] if no block is given
6262
# @raise [ArgumentError] if any of the inputs are not +IVar+s
6363
def dataflow(*inputs, &block)
64+
dataflow_with(Concurrent.configuration.global_task_pool, *inputs, &block)
65+
end
66+
module_function :dataflow
67+
68+
def dataflow_with(executor, *inputs, &block)
69+
raise ArgumentError.new('an executor must be provided') if executor.nil?
6470
raise ArgumentError.new('no block given') unless block_given?
6571
raise ArgumentError.new('not all dependencies are IVars') unless inputs.all? { |input| input.is_a? IVar }
6672

67-
result = Future.new do
73+
result = Future.new(executor: executor) do
6874
values = inputs.map { |input| input.value }
6975
block.call(*values)
7076
end
@@ -81,6 +87,5 @@ def dataflow(*inputs, &block)
8187

8288
result
8389
end
84-
85-
module_function :dataflow
90+
module_function :dataflow_with
8691
end

spec/concurrent/dataflow_spec.rb

Lines changed: 94 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -5,43 +5,77 @@ module Concurrent
55
describe 'dataflow' do
66

77
let(:executor) { ImmediateExecutor.new }
8-
9-
before(:each) do
10-
Concurrent.configure do |config|
11-
config.global_task_pool = Concurrent::PerThreadExecutor.new
12-
end
13-
end
8+
let(:root_executor) { PerThreadExecutor.new }
149

1510
it 'raises an exception when no block given' do
1611
expect { Concurrent::dataflow }.to raise_error(ArgumentError)
12+
expect { Concurrent::dataflow_with(root_executor) }.to raise_error(ArgumentError)
13+
end
14+
15+
specify '#dataflow uses the global task pool' do
16+
input = Future.execute{0}
17+
Concurrent.should_receive(:dataflow_with).once.
18+
with(Concurrent.configuration.global_task_pool, input)
19+
Concurrent::dataflow(input){0}
20+
end
21+
22+
specify '#dataflow_with uses the given executor' do
23+
input = Future.execute{0}
24+
result = Future.new{0}
25+
26+
Future.should_receive(:new).with(executor: root_executor).and_return(result)
27+
Concurrent::dataflow_with(root_executor, input){0}
28+
end
29+
30+
specify '#dataflow_with raises an exception when no executor given' do
31+
expect {
32+
Concurrent::dataflow_with(nil){ nil }
33+
}.to raise_error(ArgumentError)
1734
end
1835

1936
it 'accepts zero or more dependencies' do
2037
Concurrent::dataflow(){0}
2138
Concurrent::dataflow(Future.execute{0}){0}
2239
Concurrent::dataflow(Future.execute{0}, Future.execute{0}){0}
40+
41+
Concurrent::dataflow_with(root_executor, ){0}
42+
Concurrent::dataflow_with(root_executor, Future.execute{0}){0}
43+
Concurrent::dataflow_with(root_executor, Future.execute{0}, Future.execute{0}){0}
2344
end
2445

2546
it 'accepts uncompleted dependencies' do
2647
d = Future.new(executor: executor){0}
2748
Concurrent::dataflow(d){0}
2849
d.execute
50+
51+
d = Future.new(executor: executor){0}
52+
Concurrent::dataflow_with(root_executor, d){0}
53+
d.execute
2954
end
3055

3156
it 'accepts completed dependencies' do
3257
d = Future.new(executor: executor){0}
3358
d.execute
3459
Concurrent::dataflow(d){0}
60+
61+
d = Future.new(executor: executor){0}
62+
d.execute
63+
Concurrent::dataflow_with(root_executor, d){0}
3564
end
3665

3766
it 'raises an exception if any dependencies are not IVars' do
3867
expect { Concurrent::dataflow(nil) }.to raise_error(ArgumentError)
3968
expect { Concurrent::dataflow(Future.execute{0}, nil) }.to raise_error(ArgumentError)
4069
expect { Concurrent::dataflow(nil, Future.execute{0}) }.to raise_error(ArgumentError)
70+
71+
expect { Concurrent::dataflow_with(root_executor, nil) }.to raise_error(ArgumentError)
72+
expect { Concurrent::dataflow_with(root_executor, Future.execute{0}, nil) }.to raise_error(ArgumentError)
73+
expect { Concurrent::dataflow_with(root_executor, nil, Future.execute{0}) }.to raise_error(ArgumentError)
4174
end
4275

4376
it 'returns a Future' do
4477
Concurrent::dataflow{0}.should be_a(Future)
78+
Concurrent::dataflow{0}.should be_a(Future)
4579
end
4680

4781
context 'does not schedule the Future' do
@@ -51,6 +85,11 @@ module Concurrent
5185
f = Concurrent::dataflow(d){0}
5286
f.should be_unscheduled
5387
d.execute
88+
89+
d = Future.new(executor: executor){0}
90+
f = Concurrent::dataflow_with(root_executor, d){0}
91+
f.should be_unscheduled
92+
d.execute
5493
end
5594

5695
specify 'if one dependency of two is completed' do
@@ -60,6 +99,13 @@ module Concurrent
6099
d1.execute
61100
f.should be_unscheduled
62101
d2.execute
102+
103+
d1 = Future.new(executor: executor){0}
104+
d2 = Future.new(executor: executor){0}
105+
f = Concurrent::dataflow_with(root_executor, d1, d2){0}
106+
d1.execute
107+
f.should be_unscheduled
108+
d2.execute
63109
end
64110
end
65111

@@ -70,6 +116,11 @@ module Concurrent
70116
f = Concurrent::dataflow(d){0}
71117
d.execute
72118
f.value.should eq 0
119+
120+
d = Future.new(executor: executor){0}
121+
f = Concurrent::dataflow_with(root_executor, d){0}
122+
d.execute
123+
f.value.should eq 0
73124
end
74125

75126
specify 'if there is more than one' do
@@ -79,6 +130,13 @@ module Concurrent
79130
d1.execute
80131
d2.execute
81132
f.value.should eq 0
133+
134+
d1 = Future.new(executor: executor){0}
135+
d2 = Future.new(executor: executor){0}
136+
f = Concurrent::dataflow_with(root_executor, d1, d2){0}
137+
d1.execute
138+
d2.execute
139+
f.value.should eq 0
82140
end
83141
end
84142

@@ -89,6 +147,11 @@ module Concurrent
89147
d.execute
90148
f = Concurrent::dataflow(d){0}
91149
f.value.should eq 0
150+
151+
d = Future.new(executor: executor){0}
152+
d.execute
153+
f = Concurrent::dataflow_with(root_executor, d){0}
154+
f.value.should eq 0
92155
end
93156

94157
specify 'if there is more than one' do
@@ -98,26 +161,41 @@ module Concurrent
98161
d2.execute
99162
f = Concurrent::dataflow(d1, d2){0}
100163
f.value.should eq 0
164+
165+
d1 = Future.new(executor: executor){0}
166+
d2 = Future.new(executor: executor){0}
167+
d1.execute
168+
d2.execute
169+
f = Concurrent::dataflow_with(root_executor, d1, d2){0}
170+
f.value.should eq 0
101171
end
102172
end
103173

104174
context 'passes the values of dependencies into the block' do
105175

106176
specify 'if there is just one' do
107177
d = Future.new(executor: executor){14}
108-
f = Concurrent::dataflow(d) do |v|
109-
v
110-
end
178+
f = Concurrent::dataflow(d){|v| v }
179+
d.execute
180+
f.value.should eq 14
181+
182+
d = Future.new(executor: executor){14}
183+
f = Concurrent::dataflow_with(root_executor, d){|v| v }
111184
d.execute
112185
f.value.should eq 14
113186
end
114187

115188
specify 'if there is more than one' do
116189
d1 = Future.new(executor: executor){14}
117190
d2 = Future.new(executor: executor){2}
118-
f = Concurrent::dataflow(d1, d2) do |v1, v2|
119-
v1 + v2
120-
end
191+
f = Concurrent::dataflow(d1, d2) {|v1, v2| v1 + v2}
192+
d1.execute
193+
d2.execute
194+
f.value.should eq 16
195+
196+
d1 = Future.new(executor: executor){14}
197+
d2 = Future.new(executor: executor){2}
198+
f = Concurrent::dataflow_with(root_executor, d1, d2) {|v1, v2| v1 + v2}
121199
d1.execute
122200
d2.execute
123201
f.value.should eq 16
@@ -126,15 +204,15 @@ module Concurrent
126204

127205
context 'module function' do
128206

129-
it 'can be called as Concurrent.dataflow' do
207+
it 'can be called as Concurrent.dataflow and Concurrent.dataflow_with' do
130208

131209
def fib_with_dot(n)
132210
if n < 2
133211
Concurrent.dataflow { n }
134212
else
135213
n1 = fib_with_dot(n - 1)
136214
n2 = fib_with_dot(n - 2)
137-
Concurrent.dataflow(n1, n2) { n1.value + n2.value }
215+
Concurrent.dataflow_with(root_executor, n1, n2) { n1.value + n2.value }
138216
end
139217
end
140218

@@ -143,15 +221,15 @@ def fib_with_dot(n)
143221
expected.value.should eq 377
144222
end
145223

146-
it 'can be called as Concurrent::dataflow' do
224+
it 'can be called as Concurrent::dataflow and Concurrent::dataflow_with' do
147225

148226
def fib_with_colons(n)
149227
if n < 2
150228
Concurrent::dataflow { n }
151229
else
152230
n1 = fib_with_colons(n - 1)
153231
n2 = fib_with_colons(n - 2)
154-
Concurrent::dataflow(n1, n2) { n1.value + n2.value }
232+
Concurrent::dataflow_with(root_executor, n1, n2) { n1.value + n2.value }
155233
end
156234
end
157235

0 commit comments

Comments
 (0)