Skip to content

Commit 5c65eff

Browse files
committed
update test for ctr data
1 parent 044d2e2 commit 5c65eff

File tree

3 files changed

+96
-93
lines changed

3 files changed

+96
-93
lines changed

paddle/fluid/operators/reader/ctr_reader.cc

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,10 @@ void ReadThread(const std::vector<std::string>& file_list,
168168

169169
while (reader.HasNext()) {
170170
batch_data.clear();
171+
batch_data.reserve(batch_size);
172+
171173
batch_label.clear();
174+
batch_label.reserve(batch_size);
172175

173176
// read batch_size data
174177
for (int i = 0; i < batch_size; ++i) {
@@ -205,7 +208,8 @@ void ReadThread(const std::vector<std::string>& file_list,
205208
int64_t* tensor_data = lod_tensor.mutable_data<int64_t>(
206209
framework::make_ddim({1, static_cast<int64_t>(batch_feasign.size())}),
207210
platform::CPUPlace());
208-
memcpy(tensor_data, batch_feasign.data(), batch_feasign.size());
211+
memcpy(tensor_data, batch_feasign.data(),
212+
batch_feasign.size() * sizeof(int64_t));
209213
lod_datas.push_back(lod_tensor);
210214
}
211215

@@ -214,7 +218,8 @@ void ReadThread(const std::vector<std::string>& file_list,
214218
int64_t* label_tensor_data = label_tensor.mutable_data<int64_t>(
215219
framework::make_ddim({1, static_cast<int64_t>(batch_label.size())}),
216220
platform::CPUPlace());
217-
memcpy(label_tensor_data, batch_label.data(), batch_label.size());
221+
memcpy(label_tensor_data, batch_label.data(),
222+
batch_label.size() * sizeof(int64_t));
218223
lod_datas.push_back(label_tensor);
219224

220225
queue->Push(lod_datas);

paddle/fluid/operators/reader/ctr_reader.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,13 +55,14 @@ class CTRReader : public framework::FileReader {
5555
const std::vector<std::string>& slots,
5656
const std::vector<std::string>& file_list)
5757
: batch_size_(batch_size), slots_(slots), file_list_(file_list) {
58+
PADDLE_ENFORCE_GT(thread_num, 0, "thread num should be larger then 0!");
5859
PADDLE_ENFORCE(queue != nullptr, "LoDTensorBlockingQueue must not be null");
5960
PADDLE_ENFORCE_GT(file_list.size(), 0, "file list should not be empty");
6061
thread_num_ =
6162
file_list_.size() > thread_num ? thread_num : file_list_.size();
6263
queue_ = queue;
6364
SplitFiles();
64-
for (int i = 0; i < thread_num_; ++i) {
65+
for (size_t i = 0; i < thread_num_; ++i) {
6566
read_thread_status_.push_back(Stopped);
6667
}
6768
}
@@ -76,6 +77,7 @@ class CTRReader : public framework::FileReader {
7677

7778
void Shutdown() override {
7879
VLOG(3) << "Shutdown reader";
80+
// shutdown should stop all the reader thread
7981
for (auto& read_thread : read_threads_) {
8082
read_thread->join();
8183
}
@@ -108,7 +110,7 @@ class CTRReader : public framework::FileReader {
108110
}
109111

110112
private:
111-
int thread_num_;
113+
size_t thread_num_;
112114
const int batch_size_;
113115
const std::vector<std::string> slots_;
114116
const std::vector<std::string> file_list_;

paddle/fluid/operators/reader/ctr_reader_test.cc

Lines changed: 85 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,15 @@
1414

1515
#include "paddle/fluid/operators/reader/ctr_reader.h"
1616

17+
#include <gzstream.h>
1718
#include <time.h>
1819

20+
#include <math.h>
21+
#include <stdio.h>
22+
#include <cstring>
23+
#include <fstream>
24+
#include <tuple>
25+
1926
#include "gtest/gtest.h"
2027

2128
#include "paddle/fluid/framework/lod_tensor.h"
@@ -25,109 +32,98 @@ using paddle::operators::reader::LoDTensorBlockingQueue;
2532
using paddle::operators::reader::LoDTensorBlockingQueueHolder;
2633
using paddle::operators::reader::CTRReader;
2734
using paddle::framework::LoDTensor;
28-
using paddle::operators::reader::GetTimeInSec;
35+
using paddle::framework::LoD;
36+
using paddle::platform::CPUPlace;
37+
38+
static void generatedata(const std::vector<std::string>& data,
39+
const std::string& file_name) {
40+
std::ifstream in(file_name.c_str());
41+
if (in.good()) {
42+
VLOG(3) << "file " << file_name << " exist, delete it first!";
43+
remove(file_name.c_str());
44+
} else {
45+
in.close();
46+
}
47+
48+
ogzstream out(file_name.c_str());
49+
PADDLE_ENFORCE(out.good(), "open file %s failed!", file_name);
50+
for (auto& c : data) {
51+
out << c;
52+
}
53+
out.close();
54+
PADDLE_ENFORCE(out.good(), "save file %s failed!", file_name);
55+
}
2956

3057
TEST(CTR_READER, read_data) {
58+
const std::vector<std::string> ctr_data = {
59+
"aaaa 1 0 0:6002 1:6003 2:6004 3:6005 4:6006 -1\n",
60+
"bbbb 1 0 5:6003 6:6003 7:6003 8:6004 9:6004 -1\n",
61+
"cccc 1 1 10:6002 11:6002 12:6002 13:6002 14:6002 -2\n",
62+
"dddd 1 0 15:6003 16:6003 17:6003 18:6003 19:6004 -3\n",
63+
"1111 1 1 20:6001 21:6001 22:6001 23:6001 24:6001 12\n",
64+
"2222 1 1 25:6004 26:6004 27:6004 28:6005 29:6005 aa\n",
65+
"3333 1 0 30:6002 31:6003 32:6004 33:6004 34:6005 er\n",
66+
"eeee 1 1 35:6003 36:6003 37:6005 38:6005 39:6005 dd\n",
67+
"ffff 1 1 40:6002 41:6003 42:6004 43:6004 44:6005 66\n",
68+
"gggg 1 1 46:6006 45:6006 47:6003 48:6003 49:6003 ba\n",
69+
};
70+
std::string gz_file_name = "test_ctr_reader_data.gz";
71+
generatedata(ctr_data, gz_file_name);
72+
73+
std::vector<int64_t> label_value = {0, 0, 1, 0, 1, 1, 0, 1, 1, 1};
74+
75+
std::vector<std::tuple<LoD, std::vector<int64_t>>> data_slot_6002{
76+
{{{0, 1, 2}}, {0, 0}},
77+
{{{0, 5, 6}}, {10, 11, 12, 13, 14, 0}},
78+
{{{0, 1, 2}}, {0, 0}},
79+
{{{0, 1, 2}}, {30, 0}},
80+
{{{0, 1, 2}}, {40, 0}}};
81+
std::vector<std::tuple<LoD, std::vector<int64_t>>> data_slot_6003{
82+
{{{0, 1, 4}}, {1, 5, 6, 7}},
83+
{{{0, 1, 5}}, {0, 15, 16, 17, 18}},
84+
{{{0, 1, 2}}, {0, 0}},
85+
{{{0, 1, 3}}, {31, 35, 36}},
86+
{{{0, 1, 4}}, {41, 47, 48, 49}}};
87+
3188
LoDTensorBlockingQueueHolder queue_holder;
3289
int capacity = 64;
3390
queue_holder.InitOnce(capacity, {}, false);
3491

3592
std::shared_ptr<LoDTensorBlockingQueue> queue = queue_holder.GetQueue();
3693

37-
int batch_size = 10;
38-
int thread_num = 3;
39-
std::vector<std::string> slots = {
40-
"6002", "6003", "6004", "6005", "6006", "6007", "6008", "6009", "6010",
41-
"6011", "6012", "6013", "6014", "6015", "6016", "6017", "6018", "6019",
42-
"6020", "6021", "6023", "6024", "6025", "6026", "6027", "6028", "6029",
43-
"6030", "6031", "6032", "6033", "6034", "6035", "6036", "6037", "6038",
44-
"6039", "6040", "6041", "6042", "6043", "6044", "6045", "6046", "6047",
45-
"6048", "6050", "6051", "6052", "6054", "6055", "6056", "6057", "6058",
46-
"6059", "6060", "6061", "6062", "6063", "6064", "6065", "6066", "6067",
47-
"6068", "6069", "6070", "6071", "6072", "6073", "6074", "6075", "6076",
48-
"6077", "6078", "6079", "6080", "6081", "6082", "6083", "6084", "6085",
49-
"6086", "6087", "6088", "6089", "6090", "6091", "6092", "6093", "6094",
50-
"6095", "6096", "6097", "6098", "6099", "6100", "6101", "6102", "6103",
51-
"6104", "6105", "6106", "6107", "6108", "6109", "6110", "6111", "6112",
52-
"6113", "6114", "6115", "6116", "6117", "6118", "6119", "6120", "6121",
53-
"6122", "6123", "6124", "6125", "6126", "6127", "6128", "6129", "6130",
54-
"6131", "6132", "6133", "6134", "6135", "6136", "6137", "6138", "6139",
55-
"6140", "6141", "6142", "6143", "6144", "6145", "6146", "6147", "6148",
56-
"6149", "6150", "6151", "6152", "6153", "6155", "6156", "6157", "6158",
57-
"6160", "6161", "6162", "6163", "6164", "6165", "6166", "6167", "6168",
58-
"6169", "6170", "6171", "6172", "6173", "6174", "6175", "6176", "6177",
59-
"6178", "6181", "6182", "6183", "6184", "6185", "6186", "6188", "6189",
60-
"6190", "6191", "6192", "6194", "6195", "6196", "6197", "6198", "6199",
61-
"6200", "6201", "6202", "6203", "6204", "6205", "6206", "6207", "6208",
62-
"6209", "6210", "6211", "6212", "6213", "6214", "6215", "6216", "6217",
63-
"6218", "6220", "6222", "6223", "6224", "6225", "6226", "6227", "6228",
64-
"6229", "6230", "6231", "6232", "6233", "6234", "6235", "6236", "6237",
65-
"6238", "6239", "6240", "6241", "6242", "6243", "6244", "6245", "6247",
66-
"6248", "6250", "6251", "6253", "6254", "6255", "6256", "6257", "6258",
67-
"6259", "6260", "6261", "6262", "6263", "6264", "6265", "6350", "6351",
68-
"6352", "6353", "6354", "6355", "6356", "6738", "6739", "6740", "6741",
69-
"6751", "6753", "6754", "6755", "6756", "6757", "6759", "6760", "6763",
70-
"6764", "6765", "6766", "6767", "6768", "6769", "6770", "6806", "6807",
71-
"6808", "6809", "6810", "6811", "6812", "6813", "6814", "6815", "6816",
72-
"6817", "6818", "6819", "6820", "6821", "6822", "6823", "6824", "6825",
73-
"6826", "6827", "6828", "6829", "6830", "6831", "6832", "6833", "6834",
74-
"6835", "6836", "6837", "6838", "6839", "6840", "6841", "6842", "6843",
75-
"6844", "6845", "6846", "6847", "6848", "6849", "6850", "6851", "6852",
76-
"6853", "6854", "6855", "6856", "6857", "6858", "6859", "6860", "6861",
77-
"6862", "6863", "6864", "6865", "6866", "6867", "6868", "6869", "6870",
78-
"6871", "6872", "6873", "6874", "6875", "6876", "6877", "6878", "6879",
79-
"6880", "6881", "6882", "6883", "6884", "6885", "6886", "6887", "6888",
80-
"6889", "6890", "6891", "6892", "6893", "6894", "6895", "6896", "6897",
81-
"6898", "6899", "6900", "6901", "6902", "6903", "6904", "6905", "6906",
82-
"6907", "6908", "6909", "6910", "6911", "6912", "6913", "6914", "6915",
83-
"6916", "6917", "6918", "6919", "6920", "6921", "6922", "6923", "6924",
84-
"6925", "6926", "6927", "6928", "6929", "6930", "6931", "6932", "6933",
85-
"6934", "6935", "6936", "6937", "6938", "6939", "6940", "6941", "6942",
86-
"6943", "6944", "6945", "6946", "6947", "6948", "6949", "6950", "6951",
87-
"6952", "6953", "6954", "6955", "6956", "6957", "6958", "6959", "6960",
88-
"6961", "6962", "6963", "7001", "7002", "7003", "7004", "7005", "7006",
89-
"7007", "7008", "7009", "7010", "7011", "7012", "7013", "7014", "7015",
90-
"7016", "7017", "7018", "7019", "7020", "7021", "7022", "7023", "7024",
91-
"7025", "7026", "7027", "7028", "7029", "7030", "7031", "7032", "7033",
92-
"7034", "7035", "7036", "7037", "7038", "7039", "7040", "7041", "7042",
93-
"7043", "7044", "7045", "7046", "7047", "7048", "7049", "7050", "7051",
94-
"7052", "7053", "7054", "7055", "7056", "7057", "7058", "7060", "7062",
95-
"7063", "7064", "7065", "7066", "7067", "7068", "7069", "7070", "7071",
96-
"7072", "7073", "7074", "7075", "7076", "7077", "7078", "7079", "7080",
97-
"7081", "7082", "7083", "7084", "7085", "7086", "7087", "7088", "7089",
98-
"7090", "7091", "7092", "7093", "7094", "7095", "7096", "7097", "7098",
99-
"7099", "7100", "7101", "7102", "7103", "7104", "7105", "7106", "7107",
100-
"7108", "7109", "7110", "7120", "7122", "7123", "7124", "7125", "7126",
101-
"7127", "7128", "7129", "7131", "7133", "7134", "7135", "7136", "7137",
102-
"7138", "7139", "7140", "7141", "7142", "7143", "7144", "7145", "7146",
103-
"7147", "7148", "7149", "7150", "7151", "7152", "7153", "7154", "7155",
104-
"7156", "7157", "7158", "7159", "7160", "7161", "7162", "7163", "7164",
105-
"7165", "7166", "7167", "7168", "7169", "7170", "7171", "7172", "7173",
106-
"7174", "7175", "7176", "7177", "7178", "7179", "7180", "7181", "7182",
107-
"7183", "7184", "7185", "7186", "7187", "7500", "7501", "7502", "7503",
108-
"7504", "7505", "7506", "7507", "7508", "7509", "7510", "7511", "7512",
109-
"7513", "7514", "7515", "7516", "7517", "7750"};
110-
std::vector<std::string> file_list = {
111-
"/Users/qiaolongfei/project/gzip_test/part-00000-A.gz",
112-
"/Users/qiaolongfei/project/gzip_test/part-00001-A.gz",
113-
"/Users/qiaolongfei/project/gzip_test/part-00002-A.gz",
114-
"/Users/qiaolongfei/project/gzip_test/part-00003-A.gz"};
94+
int batch_size = 2;
95+
int thread_num = 1;
96+
std::vector<std::string> slots = {"6002", "6003"};
97+
std::vector<std::string> file_list;
98+
for (int i = 0; i < thread_num; ++i) {
99+
file_list.push_back(gz_file_name);
100+
}
115101

116102
CTRReader reader(queue, batch_size, thread_num, slots, file_list);
117103

118104
reader.Start();
119105

120-
std::cout << "start to reader data" << std::endl;
121-
std::vector<LoDTensor> out;
122-
int read_batch = 10000;
123-
uint64_t t0 = GetTimeInSec();
124-
for (int i = 0; i < read_batch; ++i) {
106+
size_t batch_num = std::ceil(ctr_data.size() / batch_size) * thread_num;
107+
108+
for (size_t i = 0; i < batch_num; ++i) {
109+
std::vector<LoDTensor> out;
125110
reader.ReadNext(&out);
126-
if (i != 0 && i % 100 == 0) {
127-
uint64_t t1 = GetTimeInSec();
128-
float line_per_s = 100 * batch_size * 1000000 / (t1 - t0);
129-
VLOG(3) << "line_per_second = " << line_per_s;
130-
t0 = GetTimeInSec();
111+
ASSERT_EQ(out.size(), slots.size() + 1);
112+
auto& label_tensor = out.back();
113+
ASSERT_EQ(label_tensor.dims(),
114+
paddle::framework::make_ddim({1, batch_size}));
115+
for (size_t j = 0; j < batch_size && i * batch_num + j < ctr_data.size();
116+
++j) {
117+
auto& label = label_tensor.data<int64_t>()[j];
118+
ASSERT_TRUE(label == 0 || label == 1);
119+
ASSERT_EQ(label, label_value[i * batch_size + j]);
131120
}
121+
auto& tensor_6002 = out[0];
122+
ASSERT_EQ(std::get<0>(data_slot_6002[i]), tensor_6002.lod());
123+
ASSERT_EQ(std::memcmp(std::get<1>(data_slot_6002[i]).data(),
124+
tensor_6002.data<int64_t>(),
125+
tensor_6002.dims()[1] * sizeof(int64_t)),
126+
0);
132127
}
128+
ASSERT_EQ(queue->Size(), 0);
133129
}

0 commit comments

Comments
 (0)