Skip to content

Commit 54ada94

Browse files
committed
Add demo for recordio train/test and parallel executor
1 parent d67b9ce commit 54ada94

File tree

7 files changed

+220
-25
lines changed

7 files changed

+220
-25
lines changed

paddle/fluid/framework/details/threaded_ssa_graph_executor.cc

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,9 @@ FeedFetchList ThreadedSSAGraphExecutor::Run(
140140

141141
if (timeout) {
142142
if (exception_) {
143-
throw * exception_;
143+
auto exp = *exception_;
144+
exception_.reset();
145+
throw exp;
144146
} else {
145147
continue;
146148
}

paddle/fluid/framework/parallel_executor.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ ParallelExecutor::ParallelExecutor(
7474
member_->own_local_scope = false;
7575
PADDLE_ENFORCE_EQ(member_->places_.size(), local_scopes.size());
7676
for (size_t i = 0; i < member_->places_.size(); ++i) {
77-
member_->local_scopes_.emplace_back(local_scopes[i]);
77+
member_->local_scopes_.emplace_back(&local_scopes[i]->NewScope());
7878
}
7979
}
8080

paddle/fluid/operators/reader/create_threaded_reader_op.cc

Lines changed: 3 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -21,26 +21,16 @@ namespace reader {
2121

2222
class ThreadedReader : public framework::DecoratedReader {
2323
public:
24-
ThreadedReader(ReaderBase* reader, bool safe_mode)
25-
: DecoratedReader(reader), safe_mode_(safe_mode) {}
24+
explicit ThreadedReader(ReaderBase* reader) : DecoratedReader(reader) {}
2625

2726
void ReadNext(std::vector<framework::LoDTensor>* out) override {
2827
std::lock_guard<std::mutex> lock(mutex_);
2928
reader_->ReadNext(out);
3029
}
3130

32-
void ReInit() override {
33-
if (safe_mode_) {
34-
PADDLE_THROW(
35-
"ThreadedReader::ReInit() is disabled when 'safe_mode' is true.");
36-
}
37-
VLOG(5) << "ThreadedReader::ReInit() is invoked! It might be buggy in "
38-
"multi-thread environment.";
39-
reader_->ReInit();
40-
}
31+
void ReInit() override { reader_->ReInit(); }
4132

4233
private:
43-
bool safe_mode_;
4434
std::mutex mutex_;
4535
};
4636

@@ -58,19 +48,14 @@ class CreateThreadedReaderOp : public framework::OperatorBase {
5848
}
5949
const auto& underlying_reader = scope.FindVar(Input("UnderlyingReader"))
6050
->Get<framework::ReaderHolder>();
61-
bool safe_mode = Attr<bool>("safe_mode");
62-
out->Reset(new ThreadedReader(underlying_reader.Get(), safe_mode));
51+
out->Reset(new ThreadedReader(underlying_reader.Get()));
6352
}
6453
};
6554

6655
class CreateThreadedReaderOpMaker : public DecoratedReaderMakerBase {
6756
public:
6857
CreateThreadedReaderOpMaker(OpProto* op_proto, OpAttrChecker* op_checker)
6958
: DecoratedReaderMakerBase(op_proto, op_checker) {
70-
AddAttr<bool>("safe_mode",
71-
"When 'safe_mode' is true, 'ReInit()' is disabled to avoid "
72-
"unexpected bugs in multi-thread environment.")
73-
.SetDefault(true);
7459
AddComment(R"DOC(
7560
CreateThreadedReader Operator
7661

python/paddle/fluid/layers/io.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -457,8 +457,8 @@ def __create_shared_decorated_reader__(op_type, reader, attrs):
457457
return monkey_patch_reader_methods(main_prog_var)
458458

459459

460-
def __create_unshared_decorated_reader__(op_type, reader, attrs):
461-
new_reader_name = unique_name(op_type)
460+
def __create_unshared_decorated_reader__(op_type, reader, attrs, name=None):
461+
new_reader_name = name if name is not None else unique_name(op_type)
462462
main_blk = default_main_program().current_block()
463463
new_reader = main_blk.create_var(name=new_reader_name)
464464
main_blk.append_op(
@@ -481,12 +481,12 @@ def batch(reader, batch_size):
481481
'create_batch_reader', reader, {'batch_size': int(batch_size)})
482482

483483

484-
def double_buffer(reader, place=None):
484+
def double_buffer(reader, place=None, name=None):
485485
attrs = dict()
486486
if place is not None:
487487
attrs['place'] = str(place).upper()
488-
return __create_unshared_decorated_reader__('create_double_buffer_reader',
489-
reader, attrs)
488+
return __create_unshared_decorated_reader__(
489+
'create_double_buffer_reader', reader, attrs, name=name)
490490

491491

492492
def multi_pass(reader, pass_num):
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
*.recordio
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import sys
16+
import paddle.fluid as fluid
17+
import paddle.v2 as paddle
18+
19+
20+
def load_vocab(filename):
21+
"""
22+
load vocabulary
23+
"""
24+
vocab = {}
25+
with open(filename) as f:
26+
wid = 0
27+
for line in f:
28+
vocab[line.strip()] = wid
29+
wid += 1
30+
return vocab
31+
32+
33+
# load word dict with paddle inner function
34+
word_dict = load_vocab(sys.argv[1])
35+
word_dict["<unk>"] = len(word_dict)
36+
print "Dict dim = ", len(word_dict)
37+
38+
# input text data
39+
data = fluid.layers.data(name="words", shape=[1], dtype="int64", lod_level=1)
40+
41+
# label data
42+
label = fluid.layers.data(name="label", shape=[1], dtype="int64")
43+
# like placeholder
44+
feeder = fluid.DataFeeder(feed_list=[data, label], place=fluid.CPUPlace())
45+
46+
# train data set
47+
BATCH_SIZE = 128
48+
train_reader = paddle.batch(
49+
paddle.reader.shuffle(
50+
paddle.dataset.imdb.train(word_dict), buf_size=10000),
51+
batch_size=BATCH_SIZE)
52+
53+
test_reader = paddle.batch(
54+
paddle.dataset.imdb.test(word_dict), batch_size=BATCH_SIZE)
55+
56+
fluid.recordio_writer.convert_reader_to_recordio_file(
57+
"train.recordio", feeder=feeder, reader_creator=train_reader)
58+
fluid.recordio_writer.convert_reader_to_recordio_file(
59+
"test.recordio", feeder=feeder, reader_creator=test_reader)
Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import paddle.fluid as fluid
16+
import numpy
17+
import sys
18+
19+
TRAIN_FILES = ['train.recordio']
20+
TEST_FILES = ['test.recordio']
21+
22+
DICT_DIM = 89528
23+
24+
# embedding dim
25+
emb_dim = 128
26+
27+
# hidden dim
28+
hid_dim = 128
29+
30+
# hidden dim2
31+
hid_dim2 = 96
32+
33+
# class num
34+
class_dim = 2
35+
36+
37+
def network_cfg(is_train, pass_num=100):
38+
with fluid.unique_name.guard():
39+
train_file_obj = fluid.layers.open_files(
40+
filenames=TRAIN_FILES,
41+
pass_num=pass_num,
42+
shapes=[[-1, 1], [-1, 1]],
43+
lod_levels=[1, 0],
44+
dtypes=['int64', 'int64'],
45+
thread_num=1)
46+
47+
test_file_obj = fluid.layers.open_files(
48+
filenames=TEST_FILES,
49+
pass_num=1,
50+
shapes=[[-1, 1], [-1, 1]],
51+
lod_levels=[1, 0],
52+
dtypes=['int64', 'int64'],
53+
thread_num=1)
54+
55+
if is_train:
56+
file_obj = fluid.layers.shuffle(train_file_obj, buffer_size=1000)
57+
else:
58+
file_obj = test_file_obj
59+
60+
file_obj = fluid.layers.double_buffer(
61+
file_obj,
62+
name="train_double_buffer" if is_train else 'test_double_buffer')
63+
64+
data, label = fluid.layers.read_file(file_obj)
65+
66+
emb = fluid.layers.embedding(input=data, size=[DICT_DIM, emb_dim])
67+
68+
# sequence conv with window size = 3
69+
win_size = 3
70+
conv_3 = fluid.nets.sequence_conv_pool(
71+
input=emb,
72+
num_filters=hid_dim,
73+
filter_size=win_size,
74+
act="tanh",
75+
pool_type="max")
76+
77+
# fc layer after conv
78+
fc_1 = fluid.layers.fc(input=[conv_3], size=hid_dim2)
79+
80+
# probability of each class
81+
prediction = fluid.layers.fc(input=[fc_1],
82+
size=class_dim,
83+
act="softmax")
84+
# cross entropy loss
85+
cost = fluid.layers.cross_entropy(input=prediction, label=label)
86+
87+
# mean loss
88+
avg_cost = fluid.layers.mean(x=cost)
89+
acc = fluid.layers.accuracy(input=prediction, label=label)
90+
91+
if is_train:
92+
# SGD optimizer
93+
sgd_optimizer = fluid.optimizer.Adagrad(learning_rate=0.01)
94+
sgd_optimizer.minimize(avg_cost)
95+
96+
return {
97+
'loss': avg_cost,
98+
'log': [avg_cost, acc],
99+
'file': train_file_obj if is_train else test_file_obj
100+
}
101+
102+
103+
def main():
104+
train = fluid.Program()
105+
startup = fluid.Program()
106+
107+
with fluid.program_guard(train, startup):
108+
train_args = network_cfg(is_train=True)
109+
110+
test = fluid.Program()
111+
112+
with fluid.program_guard(test, fluid.Program()):
113+
test_args = network_cfg(is_train=False)
114+
115+
# startup
116+
place = fluid.CUDAPlace(0)
117+
exe = fluid.Executor(place=place)
118+
exe.run(startup)
119+
120+
train_exe = fluid.ParallelExecutor(
121+
use_cuda=True, loss_name=train_args['loss'].name, main_program=train)
122+
123+
fetch_var_list = [var.name for var in train_args['log']]
124+
for i in xrange(sys.maxint):
125+
result = map(numpy.array,
126+
train_exe.run(fetch_list=fetch_var_list
127+
if i % 1000 == 0 else []))
128+
if len(result) != 0:
129+
print 'Train: ', result
130+
131+
if i % 1000 == 0:
132+
test_exe = fluid.ParallelExecutor(
133+
use_cuda=True, main_program=test, share_vars_from=train_exe)
134+
loss = []
135+
acc = []
136+
try:
137+
while True:
138+
loss_np, acc_np = map(
139+
numpy.array, test_exe.run(fetch_list=fetch_var_list))
140+
loss.append(loss_np[0])
141+
acc.append(acc_np[0])
142+
except:
143+
test_args['file'].reset()
144+
print 'TEST: ', numpy.mean(loss), numpy.mean(acc)
145+
146+
147+
if __name__ == '__main__':
148+
main()

0 commit comments

Comments
 (0)