Skip to content

Commit 89c9f79

Browse files
committed
Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into adadelta
2 parents 7c59ac4 + 832deee commit 89c9f79

File tree

10 files changed

+103
-13
lines changed

10 files changed

+103
-13
lines changed

doc/design/distributed_lookup_table_design.md renamed to doc/fluid/design/dist_train/distributed_lookup_table_design.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ lookup of rows.
2626
The following figure illustrates the multiplication of x with two
2727
non-zero elements, or say, two symbols, and a lookup table W:
2828

29-
![lookup table](./lookup_table.png)
29+
![lookup table](./src/lookup_table.png)
3030

3131
### The Backward Algorithm
3232

@@ -42,7 +42,7 @@ or some more sophisticated algorithms that rely on both W' and W:
4242
$$W = f(W, W')$$
4343

4444
The following figure illustrates the backward pass of the lookup
45-
operator: ![lookup table training](./lookup_table_training.png)
45+
operator: ![lookup table training](./src/lookup_table_training.png)
4646

4747
## Distributed Storage Service
4848

doc/fluid/design/motivation/fluid.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ In computability theory, a system of data-manipulation rules, such as a programm
103103

104104
There are two ways to execute a Fluid program. When a program is executed, it creates a protobuf message [`ProgramDesc`](https://github.com/PaddlePaddle/Paddle/blob/a91efdde6910ce92a78e3aa7157412c4c88d9ee8/paddle/framework/framework.proto#L145) that describes the process and is conceptually like an [abstract syntax tree](https://en.wikipedia.org/wiki/Abstract_syntax_tree).
105105

106-
There is a C++ class [`Executor`](https://github.com/PaddlePaddle/Paddle/blob/develop/paddle/framework/executor.h), which runs a `ProgramDesc`, similar to how an interpreter runs a Python program.
106+
There is a C++ class [`Executor`](https://github.com/PaddlePaddle/Paddle/blob/develop/paddle/fluid/framework/executor.h), which runs a `ProgramDesc`, similar to how an interpreter runs a Python program.
107107

108108
Fluid is moving towards the direction of a compiler, which is explain in [fluid_compiler.md](fluid_compiler.md).
109109

paddle/fluid/framework/channel_test.cc

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -871,3 +871,67 @@ TEST(ChannelHolder, ChannelHolderDestroyUnblocksSendersTest) {
871871
ch->Reset<int>(0);
872872
ChannelHolderDestroyUnblockSenders(ch, false);
873873
}
874+
875+
// This tests that closing a channelholder many times.
876+
void ChannelHolderManyTimesClose(ChannelHolder *ch) {
877+
const int num_threads = 15;
878+
std::thread t[num_threads];
879+
bool thread_ended[num_threads];
880+
881+
// Launches threads that try to send data to channel.
882+
for (size_t i = 0; i < num_threads / 3; i++) {
883+
thread_ended[i] = false;
884+
t[i] = std::thread(
885+
[&](bool *ended) {
886+
int data = 10;
887+
ch->Send(&data);
888+
*ended = true;
889+
},
890+
&thread_ended[i]);
891+
}
892+
893+
// Launches threads that try to receive data to channel.
894+
for (size_t i = num_threads / 3; i < 2 * num_threads / 3; i++) {
895+
thread_ended[i] = false;
896+
t[i] = std::thread(
897+
[&](bool *p) {
898+
int data;
899+
if (ch->Receive(&data)) {
900+
EXPECT_EQ(data, 10);
901+
}
902+
*p = true;
903+
},
904+
&thread_ended[i]);
905+
}
906+
907+
// Launches threads that try to close the channel.
908+
for (size_t i = 2 * num_threads / 3; i < num_threads; i++) {
909+
thread_ended[i] = false;
910+
t[i] = std::thread(
911+
[&](bool *p) {
912+
if (!ch->IsClosed()) {
913+
ch->close();
914+
}
915+
*p = true;
916+
},
917+
&thread_ended[i]);
918+
}
919+
920+
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // wait
921+
922+
// Verify that all threads are unblocked
923+
for (size_t i = 0; i < num_threads; i++) {
924+
EXPECT_EQ(thread_ended[i], true);
925+
}
926+
EXPECT_TRUE(ch->IsClosed());
927+
// delete the channel
928+
delete ch;
929+
for (size_t i = 0; i < num_threads; i++) t[i].join();
930+
}
931+
932+
TEST(ChannelHolder, ChannelHolderManyTimesCloseTest) {
933+
// Check for Buffered Channel
934+
ChannelHolder *ch = new ChannelHolder();
935+
ch->Reset<int>(10);
936+
ChannelHolderManyTimesClose(ch);
937+
}

paddle/fluid/operators/math/CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ math_library(sequence2batch)
4343
math_library(sequence_padding)
4444
math_library(sequence_pooling DEPS math_function)
4545
math_library(sequence_scale)
46-
math_library(softmax)
46+
math_library(softmax DEPS math_function)
4747
math_library(unpooling)
4848
math_library(vol2col)
4949

paddle/fluid/operators/math/concat.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ class ConcatFunctor<platform::CPUDeviceContext, T> {
4444
out_cols += t_cols;
4545
input_cols[i] = t_cols;
4646
}
47-
auto& cpu_place = boost::get<platform::CPUPlace>(context.GetPlace());
47+
auto cpu_place = boost::get<platform::CPUPlace>(context.GetPlace());
4848

4949
// computation
5050
for (int k = 0; k < out_rows; ++k) {
@@ -87,7 +87,7 @@ class ConcatGradFunctor<platform::CPUDeviceContext, T> {
8787
input_cols += t_cols;
8888
output_cols[i] = t_cols;
8989
}
90-
auto& cpu_place = boost::get<platform::CPUPlace>(context.GetPlace());
90+
auto cpu_place = boost::get<platform::CPUPlace>(context.GetPlace());
9191

9292
// computation
9393
for (int k = 0; k < input_rows; ++k) {

paddle/fluid/operators/reader/create_double_buffer_reader_op.cc

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,20 +48,24 @@ class DoubleBufferReader : public framework::DecoratedReader {
4848

4949
void start_thread() {
5050
buffer_ = framework::MakeChannel<Item>(kDoubleBufferSize);
51-
std::thread prefetch([this] { PrefetchThreadFunc(); });
52-
prefetch.detach();
51+
prefetcher_ = std::thread([this] { PrefetchThreadFunc(); });
5352
}
5453

5554
void ReadNext(std::vector<framework::LoDTensor>* out) override;
5655
void ReInit() override;
5756

58-
~DoubleBufferReader() { buffer_->Close(); }
57+
~DoubleBufferReader() {
58+
buffer_->Close();
59+
prefetcher_.join();
60+
delete buffer_;
61+
}
5962

6063
bool HasNext() const override;
6164

6265
private:
6366
void PrefetchThreadFunc();
6467

68+
std::thread prefetcher_;
6569
framework::Channel<Item>* buffer_;
6670
platform::Place place_;
6771
std::vector<std::unique_ptr<platform::DeviceContext>> ctxs_;
@@ -134,6 +138,8 @@ void DoubleBufferReader::ReadNext(std::vector<framework::LoDTensor>* out) {
134138
void DoubleBufferReader::ReInit() {
135139
reader_->ReInit();
136140
buffer_->Close();
141+
prefetcher_.join();
142+
delete buffer_;
137143
start_thread();
138144
}
139145

@@ -159,11 +165,12 @@ void DoubleBufferReader::PrefetchThreadFunc() {
159165

160166
if (!buffer_->Send(&batch)) {
161167
VLOG(5) << "WARNING: The double buffer channel has been closed. The "
162-
"prefetch thread terminates.";
168+
"prefetch thread will terminate.";
163169
break;
164170
}
165171
}
166172
buffer_->Close();
173+
VLOG(5) << "Prefetch thread terminates.";
167174
}
168175

169176
bool DoubleBufferReader::HasNext() const {

paddle/fluid/operators/reader/create_shuffle_reader_op.cc

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@ class ShuffleReader : public framework::DecoratedReader {
3434
}
3535

3636
void ReadNext(std::vector<framework::LoDTensor>* out) override {
37+
if (!HasNext()) {
38+
PADDLE_THROW("There is no next data!");
39+
}
3740
if (iteration_pos_ >= buffer_.size()) {
3841
VLOG(10) << "Resetting shuffle buffer";
3942
ReadIntoBuffers();
@@ -50,7 +53,6 @@ class ShuffleReader : public framework::DecoratedReader {
5053
buffer_.clear();
5154
buffer_.reserve(buffer_size_);
5255
iteration_pos_ = 0;
53-
PADDLE_ENFORCE(reader_->HasNext());
5456
for (size_t i = 0; i < buffer_size_; ++i) {
5557
if (!reader_->HasNext()) {
5658
break;

python/paddle/fluid/concurrency.py

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ def make_channel(dtype, capacity=0):
131131
return channel
132132

133133

134-
def channel_send(channel, value):
134+
def channel_send(channel, value, copy=False):
135135
"""
136136
Sends a value through a channel variable. Used by an unbuffered or buffered
137137
channel to pass data from within or to a concurrent Go block, where
@@ -141,6 +141,8 @@ def channel_send(channel, value):
141141
channel (Variable|Channel): Channel variable created using
142142
`make_channel`.
143143
value (Variable): Value to send to channel
144+
copy (bool): Copy data while channel send. If False, then data
145+
is moved. The input cannot be used after move.
144146
Returns:
145147
Variable: The boolean status on whether or not the channel
146148
successfully sent the passed value.
@@ -162,11 +164,26 @@ def channel_send(channel, value):
162164
type=core.VarDesc.VarType.LOD_TENSOR,
163165
dtype=core.VarDesc.VarType.BOOL)
164166

167+
X = value
168+
169+
if copy is True:
170+
copied_X = helper.create_variable(
171+
name=unique_name.generate(value.name + '_copy'),
172+
type=value.type,
173+
dtype=value.dtype,
174+
shape=value.shape,
175+
lod_level=value.lod_level,
176+
capacity=value.capacity)
177+
178+
assign_op = channel_send_block.append_op(
179+
type="assign_op", inputs={"X": value}, outputs={"Out": copied_X})
180+
X = copied_X
181+
165182
channel_send_op = channel_send_block.append_op(
166183
type="channel_send",
167184
inputs={
168185
"Channel": channel,
169-
"X": value,
186+
"X": X,
170187
},
171188
outputs={"Status": status})
172189

0 commit comments

Comments
 (0)