Skip to content

Commit 20f181c

Browse files
committed
init ctr_reader
1 parent d26e450 commit 20f181c

File tree

5 files changed

+185
-8
lines changed

5 files changed

+185
-8
lines changed

paddle/fluid/operators/reader/CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ function(reader_library TARGET_NAME)
1616
endfunction()
1717

1818
cc_library(buffered_reader SRCS buffered_reader.cc DEPS reader simple_threadpool)
19-
cc_library(ctr_reader SRCS ctr_reader.cc DEPS reader simple_threadpool)
19+
cc_library(ctr_reader SRCS ctr_reader.cc DEPS reader simple_threadpool boost)
2020
reader_library(open_files_op SRCS open_files_op.cc DEPS buffered_reader)
2121
reader_library(create_ctr_reader_op SRCS create_ctr_reader_op.cc DEPS ctr_reader)
2222
reader_library(create_random_data_generator_op SRCS create_random_data_generator_op.cc)

paddle/fluid/operators/reader/create_ctr_reader_op.cc

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,13 @@ class CreateCTRReaderOp : public framework::OperatorBase {
4141
auto* queue_holder =
4242
queue_holder_var->template GetMutable<LoDTensorBlockingQueueHolder>();
4343

44-
out->Reset(std::make_shared<CTRReader>(queue_holder->GetQueue()));
44+
int thread_num = Attr<int>("thread_num");
45+
std::vector<std::string> slots = Attr<std::vector<std::string>>("slots");
46+
int batch_size = Attr<int>("batch_size");
47+
std::vector<std::string> file_list =
48+
Attr<std::vector<std::string>>("file_list");
49+
out->Reset(std::make_shared<CTRReader>(queue_holder->GetQueue(), batch_size,
50+
thread_num, slots, file_list));
4551
}
4652
};
4753

@@ -50,6 +56,12 @@ class CreateCTRReaderOpMaker : public FileReaderMakerBase {
5056
void Apply() override {
5157
AddInput("blocking_queue",
5258
"Name of the `LoDTensorBlockingQueueHolder` variable");
59+
AddAttr<int>("thread_num", "the thread num to read data");
60+
AddAttr<int>("batch_size", "the batch size of read data");
61+
AddAttr<std::vector<std::string>>("file_list",
62+
"The list of files that need to read");
63+
AddAttr<std::vector<std::string>>(
64+
"slots", "the slots that should be extract from file");
5365

5466
AddComment(R"DOC(
5567
Create CTRReader to support read ctr data with cpp.

paddle/fluid/operators/reader/ctr_reader.cc

Lines changed: 130 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,137 @@
1414

1515
#include "paddle/fluid/operators/reader/ctr_reader.h"
1616

17+
#include <cstdlib>
18+
#include <fstream>
19+
#include <iostream>
20+
#include <sstream>
21+
#include <string>
22+
#include <unordered_map>
23+
24+
#include <algorithm>
25+
#include <random>
26+
27+
#include <boost/iostreams/copy.hpp>
28+
#include <boost/iostreams/filter/gzip.hpp>
29+
#include <boost/iostreams/filtering_streambuf.hpp>
30+
1731
namespace paddle {
1832
namespace operators {
19-
namespace reader {} // namespace reader
33+
namespace reader {
34+
35+
static inline void string_split(const std::string& s, const char delimiter,
36+
std::vector<std::string>* output) {
37+
size_t start = 0;
38+
size_t end = s.find_first_of(delimiter);
39+
40+
while (end <= std::string::npos) {
41+
output->emplace_back(s.substr(start, end - start));
42+
if (end == std::string::npos) {
43+
break;
44+
}
45+
start = end + 1;
46+
end = s.find_first_of(delimiter, start);
47+
}
48+
}
49+
50+
static inline void parse_line(
51+
const std::string& line, const std::vector<std::string>& slots,
52+
int64_t* label,
53+
std::unordered_map<std::string, std::vector<int64_t>>* slots_to_data) {
54+
std::vector<std::string> ret;
55+
string_split(line, ' ', &ret);
56+
*label = std::stoi(ret[2]) > 0;
57+
for (size_t i = 3; i < ret.size(); ++i) {
58+
const std::string& item = ret[i];
59+
std::vector<std::string> slot_and_feasign;
60+
string_split(item, ':', &slot_and_feasign);
61+
if (slot_and_feasign.size() == 2) {
62+
const std::string& slot = slot_and_feasign[1];
63+
int64_t feasign = std::strtoll(slot_and_feasign[0].c_str(), NULL, 10);
64+
(*slots_to_data)[slot_and_feasign[1]].push_back(feasign);
65+
}
66+
}
67+
}
68+
69+
// class Reader {
70+
// public:
71+
// virtual ~Reader() {}
72+
// virtual bool HasNext() = 0;
73+
// virtual void NextLine(std::string& line) = 0;
74+
//};
75+
76+
class GzipReader {
77+
public:
78+
explicit GzipReader(const std::string& file_name) : instream_(&inbuf_) {
79+
file_ = std::ifstream(file_name, std::ios_base::in | std::ios_base::binary);
80+
inbuf_.push(boost::iostreams::gzip_decompressor());
81+
inbuf_.push(file_);
82+
// Convert streambuf to istream
83+
}
84+
85+
~GzipReader() { file_.close(); }
86+
87+
bool HasNext() { return instream_.peek() != EOF; }
88+
89+
void NextLine(std::string& line) { std::getline(instream_, line); } // NOLINT
90+
91+
private:
92+
boost::iostreams::filtering_streambuf<boost::iostreams::input> inbuf_;
93+
std::ifstream file_;
94+
std::istream instream_;
95+
};
96+
97+
class MultiGzipReader {
98+
public:
99+
explicit MultiGzipReader(const std::vector<std::string>& file_list) {
100+
for (auto& file : file_list) {
101+
readers_.emplace_back(std::make_shared<GzipReader>(file));
102+
}
103+
}
104+
105+
bool HasNext() {
106+
if (current_reader_index_ >= readers_.size()) {
107+
return false;
108+
}
109+
if (!readers_[current_reader_index_]->HasNext()) {
110+
current_reader_index_++;
111+
return HasNext();
112+
}
113+
return true;
114+
}
115+
116+
void NextLine(std::string& line) { // NOLINT
117+
readers_[current_reader_index_]->NextLine(line);
118+
}
119+
120+
private:
121+
std::vector<std::shared_ptr<GzipReader>> readers_;
122+
size_t current_reader_index_ = 0;
123+
};
124+
125+
// void CTRReader::ReadThread(
126+
// const std::vector<std::string> &file_list,
127+
// const std::vector<std::string>& slots,
128+
// int batch_size,
129+
// std::shared_ptr<LoDTensorBlockingQueue>& queue) {}
130+
131+
void CTRReader::ReadThread(const std::vector<std::string>& file_list,
132+
const std::vector<std::string>& slots,
133+
int batch_size,
134+
std::shared_ptr<LoDTensorBlockingQueue>* queue) {
135+
std::string line;
136+
137+
// read all files
138+
std::vector<std::string> all_lines;
139+
MultiGzipReader reader(file_list);
140+
141+
for (int j = 0; j < all_lines.size(); ++j) {
142+
std::unordered_map<std::string, std::vector<int64_t>> slots_to_data;
143+
int64_t label;
144+
parse_line(all_lines[j], slots, &label, &slots_to_data);
145+
}
146+
}
147+
148+
} // namespace reader
20149
} // namespace operators
21150
} // namespace paddle

paddle/fluid/operators/reader/ctr_reader.h

Lines changed: 40 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,20 @@
1414

1515
#pragma once
1616

17+
#include <cstdlib>
18+
#include <fstream>
19+
#include <iostream>
20+
#include <sstream>
21+
#include <string>
22+
#include <unordered_map>
1723
#include <vector>
24+
25+
#include <boost/iostreams/copy.hpp>
26+
#include <boost/iostreams/filter/gzip.hpp>
27+
#include <boost/iostreams/filtering_streambuf.hpp>
28+
1829
#include "paddle/fluid/framework/reader.h"
30+
#include "paddle/fluid/framework/threadpool.h"
1931
#include "paddle/fluid/operators/reader/lod_tensor_blocking_queue.h"
2032

2133
namespace paddle {
@@ -24,26 +36,50 @@ namespace reader {
2436

2537
class CTRReader : public framework::FileReader {
2638
public:
27-
explicit CTRReader(const std::shared_ptr<LoDTensorBlockingQueue>& queue)
39+
explicit CTRReader(const std::shared_ptr<LoDTensorBlockingQueue>& queue,
40+
int batch_size, int thread_num,
41+
const std::vector<std::string>& slots,
42+
const std::vector<std::string>& file_list)
2843
: framework::FileReader() {
44+
thread_num_ = thread_num;
45+
batch_size_ = batch_size;
2946
PADDLE_ENFORCE(queue != nullptr, "LoDTensorBlockingQueue must not be null");
3047
queue_ = queue;
48+
slots_ = slots;
49+
file_list_ = file_list;
3150
}
3251

52+
~CTRReader() { queue_->Close(); }
53+
3354
void ReadNext(std::vector<framework::LoDTensor>* out) override {
3455
bool success;
3556
*out = queue_->Pop(&success);
3657
if (!success) out->clear();
3758
}
3859

39-
~CTRReader() { queue_->Close(); }
40-
4160
void Shutdown() override { queue_->Close(); }
4261

43-
void Start() override { queue_->ReOpen(); }
62+
void Start() override {
63+
queue_->ReOpen();
64+
for (int i = 0; i < thread_num_; i++) {
65+
read_threads_.emplace_back(
66+
new std::thread(std::bind(&CTRReader::ReadThread, this, file_list_,
67+
slots_, batch_size_, queue_)));
68+
}
69+
}
70+
71+
private:
72+
void ReadThread(const std::vector<std::string>& file_list,
73+
const std::vector<std::string>& slots, int batch_size,
74+
std::shared_ptr<LoDTensorBlockingQueue>* queue);
4475

4576
private:
4677
std::shared_ptr<LoDTensorBlockingQueue> queue_;
78+
std::vector<std::unique_ptr<std::thread>> read_threads_;
79+
int thread_num_;
80+
int batch_size_;
81+
std::vector<std::string> slots_;
82+
std::vector<std::string> file_list_;
4783
};
4884

4985
} // namespace reader

paddle/fluid/pybind/CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11

2-
set(PYBIND_DEPS pybind python proto_desc memory executor prune feed_fetch_method pass_builder)
2+
set(PYBIND_DEPS pybind python proto_desc memory executor prune feed_fetch_method pass_builder boost)
33
set(PYBIND_SRCS pybind.cc exception.cc protobuf.cc const_value.cc)
44
if(NOT WIN32)
55
list(APPEND PYBIND_DEPS parallel_executor profiler)

0 commit comments

Comments
 (0)