Skip to content

Commit 01fbcb0

Browse files
authored
Merge pull request #11695 from sneaxiy/complete_py_reader_cpp
Add Python Reader Op (CPP side)
2 parents 453aa9e + d4d946d commit 01fbcb0

File tree

6 files changed

+244
-12
lines changed

6 files changed

+244
-12
lines changed

paddle/fluid/operators/reader/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ reader_library(create_double_buffer_reader_op SRCS create_double_buffer_reader_o
2424
reader_library(create_multi_pass_reader_op SRCS create_multi_pass_reader_op.cc)
2525
reader_library(create_threaded_reader_op SRCS create_threaded_reader_op.cc)
2626
reader_library(create_custom_reader_op SRCS create_custom_reader_op.cc)
27+
reader_library(create_py_reader_op SRCS create_py_reader_op.cc)
2728

2829
cc_test(reader_blocking_queue_test SRCS reader_blocking_queue_test.cc)
2930
# Export local libraries to parent

paddle/fluid/operators/reader/blocking_queue.h

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -88,24 +88,29 @@ class BlockingQueue {
8888
receive_cv_.notify_all();
8989
}
9090

91-
bool IsClosed() {
91+
bool IsClosed() const {
9292
std::lock_guard<std::mutex> lock(mutex_);
9393
return closed_;
9494
}
9595

96-
size_t Cap() {
96+
size_t Cap() const {
9797
std::lock_guard<std::mutex> lock(mutex_);
9898
return capacity_;
9999
}
100100

101+
size_t Size() const {
102+
std::lock_guard<std::mutex> lock(mutex_);
103+
return queue_.size();
104+
}
105+
101106
private:
102107
size_t capacity_;
103108
bool closed_;
104109
std::deque<T> queue_;
105110

106-
std::mutex mutex_;
107-
std::condition_variable receive_cv_;
108-
std::condition_variable send_cv_;
111+
mutable std::mutex mutex_;
112+
mutable std::condition_variable receive_cv_;
113+
mutable std::condition_variable send_cv_;
109114
};
110115
} // namespace reader
111116
} // namespace operators
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#include "paddle/fluid/operators/reader/lod_tensor_blocking_queue.h"
16+
#include "paddle/fluid/operators/reader/reader_op_registry.h"
17+
18+
namespace paddle {
19+
namespace operators {
20+
namespace reader {
21+
22+
class PyReader : public framework::ReaderBase {
23+
public:
24+
explicit PyReader(const std::shared_ptr<LoDTensorBlockingQueue>& queue) {
25+
PADDLE_ENFORCE(queue != nullptr, "LoDTensorBlockingQueue must not be null");
26+
queue_ = queue;
27+
}
28+
29+
void ReadNext(std::vector<framework::LoDTensor>* out) override {
30+
bool success;
31+
*out = queue_->Pop(&success);
32+
if (!success) out->clear();
33+
}
34+
35+
void ReInit() override {}
36+
37+
private:
38+
std::shared_ptr<LoDTensorBlockingQueue> queue_;
39+
};
40+
41+
class CreatePyReaderOp : public framework::OperatorBase {
42+
public:
43+
using framework::OperatorBase::OperatorBase;
44+
45+
private:
46+
void RunImpl(const framework::Scope& scope,
47+
const platform::Place& dev_place) const override {
48+
auto* out = scope.FindVar(Output("Out"))
49+
->template GetMutable<framework::ReaderHolder>();
50+
if (out->Get() != nullptr) return;
51+
52+
const std::string& queue_name = Input("blocking_queue");
53+
auto* queue_holder_var = scope.FindVar(queue_name);
54+
PADDLE_ENFORCE(
55+
queue_holder_var != nullptr,
56+
"No LoDTensorBlockingQueueHolder variable with name %s found",
57+
queue_name);
58+
auto* queue_holder =
59+
queue_holder_var->template GetMutable<LoDTensorBlockingQueueHolder>();
60+
61+
out->Reset(new PyReader(queue_holder->GetQueue()));
62+
}
63+
};
64+
65+
class CreatePyReaderOpMaker : public FileReaderMakerBase {
66+
protected:
67+
void Apply() override {
68+
AddInput("blocking_queue",
69+
"Name of the `LoDTensorBlockingQueueHolder` variable");
70+
71+
AddComment(R"DOC(
72+
Create PyReader to support LoDTensor data feeding in Python side.
73+
)DOC");
74+
}
75+
};
76+
77+
} // namespace reader
78+
} // namespace operators
79+
} // namespace paddle
80+
81+
namespace reader = ::paddle::operators::reader;
82+
83+
REGISTER_FILE_READER_OPERATOR(create_py_reader, reader::CreatePyReaderOp,
84+
reader::CreatePyReaderOpMaker);
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#pragma once
16+
17+
#include <memory>
18+
#include <vector>
19+
20+
#include "paddle/fluid/framework/ddim.h"
21+
#include "paddle/fluid/framework/lod_tensor.h"
22+
#include "paddle/fluid/operators/reader/blocking_queue.h"
23+
#include "paddle/fluid/platform/place.h"
24+
25+
namespace paddle {
26+
namespace operators {
27+
namespace reader {
28+
29+
class LoDTensorBlockingQueueHolder;
30+
31+
class LoDTensorBlockingQueue {
32+
friend class LoDTensorBlockingQueueHolder;
33+
34+
private:
35+
LoDTensorBlockingQueue(size_t capacity,
36+
const std::vector<framework::DDim>& dims)
37+
: queue_(capacity), dims_(dims) {}
38+
39+
public:
40+
bool Push(const std::vector<framework::LoDTensor>& lod_tensor_vec) {
41+
CheckDims(lod_tensor_vec);
42+
return queue_.Send(lod_tensor_vec);
43+
}
44+
45+
bool Push(std::vector<framework::LoDTensor>&& lod_tensor_vec) {
46+
CheckDims(lod_tensor_vec);
47+
return queue_.Send(std::move(lod_tensor_vec));
48+
}
49+
50+
std::vector<framework::LoDTensor> Pop(bool* ok = nullptr) {
51+
std::vector<framework::LoDTensor> lod_tensor_vec;
52+
bool success = queue_.Receive(&lod_tensor_vec);
53+
if (ok != nullptr) *ok = success;
54+
return lod_tensor_vec;
55+
}
56+
57+
inline size_t Cap() const { return queue_.Cap(); }
58+
59+
inline size_t Size() const { return queue_.Size(); }
60+
61+
inline void Close() { return queue_.Close(); }
62+
63+
inline bool IsClosed() const { return queue_.IsClosed(); }
64+
65+
private:
66+
void CheckDims(const std::vector<framework::LoDTensor>& lod_tensor_vec) {
67+
PADDLE_ENFORCE(dims_.size() == lod_tensor_vec.size(),
68+
"Expect input size is %d but found %s", dims_.size(),
69+
lod_tensor_vec.size());
70+
for (size_t i = 0; i < dims_.size(); ++i) {
71+
const auto& in_dims = framework::slice_ddim(
72+
lod_tensor_vec[i].dims(), 1, lod_tensor_vec[i].dims().size());
73+
const auto& expect_dims =
74+
framework::slice_ddim(dims_[i], 1, dims_[i].size());
75+
PADDLE_ENFORCE(in_dims == expect_dims,
76+
"Dims of the %d-th input tensor do not match", i);
77+
}
78+
}
79+
80+
BlockingQueue<std::vector<framework::LoDTensor>> queue_;
81+
std::vector<framework::DDim> dims_;
82+
};
83+
84+
class LoDTensorBlockingQueueHolder {
85+
public:
86+
void InitOnce(size_t capacity, const std::vector<framework::DDim>& dims) {
87+
PADDLE_ENFORCE(
88+
queue_ == nullptr,
89+
"LoDTensorBlockingQueueHolder::InitOnce() can only be called once");
90+
queue_.reset(new LoDTensorBlockingQueue(capacity, dims));
91+
}
92+
93+
inline const std::shared_ptr<LoDTensorBlockingQueue>& GetQueue() const {
94+
return queue_;
95+
}
96+
97+
private:
98+
std::shared_ptr<LoDTensorBlockingQueue> queue_;
99+
};
100+
101+
} // namespace reader
102+
} // namespace operators
103+
} // namespace paddle

paddle/fluid/pybind/pybind.cc

Lines changed: 43 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ limitations under the License. */
3434
#include "paddle/fluid/framework/reader.h"
3535
#include "paddle/fluid/framework/selected_rows.h"
3636
#include "paddle/fluid/operators/activation_op.h"
37+
#include "paddle/fluid/operators/reader/lod_tensor_blocking_queue.h"
3738
#include "paddle/fluid/platform/enforce.h"
3839
#include "paddle/fluid/platform/place.h"
3940
#include "paddle/fluid/platform/profiler.h"
@@ -297,6 +298,37 @@ All parameter, weight, gradient are variables in Paddle.
297298
py::class_<framework::ReaderHolder>(m, "Reader", "")
298299
.def("reset", &framework::ReaderHolder::ReInit);
299300

301+
using LoDTensorBlockingQueue =
302+
::paddle::operators::reader::LoDTensorBlockingQueue;
303+
using LoDTensorBlockingQueueHolder =
304+
::paddle::operators::reader::LoDTensorBlockingQueueHolder;
305+
py::class_<LoDTensorBlockingQueue>(m, "LoDTensorBlockingQueue", "")
306+
.def("push",
307+
[](LoDTensorBlockingQueue &self,
308+
const std::vector<framework::LoDTensor> &lod_tensor_vec) {
309+
pybind11::gil_scoped_release release;
310+
return self.Push(lod_tensor_vec);
311+
})
312+
.def("size", &LoDTensorBlockingQueue::Size)
313+
.def("capacity", &LoDTensorBlockingQueue::Cap)
314+
.def("close", &LoDTensorBlockingQueue::Close)
315+
.def("is_closed", &LoDTensorBlockingQueue::IsClosed);
316+
317+
m.def("init_lod_tensor_blocking_queue",
318+
[](Variable &var, size_t capacity,
319+
const std::vector<std::vector<int64_t>> &shapes)
320+
-> LoDTensorBlockingQueue * {
321+
std::vector<DDim> dims(shapes.size());
322+
std::transform(shapes.begin(), shapes.end(), dims.begin(),
323+
[](const std::vector<int64_t> &shape) {
324+
return make_ddim(shape);
325+
});
326+
auto *holder = var.GetMutable<LoDTensorBlockingQueueHolder>();
327+
holder->InitOnce(capacity, dims);
328+
return holder->GetQueue().get();
329+
},
330+
py::return_value_policy::reference);
331+
300332
py::class_<Scope>(m, "Scope", "")
301333
.def("var",
302334
[](Scope &self, const std::string &name) -> Variable * {
@@ -463,9 +495,11 @@ All parameter, weight, gradient are variables in Paddle.
463495
#ifdef PADDLE_WITH_DISTRIBUTE
464496
.def("complete", &Executor::Complete)
465497
#endif
466-
.def("run",
467-
(void (Executor::*)(const ProgramDesc &, Scope *, int, bool, bool)) &
468-
Executor::Run);
498+
.def("run", [](Executor &self, const ProgramDesc &prog, Scope *scope,
499+
int block_id, bool create_local_scope, bool create_vars) {
500+
pybind11::gil_scoped_release release;
501+
self.Run(prog, scope, block_id, create_local_scope, create_vars);
502+
});
469503

470504
m.def("init_gflags", framework::InitGflags);
471505
m.def("init_glog", framework::InitGLOG);
@@ -631,7 +665,12 @@ All parameter, weight, gradient are variables in Paddle.
631665
&ParallelExecutor::FeedTensorsIntoLocalScopes)
632666
.def("feed_and_split_tensor_into_local_scopes",
633667
&ParallelExecutor::FeedAndSplitTensorIntoLocalScopes)
634-
.def("run", &ParallelExecutor::Run);
668+
.def("run", [](ParallelExecutor &self,
669+
const std::vector<std::string> &fetch_tensors,
670+
const std::string &fetched_var_name) {
671+
pybind11::gil_scoped_release release;
672+
self.Run(fetch_tensors, fetched_var_name);
673+
});
635674

636675
BindRecordIOWriter(&m);
637676
return m.ptr();

paddle/fluid/pybind/tensor_py.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ void PyCPUTensorSetFromArray(
146146
template <>
147147
// This following specialization maps uint16_t in the parameter type to
148148
// platform::float16.
149-
void PyCPUTensorSetFromArray(
149+
inline void PyCPUTensorSetFromArray(
150150
framework::Tensor *self,
151151
pybind11::array_t<uint16_t,
152152
pybind11::array::c_style | pybind11::array::forcecast>
@@ -185,7 +185,7 @@ void PyCUDATensorSetFromArray(
185185
template <>
186186
// This following specialization maps uint16_t in the parameter type to
187187
// platform::float16.
188-
void PyCUDATensorSetFromArray(
188+
inline void PyCUDATensorSetFromArray(
189189
framework::Tensor *self,
190190
pybind11::array_t<uint16_t,
191191
pybind11::array::c_style | pybind11::array::forcecast>
@@ -224,7 +224,7 @@ void PyCUDAPinnedTensorSetFromArray(
224224
template <>
225225
// This following specialization maps uint16_t in the parameter type to
226226
// platform::float16.
227-
void PyCUDAPinnedTensorSetFromArray(
227+
inline void PyCUDAPinnedTensorSetFromArray(
228228
framework::Tensor *self,
229229
pybind11::array_t<uint16_t,
230230
pybind11::array::c_style | pybind11::array::forcecast>

0 commit comments

Comments
 (0)