Skip to content

Commit b304036

Browse files
committed
[Do not merge, RFC]Add dataflow!
With futures, you can call ```value!``` if you want exceptions to bubble up. It'd be rather nice if you could do the same thing with dataflow. This is the simplest possible solution, if you collectively think this is a worthwhile feature I'll polish it up, add tests, etc.
1 parent 572a377 commit b304036

File tree

1 file changed

+29
-0
lines changed

1 file changed

+29
-0
lines changed

lib/concurrent/dataflow.rb

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,4 +88,33 @@ def dataflow_with(executor, *inputs, &block)
8888
result
8989
end
9090
module_function :dataflow_with
91+
92+
def dataflow!(*inputs, &block)
93+
dataflow_with(Concurrent.configuration.global_task_pool, *inputs, &block)
94+
end
95+
module_function :dataflow!
96+
97+
def dataflow_with!(executor, *inputs, &block)
98+
raise ArgumentError.new('an executor must be provided') if executor.nil?
99+
raise ArgumentError.new('no block given') unless block_given?
100+
raise ArgumentError.new('not all dependencies are IVars') unless inputs.all? { |input| input.is_a? IVar }
101+
102+
result = Future.new(executor: executor) do
103+
values = inputs.map { |input| input.value! }
104+
block.call(*values)
105+
end
106+
107+
if inputs.empty?
108+
result.execute
109+
else
110+
counter = DependencyCounter.new(inputs.size) { result.execute }
111+
112+
inputs.each do |input|
113+
input.add_observer counter
114+
end
115+
end
116+
117+
result
118+
end
119+
module_function :dataflow_with!
91120
end

0 commit comments

Comments
 (0)