|
1 | 1 | # PaddleRec 自定义数据集及Reader |
2 | 2 |
|
3 | | -用户自定义数据集及配置异步Reader,需要关注以下几个步骤: |
| 3 | +## PaddleRec数据支持方式 |
4 | 4 |
|
5 | | -* [数据集整理](#数据集整理) |
6 | | -* [在模型组网中加入输入占位符](#在模型组网中加入输入占位符) |
7 | | -* [Reader实现](#Reader的实现) |
8 | | -* [在yaml文件中配置Reader](#在yaml文件中配置reader) |
| 5 | +### 定长数据 |
9 | 6 |
|
10 | | -我们以CTR-DNN模型为例,给出了从数据整理,变量定义,Reader写法,调试的完整历程。 |
| 7 | +如下两条数据表示定长数据,4个域(label, sparse1, sparse2, dense1)的长度分别是固定的1,2,1,3 |
11 | 8 |
|
12 | | -* [数据及Reader示例-DNN](#数据及Reader示例-DNN) |
13 | | - |
14 | | - |
15 | | -## 数据集整理 |
16 | | - |
17 | | -PaddleRec支持模型自定义数据集。 |
18 | | - |
19 | | -关于数据的tips: |
20 | | -1. 数据量: |
21 | | - |
22 | | - PaddleRec面向大规模数据设计,可以轻松支持亿级的数据读取,工业级的数据读写api:`dataset`在搜索、推荐、信息流等业务得到了充分打磨。 |
23 | | -2. 文件类型: |
24 | | - |
25 | | - 支持任意直接可读的文本数据,`dataset`同时支持`.gz`格式的文本压缩数据,无需额外代码,可直接读取。数据样本应以`\n`为标志,按行组织。 |
26 | | - |
27 | | -3. 文件存放位置: |
28 | | - |
29 | | - 文件通常存放在训练节点本地,但同时,`dataset`支持使用`hadoop`远程读取数据,数据无需下载到本地,为dataset配置hadoop相关账户及地址即可。 |
30 | | -4. 数据类型 |
31 | | - |
32 | | - Reader处理的是以行为单位的`string`数据,喂入网络的数据需要转为`int`,`float`的数值数据,不支持`string`喂入网络,不建议明文保存及处理训练数据。 |
33 | | -5. Tips |
34 | | - |
35 | | - Dataset模式下,训练线程与数据读取线程的关系强相关,为了多线程充分利用,`强烈建议将文件合理的拆为多个小文件`,尤其是在分布式训练场景下,可以均衡各个节点的数据量,同时加快数据的下载速度。 |
36 | | - |
37 | | -## 在模型组网中加入输入占位符 |
38 | | - |
39 | | -Reader读取文件后,产出的数据喂入网络,需要有占位符进行接收。占位符在Paddle中使用`fluid.data`或`fluid.layers.data`进行定义。`data`的定义可以参考[fluid.data](https://www.paddlepaddle.org.cn/documentation/docs/zh/api_cn/fluid_cn/data_cn.html#data)以及[fluid.layers.data](https://www.paddlepaddle.org.cn/documentation/docs/zh/api_cn/layers_cn/data_cn.html#data)。 |
40 | | - |
41 | | -加入您希望输入三个数据,分别是维度32的数据A,维度变长的稀疏数据B,以及一个一维的标签数据C,并希望梯度可以经过该变量向前传递,则示例如下: |
42 | | - |
43 | | -数据A的定义: |
44 | | -```python |
45 | | -var_a = fluid.data(name='A', shape= [-1, 32], dtype='float32') |
46 | 9 | ``` |
47 | | - |
48 | | -数据B的定义,变长数据的使用可以参考[LoDTensor](https://www.paddlepaddle.org.cn/documentation/docs/zh/beginners_guide/basic_concept/lod_tensor.html#cn-user-guide-lod-tensor): |
49 | | -```python |
50 | | -var_b = fluid.data(name='B', shape=[-1, 1], lod_level=1, dtype='int64') |
| 10 | +line1: label:1 sparse1:2 sparse1:3 sparse2:100 dense1:2.1 dense1:5.8 dense1:8.9 |
| 11 | +line2: label:0 sparse1:78 sparse1:89 sparse2:999 dense1:0.0 dense1:8.8 dense1:7.8 |
51 | 12 | ``` |
52 | 13 |
|
53 | | -数据C的定义: |
54 | | -```python |
55 | | -var_c = fluid.data(name='C', shape=[-1, 1], dtype='int32') |
56 | | -var_c.stop_gradient = False |
57 | | -``` |
| 14 | +对于定长数据(每个特征的表示是固定长度),动态图模式和静态图模式均支持。 |
58 | 15 |
|
59 | | -当我们完成以上三个数据的定义后,在PaddleRec的模型定义中,还需将其加入model基类成员变量`self._data_var` |
| 16 | +### 变长数据 |
60 | 17 |
|
61 | | -```python |
62 | | -self._data_var.append(var_a) |
63 | | -self._data_var.append(var_b) |
64 | | -self._data_var.append(var_c) |
| 18 | +如下所示,对于sparse1的长度不是固定的,常见于sparse特征域,比如用户标签域,不同的用户的标签数量不同 |
65 | 19 | ``` |
66 | | -至此,我们完成了在组网中定义输入数据的工作。 |
67 | | - |
68 | | -## Reader的实现 |
69 | | - |
70 | | -### Reader的实现范式 |
71 | | - |
72 | | -Reader的逻辑需要一个单独的python文件进行描述。我们试写一个`test_reader.py`,实现的具体流程如下: |
73 | | -1. 首先我们需要引入Reader基类 |
74 | | - |
75 | | - ```python |
76 | | - from paddlerec.core.reader import ReaderBase |
77 | | - ``` |
78 | | -2. 创建一个子类,继承Reader的基类,训练所需Reader命名为`TrainerReader` |
79 | | - ```python |
80 | | - class Reader(ReaderBase): |
81 | | - def init(self): |
82 | | - pass |
83 | | - |
84 | | - def generator_sample(self, line): |
85 | | - pass |
86 | | - ``` |
87 | | - |
88 | | -3. 在`init(self)`函数中声明一些在数据读取中会用到的变量,必要时可以在`config.yaml`文件中配置变量,利用`env.get_global_env()`拿到。 |
89 | | - |
90 | | - 比如,我们希望从yaml文件中读取一个数据预处理变量`avg=10`,目的是将数据A的数据缩小10倍,可以这样实现: |
91 | | - |
92 | | - 首先更改yaml文件,在某个hyper_parameters下加入该变量 |
93 | | - |
94 | | - ```yaml |
95 | | - ... |
96 | | - hyper_parameters: |
97 | | - reader: |
98 | | - avg: 10 |
99 | | - ... |
100 | | - ``` |
101 | | - |
102 | | - |
103 | | - 再更改Reader的init函数 |
104 | | - |
105 | | - ```python |
106 | | - from paddlerec.core.utils import envs |
107 | | - class Reader(ReaderBase): |
108 | | - def init(self): |
109 | | - self.avg = envs.get_global_env("avg", None, "hyper_parameters.reader") |
110 | | - |
111 | | - def generator_sample(self, line): |
112 | | - pass |
113 | | - ``` |
114 | | - |
115 | | -4. 继承并实现基类中的`generate_sample(self, line)`函数,逐行读取数据。 |
116 | | - - 该函数应返回一个可以迭代的reader方法(带有yield的函数不再是一个普通的函数,而是一个生成器generator,成为了可以迭代的对象,等价于一个数组、链表、文件、字符串etc.) |
117 | | - - 在这个可以迭代的函数中,如示例代码中的`def reader()`,我们定义数据读取的逻辑。以行为单位的数据进行截取,转换及预处理。 |
118 | | - - 最后,我们需要将数据整理为特定的格式,才能够被PaddleRec的Reader正确读取,并灌入的训练的网络中。简单来说,数据的输出顺序与我们在网络中创建的`inputs`必须是严格一一对应的,并转换为类似字典的形式。 |
119 | | - |
120 | | - 示例: 假设数据ABC在文本数据中,每行以这样的形式存储: |
121 | | - ```shell |
122 | | - 0.1,0.2,0.3...3.0,3.1,3.2 \t 99999,99998,99997 \t 1 \n |
123 | | - ``` |
124 | | - |
125 | | - 则示例代码如下: |
126 | | - ```python |
127 | | - from paddlerec.core.utils import envs |
128 | | - class Reader(ReaderBase): |
129 | | - def init(self): |
130 | | - self.avg = envs.get_global_env("avg", None, "hyper_parameters.reader") |
131 | | - |
132 | | - def generator_sample(self, line): |
133 | | - |
134 | | - def reader(self, line): |
135 | | - # 先分割 '\n', 再以 '\t'为标志分割为list |
136 | | - variables = (line.strip('\n')).split('\t') |
137 | | - |
138 | | - # A是第一个元素,并且每个数据之间使用','分割 |
139 | | - var_a = variables[0].split(',') # list |
140 | | - var_a = [float(i) / self.avg for i in var_a] # 将str数据转换为float |
141 | | - |
142 | | - |
143 | | - # B是第二个元素,同样以 ',' 分割 |
144 | | - var_b = variables[1].split(',') # list |
145 | | - var_b = [int(i) for i in var_b] # 将str数据转换为int |
146 | | - |
147 | | - # C是第三个元素, 只有一个元素,没有分割符 |
148 | | - var_c = variables[2] |
149 | | - var_c = int(var_c) # 将str数据转换为int |
150 | | - var_c = [var_c] # 将单独的数据元素置入list中 |
151 | | - |
152 | | - # 将数据与数据名结合,组织为dict的形式 |
153 | | - # 如下,output形式为{ A: var_a, B: var_b, C: var_c} |
154 | | - variable_name = ['A', 'B', 'C'] |
155 | | - output = zip(variable_name, [var_a] + [var_b] + [var_c]) |
156 | | - |
157 | | - # 将数据输出,使用yield方法,将该函数变为了一个可迭代的对象 |
158 | | - yield output |
159 | | - |
160 | | - ``` |
161 | | - |
162 | | - 至此,我们完成了Reader的实现。 |
163 | | - |
164 | | - |
165 | | -### 在yaml文件中配置Reader |
166 | | - |
167 | | -在模型的yaml配置文件中,主要的修改是三个,如下 |
168 | | - |
169 | | -```yaml |
170 | | -reader: |
171 | | - batch_size: 2 |
172 | | - class: "{workspace}/criteo_reader.py" |
173 | | - train_data_path: "{workspace}/data/train_data" |
| 20 | +line1: label:1 sparse1:2 sparse1:3 sparse2:100 dense1:2.1 dense1:5.8 dense1:8.9 |
| 21 | +line2: label:0 sparse1:78 sparse2:999 dense1:0.0 dense1:8.8 dense1:7.8 |
174 | 22 | ``` |
175 | 23 |
|
176 | | -batch_size: 顾名思义,是小批量训练时的样本大小 |
177 | | -class: 运行改模型所需reader的路径 |
178 | | -train_data_path: 训练数据所在文件夹 |
179 | | -reader_debug_mode: 测试reader语法,及输出是否符合预期的debug模式的开关 |
| 24 | +对于变长数据,一种方法是是通过padding的方式补齐成定长,这样动态图静态图均可支持。 |
| 25 | + |
| 26 | +由于推荐系统中的变长数据很常见,padding的方式会导致精度和性能。Paddle静态图支持直接读取数据,常见于sparse域,处理方式是embedding后通过sequence_pool转成一个定长的特征。 |
180 | 27 |
|
| 28 | +## 自定义Reader实现 |
181 | 29 |
|
182 | | -## 数据及Reader示例-DNN |
| 30 | +我们提供了两种Reader来读取自定义的数据方式,DataLoader和QueueDataset。 |
183 | 31 |
|
| 32 | +默认是DataLoader模式,可以在runner.reader_type定义两种模式:"DataLoader"或者"QueueDataset" |
184 | 33 |
|
185 | | -### Criteo数据集格式 |
| 34 | +### DataLoader |
186 | 35 |
|
187 | | -CTR-DNN训练及测试数据集选用[Display Advertising Challenge](https://www.kaggle.com/c/criteo-display-ad-challenge/)所用的Criteo数据集。该数据集包括两部分:训练集和测试集。训练集包含一段时间内Criteo的部分流量,测试集则对应训练数据后一天的广告点击流量。 |
188 | | -每一行数据格式如下所示: |
189 | | -```bash |
190 | | -<label> <integer feature 1> ... <integer feature 13> <categorical feature 1> ... <categorical feature 26> |
| 36 | +我们以下面10条样本组成的简单数据集data/test.txt为例,介绍如何自定义DataLoader,支持动态图和静态图。 |
| 37 | + |
| 38 | +``` |
| 39 | +line1: label:1 sparse1:2 sparse1:3 sparse2:100 dense1:2.1 dense1:5.8 dense1:8.9 |
| 40 | +line2: label:0 sparse1:78 sparse1:89 sparse2:999 dense1:0.0 dense1:8.8 dense1:7.8 |
| 41 | +line3: label:1 sparse1:2 sparse1:3 sparse2:100 dense1:2.1 dense1:5.8 dense1:8.9 |
| 42 | +line4: label:0 sparse1:78 sparse1:89 sparse2:999 dense1:0.0 dense1:8.8 dense1:7.8 |
| 43 | +... |
| 44 | +line10: label:0 sparse1:78 sparse1:89 sparse2:999 dense1:0.0 dense1:8.8 dense1:7.8 |
191 | 45 | ``` |
192 | | -其中```<label>```表示广告是否被点击,点击用1表示,未点击用0表示。```<integer feature>```代表数值特征(连续特征),共有13个连续特征。```<categorical feature>```代表分类特征(离散特征),共有26个离散特征。相邻两个特征用```\t```分隔,缺失特征用空格表示。测试集中```<label>```特征已被移除。 |
193 | 46 |
|
194 | | -### Criteo数据集的预处理 |
| 47 | +参照models/rank/dnn 目录下的criteo_reader.py的实现方式 |
195 | 48 |
|
196 | | -数据预处理共包括两步: |
197 | | -- 将原始训练集按9:1划分为训练集和验证集 |
198 | | -- 数值特征(连续特征)需进行归一化处理,但需要注意的是,对每一个特征```<integer feature i>```,归一化时用到的最大值并不是用全局最大值,而是取排序后95%位置处的特征值作为最大值,同时保留极值。 |
| 49 | +#### 修改xx_reader.py |
199 | 50 |
|
200 | | -### CTR网络输入的定义 |
| 51 | +用户只需要修改class RecDataset中的__iter__函数, 通过python自带的yield方式输出每条数据,目前推荐使用numpy格式输出。 |
201 | 52 |
|
202 | | -正如前所述,Criteo数据集中,分为连续数据与离散(稀疏)数据,所以整体而言,CTR-DNN模型的数据输入层包括三个,分别是:`dense_input`用于输入连续数据,维度由超参数`dense_feature_dim`指定,数据类型是归一化后的浮点型数据。`sparse_input_ids`用于记录离散数据,在Criteo数据集中,共有26个slot,所以我们创建了名为`C1~C26`的26个稀疏参数输入,并设置`lod_level=1`,代表其为变长数据,数据类型为整数;最后是每条样本的`label`,代表了是否被点击,数据类型是整数,0代表负样例,1代表正样例。 |
| 53 | +以line1为例 根据自定义函数, 实现对4个特征域的分别输出, yield的格式支持list。 |
| 54 | +``` |
| 55 | +yield [numpy.array([1]), numpy.array([2, 3]), numpy.array([100]), numpy.array([2.1,5.8,8.9])] |
| 56 | +``` |
| 57 | +Tips1: 目前的class必须命名为RecDataset, 用户只需要修改__iter__函数 |
203 | 58 |
|
204 | | -在Paddle中数据输入的声明使用`paddle.fluid.layers.data()`,会创建指定类型的占位符,数据IO会依据此定义进行数据的输入。 |
| 59 | +Tips2: 调试过程中可以直接print, 快速调研 |
205 | 60 |
|
206 | | -稀疏参数输入的定义: |
207 | | -```python |
208 | | -def sparse_inputs(): |
209 | | - ids = envs.get_global_env("hyper_parameters.sparse_inputs_slots", None) |
| 61 | +#### 修改config.yaml |
210 | 62 |
|
211 | | - sparse_input_ids = [ |
212 | | - fluid.layers.data(name="S" + str(i), |
213 | | - shape=[1], |
214 | | - lod_level=1, |
215 | | - dtype="int64") for i in range(1, ids) |
216 | | - ] |
217 | | - return sparse_input_ids |
218 | | -``` |
| 63 | +详细的yaml格式可以参考进阶教程的yaml文档 |
219 | 64 |
|
220 | | -稠密参数输入的定义: |
221 | | -```python |
222 | | -def dense_input(): |
223 | | - dim = envs.get_global_env("hyper_parameters.dense_input_dim", None) |
| 65 | +yaml中的runner.train_reader_path 为训练阶段的reader路径 |
224 | 66 |
|
225 | | - dense_input_var = fluid.layers.data(name="D", |
226 | | - shape=[dim], |
227 | | - dtype="float32") |
228 | | - return dense_input_var |
229 | | -``` |
| 67 | +Tips: importlib格式, 如test_reader.py,则写为train_reader_path: "test_reader" |
230 | 68 |
|
231 | | -标签的定义: |
232 | | -```python |
233 | | -def label_input(): |
234 | | - label = fluid.layers.data(name="click", shape=[1], dtype="int64") |
235 | | - return label |
236 | | -``` |
| 69 | +### QueueDataset |
237 | 70 |
|
238 | | -组合起来,正确的声明他们: |
239 | | -```python |
240 | | -self.sparse_inputs = sparse_inputs() |
241 | | -self.dense_input = dense_input() |
242 | | -self.label_input = label_input() |
| 71 | +QueueDataset适用于静态图对性能要求特别高的任务,面向大规模数据设计,可以轻松支持亿级的数据读取 |
243 | 72 |
|
244 | | -self._data_var.append(self.dense_input) |
| 73 | +#### 修改xx_reader.py |
245 | 74 |
|
246 | | -for input in self.sparse_inputs: |
247 | | - self._data_var.append(input) |
| 75 | +参照models/rank/dnn 目录下的queuedataset_reader.py, 用户需要修改函数generate_sample |
248 | 76 |
|
249 | | -self._data_var.append(self.label_input) |
| 77 | +Tips: yield返回的dict的序列需要和static_model.py中定义的create_feeds返回的占位符序列保持一致,dict的key值无作用。 |
250 | 78 |
|
251 | | -``` |
| 79 | +#### 修改config.yaml |
252 | 80 |
|
| 81 | +参照models/rank/config_queuedataset.yaml, 需要将runner.reader_type修改为"QueueDataset", 同时pipe_command修改为"python xx_reader.py" |
253 | 82 |
|
254 | | -### Criteo Reader写法 |
255 | | - |
256 | | -```python |
257 | | -# 引入PaddleRec的Reader基类 |
258 | | -from paddlerec.core.reader import ReaderBase |
259 | | -# 引入PaddleRec的读取yaml配置文件的方法 |
260 | | -from paddlerec.core.utils import envs |
261 | | - |
262 | | -# 定义TrainReader,需要继承 paddlerec.core.reader.Reader |
263 | | -class Reader(ReaderBase): |
264 | | - |
265 | | - # 数据预处理逻辑,继承自基类 |
266 | | - # 如果无需处理, 使用pass跳过该函数的执行 |
267 | | - def init(self): |
268 | | - self.cont_min_ = [0, -3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] |
269 | | - self.cont_max_ = [20, 600, 100, 50, 64000, 500, 100, 50, 500, 10, 10, 10, 50] |
270 | | - self.cont_diff_ = [20, 603, 100, 50, 64000, 500, 100, 50, 500, 10, 10, 10, 50] |
271 | | - self.hash_dim_ = envs.get_global_env("hyper_parameters.sparse_feature_number", None) |
272 | | - self.continuous_range_ = range(1, 14) |
273 | | - self.categorical_range_ = range(14, 40) |
274 | | - |
275 | | - # 读取数据方法,继承自基类 |
276 | | - # 实现可以迭代的reader函数,逐行处理数据 |
277 | | - def generate_sample(self, line): |
278 | | - """ |
279 | | - Read the data line by line and process it as a dictionary |
280 | | - """ |
281 | | - |
282 | | - def reader(): |
283 | | - """ |
284 | | - This function needs to be implemented by the user, based on data format |
285 | | - """ |
286 | | - features = line.rstrip('\n').split('\t') |
287 | | - |
288 | | - dense_feature = [] |
289 | | - sparse_feature = [] |
290 | | - for idx in self.continuous_range_: |
291 | | - if features[idx] == "": |
292 | | - dense_feature.append(0.0) |
293 | | - else: |
294 | | - dense_feature.append( |
295 | | - (float(features[idx]) - self.cont_min_[idx - 1]) / |
296 | | - self.cont_diff_[idx - 1]) |
297 | | - |
298 | | - for idx in self.categorical_range_: |
299 | | - sparse_feature.append( |
300 | | - [hash(str(idx) + features[idx]) % self.hash_dim_]) |
301 | | - label = [int(features[0])] |
302 | | - feature_name = ["D"] |
303 | | - for idx in self.categorical_range_: |
304 | | - feature_name.append("S" + str(idx - 13)) |
305 | | - feature_name.append("label") |
306 | | - yield zip(feature_name, [dense_feature] + sparse_feature + [label]) |
307 | | - |
308 | | - return reader |
309 | | -``` |
| 83 | +Tips: pipe_command的执行命令默认是在config.yaml对应的目录下执行 |
0 commit comments