14
14
15
15
#include " paddle/fluid/operators/reader/ctr_reader.h"
16
16
17
+ #include < gzstream.h>
18
+
17
19
#include < cstdlib>
18
20
#include < fstream>
19
21
#include < iostream>
24
26
#include < algorithm>
25
27
#include < random>
26
28
27
- #include < boost/iostreams/copy.hpp>
28
- #include < boost/iostreams/filter/gzip.hpp>
29
- #include < boost/iostreams/filtering_streambuf.hpp>
30
-
31
29
namespace paddle {
32
30
namespace operators {
33
31
namespace reader {
@@ -75,23 +73,19 @@ static inline void parse_line(
75
73
76
74
class GzipReader {
77
75
public:
78
- explicit GzipReader (const std::string& file_name) : instream_(&inbuf_) {
79
- file_ = std::ifstream (file_name, std::ios_base::in | std::ios_base::binary);
80
- inbuf_.push (boost::iostreams::gzip_decompressor ());
81
- inbuf_.push (file_);
82
- // Convert streambuf to istream
83
- }
76
+ explicit GzipReader (const std::string& file_name)
77
+ : gzstream_(file_name.c_str()) {}
84
78
85
- ~GzipReader () { file_. close (); }
79
+ ~GzipReader () {}
86
80
87
- bool HasNext () { return instream_ .peek () != EOF; }
81
+ bool HasNext () { return gzstream_ .peek () != EOF; }
88
82
89
- void NextLine (std::string& line) { std::getline (instream_, line); } // NOLINT
83
+ void NextLine (std::string* line) { // NOLINT
84
+ std::getline (gzstream_, line);
85
+ }
90
86
91
87
private:
92
- boost::iostreams::filtering_streambuf<boost::iostreams::input> inbuf_;
93
- std::ifstream file_;
94
- std::istream instream_;
88
+ igzstream gzstream_;
95
89
};
96
90
97
91
class MultiGzipReader {
@@ -113,36 +107,28 @@ class MultiGzipReader {
113
107
return true ;
114
108
}
115
109
116
- void NextLine (std::string& line) { // NOLINT
117
- readers_[current_reader_index_]->NextLine (line);
110
+ void NextLine (std::string* line) {
111
+ readers_[current_reader_index_]->NextLine (* line);
118
112
}
119
113
120
114
private:
121
115
std::vector<std::shared_ptr<GzipReader>> readers_;
122
116
size_t current_reader_index_ = 0 ;
123
117
};
124
118
125
- // void CTRReader::ReadThread(
126
- // const std::vector<std::string> &file_list,
127
- // const std::vector<std::string>& slots,
128
- // int batch_size,
129
- // std::shared_ptr<LoDTensorBlockingQueue>& queue) {}
130
-
131
119
void CTRReader::ReadThread (const std::vector<std::string>& file_list,
132
120
const std::vector<std::string>& slots,
133
121
int batch_size,
134
122
std::shared_ptr<LoDTensorBlockingQueue>* queue) {
135
123
std::string line;
136
124
137
125
// read all files
138
- std::vector<std::string> all_lines;
139
126
MultiGzipReader reader (file_list);
127
+ reader.NextLine (&line);
140
128
141
- for (int j = 0 ; j < all_lines.size (); ++j) {
142
- std::unordered_map<std::string, std::vector<int64_t >> slots_to_data;
143
- int64_t label;
144
- parse_line (all_lines[j], slots, &label, &slots_to_data);
145
- }
129
+ std::unordered_map<std::string, std::vector<int64_t >> slots_to_data;
130
+ int64_t label;
131
+ parse_line (line, slots, &label, &slots_to_data);
146
132
}
147
133
148
134
} // namespace reader
0 commit comments