Skip to content

Commit ffcc760

Browse files
abhinavarorawangkuiyi
authored andcommitted
Fix deadlock in channel_test (#9544)
1 parent 0ee4565 commit ffcc760

File tree

2 files changed

+5
-14
lines changed

2 files changed

+5
-14
lines changed

paddle/fluid/framework/CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ cc_test(init_test SRCS init_test.cc DEPS init)
104104
cc_test(op_kernel_type_test SRCS op_kernel_type_test.cc DEPS place device_context framework_proto)
105105
cc_test(cow_ptr_tests SRCS details/cow_ptr_test.cc)
106106

107-
# cc_test(channel_test SRCS channel_test.cc)
107+
cc_test(channel_test SRCS channel_test.cc)
108108
cc_test(tuple_test SRCS tuple_test.cc )
109109
cc_test(concurrency_test SRCS concurrency_test.cc DEPS go_op channel_close_op channel_create_op
110110
channel_send_op channel_recv_op sum_op select_op elementwise_add_op compare_op

paddle/fluid/framework/channel_impl.h

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -138,8 +138,8 @@ void ChannelImpl<T>::Send(T *item) {
138138

139139
// If channel is closed, throw exception
140140
if (closed_) {
141-
lock.unlock();
142141
send_return();
142+
lock.unlock();
143143
PADDLE_THROW("Cannot send on closed channel");
144144
}
145145

@@ -152,11 +152,9 @@ void ChannelImpl<T>::Send(T *item) {
152152
if (m != nullptr) {
153153
*(m->data) = std::move(*item);
154154
m->Notify();
155-
lock.unlock();
156155
send_return();
157156
return;
158157
} else {
159-
lock.unlock();
160158
Send(item);
161159
send_return();
162160
return;
@@ -169,8 +167,6 @@ void ChannelImpl<T>::Send(T *item) {
169167
if (buf_.size() < cap_) {
170168
// Copy to buffer
171169
buf_.push_back(std::move(*item));
172-
// Release lock and return true
173-
lock.unlock();
174170
send_return();
175171
return;
176172
}
@@ -181,8 +177,8 @@ void ChannelImpl<T>::Send(T *item) {
181177
sendq.push_back(m);
182178
m->Wait(lock);
183179
if (m->chan_closed) {
184-
lock.unlock();
185180
send_return();
181+
lock.unlock();
186182
PADDLE_THROW("Cannot send on closed channel");
187183
}
188184
send_return();
@@ -195,10 +191,7 @@ bool ChannelImpl<T>::Receive(T *item) {
195191

196192
// If channel is closed and buffer is empty or
197193
// channel is unbuffered
198-
if (closed_ && buf_.empty()) {
199-
lock.unlock();
200-
return recv_return(false);
201-
}
194+
if (closed_ && buf_.empty()) return recv_return(false);
202195

203196
// If there is a sender, directly receive the value we want
204197
// from the sender. In case of a buffered channel, read from
@@ -229,7 +222,6 @@ bool ChannelImpl<T>::Receive(T *item) {
229222
} else
230223
return recv_return(Receive(item));
231224
}
232-
lock.unlock();
233225
return recv_return(true);
234226
}
235227

@@ -238,8 +230,7 @@ bool ChannelImpl<T>::Receive(T *item) {
238230
// Directly read from buffer
239231
*item = std::move(buf_.front());
240232
buf_.pop_front();
241-
// Release lock and return true
242-
lock.unlock();
233+
// return true
243234
return recv_return(true);
244235
}
245236

0 commit comments

Comments
 (0)