Skip to content

Commit c9dd4e5

Browse files
authored
Unittests concurrency (#8666)
Python Unit Tests for CSP * Simple Channel Send and Receive test * Daisy Chain test with 100 channels/Go ops
1 parent 9e1ec8c commit c9dd4e5

File tree

3 files changed

+143
-67
lines changed

3 files changed

+143
-67
lines changed

python/paddle/fluid/concurrency.py

Lines changed: 43 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,11 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
# TODO: Variables: make_channel
16-
# TODO: Operators: send, close_channel, recv, go, select
1715
from layers.control_flow import BlockGuard
18-
from layer_helper import LayerHelper
16+
from layer_helper import LayerHelper, unique_name
17+
from layers import fill_constant
1918
import core
19+
2020
__all__ = [
2121
'Go',
2222
'make_channel',
@@ -46,27 +46,35 @@ def construct_go_op(self):
4646
parent_block = main_program.block(main_program.current_block()
4747
.parent_idx)
4848

49+
inner_outputs = set()
4950
x_name_list = set()
50-
out_vars = []
5151
for op in go_block.ops:
5252
# Iterate over all operators, get all the inputs
5353
# and add as input to the Go operator.
5454
for iname in op.input_names:
5555
for in_var_name in op.input(iname):
56-
x_name_list.add(in_var_name)
56+
if in_var_name not in inner_outputs:
57+
x_name_list.add(in_var_name)
5758

58-
# Iterate over all operators , get all the outputs
59-
# add to the output list of Go operator only if
60-
# they exist in the parent block.
6159
for oname in op.output_names:
6260
for out_var_name in op.output(oname):
63-
if out_var_name in parent_block.vars:
64-
out_vars.add(parent_block.var(out_var_name))
61+
inner_outputs.add(out_var_name)
62+
63+
# Iterate over all operators , get all the outputs
64+
# add to the output list of Go operator only if
65+
# they exist in the parent block.
66+
out_vars = []
67+
for inner_out_name in inner_outputs:
68+
if inner_out_name in parent_block.vars:
69+
out_vars.append(parent_block.var(inner_out_name))
6570

6671
parent_block.append_op(
6772
type='go',
68-
inputs={'X': [parent_block.var(x_name) for x_name in x_name_list]},
69-
outputs={'Out': out_vars},
73+
inputs={
74+
'X':
75+
[parent_block.var_recursive(x_name) for x_name in x_name_list]
76+
},
77+
outputs={},
7078
attrs={'sub_block': go_block})
7179

7280

@@ -88,8 +96,8 @@ def make_channel(dtype, capacity=0):
8896
`channel_close`, and `Go` to design a concurrent Paddle program.
8997
9098
Args:
91-
dtype (ParamAttr|int): Data type of the data sent in the channel.
92-
This data type should be one of the Paddle supported data types.
99+
dtype (ParamAttr|string): Data type of the data sent in the channel.
100+
This data type should be the string name of a numpy data type.
93101
capacity (ParamAttr|int): Size of the channel. Defaults to 0 for
94102
to create an unbuffered channel.
95103
@@ -106,22 +114,24 @@ def make_channel(dtype, capacity=0):
106114
fluid.channel_send(ch, 100)
107115
fluid.channel_close(ch)
108116
"""
109-
helper = LayerHelper('make_channel', **locals())
117+
helper = LayerHelper('channel_create', **locals())
110118
main_program = helper.main_program
111119
make_channel_block = main_program.current_block()
112120

113121
# Make a channel variable (using the channel data type) and make sure it
114122
# persists into the global scope.
115123
channel = helper.create_variable(
116-
dtype=core.VarDesc.VarType.CHANNEL, persistable=True)
124+
name=unique_name.generate('channel'),
125+
type=core.VarDesc.VarType.CHANNEL,
126+
persistable=True)
117127

118128
create_channel_op = make_channel_block.append_op(
119129
type="channel_create",
120130
outputs={"Out": channel},
121131
attrs={"data_type": dtype,
122132
"capacity": capacity})
123133

124-
return create_channel_op
134+
return channel
125135

126136

127137
def channel_send(channel, value):
@@ -133,7 +143,7 @@ def channel_send(channel, value):
133143
Args:
134144
channel (Variable|Channel): Channel variable created using
135145
`make_channel`.
136-
146+
value (Variable): Value to send to channel
137147
Returns:
138148
Variable: The boolean status on whether or not the channel
139149
successfully sent the passed value.
@@ -149,7 +159,11 @@ def channel_send(channel, value):
149159
helper = LayerHelper('channel_send', **locals())
150160
main_program = helper.main_program
151161
channel_send_block = main_program.current_block()
152-
status = helper.create_variable(dtype=core.VarDesc.VarType.TENSOR)
162+
163+
status = helper.create_variable(
164+
name=unique_name.generate('status'),
165+
type=core.VarDesc.VarType.LOD_TENSOR,
166+
dtype=core.VarDesc.VarType.BOOL)
153167

154168
channel_send_op = channel_send_block.append_op(
155169
type="channel_send",
@@ -159,10 +173,10 @@ def channel_send(channel, value):
159173
},
160174
outputs={"Status": status})
161175

162-
return channel_send_op
176+
return status
163177

164178

165-
def channel_recv(channel, dtype):
179+
def channel_recv(channel, return_value):
166180
"""
167181
Receives a value through a channel variable. Used by an unbuffered or
168182
buffered channel within a concurrent Go block to get data from originally
@@ -172,11 +186,10 @@ def channel_recv(channel, dtype):
172186
Args:
173187
channel (Variable|Channel): Channel variable created using
174188
`make_channel`.
175-
dtype (Variable|int): Data type of the data expected to be read in the
176-
channel. This data type should be one of the Paddle supported data
177-
types.
189+
return_value (Variable): Variable to set as a result of running channel_recv_op
178190
179191
Returns:
192+
Variable: The received value from the channel.
180193
Variable: The boolean status on whether or not the channel
181194
successfully received the passed value.
182195
@@ -185,24 +198,26 @@ def channel_recv(channel, dtype):
185198
186199
ch = fluid.make_channel(dtype='int32', capacity=10)
187200
with fluid.Go():
188-
fluid.channel_recv(ch, 'int32')
201+
returned_value = fluid.channel_recv(ch, 'int32')
189202
190203
# Code to send data through the channel.
191204
"""
192205
helper = LayerHelper('channel_recv', **locals())
193206
main_program = helper.main_program
194207
channel_recv_block = main_program.current_block()
195208

196-
return_value = helper.create_variable(dtype=dtype)
197-
status = helper.create_variable(dtype=core.VarDesc.VarType.TENSOR)
209+
status = helper.create_variable(
210+
name=unique_name.generate('status'),
211+
type=core.VarDesc.VarType.LOD_TENSOR,
212+
dtype=core.VarDesc.VarType.BOOL)
198213

199214
channel_recv_op = channel_recv_block.append_op(
200215
type="channel_recv",
201216
inputs={"Channel": channel},
202217
outputs={"Out": return_value,
203218
"Status": status})
204219

205-
return channel_recv_op
220+
return return_value, status
206221

207222

208223
def channel_close(channel):
@@ -228,5 +243,3 @@ def channel_close(channel):
228243

229244
channel_close_op = channel_close_block.append_op(
230245
type="channel_close", inputs={"Channel": channel})
231-
232-
return channel_close_op

python/paddle/fluid/tests/notest_csp.py

Lines changed: 0 additions & 37 deletions
This file was deleted.
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import unittest
16+
import paddle.fluid as fluid
17+
import paddle.fluid.core as core
18+
from paddle.fluid import framework, unique_name
19+
from paddle.fluid.executor import Executor
20+
from paddle.fluid.layers import fill_constant
21+
22+
23+
class TestRoutineOp(unittest.TestCase):
24+
def test_simple_routine(self):
25+
ch = fluid.make_channel(dtype=core.VarDesc.VarType.LOD_TENSOR)
26+
27+
# Create LOD_TENSOR<INT64> and put it into the scope. This placeholder
28+
# variable will be filled in and returned by fluid.channel_recv
29+
result = self._create_tensor('return_value',
30+
core.VarDesc.VarType.LOD_TENSOR,
31+
core.VarDesc.VarType.INT64)
32+
33+
with fluid.Go():
34+
input_value = fill_constant(
35+
shape=[1], dtype=core.VarDesc.VarType.FP64, value=1234)
36+
fluid.channel_send(ch, input_value)
37+
38+
result, status = fluid.channel_recv(ch, result)
39+
fluid.channel_close(ch)
40+
41+
cpu = core.CPUPlace()
42+
exe = Executor(cpu)
43+
44+
outs = exe.run(fetch_list=[result])
45+
self.assertEqual(outs[0], 1234)
46+
47+
def test_daisy_chain(self):
48+
'''
49+
Mimics classic Daisy-chain test: https://talks.golang.org/2012/concurrency.slide#39
50+
'''
51+
n = 100
52+
53+
leftmost = fluid.make_channel(dtype=core.VarDesc.VarType.LOD_TENSOR)
54+
left = leftmost
55+
56+
# TODO(thuan): Use fluid.While() after scope capture is implemented.
57+
# https://github.com/PaddlePaddle/Paddle/issues/8502
58+
for i in range(n):
59+
right = fluid.make_channel(dtype=core.VarDesc.VarType.LOD_TENSOR)
60+
with fluid.Go():
61+
one_tensor = self._create_one_dim_tensor(1)
62+
result = self._create_tensor('return_value',
63+
core.VarDesc.VarType.LOD_TENSOR,
64+
core.VarDesc.VarType.INT64)
65+
66+
result, status = fluid.channel_recv(right, result)
67+
one_added = fluid.layers.elementwise_add(x=one_tensor, y=result)
68+
fluid.channel_send(left, one_added)
69+
left = right
70+
71+
# Trigger the channel propagation by sending a "1" to rightmost channel
72+
with fluid.Go():
73+
one_tensor = self._create_one_dim_tensor(1)
74+
fluid.channel_send(right, one_tensor)
75+
76+
leftmost_result = self._create_tensor('return_value',
77+
core.VarDesc.VarType.LOD_TENSOR,
78+
core.VarDesc.VarType.INT64)
79+
leftmost_result, status = fluid.channel_recv(leftmost, leftmost_result)
80+
81+
cpu = core.CPUPlace()
82+
exe = Executor(cpu)
83+
leftmost_data = exe.run(fetch_list=[leftmost_result])
84+
85+
# The leftmost_data should be equal to the number of channels + 1
86+
self.assertEqual(leftmost_data[0][0], n + 1)
87+
88+
def _create_one_dim_tensor(self, value):
89+
one_dim_tensor = fill_constant(
90+
shape=[1], dtype=core.VarDesc.VarType.INT64, value=value)
91+
one_dim_tensor.stop_gradient = True
92+
return one_dim_tensor
93+
94+
def _create_tensor(self, name, type, dtype):
95+
return framework.default_main_program().current_block().create_var(
96+
name=unique_name.generate(name), type=type, dtype=dtype)
97+
98+
99+
if __name__ == '__main__':
100+
unittest.main()

0 commit comments

Comments
 (0)