Skip to content

Commit f51eddb

Browse files
committed
Merge pull request #131 from mastfish/patch-1
Add dataflow! to match value!
2 parents 4704461 + dc30f01 commit f51eddb

File tree

2 files changed

+33
-19
lines changed

2 files changed

+33
-19
lines changed

lib/concurrent/dataflow.rb

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,12 +66,29 @@ def dataflow(*inputs, &block)
6666
module_function :dataflow
6767

6868
def dataflow_with(executor, *inputs, &block)
69+
call_dataflow(:value, executor, *inputs, &block)
70+
end
71+
module_function :dataflow_with
72+
73+
def dataflow!(*inputs, &block)
74+
dataflow_with!(Concurrent.configuration.global_task_pool, *inputs, &block)
75+
end
76+
module_function :dataflow!
77+
78+
def dataflow_with!(executor, *inputs, &block)
79+
call_dataflow(:value!, executor, *inputs, &block)
80+
end
81+
module_function :dataflow_with!
82+
83+
private
84+
85+
def call_dataflow(method, executor, *inputs, &block)
6986
raise ArgumentError.new('an executor must be provided') if executor.nil?
7087
raise ArgumentError.new('no block given') unless block_given?
7188
raise ArgumentError.new('not all dependencies are IVars') unless inputs.all? { |input| input.is_a? IVar }
7289

7390
result = Future.new(executor: executor) do
74-
values = inputs.map { |input| input.value }
91+
values = inputs.map { |input| input.send(method) }
7592
block.call(*values)
7693
end
7794

@@ -87,5 +104,5 @@ def dataflow_with(executor, *inputs, &block)
87104

88105
result
89106
end
90-
module_function :dataflow_with
107+
module_function :call_dataflow
91108
end

spec/concurrent/dataflow_spec.rb

Lines changed: 14 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,19 @@ module Concurrent
7373
expect { Concurrent::dataflow_with(root_executor, nil, Future.execute{0}) }.to raise_error(ArgumentError)
7474
end
7575

76+
it 'doesn\'t raises exceptions from dependencies, unless called with !' do
77+
78+
d1 = Concurrent::dataflow(){raise}
79+
d2 = Concurrent::dataflow(){raise}
80+
f = Concurrent::dataflow!(d1, d2){|d1v, d2v| [d1v,d2v]}
81+
expect{f.value!}.to raise_error
82+
83+
d1 = Concurrent::dataflow(){raise}
84+
d2 = Concurrent::dataflow(){raise}
85+
f = Concurrent::dataflow(d1, d2){|d1v, d2v| [d1v,d2v]}
86+
expect{f.value!}.to_not raise_error
87+
end
88+
7689
it 'returns a Future' do
7790
expect(Concurrent::dataflow{0}).to be_a(Future)
7891
expect(Concurrent::dataflow{0}).to be_a(Future)
@@ -220,23 +233,7 @@ def fib_with_dot(n)
220233
sleep(0.1)
221234
expect(expected.value).to eq 377
222235
end
223-
224-
it 'can be called as Concurrent::dataflow and Concurrent::dataflow_with' do
225-
226-
def fib_with_colons(n)
227-
if n < 2
228-
Concurrent::dataflow { n }
229-
else
230-
n1 = fib_with_colons(n - 1)
231-
n2 = fib_with_colons(n - 2)
232-
Concurrent::dataflow_with(root_executor, n1, n2) { n1.value + n2.value }
233-
end
234-
end
235-
236-
expected = fib_with_colons(14)
237-
sleep(0.1)
238-
expect(expected.value).to eq 377
239-
end
236+
240237
end
241238
end
242239
end

0 commit comments

Comments
 (0)