File tree Expand file tree Collapse file tree 1 file changed +11
-5
lines changed
paddle/fluid/operators/reader Expand file tree Collapse file tree 1 file changed +11
-5
lines changed Original file line number Diff line number Diff line change @@ -48,20 +48,24 @@ class DoubleBufferReader : public framework::DecoratedReader {
48
48
49
49
void start_thread () {
50
50
buffer_ = framework::MakeChannel<Item>(kDoubleBufferSize );
51
- std::thread prefetch ([this ] { PrefetchThreadFunc (); });
52
- prefetch.detach ();
51
+ prefetcher_ = std::thread ([this ] { PrefetchThreadFunc (); });
53
52
}
54
53
55
54
void ReadNext (std::vector<framework::LoDTensor>* out) override ;
56
55
void ReInit () override ;
57
56
58
- ~DoubleBufferReader () { buffer_->Close (); }
57
+ ~DoubleBufferReader () {
58
+ buffer_->Close ();
59
+ prefetcher_.join ();
60
+ delete buffer_;
61
+ }
59
62
60
63
bool HasNext () const override ;
61
64
62
65
private:
63
66
void PrefetchThreadFunc ();
64
67
68
+ std::thread prefetcher_;
65
69
framework::Channel<Item>* buffer_;
66
70
platform::Place place_;
67
71
std::vector<std::unique_ptr<platform::DeviceContext>> ctxs_;
@@ -134,6 +138,8 @@ void DoubleBufferReader::ReadNext(std::vector<framework::LoDTensor>* out) {
134
138
void DoubleBufferReader::ReInit () {
135
139
reader_->ReInit ();
136
140
buffer_->Close ();
141
+ prefetcher_.join ();
142
+ delete buffer_;
137
143
start_thread ();
138
144
}
139
145
@@ -159,8 +165,8 @@ void DoubleBufferReader::PrefetchThreadFunc() {
159
165
160
166
if (!buffer_->Send (&batch)) {
161
167
VLOG (5 ) << " WARNING: The double buffer channel has been closed. The "
162
- " prefetch thread terminate." ;
163
- return ;
168
+ " prefetch thread will terminate." ;
169
+ break ;
164
170
}
165
171
}
166
172
buffer_->Close ();
You can’t perform that action at this time.
0 commit comments