Skip to content

Commit 725314f

Browse files
committed
Removed Dataflow class and put #dataflow method directly on Concurrent module.
1 parent f83fccc commit 725314f

File tree

4 files changed

+167
-156
lines changed

4 files changed

+167
-156
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,12 +107,12 @@ These tools will help ease the burden, but at the end of the day it is essential
107107
## Contributors
108108

109109
* [Michele Della Torre](https://github.com/mighe)
110+
* [Chris Seaton](https://github.com/chrisseaton)
110111
* [Giuseppe Capizzi](https://github.com/gcapizzi)
111112
* [Brian Shirai](https://github.com/brixen)
112113
* [Chip Miller](https://github.com/chip-miller)
113114
* [Jamie Hodge](https://github.com/jamiehodge)
114115
* [Zander Hill](https://github.com/zph)
115-
* [Chris Seaton](https://github.com/chrisseaton)
116116

117117
## Contributing
118118

lib/concurrent/dataflow.rb

Lines changed: 33 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -2,64 +2,55 @@
22

33
module Concurrent
44

5-
module Dataflow
5+
class AtomicCounter
66

7-
class AtomicCounter
8-
9-
def initialize(init)
10-
@counter = init
11-
@mutex = Mutex.new
12-
end
7+
def initialize(init)
8+
@counter = init
9+
@mutex = Mutex.new
10+
end
1311

14-
def decrement
15-
@mutex.synchronize do
16-
@counter -= 1
17-
end
12+
def decrement
13+
@mutex.synchronize do
14+
@counter -= 1
1815
end
19-
2016
end
17+
end
2118

22-
class DependencyCounter
19+
class DependencyCounter
2320

24-
def initialize(count, &block)
25-
@counter = AtomicCounter.new(count)
26-
@block = block
27-
end
21+
def initialize(count, &block)
22+
@counter = AtomicCounter.new(count)
23+
@block = block
24+
end
2825

29-
def update(time, value, reason)
30-
if @counter.decrement == 0
31-
@block.call()
32-
end
26+
def update(time, value, reason)
27+
if @counter.decrement == 0
28+
@block.call()
3329
end
34-
3530
end
31+
end
3632

37-
def self.dataflow(*inputs, &block)
38-
raise ArgumentError.new('no block given') unless block_given?
39-
raise ArgumentError.new('not all dependencies are Futures') unless inputs.all? { |input| input.is_a? Future }
33+
def dataflow(*inputs, &block)
34+
raise ArgumentError.new('no block given') unless block_given?
35+
raise ArgumentError.new('not all dependencies are Futures') unless inputs.all? { |input| input.is_a? Future }
4036

41-
result = Concurrent::Future.new do
42-
values = inputs.map { |input| input.value }
43-
block.call(*values)
44-
end
37+
result = Future.new do
38+
values = inputs.map { |input| input.value }
39+
block.call(*values)
40+
end
4541

46-
if inputs.empty?
47-
result.execute
48-
else
49-
counter = Dataflow::DependencyCounter.new(inputs.size) { result.execute }
42+
if inputs.empty?
43+
result.execute
44+
else
45+
counter = DependencyCounter.new(inputs.size) { result.execute }
5046

51-
inputs.each do |input|
52-
input.add_observer counter
53-
end
47+
inputs.each do |input|
48+
input.add_observer counter
5449
end
55-
56-
result
5750
end
5851

52+
result
5953
end
6054

61-
def self.dataflow(*inputs, &block)
62-
Dataflow::dataflow(*inputs, &block)
63-
end
64-
55+
module_function :dataflow
6556
end

md/dataflow.md

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ def fib(n)
5555
end
5656
end
5757

58-
puts fib(14)
58+
puts fib(14) #=> 377
5959
```
6060

6161
We could modify this to use futures.
@@ -71,10 +71,12 @@ def fib(n)
7171
end
7272
end
7373

74-
f = fib(14)
75-
f.execute
74+
f = fib(14) #=> #<Concurrent::Future:0x000001019ef5a0 ...
75+
f.execute #=> #<Concurrent::Future:0x000001019ef5a0 ...
76+
7677
sleep(0.5)
77-
puts f.value
78+
79+
puts f.value #=> 377
7880
```
7981

8082
One of the drawbacks of this approach is that all the futures start, and then
@@ -128,9 +130,10 @@ end
128130
We can wrap this up in a dataflow utility.
129131

130132
```ruby
131-
f = fib(14)
133+
f = fib(14) #=> #<Concurrent::Future:0x00000101fca308 ...
132134
sleep(0.5)
133-
puts f.value
135+
136+
puts f.value #=> 377
134137

135138
def dataflow(*inputs, &block)
136139
result = Concurrent::Future.new(&block)
@@ -158,9 +161,10 @@ def fib(n)
158161
end
159162
end
160163

161-
f = fib(14)
164+
f = fib(14) #=> #<Concurrent::Future:0x00000101fca308 ...
162165
sleep(0.5)
163-
puts f.value
166+
167+
puts f.value #=> 377
164168
```
165169

166170
Since we know that the futures the dataflow computation depends on are already
@@ -179,7 +183,7 @@ def dataflow(*inputs, &block)
179183
if inputs.empty?
180184
result.execute
181185
else
182-
barrier = Concurrent::CountingObserver.new(inputs.size) { result.execute }
186+
barrier = CountingObserver.new(inputs.size) { result.execute }
183187

184188
inputs.each do |input|
185189
input.add_observer barrier
@@ -189,7 +193,8 @@ def dataflow(*inputs, &block)
189193
result
190194
end
191195

192-
f = fib(14)
196+
f = fib(14) #=> #<Concurrent::Future:0x000001019a26d8 ...
193197
sleep(0.5)
194-
puts f.value
198+
199+
puts f.value #=> 377
195200
```

0 commit comments

Comments
 (0)