File tree Expand file tree Collapse file tree 2 files changed +14
-5
lines changed
paddle/fluid/operators/reader Expand file tree Collapse file tree 2 files changed +14
-5
lines changed Original file line number Diff line number Diff line change @@ -48,20 +48,24 @@ class DoubleBufferReader : public framework::DecoratedReader {
48
48
49
49
void start_thread () {
50
50
buffer_ = framework::MakeChannel<Item>(kDoubleBufferSize );
51
- std::thread prefetch ([this ] { PrefetchThreadFunc (); });
52
- prefetch.detach ();
51
+ prefetcher_ = std::thread ([this ] { PrefetchThreadFunc (); });
53
52
}
54
53
55
54
void ReadNext (std::vector<framework::LoDTensor>* out) override ;
56
55
void ReInit () override ;
57
56
58
- ~DoubleBufferReader () { buffer_->Close (); }
57
+ ~DoubleBufferReader () {
58
+ buffer_->Close ();
59
+ prefetcher_.join ();
60
+ delete buffer_;
61
+ }
59
62
60
63
bool HasNext () const override ;
61
64
62
65
private:
63
66
void PrefetchThreadFunc ();
64
67
68
+ std::thread prefetcher_;
65
69
framework::Channel<Item>* buffer_;
66
70
platform::Place place_;
67
71
std::vector<std::unique_ptr<platform::DeviceContext>> ctxs_;
@@ -134,6 +138,8 @@ void DoubleBufferReader::ReadNext(std::vector<framework::LoDTensor>* out) {
134
138
void DoubleBufferReader::ReInit () {
135
139
reader_->ReInit ();
136
140
buffer_->Close ();
141
+ prefetcher_.join ();
142
+ delete buffer_;
137
143
start_thread ();
138
144
}
139
145
@@ -159,11 +165,12 @@ void DoubleBufferReader::PrefetchThreadFunc() {
159
165
160
166
if (!buffer_->Send (&batch)) {
161
167
VLOG (5 ) << " WARNING: The double buffer channel has been closed. The "
162
- " prefetch thread terminates ." ;
168
+ " prefetch thread will terminate ." ;
163
169
break ;
164
170
}
165
171
}
166
172
buffer_->Close ();
173
+ VLOG (5 ) << " Prefetch thread terminates." ;
167
174
}
168
175
169
176
bool DoubleBufferReader::HasNext () const {
Original file line number Diff line number Diff line change @@ -34,6 +34,9 @@ class ShuffleReader : public framework::DecoratedReader {
34
34
}
35
35
36
36
void ReadNext (std::vector<framework::LoDTensor>* out) override {
37
+ if (!HasNext ()) {
38
+ PADDLE_THROW (" There is no next data!" );
39
+ }
37
40
if (iteration_pos_ >= buffer_.size ()) {
38
41
VLOG (10 ) << " Resetting shuffle buffer" ;
39
42
ReadIntoBuffers ();
@@ -50,7 +53,6 @@ class ShuffleReader : public framework::DecoratedReader {
50
53
buffer_.clear ();
51
54
buffer_.reserve (buffer_size_);
52
55
iteration_pos_ = 0 ;
53
- PADDLE_ENFORCE (reader_->HasNext ());
54
56
for (size_t i = 0 ; i < buffer_size_; ++i) {
55
57
if (!reader_->HasNext ()) {
56
58
break ;
You can’t perform that action at this time.
0 commit comments