Skip to content

Commit 681226e

Browse files
authored
Merge pull request #13864 from jacquesqiao/py-reader-add-test-mode
reader block queue add test mode
2 parents f8874b3 + ec25a09 commit 681226e

File tree

6 files changed

+51
-9
lines changed

6 files changed

+51
-9
lines changed

paddle/fluid/operators/reader/blocking_queue.h

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,8 @@ class BlockingQueue {
3131
// is a workaround and a simplified version of framework::Channel as it
3232
// doesn't support GPU and it implements on buffered blocking queue.
3333
public:
34-
explicit BlockingQueue(size_t capacity)
35-
: capacity_(capacity), closed_(false) {
34+
explicit BlockingQueue(size_t capacity, bool speed_test_mode = false)
35+
: capacity_(capacity), speed_test_mode_(speed_test_mode), closed_(false) {
3636
PADDLE_ENFORCE_GT(
3737
capacity_, 0,
3838
"The capacity of a reader::BlockingQueue must be greater than 0.");
@@ -72,7 +72,9 @@ class BlockingQueue {
7272
if (!queue_.empty()) {
7373
PADDLE_ENFORCE_NOT_NULL(elem);
7474
*elem = queue_.front();
75-
queue_.pop_front();
75+
if (LIKELY(!speed_test_mode_)) {
76+
queue_.pop_front();
77+
}
7678
send_cv_.notify_one();
7779
return true;
7880
} else {
@@ -114,6 +116,7 @@ class BlockingQueue {
114116

115117
private:
116118
size_t capacity_;
119+
bool speed_test_mode_;
117120
bool closed_;
118121
std::deque<T> queue_;
119122

paddle/fluid/operators/reader/lod_tensor_blocking_queue.h

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,9 @@ class LoDTensorBlockingQueue {
3333

3434
private:
3535
LoDTensorBlockingQueue(size_t capacity,
36-
const std::vector<framework::DDim>& dims)
37-
: queue_(capacity), dims_(dims) {}
36+
const std::vector<framework::DDim>& dims,
37+
bool speed_test_mode = false)
38+
: queue_(capacity, speed_test_mode), dims_(dims) {}
3839

3940
public:
4041
bool Push(const std::vector<framework::LoDTensor>& lod_tensor_vec) {
@@ -69,11 +70,12 @@ class LoDTensorBlockingQueue {
6970

7071
class LoDTensorBlockingQueueHolder {
7172
public:
72-
void InitOnce(size_t capacity, const std::vector<framework::DDim>& dims) {
73+
void InitOnce(size_t capacity, const std::vector<framework::DDim>& dims,
74+
bool speed_test_mode = false) {
7375
PADDLE_ENFORCE(
7476
queue_ == nullptr,
7577
"LoDTensorBlockingQueueHolder::InitOnce() can only be called once");
76-
queue_.reset(new LoDTensorBlockingQueue(capacity, dims));
78+
queue_.reset(new LoDTensorBlockingQueue(capacity, dims, speed_test_mode));
7779
}
7880

7981
inline const std::shared_ptr<LoDTensorBlockingQueue>& GetQueue() const {

paddle/fluid/operators/reader/reader_blocking_queue_test.cc

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,3 +217,27 @@ TEST(BlockingQueue, MyClassTest) {
217217
q.Receive(&b);
218218
EXPECT_EQ(a.val_, b.val_);
219219
}
220+
221+
TEST(BlockingQueue, speed_test_mode) {
222+
size_t queue_size = 10;
223+
BlockingQueue<size_t> q1(queue_size, false);
224+
for (size_t i = 0; i < queue_size; ++i) {
225+
q1.Send(i);
226+
}
227+
size_t b;
228+
for (size_t i = 0; i < queue_size; ++i) {
229+
q1.Receive(&b);
230+
EXPECT_EQ(b, i);
231+
}
232+
EXPECT_EQ(q1.Size(), 0);
233+
234+
BlockingQueue<size_t> q2(queue_size, true);
235+
for (size_t i = 0; i < queue_size; ++i) {
236+
q2.Send(i);
237+
}
238+
for (size_t i = 0; i < queue_size; ++i) {
239+
q2.Receive(&b);
240+
EXPECT_EQ(b, 0);
241+
}
242+
EXPECT_EQ(q2.Size(), queue_size);
243+
}

paddle/fluid/platform/enforce.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,13 @@ struct EOFException : public std::exception {
130130
#define UNLIKELY(condition) (condition == 0)
131131
#endif
132132

133+
#if !defined(_WIN32)
134+
#define LIKELY(condition) __builtin_expect(static_cast<bool>(condition), 1)
135+
#else
136+
// there is no equivalent intrinsics in msvc.
137+
#define LIKELY(condition) (condition != 0)
138+
#endif
139+
133140
template <typename... Args>
134141
inline typename std::enable_if<sizeof...(Args) != 0, void>::type throw_on_error(
135142
bool stat, const Args&... args) {

paddle/fluid/pybind/pybind.cc

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,10 @@ limitations under the License. */
5757

5858
#include "pybind11/stl.h"
5959

60+
DEFINE_bool(reader_queue_speed_test_mode, false,
61+
"If set true, the queue.pop will only get data from queue but not "
62+
"remove the data from queue for speed testing");
63+
6064
// disable auto conversion to list in Python
6165
PYBIND11_MAKE_OPAQUE(paddle::framework::LoDTensorArray);
6266

@@ -380,7 +384,8 @@ All parameter, weight, gradient are variables in Paddle.
380384
return make_ddim(shape);
381385
});
382386
auto *holder = var.GetMutable<LoDTensorBlockingQueueHolder>();
383-
holder->InitOnce(capacity, dims);
387+
holder->InitOnce(capacity, dims,
388+
FLAGS_reader_queue_speed_test_mode);
384389
return holder->GetQueue();
385390
},
386391
py::return_value_policy::copy);

python/paddle/fluid/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,8 @@ def __bootstrap__():
113113
'use_pinned_memory', 'check_nan_inf', 'benchmark', 'warpctc_dir',
114114
'eager_delete_scope', 'use_mkldnn', 'initial_cpu_memory_in_mb',
115115
'init_allocated_mem', 'free_idle_memory', 'paddle_num_threads',
116-
"dist_threadpool_size", 'cpu_deterministic', 'eager_delete_tensor_gb'
116+
'dist_threadpool_size', 'cpu_deterministic', 'eager_delete_tensor_gb',
117+
'reader_queue_speed_test_mode'
117118
]
118119
if core.is_compiled_with_dist():
119120
read_env_flags.append('rpc_deadline')

0 commit comments

Comments
 (0)