Skip to content

Commit 809530f

Browse files
committed
Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into dev_MultiEpochReader
2 parents a944d57 + 7c041e4 commit 809530f

23 files changed

+1208
-263
lines changed
99.1 KB
Loading
Lines changed: 265 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,265 @@
1+
# select_op Design
2+
3+
## Introduction
4+
5+
In golang, the [**select**](https://golang.org/ref/spec#Select_statements)
6+
statement lets a goroutine wait on multiple communication operations at the
7+
same time. The **select** blocks until one of its cases can run, then
8+
executes the case. If multiple cases are ready to run, then one case is
9+
choosen at random to be executed.
10+
11+
With the introduction of CSP for Paddle, we mimic this behavior by
12+
creating a ***select_op***.
13+
14+
## How to use it
15+
16+
The **select_op** is available as a c++ operator. However most users
17+
will prefer to use the much simplier Python API.
18+
19+
- **fluid.Select()**: Creates a select operator and adds it to the current
20+
block within the main program. Also creates a sub block and adds it to the
21+
main program. This sub block is used to hold all variables and operators
22+
used by the case statements.
23+
24+
Within the select block, users can add cases by
25+
calling **select.case** or **select.default** method.
26+
27+
- **fluid.Select.case(channel_action, channel, result_variable)**: Represents
28+
a fluid channel send/recv case. This method creates a SelectCase block
29+
guard and adds it to the Select block. The arguments into this method tells
30+
the select which channel operation to listen to.
31+
32+
- **fluid.Select.default()**: Represents the fluid default case. This default
33+
case is executed if none of the channel send/recv cases are available to
34+
execute.
35+
36+
**Example:**
37+
```
38+
ch1 = fluid.make_channel(dtype=core.VarDesc.VarType.LOD_TENSOR)
39+
quit_ch = fluid.make_channel(dtype=core.VarDesc.VarType.LOD_TENSOR)
40+
41+
x = fill_constant(shape=[1], dtype=core.VarDesc.VarType.INT32, value=0)
42+
y = fill_constant(shape=[1], dtype=core.VarDesc.VarType.INT32, value=1)
43+
44+
while_cond = fill_constant(shape=[1], dtype=core.VarDesc.VarType.BOOL, value=True)
45+
while_op = While(cond=while_cond)
46+
47+
with while_op.block():
48+
with fluid.Select() as select:
49+
with select.case(fluid.channel_send, channel, x):
50+
# Send x, then perform Fibonacci calculation on x and y
51+
x_tmp = fill_constant(shape=[1], dtype=core.VarDesc.VarType.INT32, value=0)
52+
assign(input=x, output=x_tmp)
53+
assign(input=y, output=x)
54+
assign(elementwise_add(x=x_tmp, y=y), output=y)
55+
with select.case(fluid.channel_recv, quit_channel, result2):
56+
# Exit out of While loop
57+
while_false = fill_constant(shape=[1], dtype=core.VarDesc.VarType.BOOL, value=False)
58+
helper = layer_helper.LayerHelper('assign')
59+
helper.append_op(
60+
type='assign',
61+
inputs={'X': [while_false]},
62+
outputs={'Out': [while_cond]})
63+
```
64+
65+
## How it Works
66+
67+
### Program Description
68+
69+
```
70+
blocks {
71+
idx: 0
72+
...
73+
// Create "case_to_execute" variable
74+
ops {
75+
outputs {
76+
parameter: "Out"
77+
arguments: "fill_constant_110.tmp_0"
78+
}
79+
type: "fill_constant"
80+
attrs {
81+
name: "force_cpu"
82+
type: BOOLEAN
83+
b: false
84+
}
85+
attrs {
86+
name: "value"
87+
type: FLOAT
88+
f: -1.0
89+
}
90+
attrs {
91+
name: "shape"
92+
type: INTS
93+
ints: 1
94+
}
95+
attrs {
96+
name: "dtype"
97+
type: INT
98+
i: 2
99+
}
100+
}
101+
// Create "select" operator.
102+
// inputs:
103+
// X: All input variables used by operators within the select block
104+
// case_to_execute: Variable filled in by select_op when it determines
105+
// which case to execute.
106+
//
107+
// outputs:
108+
// Out: All output variables referenced by operators within select block.
109+
//
110+
// attrs:
111+
// sub_block: The block id containing the select "cases"
112+
// cases: Serialized list of all cases in the select op.
113+
// Each case is serialized as: '<index>,<type>,<channel>,<value>'
114+
// where type is 0 for default, 1 for send, and 2 for receive.
115+
// No channel and values are needed for default cases.
116+
ops {
117+
inputs {
118+
parameter: "X"
119+
arguments: "fill_constant_103.tmp_0"
120+
arguments: "fill_constant_104.tmp_0"
121+
}
122+
inputs {
123+
parameter: "case_to_execute"
124+
arguments: "fill_constant_110.tmp_0"
125+
}
126+
outputs {
127+
parameter: "Out"
128+
arguments: "fill_constant_110.tmp_0"
129+
}
130+
type: "select"
131+
attrs {
132+
name: "sub_block"
133+
type: BLOCK
134+
block_idx: 1
135+
}
136+
attrs {
137+
name: "cases"
138+
type: STRINGS
139+
strings: "0,1,channel_101,fill_constant_109.tmp_0"
140+
strings: "1,2,channel_102,fill_constant_108.tmp_0"
141+
}
142+
}
143+
...
144+
}
145+
```
146+
147+
The python select API will add the **select_op** to the current block. In addition, it will
148+
iterate through all it's case statements and add any input variables required by case statements
149+
into **X**. It will also create a temp variable called **case_to_execute**. This variable is
150+
filled in by the select_op after it has completed processing the case statements.
151+
152+
If there are no available cases to execute (ie: all cases are blocked on channel operations, and
153+
there is no default statement), then the select_op will block the current thread. The thread will
154+
unblock once there is a channel operation affecting one of the case statements, at which point, the
155+
**select_op** will set the **case_to_execute** variable to the index of the case to execute.
156+
157+
Finally the select_op will call executor.run on the **sub_block**.
158+
159+
```
160+
blocks {
161+
idx: 1
162+
parent_idx: 0
163+
...
164+
// Fill a tensor with the case index (ie: 0,1,2,3,ect.)
165+
ops {
166+
outputs {
167+
parameter: "Out"
168+
arguments: "fill_constant_111.tmp_0"
169+
}
170+
type: "fill_constant"
171+
attrs {
172+
name: "force_cpu"
173+
type: BOOLEAN
174+
b: false
175+
}
176+
attrs {
177+
name: "value"
178+
type: FLOAT
179+
f: 0.0
180+
}
181+
attrs {
182+
name: "shape"
183+
type: INTS
184+
ints: 1
185+
}
186+
attrs {
187+
name: "dtype"
188+
type: INT
189+
i: 2
190+
}
191+
}
192+
// Create an "equal" operator to compare the case index with the "case_to_execute"
193+
// tensor (which was filled in by the select op).
194+
ops {
195+
inputs {
196+
parameter: "X"
197+
arguments: "fill_constant_111.tmp_0" // case 0
198+
}
199+
inputs {
200+
parameter: "Y"
201+
arguments: "fill_constant_110.tmp_0" // case_to_execute
202+
}
203+
outputs {
204+
parameter: "Out"
205+
arguments: "equal_0.tmp_0"
206+
}
207+
type: "equal"
208+
attrs {
209+
name: "axis"
210+
type: INT
211+
i: -1
212+
}
213+
}
214+
// Use the output of the "equal" operator as a condition for the "conditional_block".
215+
// If the condition evaluates to true, then execute the "sub_block" (which represents
216+
// the select case's body)
217+
ops {
218+
inputs {
219+
parameter: "Params"
220+
}
221+
inputs {
222+
parameter: "X"
223+
arguments: "equal_0.tmp_0"
224+
}
225+
outputs {
226+
parameter: "Out"
227+
}
228+
outputs {
229+
parameter: "Scope"
230+
arguments: "_generated_var_0"
231+
}
232+
type: "conditional_block"
233+
attrs {
234+
name: "is_scalar_condition"
235+
type: BOOLEAN
236+
b: true
237+
}
238+
attrs {
239+
name: "sub_block"
240+
type: BLOCK
241+
block_idx: 4
242+
}
243+
}
244+
...
245+
// Repeat the above operators for each case statements inside the select body
246+
}
247+
248+
```
249+
250+
Cases are represented by a **conditional_block operator**, whose's condition is set as the output of
251+
equal(**case_to_execute**, **case_index**). Since each case index is unique in this sub-block,
252+
only one case will be executed.
253+
254+
### select_op flow
255+
256+
<p align="center">
257+
<img src="./images/select_op_workflow.png"/><br/>
258+
</p>
259+
260+
The select algorithm is inspired by golang's select routine. Please refer to
261+
http://www.tapirgames.com/blog/golang-concurrent-select-implementation for more information.
262+
263+
## Backward Pass
264+
265+
TODO

doc/v2/getstarted/index_en.rst

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,19 @@
11
GET STARTED
22
============
33

4+
If you want to quickly know how to use PaddlePaddle, please refer to the following guide:
5+
46
.. toctree::
57
:maxdepth: 1
68

79
quickstart_en.rst
10+
11+
12+
While using PaddlePaddle to build applications, please understand some basic concepts.
13+
14+
Here is an example of linear regression. It introduces workflow of PaddlePaddle, including data format, model configuration and training, etc.
15+
16+
.. toctree::
17+
:maxdepth: 1
18+
819
concepts/use_concepts_en.rst

doc/v2/howto/cluster/index_en.rst

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@ Distributed Training
22
====================
33

44
The effectiveness of the deep learning model is often directly related to the scale of the data: it can generally achieve better results after increasing the size of the dataset on the same model. However, it can not fit in one single computer when the amount of data increases to a certain extent. At this point, using multiple computers for distributed training is a natural solution. In distributed training, the training data is divided into multiple copies (sharding), and multiple machines participating in the training read their own data for training and collaboratively update the parameters of the overall model.
5+
6+
Distributed training generally has framwork as shown below:
7+
58
.. image:: src/ps_en.png
69
:width: 500
710

paddle/fluid/framework/executor.cc

Lines changed: 11 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,8 @@ limitations under the License. */
1414

1515
#include "paddle/fluid/framework/executor.h"
1616

17-
#include <set>
18-
19-
#include "gflags/gflags.h"
2017
#include "paddle/fluid/framework/channel.h"
2118
#include "paddle/fluid/framework/feed_fetch_method.h"
22-
#include "paddle/fluid/framework/feed_fetch_type.h"
2319
#include "paddle/fluid/framework/lod_rank_table.h"
2420
#include "paddle/fluid/framework/lod_tensor_array.h"
2521
#include "paddle/fluid/framework/op_registry.h"
@@ -40,14 +36,13 @@ namespace {
4036
int kProgramId = -1;
4137
} // namespace
4238

43-
struct ExecutorPrepareContext {
44-
ExecutorPrepareContext(const framework::ProgramDesc& prog, size_t block_id)
45-
: prog_(prog), block_id_(block_id) {}
39+
ExecutorPrepareContext::ExecutorPrepareContext(
40+
const framework::ProgramDesc& prog, size_t block_id)
41+
: prog_(prog), block_id_(block_id) {}
4642

47-
const framework::ProgramDesc& prog_;
48-
size_t block_id_;
49-
std::vector<std::unique_ptr<OperatorBase>> ops_;
50-
};
43+
ExecutorPrepareContext::~ExecutorPrepareContext() {
44+
VLOG(5) << "destroy ExecutorPrepareContext";
45+
}
5146

5247
Executor::Executor(const platform::Place& place) : place_(place) {}
5348

@@ -101,9 +96,8 @@ static void CheckTensorNANOrInf(const std::string& name,
10196
void Executor::Run(const ProgramDesc& pdesc, Scope* scope, int block_id,
10297
bool create_local_scope, bool create_vars) {
10398
platform::RecordBlock b(block_id);
104-
auto* ctx = Prepare(pdesc, block_id);
105-
RunPreparedContext(ctx, scope, create_local_scope, create_vars);
106-
delete ctx;
99+
auto ctx = Prepare(pdesc, block_id);
100+
RunPreparedContext(ctx.get(), scope, create_local_scope, create_vars);
107101
}
108102

109103
// Check whether the block already has feed operators and feed_holder.
@@ -274,15 +268,15 @@ void Executor::Run(const ProgramDesc& program, Scope* scope,
274268
}
275269
}
276270

277-
ExecutorPrepareContext* Executor::Prepare(const ProgramDesc& program,
278-
int block_id) {
271+
std::unique_ptr<ExecutorPrepareContext> Executor::Prepare(
272+
const ProgramDesc& program, int block_id) {
279273
auto* ctx = new ExecutorPrepareContext(program, block_id);
280274
PADDLE_ENFORCE_LT(static_cast<size_t>(block_id), program.Size());
281275
auto& block = program.Block(block_id);
282276
for (auto& op_desc : block.AllOps()) {
283277
ctx->ops_.push_back(OpRegistry::CreateOp(*op_desc));
284278
}
285-
return ctx;
279+
return std::unique_ptr<ExecutorPrepareContext>(ctx);
286280
}
287281

288282
void Executor::RunPreparedContext(ExecutorPrepareContext* ctx, Scope* scope,

paddle/fluid/framework/executor.h

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,16 @@ limitations under the License. */
2222

2323
namespace paddle {
2424
namespace framework {
25-
struct ExecutorPrepareContext;
25+
26+
struct ExecutorPrepareContext {
27+
ExecutorPrepareContext(const framework::ProgramDesc& prog, size_t block_id);
28+
~ExecutorPrepareContext();
29+
30+
const framework::ProgramDesc& prog_;
31+
size_t block_id_;
32+
std::vector<std::unique_ptr<OperatorBase>> ops_;
33+
};
34+
2635
class Executor {
2736
public:
2837
// TODO(dzhwinter) : Do not rely on this function, it will be removed
@@ -47,8 +56,8 @@ class Executor {
4756
const std::string& feed_holder_name = "feed",
4857
const std::string& fetch_holder_name = "fetch");
4958

50-
static ExecutorPrepareContext* Prepare(const ProgramDesc& program,
51-
int block_id);
59+
static std::unique_ptr<ExecutorPrepareContext> Prepare(
60+
const ProgramDesc& program, int block_id);
5261

5362
void RunPreparedContext(ExecutorPrepareContext* ctx, Scope* scope,
5463
bool create_local_scope = true,

0 commit comments

Comments
 (0)