Skip to content

Commit 304b6b7

Browse files
committed
Follow comments
1 parent 4cb63d8 commit 304b6b7

File tree

3 files changed

+23
-43
lines changed

3 files changed

+23
-43
lines changed

paddle/fluid/operators/reader/blocking_queue.h

Lines changed: 17 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,11 @@ namespace reader {
2525

2626
template <typename T>
2727
class BlockingQueue {
28+
// BlockingQueue is for buffered reading and is supposed to use only the
29+
// reader package. It is true that we could and we should have been using
30+
// framework::Channel, but which has currently a deadlock bug. BlockingQueue
31+
// is a workaround and a simplified version of framework::Channel as it
32+
// doesn't support GPU and it implements on buffered blocking queue.
2833
public:
2934
explicit BlockingQueue(size_t capacity)
3035
: capacity_(capacity), closed_(false) {
@@ -37,26 +42,28 @@ class BlockingQueue {
3742
std::unique_lock<std::mutex> lock(mutex_);
3843
send_cv_.wait(lock, [&] { return queue_.size() < capacity_ || closed_; });
3944
if (closed_) {
45+
VLOG(5)
46+
<< "WARNING: Sending an element to a closed reader::BlokcingQueue.";
4047
return false;
41-
} else {
42-
PADDLE_ENFORCE_LT(queue_.size(), capacity_);
43-
queue_.push_back(elem);
44-
receive_cv_.notify_one();
45-
return true;
4648
}
49+
PADDLE_ENFORCE_LT(queue_.size(), capacity_);
50+
queue_.push_back(elem);
51+
receive_cv_.notify_one();
52+
return true;
4753
}
4854

4955
bool Send(T&& elem) {
5056
std::unique_lock<std::mutex> lock(mutex_);
5157
send_cv_.wait(lock, [&] { return queue_.size() < capacity_ || closed_; });
5258
if (closed_) {
59+
VLOG(5)
60+
<< "WARNING: Sending an element to a closed reader::BlokcingQueue.";
5361
return false;
54-
} else {
55-
PADDLE_ENFORCE_LT(queue_.size(), capacity_);
56-
queue_.emplace_back(std::move(elem));
57-
receive_cv_.notify_one();
58-
return true;
5962
}
63+
PADDLE_ENFORCE_LT(queue_.size(), capacity_);
64+
queue_.emplace_back(std::move(elem));
65+
receive_cv_.notify_one();
66+
return true;
6067
}
6168

6269
bool Receive(T* elem) {
@@ -86,16 +93,6 @@ class BlockingQueue {
8693
return closed_;
8794
}
8895

89-
bool CanSend() {
90-
std::lock_guard<std::mutex> lock(mutex_);
91-
return !closed_ && queue_.size() < capacity_;
92-
}
93-
94-
bool CanReceive() {
95-
std::lock_guard<std::mutex> lock(mutex_);
96-
return !queue_.empty();
97-
}
98-
9996
size_t Cap() {
10097
std::lock_guard<std::mutex> lock(mutex_);
10198
return capacity_;

paddle/fluid/operators/reader/create_double_buffer_reader_op.cc

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,6 @@ class DoubleBufferReader : public framework::DecoratedReader {
5555
~DoubleBufferReader() { EndPrefetcher(); }
5656

5757
private:
58-
bool HasNext() const;
59-
6058
void StartPrefetcher() {
6159
channel_ = new reader::BlockingQueue<size_t>(kChannelSize);
6260
prefetcher_ = std::thread([this] { PrefetchThreadFunc(); });
@@ -139,17 +137,16 @@ class CreateDoubleBufferReaderOpMaker : public DecoratedReaderMakerBase {
139137
};
140138

141139
void DoubleBufferReader::ReadNext(std::vector<framework::LoDTensor>* out) {
142-
out->clear();
143-
if (HasNext()) {
144-
size_t cached_tensor_id;
145-
channel_->Receive(&cached_tensor_id);
140+
size_t cached_tensor_id;
141+
if (channel_->Receive(&cached_tensor_id)) {
146142
if (platform::is_gpu_place(place_)) {
147143
*out = gpu_tensor_cache_[cached_tensor_id];
148-
ctxs_[cached_tensor_id]->Wait();
149144
} else {
150145
// CPU place
151146
*out = cpu_tensor_cache_[cached_tensor_id];
152147
}
148+
} else {
149+
out->clear();
153150
}
154151
}
155152

@@ -159,12 +156,6 @@ void DoubleBufferReader::ReInit() {
159156
StartPrefetcher();
160157
}
161158

162-
bool DoubleBufferReader::HasNext() const {
163-
while (!channel_->IsClosed() && !channel_->CanReceive()) {
164-
}
165-
return channel_->CanReceive();
166-
}
167-
168159
void DoubleBufferReader::PrefetchThreadFunc() {
169160
VLOG(5) << "A new prefetch thread starts.";
170161
size_t cached_tensor_id = 0;

paddle/fluid/operators/reader/open_files_op.cc

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ class MultiFileReader : public framework::ReaderBase {
3737
~MultiFileReader() { EndScheduler(); }
3838

3939
private:
40-
bool HasNext();
4140
void StartNewScheduler();
4241
void EndScheduler();
4342
void ScheduleThreadFunc();
@@ -54,9 +53,8 @@ class MultiFileReader : public framework::ReaderBase {
5453
};
5554

5655
void MultiFileReader::ReadNext(std::vector<framework::LoDTensor>* out) {
57-
out->clear();
58-
if (HasNext()) {
59-
buffer_->Receive(out);
56+
if (!buffer_->Receive(out)) {
57+
out->clear();
6058
}
6159
}
6260

@@ -65,12 +63,6 @@ void MultiFileReader::ReInit() {
6563
StartNewScheduler();
6664
}
6765

68-
bool MultiFileReader::HasNext() {
69-
while (!buffer_->IsClosed() && !buffer_->CanReceive()) {
70-
}
71-
return buffer_->CanReceive();
72-
}
73-
7466
void MultiFileReader::StartNewScheduler() {
7567
size_t thread_num = prefetchers_.size();
7668
waiting_file_idx_ = new reader::BlockingQueue<size_t>(file_names_.size());

0 commit comments

Comments
 (0)