Skip to content

Commit c585550

Browse files
authored
Merge pull request #14731 from jacquesqiao/optimize-cpp-reader
Optimize cpp reader
2 parents d54494b + 119a3d4 commit c585550

File tree

15 files changed

+533
-140
lines changed

15 files changed

+533
-140
lines changed

paddle/fluid/API.spec

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -359,6 +359,7 @@ paddle.fluid.contrib.QuantizeTranspiler.__init__ ArgSpec(args=['self', 'weight_b
359359
paddle.fluid.contrib.QuantizeTranspiler.convert_to_int8 ArgSpec(args=['self', 'program', 'place', 'scope'], varargs=None, keywords=None, defaults=(None,))
360360
paddle.fluid.contrib.QuantizeTranspiler.freeze_program ArgSpec(args=['self', 'program', 'place', 'fuse_bn', 'scope'], varargs=None, keywords=None, defaults=(False, None))
361361
paddle.fluid.contrib.QuantizeTranspiler.training_transpile ArgSpec(args=['self', 'program', 'startup_program'], varargs=None, keywords=None, defaults=(None, None))
362+
paddle.fluid.contrib.reader.ctr_reader.ctr_reader ArgSpec(args=['feed_dict', 'file_type', 'file_format', 'dense_slot_index', 'sparse_slot_index', 'capacity', 'thread_num', 'batch_size', 'file_list', 'slots', 'name'], varargs=None, keywords=None, defaults=(None,))
362363
paddle.fluid.contrib.build_compressor ArgSpec(args=['place', 'data_reader', 'data_feeder', 'scope', 'metrics', 'epoch', 'config'], varargs=None, keywords=None, defaults=(None, None, None, None, None, None, None))
363364
paddle.fluid.contrib.CompressPass.__init__ ArgSpec(args=['self', 'place', 'data_reader', 'data_feeder', 'scope', 'metrics', 'epoch', 'program_exe'], varargs=None, keywords=None, defaults=(None, None, None, None, None, None, None))
364365
paddle.fluid.contrib.CompressPass.add_strategy ArgSpec(args=['self', 'strategy'], varargs=None, keywords=None, defaults=None)

paddle/fluid/operators/reader/create_ctr_reader_op.cc

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -41,13 +41,19 @@ class CreateCTRReaderOp : public framework::OperatorBase {
4141
auto* queue_holder =
4242
queue_holder_var->template GetMutable<LoDTensorBlockingQueueHolder>();
4343

44-
int thread_num = Attr<int>("thread_num");
45-
std::vector<std::string> slots = Attr<std::vector<std::string>>("slots");
46-
int batch_size = Attr<int>("batch_size");
47-
std::vector<std::string> file_list =
48-
Attr<std::vector<std::string>>("file_list");
49-
out->Reset(std::make_shared<CTRReader>(queue_holder->GetQueue(), batch_size,
50-
thread_num, slots, file_list));
44+
auto thread_num = Attr<int>("thread_num");
45+
auto sparse_slots = Attr<std::vector<std::string>>("sparse_slots");
46+
auto dense_slot_index = Attr<std::vector<int>>("dense_slot_index");
47+
auto sparse_slot_index = Attr<std::vector<int>>("sparse_slot_index");
48+
auto batch_size = Attr<int>("batch_size");
49+
auto file_type = Attr<std::string>("file_type");
50+
auto file_format = Attr<std::string>("file_format");
51+
auto file_list = Attr<std::vector<std::string>>("file_list");
52+
DataDesc data_desc(batch_size, file_list, file_type, file_format,
53+
dense_slot_index, sparse_slot_index, sparse_slots);
54+
VLOG(1) << data_desc;
55+
out->Reset(std::make_shared<CTRReader>(queue_holder->GetQueue(), thread_num,
56+
data_desc));
5157
}
5258
};
5359

@@ -58,10 +64,22 @@ class CreateCTRReaderOpMaker : public FileReaderMakerBase {
5864
"Name of the `LoDTensorBlockingQueueHolder` variable");
5965
AddAttr<int>("thread_num", "the thread num to read data");
6066
AddAttr<int>("batch_size", "the batch size of read data");
67+
AddAttr<std::string>("file_type", "plain or gzip").SetDefault("plain");
68+
AddAttr<std::string>("file_format", "svm or csv").SetDefault("csv");
6169
AddAttr<std::vector<std::string>>("file_list",
6270
"The list of files that need to read");
63-
AddAttr<std::vector<std::string>>(
64-
"slots", "the slots that should be extract from file");
71+
AddAttr<std::vector<int>>(
72+
"dense_slot_index",
73+
"the dense slots id that should be extract from file")
74+
.SetDefault({});
75+
AddAttr<std::vector<int>>(
76+
"sparse_slot_index",
77+
"the sparse slots id that should be extract from file")
78+
.SetDefault({});
79+
AddAttr<std::vector<std::string>>("sparse_slots",
80+
"the sparse slots id that should be "
81+
"extract from file, used when file "
82+
"format is svm");
6583

6684
AddComment(R"DOC(
6785
Create CTRReader to support read ctr data with cpp.

paddle/fluid/operators/reader/ctr_reader.cc

Lines changed: 199 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,9 @@ static inline void parse_line(
7373
}
7474
}
7575

76+
// label slot1:fea_sign slot2:fea_sign slot1:fea_sign
77+
static inline void parse_svm_line(const std::string& line) {}
78+
7679
class Reader {
7780
public:
7881
virtual ~Reader() {}
@@ -95,11 +98,27 @@ class GzipReader : public Reader {
9598
igzstream gzstream_;
9699
};
97100

98-
class MultiGzipReader : public Reader {
101+
class PlainFileReader : public Reader {
99102
public:
100-
explicit MultiGzipReader(const std::vector<std::string>& file_list) {
103+
explicit PlainFileReader(const std::string& file_name)
104+
: stream_(file_name.c_str()) {}
105+
106+
~PlainFileReader() {}
107+
108+
bool HasNext() override { return stream_.peek() != EOF; }
109+
110+
void NextLine(std::string* line) override { std::getline(stream_, *line); }
111+
112+
private:
113+
std::ifstream stream_;
114+
};
115+
116+
template <typename SingleFileReader>
117+
class MultiFileReader : public Reader {
118+
public:
119+
explicit MultiFileReader(const std::vector<std::string>& file_list) {
101120
for (auto& file : file_list) {
102-
readers_.emplace_back(std::make_shared<GzipReader>(file));
121+
readers_.emplace_back(std::make_shared<SingleFileReader>(file));
103122
}
104123
}
105124

@@ -119,68 +138,53 @@ class MultiGzipReader : public Reader {
119138
}
120139

121140
private:
122-
std::vector<std::shared_ptr<GzipReader>> readers_;
141+
std::vector<std::shared_ptr<SingleFileReader>> readers_;
123142
size_t current_reader_index_ = 0;
124143
};
125144

126145
void MonitorThread(std::vector<ReaderThreadStatus>* thread_status,
127146
std::shared_ptr<LoDTensorBlockingQueue> queue) {
128-
VLOG(30) << "monitor thread in";
147+
VLOG(3) << "monitor thread in";
129148
bool reader_thread_is_running = true;
130149
while (reader_thread_is_running) {
131-
VLOG(30) << "reader_thread_is_running";
150+
VLOG(3) << "reader_thread_is_running";
132151
reader_thread_is_running = false;
133152
for (size_t i = 0; i < (*thread_status).size(); ++i) {
134153
if ((*thread_status)[i] == Running) {
135-
VLOG(30) << "reader is running!";
154+
VLOG(3) << "reader is running!";
136155
reader_thread_is_running = true;
137156
}
138157
}
139158
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
140159
}
141-
VLOG(30) << "all reader thread is stopped, push empty data into queue";
142-
queue->Push({});
143-
VLOG(30) << "monitor thread exited";
160+
VLOG(3) << "all reader thread is stopped, close the queue";
161+
queue->Close();
162+
VLOG(3) << "monitor thread exited";
144163
}
145164

146-
void ReadThread(const std::vector<std::string>& file_list,
147-
const std::vector<std::string>& slots, int batch_size,
148-
int thread_id, std::vector<ReaderThreadStatus>* thread_status,
149-
std::shared_ptr<LoDTensorBlockingQueue> queue) {
150-
VLOG(30) << "[" << thread_id << "]"
151-
<< " reader thread start! thread_id = " << thread_id;
152-
for (auto& file : file_list) {
153-
VLOG(30) << "[" << thread_id << "]"
154-
<< " file " << file;
155-
}
156-
(*thread_status)[thread_id] = Running;
157-
VLOG(30) << "set status to running";
158-
165+
void ReadSvmData(const DataDesc& data_desc, std::shared_ptr<Reader> reader,
166+
std::shared_ptr<LoDTensorBlockingQueue> queue) {
159167
std::unordered_map<std::string, size_t> slot_to_index;
160-
for (size_t i = 0; i < slots.size(); ++i) {
161-
slot_to_index[slots[i]] = i;
168+
for (size_t i = 0; i < data_desc.sparse_slot_ids_.size(); ++i) {
169+
slot_to_index[data_desc.sparse_slot_ids_[i]] = i;
162170
}
163171

164172
std::string line;
165173

166174
std::vector<std::unordered_map<std::string, std::vector<int64_t>>> batch_data;
167175
std::vector<int64_t> batch_label;
168176

169-
MultiGzipReader reader(file_list);
170-
171-
VLOG(30) << "reader inited";
172-
173-
while (reader.HasNext()) {
177+
while (reader->HasNext()) {
174178
batch_data.clear();
175-
batch_data.reserve(batch_size);
179+
batch_data.reserve(data_desc.batch_size_);
176180

177181
batch_label.clear();
178-
batch_label.reserve(batch_size);
182+
batch_label.reserve(data_desc.batch_size_);
179183

180184
// read batch_size data
181-
for (int i = 0; i < batch_size; ++i) {
182-
if (reader.HasNext()) {
183-
reader.NextLine(&line);
185+
for (int i = 0; i < data_desc.batch_size_; ++i) {
186+
if (reader->HasNext()) {
187+
reader->NextLine(&line);
184188
std::unordered_map<std::string, std::vector<int64_t>> slot_to_data;
185189
int64_t label;
186190
parse_line(line, slot_to_index, &label, &slot_to_data);
@@ -193,8 +197,8 @@ void ReadThread(const std::vector<std::string>& file_list,
193197

194198
std::vector<framework::LoDTensor> lod_datas;
195199

196-
// first insert tensor for each slots
197-
for (auto& slot : slots) {
200+
// first insert tensor for each sparse_slots
201+
for (auto& slot : data_desc.sparse_slot_ids_) {
198202
std::vector<size_t> lod_data{0};
199203
std::vector<int64_t> batch_feasign;
200204

@@ -226,11 +230,167 @@ void ReadThread(const std::vector<std::string>& file_list,
226230
lod_datas.push_back(label_tensor);
227231

228232
queue->Push(lod_datas);
229-
VLOG(40) << "push one data, queue_size=" << queue->Size();
233+
VLOG(4) << "push one data, queue_size=" << queue->Size();
234+
}
235+
}
236+
237+
// label dense_fea,dense_fea sparse_fea,sparse_fea
238+
static inline void parse_csv_line(
239+
const std::string& line, const DataDesc& data_desc, int64_t* label,
240+
std::vector<std::vector<float>>* dense_datas,
241+
std::vector<std::vector<int64_t>>* sparse_datas) {
242+
std::vector<std::string> ret;
243+
string_split(line, ' ', &ret);
244+
*label = std::stol(ret[0]);
245+
dense_datas->resize(data_desc.dense_slot_index_.size());
246+
for (size_t i = 0; i < data_desc.dense_slot_index_.size(); ++i) {
247+
int slot_idx = data_desc.dense_slot_index_[i];
248+
auto& slot_data = ret[slot_idx];
249+
std::vector<std::string> data_in_slot_str;
250+
string_split(slot_data, ',', &data_in_slot_str);
251+
std::vector<float> data_in_slot;
252+
for (auto& data_str : data_in_slot_str) {
253+
(*dense_datas)[i].push_back(std::stof(data_str));
254+
}
255+
}
256+
sparse_datas->resize(data_desc.sparse_slot_index_.size());
257+
for (size_t i = 0; i < data_desc.sparse_slot_index_.size(); ++i) {
258+
int slot_idx = data_desc.sparse_slot_index_[i];
259+
auto& slot_data = ret[slot_idx];
260+
std::vector<std::string> data_in_slot_str;
261+
string_split(slot_data, ',', &data_in_slot_str);
262+
std::vector<int64_t> data_in_slot;
263+
for (auto& data_str : data_in_slot_str) {
264+
auto id = std::stol(data_str);
265+
(*sparse_datas)[i].push_back(id);
266+
}
267+
}
268+
}
269+
270+
void ReadCsvData(const DataDesc& data_desc, std::shared_ptr<Reader> reader,
271+
std::shared_ptr<LoDTensorBlockingQueue> queue) {
272+
std::string line;
273+
while (reader->HasNext()) {
274+
std::vector<int64_t> batch_label;
275+
batch_label.reserve(data_desc.batch_size_);
276+
277+
std::vector<std::vector<std::vector<float>>> batch_dense_data;
278+
batch_dense_data.reserve(data_desc.batch_size_);
279+
280+
std::vector<std::vector<std::vector<int64_t>>> batch_sparse_data;
281+
batch_sparse_data.reserve(data_desc.batch_size_);
282+
283+
// read batch_size data
284+
for (int i = 0; i < data_desc.batch_size_; ++i) {
285+
if (reader->HasNext()) {
286+
reader->NextLine(&line);
287+
int64_t label;
288+
std::vector<std::vector<float>> dense_datas;
289+
std::vector<std::vector<int64_t>> sparse_datas;
290+
parse_csv_line(line, data_desc, &label, &dense_datas, &sparse_datas);
291+
batch_label.push_back(label);
292+
if (!batch_dense_data.empty()) {
293+
PADDLE_ENFORCE_EQ(batch_dense_data[0].size(), dense_datas.size(),
294+
"dense data should have the same shape");
295+
}
296+
batch_dense_data.push_back(dense_datas);
297+
batch_sparse_data.push_back(sparse_datas);
298+
} else {
299+
break;
300+
}
301+
}
302+
303+
// the order of output data is label, dense_datas, sparse_datas
304+
std::vector<framework::LoDTensor> lod_datas;
305+
306+
// insert label tensor
307+
framework::LoDTensor label_tensor;
308+
auto* label_tensor_data = label_tensor.mutable_data<int64_t>(
309+
framework::make_ddim({static_cast<int64_t>(batch_label.size()), 1}),
310+
platform::CPUPlace());
311+
memcpy(label_tensor_data, batch_label.data(),
312+
batch_label.size() * sizeof(int64_t));
313+
lod_datas.push_back(label_tensor);
314+
315+
// insert tensor for each dense_slots
316+
for (size_t i = 0; i < data_desc.dense_slot_index_.size(); ++i) {
317+
framework::LoDTensor lod_tensor;
318+
size_t width = batch_dense_data[0][i].size();
319+
auto* tensor_data = lod_tensor.mutable_data<float>(
320+
framework::make_ddim(
321+
{static_cast<int64_t>(batch_dense_data.size()), // batch_size
322+
static_cast<int64_t>(width)}),
323+
platform::CPUPlace());
324+
325+
for (size_t j = 0; j < batch_dense_data.size(); ++j) {
326+
auto& dense_data_row = batch_dense_data[j][i];
327+
memcpy(tensor_data + j * width, dense_data_row.data(),
328+
width * sizeof(float));
329+
}
330+
331+
lod_datas.push_back(lod_tensor);
332+
}
333+
334+
// insert tensor for each sparse_slots
335+
for (size_t i = 0; i < data_desc.sparse_slot_index_.size(); ++i) {
336+
std::vector<size_t> lod_data{0};
337+
std::vector<int64_t> batch_feasign;
338+
339+
for (size_t row_idx = 0; row_idx < batch_sparse_data.size(); ++row_idx) {
340+
auto& sparse_ids = batch_sparse_data[row_idx][i];
341+
lod_data.push_back(lod_data.back() + sparse_ids.size());
342+
batch_feasign.insert(batch_feasign.end(), sparse_ids.begin(),
343+
sparse_ids.end());
344+
}
345+
346+
framework::LoDTensor lod_tensor;
347+
framework::LoD lod{lod_data};
348+
lod_tensor.set_lod(lod);
349+
int64_t* tensor_data = lod_tensor.mutable_data<int64_t>(
350+
framework::make_ddim({static_cast<int64_t>(batch_feasign.size()), 1}),
351+
platform::CPUPlace());
352+
memcpy(tensor_data, batch_feasign.data(),
353+
batch_feasign.size() * sizeof(int64_t));
354+
lod_datas.push_back(lod_tensor);
355+
}
356+
357+
queue->Push(lod_datas);
358+
VLOG(4) << "push one data, queue_size=" << queue->Size();
359+
}
360+
}
361+
362+
void ReadThread(const std::vector<std::string>& file_list,
363+
const DataDesc& data_desc, int thread_id,
364+
std::vector<ReaderThreadStatus>* thread_status,
365+
std::shared_ptr<LoDTensorBlockingQueue> queue) {
366+
VLOG(3) << "[" << thread_id << "]"
367+
<< " reader thread start! thread_id = " << thread_id;
368+
for (auto& file : file_list) {
369+
VLOG(3) << "[" << thread_id << "]"
370+
<< " file " << file;
371+
}
372+
(*thread_status)[thread_id] = Running;
373+
VLOG(3) << "set status to running";
374+
375+
std::shared_ptr<Reader> reader;
376+
if (data_desc.file_type_ == "gzip") {
377+
reader.reset(new MultiFileReader<GzipReader>(file_list));
378+
} else if (data_desc.file_type_ == "plain") {
379+
reader.reset(new MultiFileReader<PlainFileReader>(file_list));
380+
} else {
381+
PADDLE_THROW("do not support file format %s", data_desc.file_type_);
382+
}
383+
384+
VLOG(3) << "reader inited";
385+
386+
if (data_desc.file_format_ == "svm") {
387+
ReadSvmData(data_desc, reader, queue);
388+
} else if (data_desc.file_format_ == "csv") {
389+
ReadCsvData(data_desc, reader, queue);
230390
}
231391

232392
(*thread_status)[thread_id] = Stopped;
233-
VLOG(30) << "set status to stopped, thread " << thread_id << " exited";
393+
VLOG(3) << "set status to stopped, thread " << thread_id << " exited";
234394
}
235395

236396
} // namespace reader

0 commit comments

Comments
 (0)