Skip to content

Commit 8b1918f

Browse files
author
chengduo
authored
Merge pull request #10454 from chengduoZH/fix_fetchop
Fix fetch_op_handle
2 parents 2bff03b + a459764 commit 8b1918f

File tree

4 files changed

+105
-22
lines changed

4 files changed

+105
-22
lines changed

paddle/fluid/framework/details/fetch_op_handle.cc

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,9 @@ void FetchOpHandle::RunImpl() {
4949
platform::DeviceContextPool::Instance().Get(platform::CPUPlace());
5050
for (auto *input : inputs_) {
5151
auto *var = static_cast<VarHandle *>(input);
52-
var->generated_op_->Wait(cpu_ctx);
52+
if (var->generated_op_) {
53+
var->generated_op_->Wait(cpu_ctx);
54+
}
5355
}
5456
tensors_.resize(inputs_.size());
5557
auto *var_handle = static_cast<VarHandle *>(inputs_[0]);

paddle/fluid/framework/details/nccl_all_reduce_op_handle.cc

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,9 @@ void NCCLAllReduceOpHandle::RunImpl() {
3636
// Wait input done
3737
for (auto *in : inputs_) {
3838
auto &p = static_cast<VarHandle *>(in)->place_;
39-
in->generated_op_->Wait(dev_ctxes_[p]);
39+
if (in->generated_op_) {
40+
in->generated_op_->Wait(dev_ctxes_[p]);
41+
}
4042
}
4143

4244
auto &var_name = static_cast<VarHandle *>(this->inputs_[0])->name_;

paddle/fluid/framework/details/send_op_handle.cc

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,9 @@ void SendOpHandle::RunImpl() {
3232
if (in->DebugString() == "dummy") { // HACK
3333
continue;
3434
}
35-
in->generated_op_->Wait(dev_ctxes_[p]);
35+
if (in->generated_op_) {
36+
in->generated_op_->Wait(dev_ctxes_[p]);
37+
}
3638
}
3739
auto &tmp_scope = local_scope_->FindVar(kLocalExecScopeName)->Get<Scope *>();
3840
// FIXME(wuyi): can not use RunAndRecordEvent here, for it will cause dead

python/paddle/fluid/tests/unittests/test_parallel_executor.py

Lines changed: 96 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
import numpy
15+
import numpy as np
1616
import unittest
1717

1818
import paddle.fluid as fluid
@@ -243,7 +243,7 @@ def run_executor(exe, feed, fetch_list, program=None):
243243
begin = time.time()
244244
first_loss, = run_executor(
245245
exe=exe, feed=feed_dict, fetch_list=[loss.name])
246-
first_loss = numpy.array(first_loss)
246+
first_loss = np.array(first_loss)
247247

248248
for i in xrange(iter):
249249
run_executor(exe=exe, feed=feed_dict, fetch_list=[])
@@ -256,7 +256,7 @@ def run_executor(exe, feed, fetch_list, program=None):
256256
print "%.4f Instance per second" % (
257257
(batch_size * iter + 2) / (end - begin))
258258

259-
last_loss = numpy.array(last_loss)
259+
last_loss = np.array(last_loss)
260260

261261
print first_loss, last_loss
262262
# self.assertGreater(first_loss[0], last_loss[0])
@@ -284,8 +284,8 @@ def check_simple_fc_convergence(self):
284284
self.check_network_convergence(simple_fc_net)
285285
self.check_network_convergence(simple_fc_net, allow_op_delay=True)
286286

287-
img = numpy.zeros(shape=[32, 784], dtype='float32')
288-
label = numpy.ones(shape=[32, 1], dtype='int64')
287+
img = np.zeros(shape=[32, 784], dtype='float32')
288+
label = np.ones(shape=[32, 1], dtype='int64')
289289
self.check_network_convergence(
290290
simple_fc_net, feed_dict={"image": img,
291291
"label": label})
@@ -294,8 +294,8 @@ def test_simple_fc(self):
294294
self.check_simple_fc_convergence()
295295

296296
def check_simple_fc_parallel_accuracy(self):
297-
img = numpy.zeros(shape=[32, 784], dtype='float32')
298-
label = numpy.ones(shape=[32, 1], dtype='int64')
297+
img = np.zeros(shape=[32, 784], dtype='float32')
298+
label = np.ones(shape=[32, 1], dtype='int64')
299299
single_first_loss, single_last_loss = self.check_network_convergence(
300300
method=simple_fc_net,
301301
seed=1000,
@@ -319,8 +319,8 @@ def test_simple_fc_parallel_accuracy(self):
319319

320320
def check_batchnorm_fc_convergence(self):
321321
self.check_network_convergence(fc_with_batchnorm)
322-
img = numpy.zeros(shape=[32, 784], dtype='float32')
323-
label = numpy.ones(shape=[32, 1], dtype='int64')
322+
img = np.zeros(shape=[32, 784], dtype='float32')
323+
label = np.ones(shape=[32, 1], dtype='int64')
324324
self.check_network_convergence(
325325
fc_with_batchnorm, feed_dict={"image": img,
326326
"label": label})
@@ -404,9 +404,6 @@ class ModelHyperParams(object):
404404
dropout = 0.1
405405

406406

407-
import numpy as np
408-
409-
410407
def prepare_batch_input(insts, src_pad_idx, trg_pad_idx, n_head):
411408
"""
412409
Pad the instances to the max sequence length in batch, and generate the
@@ -533,9 +530,8 @@ def check_network_convergence(self):
533530
opt.minimize(loss)
534531

535532
batch_size = 32
536-
image = numpy.random.normal(size=(batch_size,
537-
784)).astype('float32')
538-
label = numpy.random.randint(0, 10, (batch_size, 1), dtype="int64")
533+
image = np.random.normal(size=(batch_size, 784)).astype('float32')
534+
label = np.random.randint(0, 10, (batch_size, 1), dtype="int64")
539535

540536
place = fluid.CUDAPlace(0)
541537
exe = fluid.Executor(place)
@@ -552,12 +548,12 @@ def check_network_convergence(self):
552548

553549
for i in xrange(5):
554550
test_loss, = test_exe.run([loss.name], feed=feed_dict)
555-
test_loss = numpy.array(test_loss)
551+
test_loss = np.array(test_loss)
556552

557553
train_loss, = train_exe.run([loss.name], feed=feed_dict)
558-
train_loss = numpy.array(train_loss)
554+
train_loss = np.array(train_loss)
559555
self.assertTrue(
560-
numpy.allclose(
556+
np.allclose(
561557
train_loss, test_loss, atol=1e-8),
562558
"Train loss: " + str(train_loss) + "\n Test loss:" +
563559
str(test_loss))
@@ -712,7 +708,7 @@ def check_network_convergence(self, is_sparse):
712708
data = train_data()
713709
for i in xrange(10):
714710
cur_batch = next(data)
715-
print map(numpy.array,
711+
print map(np.array,
716712
pe.run(feed=feeder.feed(cur_batch),
717713
fetch_list=[avg_cost.name]))[0]
718714

@@ -721,3 +717,84 @@ def test_update_sparse_parameter(self):
721717

722718
def test_update_dense_parameter(self):
723719
self.check_network_convergence(is_sparse=False)
720+
721+
722+
# test fetch all the variables of global_block
723+
724+
import paddle.dataset.flowers as flowers
725+
import math
726+
727+
728+
def Lenet(data, class_dim):
729+
conv1 = fluid.layers.conv2d(data, 32, 5, 1, act=None)
730+
bn1 = fluid.layers.batch_norm(conv1, act='relu')
731+
pool1 = fluid.layers.pool2d(bn1, 2, 'max', 2)
732+
conv2 = fluid.layers.conv2d(pool1, 50, 5, 1, act=None)
733+
bn2 = fluid.layers.batch_norm(conv2, act='relu')
734+
pool2 = fluid.layers.pool2d(bn2, 2, 'max', 2)
735+
736+
fc1 = fluid.layers.fc(pool2, size=500, act='relu')
737+
fc2 = fluid.layers.fc(fc1, size=class_dim, act='softmax')
738+
739+
return fc2
740+
741+
742+
class TestFetchOp(unittest.TestCase):
743+
def parallel_exe(self, train_inputs, seed):
744+
main = fluid.Program()
745+
startup = fluid.Program()
746+
startup.random_seed = seed
747+
with fluid.program_guard(main, startup):
748+
data = fluid.layers.data(
749+
name='image', shape=[3, 224, 224], dtype='float32')
750+
label = fluid.layers.data(name='label', shape=[1], dtype='int64')
751+
out = Lenet(data, class_dim=102)
752+
loss = fluid.layers.cross_entropy(input=out, label=label)
753+
loss = fluid.layers.mean(loss)
754+
755+
opt = fluid.optimizer.Momentum(
756+
learning_rate=0.1,
757+
momentum=0.9,
758+
regularization=fluid.regularizer.L2Decay(1e-4))
759+
760+
opt.minimize(loss)
761+
762+
# TODO(zcd): I found that onece the memory optimizer is open,
763+
# parallel_exe doesn't fetch some variable, such as conv2d_0.b_0@GRAD,
764+
# conv2d_1.b_0@GRAD. Those variables should not be pruned.
765+
# fluid.memory_optimize(main)
766+
767+
place = fluid.CUDAPlace(0)
768+
exe = fluid.Executor(place)
769+
exe.run(startup)
770+
771+
feeder = fluid.DataFeeder(place=place, feed_list=[data, label])
772+
pe = fluid.ParallelExecutor(
773+
use_cuda=True, loss_name=loss.name, main_program=main)
774+
775+
fetch_list = []
776+
all_vars = main.global_block().vars
777+
for k, v in all_vars.iteritems():
778+
if 'tmp' not in k and k[0] is not '_' or v.persistable:
779+
fetch_list.append(k)
780+
781+
for data in train_inputs:
782+
ret = pe.run(fetch_list, feed=feeder.feed(data))
783+
for i in range(len(fetch_list)):
784+
assert not math.isnan(np.sum(ret[i])) and \
785+
not math.isinf(np.sum(ret[i]))
786+
787+
def test_update_sparse_parameter(self):
788+
tst_reader = paddle.batch(flowers.test(use_xmap=False), batch_size=16)
789+
tst_reader_iter = tst_reader()
790+
791+
iters = 3
792+
train_inputs = []
793+
for i in range(iters):
794+
train_inputs.append(tst_reader_iter.next())
795+
796+
self.parallel_exe(train_inputs, seed=1)
797+
798+
799+
if __name__ == '__main__':
800+
unittest.main()

0 commit comments

Comments
 (0)