Skip to content

Commit e5281b3

Browse files
committed
Clean code & add execution strategy
1 parent 9923be5 commit e5281b3

File tree

9 files changed

+154
-104
lines changed

9 files changed

+154
-104
lines changed
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#pragma once
16+
17+
namespace paddle {
18+
namespace framework {
19+
namespace details {
20+
21+
struct ExecutionStrategy {
22+
size_t num_threads_{0};
23+
bool use_event_{true};
24+
bool allow_op_delay_{false};
25+
};
26+
27+
} // namespace details
28+
} // namespace framework
29+
} // namespace paddle

paddle/fluid/framework/details/threaded_ssa_graph_executor.cc

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,17 @@ namespace paddle {
1818
namespace framework {
1919
namespace details {
2020
ThreadedSSAGraphExecutor::ThreadedSSAGraphExecutor(
21-
size_t num_threads, bool use_event,
22-
const std::vector<Scope *> &local_scopes,
21+
const ExecutionStrategy &strategy, const std::vector<Scope *> &local_scopes,
2322
const std::vector<platform::Place> &places,
24-
std::unique_ptr<SSAGraph> &&graph, bool allow_op_delay)
23+
std::unique_ptr<SSAGraph> &&graph)
2524
: SSAGraphExecutor(std::move(graph)),
26-
pool_(num_threads >= 2 ? new ::ThreadPool(num_threads) : nullptr),
25+
pool_(strategy.num_threads_ >= 2 ? new ::ThreadPool(strategy.num_threads_)
26+
: nullptr),
2727
local_scopes_(local_scopes),
2828
places_(places),
2929
fetch_ctxs_(places),
30-
use_event_(use_event),
3130
running_ops_(0),
32-
allow_op_delay_(allow_op_delay) {}
31+
strategy_(strategy) {}
3332

3433
FeedFetchList ThreadedSSAGraphExecutor::Run(
3534
const std::vector<std::string> &fetch_tensors) {
@@ -86,7 +85,7 @@ FeedFetchList ThreadedSSAGraphExecutor::Run(
8685
//
8786
// NOTE: DelayedOps have a lower priority. It will be scheduled after all
8887
// ready_ops have been performed.
89-
if (ready_ops.empty() && allow_op_delay_ && running_ops_ == 0) {
88+
if (ready_ops.empty() && strategy_.allow_op_delay_ && running_ops_ == 0) {
9089
run_all_ops(delayed_ops);
9190
} else {
9291
run_all_ops(ready_ops);
@@ -113,7 +112,7 @@ FeedFetchList ThreadedSSAGraphExecutor::Run(
113112
auto &deps = pending_ops[op];
114113
--deps;
115114
if (deps == 0) {
116-
if (op->IsMultiDeviceTransfer() && allow_op_delay_) {
115+
if (op->IsMultiDeviceTransfer() && strategy_.allow_op_delay_) {
117116
delayed_ops.insert(op);
118117
} else {
119118
ready_ops.insert(op);
@@ -191,7 +190,7 @@ void ThreadedSSAGraphExecutor::RunOp(
191190
auto op_run = [ready_var_q, op, this] {
192191
try {
193192
VLOG(10) << op << " " << op->Name() << " : " << op->DebugString();
194-
op->Run(use_event_);
193+
op->Run(strategy_.use_event_);
195194
VLOG(10) << op << " " << op->Name() << " Done ";
196195
running_ops_--;
197196
ready_var_q->Extend(op->Outputs());

paddle/fluid/framework/details/threaded_ssa_graph_executor.h

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
#include <functional>
2424
#include "ThreadPool.h" // ThreadPool in thrird party
2525
#include "paddle/fluid/framework/blocking_queue.h"
26+
#include "paddle/fluid/framework/details/execution_strategy.h"
2627
#include "paddle/fluid/framework/details/fetch_op_handle.h"
2728
#include "paddle/fluid/framework/details/ssa_graph_executor.h"
2829

@@ -34,11 +35,10 @@ namespace details {
3435

3536
class ThreadedSSAGraphExecutor : public SSAGraphExecutor {
3637
public:
37-
ThreadedSSAGraphExecutor(size_t num_threads, bool use_event,
38+
ThreadedSSAGraphExecutor(const ExecutionStrategy &strategy,
3839
const std::vector<Scope *> &local_scopes,
3940
const std::vector<platform::Place> &places,
40-
std::unique_ptr<SSAGraph> &&graph,
41-
bool allow_op_delay);
41+
std::unique_ptr<SSAGraph> &&graph);
4242

4343
// Run a SSAGraph by a thread pool
4444
// Use topological sort algorithm
@@ -55,10 +55,8 @@ class ThreadedSSAGraphExecutor : public SSAGraphExecutor {
5555
std::vector<Scope *> local_scopes_;
5656
std::vector<platform::Place> places_;
5757
platform::DeviceContextPool fetch_ctxs_;
58-
const bool use_event_;
5958
std::unique_ptr<platform::EnforceNotMet> exception_;
6059
std::atomic<int> running_ops_;
61-
bool allow_op_delay_;
6260

6361
void InsertPendingOp(std::unordered_map<OpHandleBase *, size_t> *pending_ops,
6462
OpHandleBase *op_instance) const;
@@ -74,6 +72,9 @@ class ThreadedSSAGraphExecutor : public SSAGraphExecutor {
7472
std::unordered_map<OpHandleBase *, size_t> *pending_ops,
7573
std::unordered_set<VarHandleBase *> *pending_vars,
7674
BlockingQueue<VarHandleBase *> *ready_vars, FeedFetchList *fetch_data);
75+
76+
private:
77+
ExecutionStrategy strategy_;
7778
};
7879

7980
} // namespace details

paddle/fluid/framework/parallel_executor.cc

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -52,13 +52,13 @@ std::vector<Scope *> &ParallelExecutor::GetLocalScopes() {
5252
}
5353

5454
ParallelExecutor::ParallelExecutor(
55-
size_t num_threads, bool use_event,
5655
const std::vector<platform::Place> &places,
5756
const std::unordered_set<std::string> &params,
5857
const std::unordered_set<std::string> &bcast_vars,
5958
const ProgramDesc &main_program, const std::string &loss_var_name,
60-
Scope *scope, const std::vector<Scope *> &local_scopes, bool allow_op_delay,
61-
bool use_default_grad_scale, bool balance_parameter_opt_between_cards)
59+
Scope *scope, const std::vector<Scope *> &local_scopes,
60+
bool use_default_grad_scale, bool balance_parameter_opt_between_cards,
61+
const ExecutionStrategy &exec_strategy)
6262
: member_(new ParallelExecutorPrivate(places)) {
6363
member_->global_scope_ = scope;
6464

@@ -103,8 +103,7 @@ ParallelExecutor::ParallelExecutor(
103103
auto graph = builder.Build(main_program);
104104

105105
member_->executor_.reset(new details::ThreadedSSAGraphExecutor(
106-
num_threads, use_event, member_->local_scopes_, places, std::move(graph),
107-
allow_op_delay));
106+
exec_strategy, member_->local_scopes_, places, std::move(graph)));
108107

109108
// Step 3. Create vars in each scope;
110109
for (auto *var : main_program.Block(0).AllVars()) {

paddle/fluid/framework/parallel_executor.h

Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -17,53 +17,55 @@ limitations under the License. */
1717
#include <string>
1818
#include <unordered_set>
1919
#include <vector>
20+
#include "paddle/fluid/framework/details/execution_strategy.h"
2021
#include "paddle/fluid/framework/executor.h"
2122
#include "paddle/fluid/framework/op_info.h"
2223
#include "paddle/fluid/framework/program_desc.h"
2324
#include "paddle/fluid/framework/scope.h"
2425
#include "paddle/fluid/framework/tensor.h"
2526
#include "paddle/fluid/platform/device_context.h"
26-
2727
namespace paddle {
2828
namespace framework {
2929

3030
class ParallelExecutorPrivate;
3131

32+
using details::ExecutionStrategy;
33+
3234
class ParallelExecutor {
3335
DISABLE_COPY_AND_ASSIGN(ParallelExecutor);
3436

3537
public:
36-
explicit ParallelExecutor(size_t num_threads, bool use_event,
37-
const std::vector<platform::Place>& places,
38-
const std::unordered_set<std::string>& params,
39-
const std::unordered_set<std::string>& bcast_vars,
40-
const ProgramDesc& main_program,
41-
const std::string& loss_var_name, Scope* scope,
42-
const std::vector<Scope*>& local_scopes,
43-
bool allow_op_delay, bool use_default_grad_scale,
44-
bool balance_parameter_opt_between_cards);
38+
explicit ParallelExecutor(const std::vector<platform::Place> &places,
39+
const std::unordered_set<std::string> &params,
40+
const std::unordered_set<std::string> &bcast_vars,
41+
const ProgramDesc &main_program,
42+
const std::string &loss_var_name, Scope *scope,
43+
const std::vector<Scope *> &local_scopes,
44+
bool use_default_grad_scale,
45+
bool balance_parameter_opt_between_cards,
46+
const ExecutionStrategy &exec_strategy);
4547

4648
~ParallelExecutor();
4749

48-
std::vector<Scope*>& GetLocalScopes();
50+
std::vector<Scope *> &GetLocalScopes();
4951

5052
/**
5153
* Feed tensors to local scopes. The size of tensors should be equal to the
5254
* size of local scopes.
5355
*/
5456
void FeedTensorsIntoLocalScopes(
55-
const std::vector<std::unordered_map<std::string, LoDTensor>>& tensors);
57+
const std::vector<std::unordered_map<std::string, LoDTensor>> &tensors);
5658

5759
void FeedAndSplitTensorIntoLocalScopes(
58-
const std::unordered_map<std::string, LoDTensor>& tensors);
60+
const std::unordered_map<std::string, LoDTensor> &tensors);
5961

60-
void Run(const std::vector<std::string>& fetch_tensors,
61-
const std::string& fetched_var_name);
62+
void Run(const std::vector<std::string> &fetch_tensors,
63+
const std::string &fetched_var_name);
6264

63-
void BCastParamsToGPUs(const std::unordered_set<std::string>& vars) const;
65+
void BCastParamsToGPUs(const std::unordered_set<std::string> &vars) const;
6466

6567
private:
66-
ParallelExecutorPrivate* member_;
68+
ParallelExecutorPrivate *member_;
6769
};
6870

6971
} // namespace framework

paddle/fluid/pybind/pybind.cc

Lines changed: 27 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -494,22 +494,33 @@ All parameter, weight, gradient are variables in Paddle.
494494
m.def("disable_profiler", platform::DisableProfiler);
495495
m.def("reset_profiler", platform::ResetProfiler);
496496

497-
py::class_<ParallelExecutor>(m, "ParallelExecutor")
498-
.def("__init__",
499-
[](ParallelExecutor &self, size_t num_threads, bool use_event,
500-
const std::vector<platform::Place> &places,
501-
const std::unordered_set<std::string> &params,
502-
const std::unordered_set<std::string> &bcast_vars,
503-
const ProgramDesc &main_program, const std::string &loss_var_name,
504-
Scope *scope, std::vector<Scope *> &local_scopes,
505-
bool allow_op_delay, bool use_default_grad_scale,
506-
bool balance_parameter_opt_between_cards) {
507-
new (&self) ParallelExecutor(
508-
num_threads, use_event, places, params, bcast_vars,
509-
main_program, loss_var_name, scope, local_scopes,
510-
allow_op_delay, use_default_grad_scale,
511-
balance_parameter_opt_between_cards);
512-
})
497+
py::class_<ParallelExecutor> pe(m, "ParallelExecutor");
498+
py::class_<ExecutionStrategy>(pe, "ExecutionStrategy")
499+
.def(py::init())
500+
.def_property(
501+
"num_threads",
502+
[](const ExecutionStrategy &self) { return self.num_threads_; },
503+
[](ExecutionStrategy &self, size_t num_threads) {
504+
self.num_threads_ = num_threads;
505+
})
506+
.def_property(
507+
"use_event",
508+
[](const ExecutionStrategy &self) { return self.use_event_; },
509+
[](ExecutionStrategy &self, bool use_event) {
510+
self.use_event_ = use_event;
511+
})
512+
.def_property(
513+
"allow_op_delay",
514+
[](const ExecutionStrategy &self) { return self.allow_op_delay_; },
515+
[](ExecutionStrategy &self, bool allow_op_delay) {
516+
self.allow_op_delay_ = allow_op_delay;
517+
});
518+
519+
pe.def(py::init<const std::vector<platform::Place> &,
520+
const std::unordered_set<std::string> &,
521+
const std::unordered_set<std::string> &, const ProgramDesc &,
522+
const std::string &, Scope *, std::vector<Scope *> &, bool,
523+
bool, const ExecutionStrategy &>())
513524
.def("bcast_params", &ParallelExecutor::BCastParamsToGPUs)
514525
// NOTE: even we return a vec<Scope*>* to Python use reference policy.
515526
// We still cannot get local_scope from this vector, since the element

python/paddle/fluid/__init__.py

Lines changed: 28 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -44,42 +44,44 @@
4444
from param_attr import ParamAttr, WeightNormParamAttr
4545
from data_feeder import DataFeeder
4646
from core import LoDTensor, CPUPlace, CUDAPlace, CUDAPinnedPlace
47-
from transpiler import DistributeTranspiler, SimpleDistributeTranspiler, InferenceTranspiler, memory_optimize, release_memory
47+
from transpiler import DistributeTranspiler, SimpleDistributeTranspiler, \
48+
InferenceTranspiler, memory_optimize, release_memory
4849
from concurrency import (Go, make_channel, channel_send, channel_recv,
4950
channel_close, Select)
5051
import clip
5152
import profiler
5253
import unique_name
5354
import recordio_writer
54-
from parallel_executor import ParallelExecutor
55+
from parallel_executor import ParallelExecutor, ExecutionStrategy
5556

5657
Tensor = LoDTensor
5758

58-
__all__ = framework.__all__ + executor.__all__ + concurrency.__all__ +\
59+
__all__ = framework.__all__ + executor.__all__ + concurrency.__all__ + \
5960
trainer.__all__ + inferencer.__all__ + transpiler.__all__ + [
60-
'io',
61-
'initializer',
62-
'layers',
63-
'transpiler'
64-
'nets',
65-
'optimizer',
66-
'learning_rate_decay',
67-
'backward',
68-
'regularizer',
69-
'LoDTensor',
70-
'CPUPlace',
71-
'CUDAPlace',
72-
'CUDAPinnedPlace',
73-
'Tensor',
74-
'ParamAttr',
75-
'WeightNormParamAttr',
76-
'DataFeeder',
77-
'clip',
78-
'profiler',
79-
'unique_name',
80-
'recordio_writer',
81-
'ParallelExecutor',
82-
]
61+
'io',
62+
'initializer',
63+
'layers',
64+
'transpiler'
65+
'nets',
66+
'optimizer',
67+
'learning_rate_decay',
68+
'backward',
69+
'regularizer',
70+
'LoDTensor',
71+
'CPUPlace',
72+
'CUDAPlace',
73+
'CUDAPinnedPlace',
74+
'Tensor',
75+
'ParamAttr',
76+
'WeightNormParamAttr',
77+
'DataFeeder',
78+
'clip',
79+
'profiler',
80+
'unique_name',
81+
'recordio_writer',
82+
'ParallelExecutor',
83+
'ExecutionStrategy',
84+
]
8385

8486

8587
def __bootstrap__():

0 commit comments

Comments
 (0)