@@ -5,43 +5,77 @@ module Concurrent
5
5
describe 'dataflow' do
6
6
7
7
let ( :executor ) { ImmediateExecutor . new }
8
-
9
- before ( :each ) do
10
- Concurrent . configure do |config |
11
- config . global_task_pool = Concurrent ::PerThreadExecutor . new
12
- end
13
- end
8
+ let ( :root_executor ) { PerThreadExecutor . new }
14
9
15
10
it 'raises an exception when no block given' do
16
11
expect { Concurrent ::dataflow } . to raise_error ( ArgumentError )
12
+ expect { Concurrent ::dataflow_with ( root_executor ) } . to raise_error ( ArgumentError )
13
+ end
14
+
15
+ specify '#dataflow uses the global task pool' do
16
+ input = Future . execute { 0 }
17
+ Concurrent . should_receive ( :dataflow_with ) . once .
18
+ with ( Concurrent . configuration . global_task_pool , input )
19
+ Concurrent ::dataflow ( input ) { 0 }
20
+ end
21
+
22
+ specify '#dataflow_with uses the given executor' do
23
+ input = Future . execute { 0 }
24
+ result = Future . new { 0 }
25
+
26
+ Future . should_receive ( :new ) . with ( executor : root_executor ) . and_return ( result )
27
+ Concurrent ::dataflow_with ( root_executor , input ) { 0 }
28
+ end
29
+
30
+ specify '#dataflow_with raises an exception when no executor given' do
31
+ expect {
32
+ Concurrent ::dataflow_with ( nil ) { nil }
33
+ } . to raise_error ( ArgumentError )
17
34
end
18
35
19
36
it 'accepts zero or more dependencies' do
20
37
Concurrent ::dataflow ( ) { 0 }
21
38
Concurrent ::dataflow ( Future . execute { 0 } ) { 0 }
22
39
Concurrent ::dataflow ( Future . execute { 0 } , Future . execute { 0 } ) { 0 }
40
+
41
+ Concurrent ::dataflow_with ( root_executor , ) { 0 }
42
+ Concurrent ::dataflow_with ( root_executor , Future . execute { 0 } ) { 0 }
43
+ Concurrent ::dataflow_with ( root_executor , Future . execute { 0 } , Future . execute { 0 } ) { 0 }
23
44
end
24
45
25
46
it 'accepts uncompleted dependencies' do
26
47
d = Future . new ( executor : executor ) { 0 }
27
48
Concurrent ::dataflow ( d ) { 0 }
28
49
d . execute
50
+
51
+ d = Future . new ( executor : executor ) { 0 }
52
+ Concurrent ::dataflow_with ( root_executor , d ) { 0 }
53
+ d . execute
29
54
end
30
55
31
56
it 'accepts completed dependencies' do
32
57
d = Future . new ( executor : executor ) { 0 }
33
58
d . execute
34
59
Concurrent ::dataflow ( d ) { 0 }
60
+
61
+ d = Future . new ( executor : executor ) { 0 }
62
+ d . execute
63
+ Concurrent ::dataflow_with ( root_executor , d ) { 0 }
35
64
end
36
65
37
66
it 'raises an exception if any dependencies are not IVars' do
38
67
expect { Concurrent ::dataflow ( nil ) } . to raise_error ( ArgumentError )
39
68
expect { Concurrent ::dataflow ( Future . execute { 0 } , nil ) } . to raise_error ( ArgumentError )
40
69
expect { Concurrent ::dataflow ( nil , Future . execute { 0 } ) } . to raise_error ( ArgumentError )
70
+
71
+ expect { Concurrent ::dataflow_with ( root_executor , nil ) } . to raise_error ( ArgumentError )
72
+ expect { Concurrent ::dataflow_with ( root_executor , Future . execute { 0 } , nil ) } . to raise_error ( ArgumentError )
73
+ expect { Concurrent ::dataflow_with ( root_executor , nil , Future . execute { 0 } ) } . to raise_error ( ArgumentError )
41
74
end
42
75
43
76
it 'returns a Future' do
44
77
Concurrent ::dataflow { 0 } . should be_a ( Future )
78
+ Concurrent ::dataflow { 0 } . should be_a ( Future )
45
79
end
46
80
47
81
context 'does not schedule the Future' do
@@ -51,6 +85,11 @@ module Concurrent
51
85
f = Concurrent ::dataflow ( d ) { 0 }
52
86
f . should be_unscheduled
53
87
d . execute
88
+
89
+ d = Future . new ( executor : executor ) { 0 }
90
+ f = Concurrent ::dataflow_with ( root_executor , d ) { 0 }
91
+ f . should be_unscheduled
92
+ d . execute
54
93
end
55
94
56
95
specify 'if one dependency of two is completed' do
@@ -60,6 +99,13 @@ module Concurrent
60
99
d1 . execute
61
100
f . should be_unscheduled
62
101
d2 . execute
102
+
103
+ d1 = Future . new ( executor : executor ) { 0 }
104
+ d2 = Future . new ( executor : executor ) { 0 }
105
+ f = Concurrent ::dataflow_with ( root_executor , d1 , d2 ) { 0 }
106
+ d1 . execute
107
+ f . should be_unscheduled
108
+ d2 . execute
63
109
end
64
110
end
65
111
@@ -70,6 +116,11 @@ module Concurrent
70
116
f = Concurrent ::dataflow ( d ) { 0 }
71
117
d . execute
72
118
f . value . should eq 0
119
+
120
+ d = Future . new ( executor : executor ) { 0 }
121
+ f = Concurrent ::dataflow_with ( root_executor , d ) { 0 }
122
+ d . execute
123
+ f . value . should eq 0
73
124
end
74
125
75
126
specify 'if there is more than one' do
@@ -79,6 +130,13 @@ module Concurrent
79
130
d1 . execute
80
131
d2 . execute
81
132
f . value . should eq 0
133
+
134
+ d1 = Future . new ( executor : executor ) { 0 }
135
+ d2 = Future . new ( executor : executor ) { 0 }
136
+ f = Concurrent ::dataflow_with ( root_executor , d1 , d2 ) { 0 }
137
+ d1 . execute
138
+ d2 . execute
139
+ f . value . should eq 0
82
140
end
83
141
end
84
142
@@ -89,6 +147,11 @@ module Concurrent
89
147
d . execute
90
148
f = Concurrent ::dataflow ( d ) { 0 }
91
149
f . value . should eq 0
150
+
151
+ d = Future . new ( executor : executor ) { 0 }
152
+ d . execute
153
+ f = Concurrent ::dataflow_with ( root_executor , d ) { 0 }
154
+ f . value . should eq 0
92
155
end
93
156
94
157
specify 'if there is more than one' do
@@ -98,26 +161,41 @@ module Concurrent
98
161
d2 . execute
99
162
f = Concurrent ::dataflow ( d1 , d2 ) { 0 }
100
163
f . value . should eq 0
164
+
165
+ d1 = Future . new ( executor : executor ) { 0 }
166
+ d2 = Future . new ( executor : executor ) { 0 }
167
+ d1 . execute
168
+ d2 . execute
169
+ f = Concurrent ::dataflow_with ( root_executor , d1 , d2 ) { 0 }
170
+ f . value . should eq 0
101
171
end
102
172
end
103
173
104
174
context 'passes the values of dependencies into the block' do
105
175
106
176
specify 'if there is just one' do
107
177
d = Future . new ( executor : executor ) { 14 }
108
- f = Concurrent ::dataflow ( d ) do |v |
109
- v
110
- end
178
+ f = Concurrent ::dataflow ( d ) { |v | v }
179
+ d . execute
180
+ f . value . should eq 14
181
+
182
+ d = Future . new ( executor : executor ) { 14 }
183
+ f = Concurrent ::dataflow_with ( root_executor , d ) { |v | v }
111
184
d . execute
112
185
f . value . should eq 14
113
186
end
114
187
115
188
specify 'if there is more than one' do
116
189
d1 = Future . new ( executor : executor ) { 14 }
117
190
d2 = Future . new ( executor : executor ) { 2 }
118
- f = Concurrent ::dataflow ( d1 , d2 ) do |v1 , v2 |
119
- v1 + v2
120
- end
191
+ f = Concurrent ::dataflow ( d1 , d2 ) { |v1 , v2 | v1 + v2 }
192
+ d1 . execute
193
+ d2 . execute
194
+ f . value . should eq 16
195
+
196
+ d1 = Future . new ( executor : executor ) { 14 }
197
+ d2 = Future . new ( executor : executor ) { 2 }
198
+ f = Concurrent ::dataflow_with ( root_executor , d1 , d2 ) { |v1 , v2 | v1 + v2 }
121
199
d1 . execute
122
200
d2 . execute
123
201
f . value . should eq 16
@@ -126,15 +204,15 @@ module Concurrent
126
204
127
205
context 'module function' do
128
206
129
- it 'can be called as Concurrent.dataflow' do
207
+ it 'can be called as Concurrent.dataflow and Concurrent.dataflow_with ' do
130
208
131
209
def fib_with_dot ( n )
132
210
if n < 2
133
211
Concurrent . dataflow { n }
134
212
else
135
213
n1 = fib_with_dot ( n - 1 )
136
214
n2 = fib_with_dot ( n - 2 )
137
- Concurrent . dataflow ( n1 , n2 ) { n1 . value + n2 . value }
215
+ Concurrent . dataflow_with ( root_executor , n1 , n2 ) { n1 . value + n2 . value }
138
216
end
139
217
end
140
218
@@ -143,15 +221,15 @@ def fib_with_dot(n)
143
221
expected . value . should eq 377
144
222
end
145
223
146
- it 'can be called as Concurrent::dataflow' do
224
+ it 'can be called as Concurrent::dataflow and Concurrent::dataflow_with ' do
147
225
148
226
def fib_with_colons ( n )
149
227
if n < 2
150
228
Concurrent ::dataflow { n }
151
229
else
152
230
n1 = fib_with_colons ( n - 1 )
153
231
n2 = fib_with_colons ( n - 2 )
154
- Concurrent ::dataflow ( n1 , n2 ) { n1 . value + n2 . value }
232
+ Concurrent ::dataflow_with ( root_executor , n1 , n2 ) { n1 . value + n2 . value }
155
233
end
156
234
end
157
235
0 commit comments