Skip to content

Commit f83fccc

Browse files
committed
Merge pull request #13 from chrisseaton/dataflow
Demonstration of dataflow.
2 parents bf2615d + 3d278c1 commit f83fccc

File tree

5 files changed

+408
-0
lines changed

5 files changed

+408
-0
lines changed

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ The design goals of this gem are:
3333
* Actor variant [Channel](https://github.com/jdantonio/concurrent-ruby/blob/master/md/channel.md)
3434
loosely based on the [MailboxProcessor](http://blogs.msdn.com/b/dsyme/archive/2010/02/15/async-and-parallel-design-patterns-in-f-part-3-agents.aspx)
3535
agent in [F#](http://msdn.microsoft.com/en-us/library/ee370357.aspx)
36+
* [Dataflow](https://github.com/jdantonio/concurrent-ruby/blob/master/md/dataflow.md) loosely based on the syntax of Akka and Habanero Java
3637

3738
### Semantic Versioning
3839

@@ -111,6 +112,7 @@ These tools will help ease the burden, but at the end of the day it is essential
111112
* [Chip Miller](https://github.com/chip-miller)
112113
* [Jamie Hodge](https://github.com/jamiehodge)
113114
* [Zander Hill](https://github.com/zph)
115+
* [Chris Seaton](https://github.com/chrisseaton)
114116

115117
## Contributing
116118

lib/concurrent.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
require 'concurrent/agent'
1010
require 'concurrent/contract'
1111
require 'concurrent/channel'
12+
require 'concurrent/dataflow'
1213
require 'concurrent/dereferenceable'
1314
require 'concurrent/event'
1415
require 'concurrent/future'

lib/concurrent/dataflow.rb

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
require 'concurrent/future'
2+
3+
module Concurrent
4+
5+
module Dataflow
6+
7+
class AtomicCounter
8+
9+
def initialize(init)
10+
@counter = init
11+
@mutex = Mutex.new
12+
end
13+
14+
def decrement
15+
@mutex.synchronize do
16+
@counter -= 1
17+
end
18+
end
19+
20+
end
21+
22+
class DependencyCounter
23+
24+
def initialize(count, &block)
25+
@counter = AtomicCounter.new(count)
26+
@block = block
27+
end
28+
29+
def update(time, value, reason)
30+
if @counter.decrement == 0
31+
@block.call()
32+
end
33+
end
34+
35+
end
36+
37+
def self.dataflow(*inputs, &block)
38+
raise ArgumentError.new('no block given') unless block_given?
39+
raise ArgumentError.new('not all dependencies are Futures') unless inputs.all? { |input| input.is_a? Future }
40+
41+
result = Concurrent::Future.new do
42+
values = inputs.map { |input| input.value }
43+
block.call(*values)
44+
end
45+
46+
if inputs.empty?
47+
result.execute
48+
else
49+
counter = Dataflow::DependencyCounter.new(inputs.size) { result.execute }
50+
51+
inputs.each do |input|
52+
input.add_observer counter
53+
end
54+
end
55+
56+
result
57+
end
58+
59+
end
60+
61+
def self.dataflow(*inputs, &block)
62+
Dataflow::dataflow(*inputs, &block)
63+
end
64+
65+
end

md/dataflow.md

Lines changed: 195 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,195 @@
1+
# Dataflow
2+
3+
Dataflow allows you to create a task that will be scheduled then all of its data
4+
dependencies are available. Data dependencies are `Future` values. The dataflow
5+
task itself is also a `Future` value, so you can build up a graph of these
6+
tasks, each of which is run when all the data and other tasks it depends on are
7+
available or completed.
8+
9+
Our syntax is somewhat related to that of Akka's `flow` and Habanero Java's
10+
`DataDrivenFuture`. However unlike Akka we don't schedule a task at all until it
11+
is ready to run, and unlike Habanero Java we pass the data values into the task
12+
instead of dereferencing them again in the task.
13+
14+
The theory of dataflow goes back to the 80s. In the terminology of the
15+
literature, our implementation is coarse-grained, in that each task can be many
16+
instructions, and dynamic in that you can create more tasks within other tasks.
17+
18+
## Example
19+
20+
A dataflow task is created with the `dataflow` method, passing in a block.
21+
22+
```ruby
23+
task = Concurrent::dataflow { 14 }
24+
```
25+
26+
This produces a simple `Future` value. The task will run immediately, as it has
27+
no dependencies. We can also specify `Future` values that must be available
28+
before a task will run. When we do this we get the value of those futures passed
29+
to our block.
30+
31+
```ruby
32+
a = Concurrent::dataflow { 1 }
33+
b = Concurrent::dataflow { 2 }
34+
c = Concurrent::dataflow(a, b) { |av, bv| av + bv }
35+
```
36+
37+
Using the `dataflow` method you can build up a directed acyclic graph (DAG) of
38+
tasks that depend on each other, and have the tasks run as soon as their
39+
dependencies are ready and there is CPU capacity to schedule them. This can help
40+
you create a program that uses more of the CPU resources available to you.
41+
42+
## Derivation
43+
44+
This section describes how we could derive dataflow from other primitives in
45+
this library.
46+
47+
Consider a naive fibonacci calculator.
48+
49+
```ruby
50+
def fib(n)
51+
if n < 2
52+
n
53+
else
54+
fib(n - 1) + fib(n - 2)
55+
end
56+
end
57+
58+
puts fib(14)
59+
```
60+
61+
We could modify this to use futures.
62+
63+
```ruby
64+
def fib(n)
65+
if n < 2
66+
Concurrent::Future.new { n }
67+
else
68+
n1 = fib(n - 1).execute
69+
n2 = fib(n - 2).execute
70+
Concurrent::Future.new { n1.value + n2.value }
71+
end
72+
end
73+
74+
f = fib(14)
75+
f.execute
76+
sleep(0.5)
77+
puts f.value
78+
```
79+
80+
One of the drawbacks of this approach is that all the futures start, and then
81+
most of them immediately block on their dependencies. We know that there's no
82+
point executing those futures until their dependencies are ready, so let's
83+
not execute each future until all their dependencies are ready.
84+
85+
To do this we'll create an object that counts the number of times it observes a
86+
future finishing before it does something - and for us that something will be to
87+
execute the next future.
88+
89+
```ruby
90+
class CountingObserver
91+
92+
def initialize(count, &block)
93+
@count = count
94+
@block = block
95+
end
96+
97+
def update(time, value, reason)
98+
@count -= 1
99+
100+
if @count <= 0
101+
@block.call()
102+
end
103+
end
104+
105+
end
106+
107+
def fib(n)
108+
if n < 2
109+
Concurrent::Future.new { n }.execute
110+
else
111+
n1 = fib(n - 1)
112+
n2 = fib(n - 2)
113+
114+
result = Concurrent::Future.new { n1.value + n2.value }
115+
116+
barrier = CountingObserver.new(2) { result.execute }
117+
n1.add_observer barrier
118+
n2.add_observer barrier
119+
120+
n1.execute
121+
n2.execute
122+
123+
result
124+
end
125+
end
126+
```
127+
128+
We can wrap this up in a dataflow utility.
129+
130+
```ruby
131+
f = fib(14)
132+
sleep(0.5)
133+
puts f.value
134+
135+
def dataflow(*inputs, &block)
136+
result = Concurrent::Future.new(&block)
137+
138+
if inputs.empty?
139+
result.execute
140+
else
141+
barrier = CountingObserver.new(inputs.size) { result.execute }
142+
143+
inputs.each do |input|
144+
input.add_observer barrier
145+
end
146+
end
147+
148+
result
149+
end
150+
151+
def fib(n)
152+
if n < 2
153+
dataflow { n }
154+
else
155+
n1 = fib(n - 1)
156+
n2 = fib(n - 2)
157+
dataflow(n1, n2) { n1.value + n2.value }
158+
end
159+
end
160+
161+
f = fib(14)
162+
sleep(0.5)
163+
puts f.value
164+
```
165+
166+
Since we know that the futures the dataflow computation depends on are already
167+
going to be available when the future is executed, we might as well pass the
168+
values into the block so we don't have to reference the futures inside the
169+
block. This allows us to write the dataflow block as straight non-concurrent
170+
code without reference to futures.
171+
172+
```ruby
173+
def dataflow(*inputs, &block)
174+
result = Concurrent::Future.new do
175+
values = inputs.map { |input| input.value }
176+
block.call(*values)
177+
end
178+
179+
if inputs.empty?
180+
result.execute
181+
else
182+
barrier = Concurrent::CountingObserver.new(inputs.size) { result.execute }
183+
184+
inputs.each do |input|
185+
input.add_observer barrier
186+
end
187+
end
188+
189+
result
190+
end
191+
192+
f = fib(14)
193+
sleep(0.5)
194+
puts f.value
195+
```

0 commit comments

Comments
 (0)