Skip to content

Commit 9da9b19

Browse files
authored
[1.1] fix graph num hang (#14072)
* fix graph num hang test=develop * re-enable tests test=develop * re-enable graph num check test=develop * fix multi device pass role check test=develop
1 parent 8c166b6 commit 9da9b19

File tree

7 files changed

+26
-17
lines changed

7 files changed

+26
-17
lines changed

paddle/fluid/framework/ir/graph_helper.cc

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -120,19 +120,25 @@ size_t GraphNum(const Graph &graph) {
120120
std::deque<ir::Node *> q_nodes;
121121
std::vector<std::unordered_set<ir::Node *>> graph_nodes;
122122
std::unordered_set<ir::Node *> g_nodes;
123+
// q_set used to record records in the queue.
124+
std::unordered_set<ir::Node *> q_set;
123125
size_t graph_count = 0;
124126

125-
auto traverse_nodes = [&visited_nodes,
126-
&q_nodes](const std::vector<ir::Node *> &nodes) {
127-
std::copy_if(
128-
nodes.begin(), nodes.end(), std::back_inserter(q_nodes),
129-
[&visited_nodes](Node *node) { return !visited_nodes.count(node); });
127+
auto traverse_nodes = [&visited_nodes, &q_nodes,
128+
&q_set](const std::vector<ir::Node *> &nodes) {
129+
for (auto n : nodes) {
130+
if (visited_nodes.count(n) == 0 && q_set.count(n) == 0) {
131+
q_nodes.push_back(n);
132+
q_set.insert(n);
133+
}
134+
}
130135
};
131136

132137
while (visited_nodes.size() != nodes.size()) {
133138
if (!q_nodes.empty()) {
134139
auto cur_node = q_nodes.front();
135140
q_nodes.pop_front();
141+
q_set.erase(cur_node);
136142
visited_nodes.insert(cur_node);
137143
g_nodes.insert(cur_node);
138144
traverse_nodes(cur_node->inputs);
@@ -146,6 +152,7 @@ size_t GraphNum(const Graph &graph) {
146152
for (auto &n : nodes) {
147153
if (visited_nodes.count(n) == 0) {
148154
q_nodes.push_back(n);
155+
q_set.insert(n);
149156
break;
150157
}
151158
}

paddle/fluid/framework/op_proto_maker.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,12 @@ enum class OpRole {
2828
kBackward = 0x0001,
2929
kOptimize = 0x0002,
3030
// RPC role is for send/recv releated op
31-
kRPC = 0x0003,
31+
kRPC = 0x0004,
3232
// Dist role is for split_byref/split_selected_rows/concat
3333
// used for distributed training.
34-
kDist = 0x0004,
34+
kDist = 0x0008,
3535
// Tag all learning rate scheduler operators.
36-
kLRSched = 0x0005,
36+
kLRSched = 0x0016,
3737

3838
kLoss = 0x0100,
3939
// The default value of op's role. This should be only used for unittests and

paddle/fluid/framework/parallel_executor.cc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,12 @@ ParallelExecutor::ParallelExecutor(
156156
params, member_->local_scopes_, member_->use_cuda_);
157157
#endif
158158

159+
// If the loss_var_name is given, the number of graph should be only one.
160+
if (loss_var_name.size()) {
161+
PADDLE_ENFORCE_EQ(ir::GraphNum(*graph), 1,
162+
"The number of graph should be only one");
163+
}
164+
159165
if (exec_strategy.type_ == ExecutionStrategy::kDefault) {
160166
member_->executor_.reset(new details::ThreadedSSAGraphExecutor(
161167
exec_strategy, member_->local_scopes_, places, std::move(graph)));

python/paddle/fluid/tests/unittests/test_dist_ctr.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,8 @@ def _setup_config(self):
2323
self._sync_mode = True
2424
self._enforce_place = "CPU"
2525

26-
27-
def test_dist_ctr(self):
28-
self.check_with_place("dist_ctr.py", delta=1e-7, check_error_log=False)
26+
def test_dist_ctr(self):
27+
self.check_with_place("dist_ctr.py", delta=1e-7, check_error_log=False)
2928

3029

3130
if __name__ == "__main__":

python/paddle/fluid/tests/unittests/test_dist_mnist.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,7 @@ def _setup_config(self):
4040
self._sync_mode = False
4141
self._use_reduce = False
4242

43-
# FIXME(typhoonzero): fix async mode test later
44-
def no_test_dist_train(self):
43+
def test_dist_train(self):
4544
self.check_with_place("dist_mnist.py", delta=200)
4645

4746

python/paddle/fluid/tests/unittests/test_dist_se_resnext.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,7 @@ def _setup_config(self):
4040
self._sync_mode = False
4141
self._use_reader_alloc = False
4242

43-
#FIXME(typhoonzero): fix async mode later
44-
def no_test_dist_train(self):
43+
def test_dist_train(self):
4544
self.check_with_place("dist_se_resnext.py", delta=100)
4645

4746

python/paddle/fluid/tests/unittests/test_dist_simnet_bow.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,7 @@ def _setup_config(self):
7979
self._sync_mode = False
8080
self._enforce_place = "CPU"
8181

82-
#FIXME(typhoonzero): fix async tests later
83-
def no_test_simnet_bow(self):
82+
def test_simnet_bow(self):
8483
need_envs = {
8584
"IS_DISTRIBUTED": '0',
8685
"IS_SPARSE": '1',

0 commit comments

Comments
 (0)