@@ -15,7 +15,7 @@ limitations under the License. */
15
15
#pragma once
16
16
#include < stddef.h> // for size_t
17
17
#include < atomic>
18
- #include < condition_variable>
18
+ #include < condition_variable> // NOLINT
19
19
#include < deque>
20
20
#include " paddle/fluid/framework/channel.h"
21
21
#include " paddle/fluid/platform/enforce.h"
@@ -38,7 +38,7 @@ class ChannelImpl : public paddle::framework::Channel<T> {
38
38
virtual void Unlock ();
39
39
virtual bool IsClosed ();
40
40
virtual void Close ();
41
- ChannelImpl (size_t );
41
+ explicit ChannelImpl (size_t );
42
42
virtual ~ChannelImpl ();
43
43
44
44
virtual void AddToSendQ (const void *referrer, T *data,
@@ -60,7 +60,7 @@ class ChannelImpl : public paddle::framework::Channel<T> {
60
60
const void *referrer; // TODO(thuan): figure out better way to do this
61
61
std::function<bool (ChannelAction)> callback;
62
62
63
- QueueMessage (T *item)
63
+ explicit QueueMessage (T *item)
64
64
: data(item), cond(std::make_shared<std::condition_variable_any>()) {}
65
65
66
66
QueueMessage (T *item, std::shared_ptr<std::condition_variable_any> cond)
@@ -88,15 +88,15 @@ class ChannelImpl : public paddle::framework::Channel<T> {
88
88
}
89
89
90
90
std::shared_ptr<QueueMessage> get_first_message (
91
- std::deque<std::shared_ptr<QueueMessage>> & queue, ChannelAction action) {
92
- while (!queue. empty ()) {
91
+ std::deque<std::shared_ptr<QueueMessage>> * queue, ChannelAction action) {
92
+ while (!queue-> empty ()) {
93
93
// Check whether this message was added by Select
94
94
// If this was added by Select then execute the callback
95
95
// to check if you can execute this message. The callback
96
96
// can return false if some other case was executed in Select.
97
97
// In that case just discard this QueueMessage and process next.
98
- std::shared_ptr<QueueMessage> m = queue. front ();
99
- queue. pop_front ();
98
+ std::shared_ptr<QueueMessage> m = queue-> front ();
99
+ queue-> pop_front ();
100
100
if (m->callback == nullptr || m->callback (action)) return m;
101
101
}
102
102
return nullptr ;
@@ -147,7 +147,7 @@ void ChannelImpl<T>::Send(T *item) {
147
147
// to send to the receiver, bypassing the channel buffer if any
148
148
if (!recvq.empty ()) {
149
149
std::shared_ptr<QueueMessage> m =
150
- get_first_message (recvq, ChannelAction::SEND);
150
+ get_first_message (& recvq, ChannelAction::SEND);
151
151
152
152
if (m != nullptr ) {
153
153
*(m->data ) = std::move (*item);
@@ -198,7 +198,7 @@ bool ChannelImpl<T>::Receive(T *item) {
198
198
// buffer and move front of send queue to the buffer
199
199
if (!sendq.empty ()) {
200
200
std::shared_ptr<QueueMessage> m =
201
- get_first_message (sendq, ChannelAction::RECEIVE);
201
+ get_first_message (& sendq, ChannelAction::RECEIVE);
202
202
if (buf_.size () > 0 ) {
203
203
// Case 1 : Channel is Buffered
204
204
// Do Data transfer from front of buffer
@@ -219,8 +219,9 @@ bool ChannelImpl<T>::Receive(T *item) {
219
219
if (m != nullptr ) {
220
220
*item = std::move (*(m->data ));
221
221
m->Notify ();
222
- } else
222
+ } else {
223
223
return recv_return (Receive (item));
224
+ }
224
225
}
225
226
return recv_return (true );
226
227
}
0 commit comments