@@ -66,26 +66,7 @@ def dataflow(*inputs, &block)
66
66
module_function :dataflow
67
67
68
68
def dataflow_with ( executor , *inputs , &block )
69
- raise ArgumentError . new ( 'an executor must be provided' ) if executor . nil?
70
- raise ArgumentError . new ( 'no block given' ) unless block_given?
71
- raise ArgumentError . new ( 'not all dependencies are IVars' ) unless inputs . all? { |input | input . is_a? IVar }
72
-
73
- result = Future . new ( executor : executor ) do
74
- values = inputs . map { |input | input . value }
75
- block . call ( *values )
76
- end
77
-
78
- if inputs . empty?
79
- result . execute
80
- else
81
- counter = DependencyCounter . new ( inputs . size ) { result . execute }
82
-
83
- inputs . each do |input |
84
- input . add_observer counter
85
- end
86
- end
87
-
88
- result
69
+ call_dataflow ( :value , executor , *inputs , &block )
89
70
end
90
71
module_function :dataflow_with
91
72
@@ -95,12 +76,19 @@ def dataflow!(*inputs, &block)
95
76
module_function :dataflow!
96
77
97
78
def dataflow_with! ( executor , *inputs , &block )
79
+ call_dataflow ( :value! , executor , *inputs , &block )
80
+ end
81
+ module_function :dataflow_with!
82
+
83
+ private
84
+
85
+ def call_dataflow ( method , executor , *inputs , &block )
98
86
raise ArgumentError . new ( 'an executor must be provided' ) if executor . nil?
99
87
raise ArgumentError . new ( 'no block given' ) unless block_given?
100
88
raise ArgumentError . new ( 'not all dependencies are IVars' ) unless inputs . all? { |input | input . is_a? IVar }
101
89
102
90
result = Future . new ( executor : executor ) do
103
- values = inputs . map { |input | input . value! }
91
+ values = inputs . map { |input | input . send ( method ) }
104
92
block . call ( *values )
105
93
end
106
94
@@ -116,5 +104,5 @@ def dataflow_with!(executor, *inputs, &block)
116
104
117
105
result
118
106
end
119
- module_function :dataflow_with!
107
+ module_function :call_dataflow
120
108
end
0 commit comments