Skip to content

Commit 7d501da

Browse files
committed
add online learning
1 parent 18371de commit 7d501da

File tree

11 files changed

+863
-1
lines changed

11 files changed

+863
-1
lines changed

doc/online_learning.md

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,22 @@
1-
# PaddleRec 启动训练
1+
# PaddleRec 流式训练(OnlineLearning)任务启动及配置流程
22

33

4+
## 流式训练简介
5+
流式训练是按照一定顺序进行数据的接收和处理,每接收一个数据,模型会对它进行预测并对当前模型进行更新,然后处理下一个数据。 像信息流、小视频、电商等场景,每天都会新增大量的数据, 让每天(每一刻)新增的数据基于上一天(上一刻)的模型进行新的预测和模型更新。
6+
7+
在大规模流式训练场景下, 需要使用的深度学习框架有对应的能力支持, 即:
8+
* 支持大规模分布式训练的能力, 数据量巨大, 需要有良好的分布式训练及扩展能力,才能满足训练的时效要求
9+
* 支持超大规模的Embedding, 能够支持十亿甚至千亿级别的Embedding, 拥有合理的参数输出的能力,能够快速输出模型参数并和线上其他系统进行对接
10+
* Embedding的特征ID需要支持HASH映射,不要求ID的编码,能够自动增长及控制特征的准入(原先不存在的特征可以以适当的条件创建), 能够定期淘汰(能够以一定的策略进行过期的特征的清理) 并拥有准入及淘汰策略
11+
* 最后就是要基于框架开发一套完备的流式训练的 trainer.py, 能够拥有完善的流式训练流程
12+
13+
## 使用PaddleRec内置的 online learning 进行模型的训练
14+
目前,PaddleRec基于飞桨分布式训练框架的能力,实现了这套流式训练的流程。 供大家参考和使用。我们在`models/online_learning`目录下提供了一个ctr-dnn的online_training的版本,供大家更好的理解和参考。
15+
16+
**注意**
17+
1. 使用online learning 需要安装目前Paddle最新的开发者版本, 你可以从 https://www.paddlepaddle.org.cn/documentation/docs/zh/install/Tables.html#whl-dev 此处获得它,需要先卸载当前已经安装的飞桨版本,根据自己的Python环境下载相应的安装包。
18+
2. 使用流式训练及大规模稀疏的能力,需要对模型做一些微调, 因此需要你修改部分代码。
19+
3. 当前只有参数服务器的分布式训练是支持带大规模稀疏的流式训练的,因此运行时,请直接选择参数服务器本地训练或集群训练方法。
420

521
## 启动方法
622

Lines changed: 261 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,261 @@
1+
# 基于DNN模型的点击率预估模型
2+
3+
## 介绍
4+
`CTR(Click Through Rate)`,即点击率,是“推荐系统/计算广告”等领域的重要指标,对其进行预估是商品推送/广告投放等决策的基础。简单来说,CTR预估对每次广告的点击情况做出预测,预测用户是点击还是不点击。CTR预估模型综合考虑各种因素、特征,在大量历史数据上训练,最终对商业决策提供帮助。本模型实现了下述论文中提出的DNN模型:
5+
6+
```text
7+
@inproceedings{guo2017deepfm,
8+
title={DeepFM: A Factorization-Machine based Neural Network for CTR Prediction},
9+
author={Huifeng Guo, Ruiming Tang, Yunming Ye, Zhenguo Li and Xiuqiang He},
10+
booktitle={the Twenty-Sixth International Joint Conference on Artificial Intelligence (IJCAI)},
11+
pages={1725--1731},
12+
year={2017}
13+
}
14+
```
15+
16+
#
17+
## 数据准备
18+
### 数据来源
19+
训练及测试数据集选用[Display Advertising Challenge](https://www.kaggle.com/c/criteo-display-ad-challenge/)所用的Criteo数据集。该数据集包括两部分:训练集和测试集。训练集包含一段时间内Criteo的部分流量,测试集则对应训练数据后一天的广告点击流量。
20+
每一行数据格式如下所示:
21+
```bash
22+
<label> <integer feature 1> ... <integer feature 13> <categorical feature 1> ... <categorical feature 26>
23+
```
24+
其中```<label>```表示广告是否被点击,点击用1表示,未点击用0表示。```<integer feature>```代表数值特征(连续特征),共有13个连续特征。```<categorical feature>```代表分类特征(离散特征),共有26个离散特征。相邻两个特征用```\t```分隔,缺失特征用空格表示。测试集中```<label>```特征已被移除。
25+
26+
### 数据预处理
27+
数据预处理共包括两步:
28+
- 将原始训练集按9:1划分为训练集和验证集
29+
- 数值特征(连续特征)需进行归一化处理,但需要注意的是,对每一个特征```<integer feature i>```,归一化时用到的最大值并不是用全局最大值,而是取排序后95%位置处的特征值作为最大值,同时保留极值。
30+
31+
### 一键下载训练及测试数据
32+
```bash
33+
sh download_data.sh
34+
```
35+
执行该脚本,会从国内源的服务器上下载Criteo数据集,并解压到指定文件夹。全量训练数据放置于`./train_data_full/`,全量测试数据放置于`./test_data_full/`,用于快速验证的训练数据与测试数据放置于`./train_data/``./test_data/`
36+
37+
执行该脚本的理想输出为:
38+
```bash
39+
> sh download_data.sh
40+
--2019-11-26 06:31:33-- https://fleet.bj.bcebos.com/ctr_data.tar.gz
41+
Resolving fleet.bj.bcebos.com... 10.180.112.31
42+
Connecting to fleet.bj.bcebos.com|10.180.112.31|:443... connected.
43+
HTTP request sent, awaiting response... 200 OK
44+
Length: 4041125592 (3.8G) [application/x-gzip]
45+
Saving to: “ctr_data.tar.gz”
46+
47+
100%[==================================================================================================================>] 4,041,125,592 120M/s in 32s
48+
49+
2019-11-26 06:32:05 (120 MB/s) - “ctr_data.tar.gz” saved [4041125592/4041125592]
50+
51+
raw_data/
52+
raw_data/part-55
53+
raw_data/part-113
54+
...
55+
test_data/part-227
56+
test_data/part-222
57+
Complete data download.
58+
Full Train data stored in ./train_data_full
59+
Full Test data stored in ./test_data_full
60+
Rapid Verification train data stored in ./train_data
61+
Rapid Verification test data stored in ./test_data
62+
```
63+
至此,我们已完成数据准备的全部工作。
64+
65+
## 数据读取
66+
为了能高速运行CTR模型的训练,`PaddleRec`封装了`dataset``dataloader`API进行高性能的数据读取。
67+
68+
如何在我们的训练中引入dataset读取方式呢?无需变更数据格式,只需在我们的训练代码中加入以下内容,便可达到媲美二进制读取的高效率,以下是一个比较完整的流程:
69+
70+
### 引入dataset
71+
72+
1. 通过工厂类`fluid.DatasetFactory()`创建一个dataset对象。
73+
2. 将我们定义好的数据输入格式传给dataset,通过`dataset.set_use_var(inputs)`实现。
74+
3. 指定我们的数据读取方式,由`dataset_generator.py`实现数据读取的规则,后面将会介绍读取规则的实现。
75+
4. 指定数据读取的batch_size。
76+
5. 指定数据读取的线程数,该线程数和训练线程应保持一致,两者为耦合的关系。
77+
6. 指定dataset读取的训练文件的列表。
78+
79+
```python
80+
def get_dataset(inputs, args)
81+
dataset = fluid.DatasetFactory().create_dataset()
82+
dataset.set_use_var(inputs)
83+
dataset.set_pipe_command("python dataset_generator.py")
84+
dataset.set_batch_size(args.batch_size)
85+
dataset.set_thread(int(args.cpu_num))
86+
file_list = [
87+
str(args.train_files_path) + "/%s" % x
88+
for x in os.listdir(args.train_files_path)
89+
]
90+
logger.info("file list: {}".format(file_list))
91+
return dataset, file_list
92+
```
93+
94+
### 如何指定数据读取规则
95+
96+
在上文我们提到了由`dataset_generator.py`实现具体的数据读取规则,那么,怎样为dataset创建数据读取的规则呢?
97+
以下是`dataset_generator.py`的全部代码,具体流程如下:
98+
1. 首先我们需要引入dataset的库,位于`paddle.fluid.incubate.data_generator`
99+
2. 声明一些在数据读取中会用到的变量,如示例代码中的`cont_min_``categorical_range_`等。
100+
3. 创建一个子类,继承dataset的基类,基类有多种选择,如果是多种数据类型混合,并且需要转化为数值进行预处理的,建议使用`MultiSlotDataGenerator`;若已经完成了预处理并保存为数据文件,可以直接以`string`的方式进行读取,使用`MultiSlotStringDataGenerator`,能够进一步加速。在示例代码,我们继承并实现了名为`CriteoDataset`的dataset子类,使用`MultiSlotDataGenerator`方法。
101+
4. 继承并实现基类中的`generate_sample`函数,逐行读取数据。该函数应返回一个可以迭代的reader方法(带有yield的函数不再是一个普通的函数,而是一个生成器generator,成为了可以迭代的对象,等价于一个数组、链表、文件、字符串etc.)
102+
5. 在这个可以迭代的函数中,如示例代码中的`def reader()`,我们定义数据读取的逻辑。例如对以行为单位的数据进行截取,转换及预处理。
103+
6. 最后,我们需要将数据整理为特定的格式,才能够被dataset正确读取,并灌入的训练的网络中。简单来说,数据的输出顺序与我们在网络中创建的`inputs`必须是严格一一对应的,并转换为类似字典的形式。在示例代码中,我们使用`zip`的方法将参数名与数值构成的元组组成了一个list,并将其yield输出。如果展开来看,我们输出的数据形如`[('dense_feature',[value]),('C1',[value]),('C2',[value]),...,('C26',[value]),('label',[value])]`
104+
105+
106+
```python
107+
import paddle.fluid.incubate.data_generator as dg
108+
109+
cont_min_ = [0, -3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
110+
cont_max_ = [20, 600, 100, 50, 64000, 500, 100, 50, 500, 10, 10, 10, 50]
111+
cont_diff_ = [20, 603, 100, 50, 64000, 500, 100, 50, 500, 10, 10, 10, 50]
112+
hash_dim_ = 1000001
113+
continuous_range_ = range(1, 14)
114+
categorical_range_ = range(14, 40)
115+
116+
class CriteoDataset(dg.MultiSlotDataGenerator):
117+
118+
def generate_sample(self, line):
119+
120+
def reader():
121+
features = line.rstrip('\n').split('\t')
122+
dense_feature = []
123+
sparse_feature = []
124+
for idx in continuous_range_:
125+
if features[idx] == "":
126+
dense_feature.append(0.0)
127+
else:
128+
dense_feature.append(
129+
(float(features[idx]) - cont_min_[idx - 1]) /
130+
cont_diff_[idx - 1])
131+
for idx in categorical_range_:
132+
sparse_feature.append(
133+
[hash(str(idx) + features[idx]) % hash_dim_])
134+
label = [int(features[0])]
135+
process_line = dense_feature, sparse_feature, label
136+
feature_name = ["dense_feature"]
137+
for idx in categorical_range_:
138+
feature_name.append("C" + str(idx - 13))
139+
feature_name.append("label")
140+
141+
yield zip(feature_name, [dense_feature] + sparse_feature + [label])
142+
143+
return reader
144+
145+
d = CriteoDataset()
146+
d.run_from_stdin()
147+
```
148+
### 快速调试Dataset
149+
我们可以脱离组网架构,单独验证Dataset的输出是否符合我们预期。使用命令
150+
`cat 数据文件 | python dataset读取python文件`进行dataset代码的调试:
151+
```bash
152+
cat train_data/part-0 | python dataset_generator.py
153+
```
154+
输出的数据格式如下:
155+
` dense_input:size ; dense_input:value ; sparse_input:size ; sparse_input:value ; ... ; sparse_input:size ; sparse_input:value ; label:size ; label:value `
156+
157+
理想的输出为(截取了一个片段):
158+
```bash
159+
...
160+
13 0.05 0.00663349917081 0.05 0.0 0.02159375 0.008 0.15 0.04 0.362 0.1 0.2 0.0 0.04 1 715353 1 817085 1 851010 1 833725 1 286835 1 948614 1 881652 1 507110 1 27346 1 646986 1 643076 1 200960 1 18464 1 202774 1 532679 1 729573 1 342789 1 562805 1 880474 1 984402 1 666449 1 26235 1 700326 1 452909 1 884722 1 787527 1 0
161+
...
162+
```
163+
164+
#
165+
## 模型组网
166+
### 数据输入声明
167+
正如数据准备章节所介绍,Criteo数据集中,分为连续数据与离散(稀疏)数据,所以整体而言,CTR-DNN模型的数据输入层包括三个,分别是:`dense_input`用于输入连续数据,维度由超参数`dense_feature_dim`指定,数据类型是归一化后的浮点型数据。`sparse_input_ids`用于记录离散数据,在Criteo数据集中,共有26个slot,所以我们创建了名为`C1~C26`的26个稀疏参数输入,并设置`lod_level=1`,代表其为变长数据,数据类型为整数;最后是每条样本的`label`,代表了是否被点击,数据类型是整数,0代表负样例,1代表正样例。
168+
169+
在Paddle中数据输入的声明使用`paddle.fluid.data()`,会创建指定类型的占位符,数据IO会依据此定义进行数据的输入。
170+
```python
171+
dense_input = fluid.data(name="dense_input",
172+
shape=[-1, args.dense_feature_dim],
173+
dtype="float32")
174+
175+
sparse_input_ids = [
176+
fluid.data(name="C" + str(i),
177+
shape=[-1, 1],
178+
lod_level=1,
179+
dtype="int64") for i in range(1, 27)
180+
]
181+
182+
label = fluid.data(name="label", shape=[-1, 1], dtype="int64")
183+
inputs = [dense_input] + sparse_input_ids + [label]
184+
```
185+
186+
### CTR-DNN模型组网
187+
188+
CTR-DNN模型的组网比较直观,本质是一个二分类任务,代码参考`model.py`。模型主要组成是一个`Embedding`层,三个`FC`层,以及相应的分类任务的loss计算和auc计算。
189+
190+
#### Embedding层
191+
首先介绍Embedding层的搭建方式:`Embedding`层的输入是`sparse_input`,shape由超参的`sparse_feature_dim``embedding_size`定义。需要特别解释的是`is_sparse`参数,当我们指定`is_sprase=True`后,计算图会将该参数视为稀疏参数,反向更新以及分布式通信时,都以稀疏的方式进行,会极大的提升运行效率,同时保证效果一致。
192+
193+
各个稀疏的输入通过Embedding层后,将其合并起来,置于一个list内,以方便进行concat的操作。
194+
195+
```python
196+
def embedding_layer(input):
197+
return fluid.layers.embedding(
198+
input=input,
199+
is_sparse=True,
200+
size=[args.sparse_feature_dim,
201+
args.embedding_size],
202+
param_attr=fluid.ParamAttr(
203+
name="SparseFeatFactors",
204+
initializer=fluid.initializer.Uniform()),
205+
)
206+
207+
sparse_embed_seq = list(map(embedding_layer, inputs[1:-1])) # [C1~C26]
208+
```
209+
210+
#### FC层
211+
将离散数据通过embedding查表得到的值,与连续数据的输入进行`concat`操作,合为一个整体输入,作为全链接层的原始输入。我们共设计了3层FC,每层FC的输出维度都为400,每层FC都后接一个`relu`激活函数,每层FC的初始化方式为符合正态分布的随机初始化,标准差与上一层的输出维度的平方根成反比。
212+
```python
213+
concated = fluid.layers.concat(sparse_embed_seq + inputs[0:1], axis=1)
214+
215+
fc1 = fluid.layers.fc(
216+
input=concated,
217+
size=400,
218+
act="relu",
219+
param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal(
220+
scale=1 / math.sqrt(concated.shape[1]))),
221+
)
222+
fc2 = fluid.layers.fc(
223+
input=fc1,
224+
size=400,
225+
act="relu",
226+
param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal(
227+
scale=1 / math.sqrt(fc1.shape[1]))),
228+
)
229+
fc3 = fluid.layers.fc(
230+
input=fc2,
231+
size=400,
232+
act="relu",
233+
param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal(
234+
scale=1 / math.sqrt(fc2.shape[1]))),
235+
)
236+
```
237+
#### Loss及Auc计算
238+
- 预测的结果通过一个输出shape为2的FC层给出,该FC层的激活函数是softmax,会给出每条样本分属于正负样本的概率。
239+
- 每条样本的损失由交叉熵给出,交叉熵的输入维度为[batch_size,2],数据类型为float,label的输入维度为[batch_size,1],数据类型为int。
240+
- 该batch的损失`avg_cost`是各条样本的损失之和
241+
- 我们同时还会计算预测的auc,auc的结果由`fluid.layers.auc()`给出,该层的返回值有三个,分别是全局auc: `auc_var`,当前batch的auc: `batch_auc_var`,以及auc_states: `auc_states`,auc_states包含了`batch_stat_pos, batch_stat_neg, stat_pos, stat_neg`信息。`batch_auc`我们取近20个batch的平均,由参数`slide_steps=20`指定,roc曲线的离散化的临界数值设置为4096,由`num_thresholds=2**12`指定。
242+
```
243+
predict = fluid.layers.fc(
244+
input=fc3,
245+
size=2,
246+
act="softmax",
247+
param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal(
248+
scale=1 / math.sqrt(fc3.shape[1]))),
249+
)
250+
251+
cost = fluid.layers.cross_entropy(input=predict, label=inputs[-1])
252+
avg_cost = fluid.layers.reduce_sum(cost)
253+
accuracy = fluid.layers.accuracy(input=predict, label=inputs[-1])
254+
auc_var, batch_auc_var, auc_states = fluid.layers.auc(
255+
input=predict,
256+
label=inputs[-1],
257+
num_thresholds=2**12,
258+
slide_steps=20)
259+
```
260+
261+
完成上述组网后,我们最终可以通过训练拿到`avg_cost``auc`两个重要指标。
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
backend: "PaddleCloud"
15+
cluster_type: k8s # mpi 可选
16+
17+
config:
18+
fs_name: "afs://xxx.com"
19+
fs_ugi: "usr,pwd"
20+
output_path: "" # 填远程地址,如afs:/user/your/path/ 则此处填 /user/your/path
21+
22+
# for mpi
23+
train_data_path: "" # 填远程地址,如afs:/user/your/path/ 则此处填 /user/your/path
24+
test_data_path: "" # 填远程地址,如afs:/user/your/path/ 则此处填 /user/your/path
25+
thirdparty_path: "" # 填远程地址,如afs:/user/your/path/ 则此处填 /user/your/path
26+
paddle_version: "1.7.2" # 填写paddle官方版本号 >= 1.7.2
27+
28+
# for k8s
29+
afs_remote_mount_point: "" # 填远程地址,如afs:/user/your/path/ 则此处填 /user/your/path
30+
31+
# paddle分布式底层超参,无特殊需求不理不改
32+
communicator:
33+
FLAGS_communicator_is_sgd_optimizer: 0
34+
FLAGS_communicator_send_queue_size: 5
35+
FLAGS_communicator_thread_pool_size: 32
36+
FLAGS_communicator_max_merge_var_num: 5
37+
FLAGS_communicator_max_send_grad_num_before_recv: 5
38+
FLAGS_communicator_fake_rpc: 0
39+
FLAGS_rpc_retry_times: 3
40+
41+
submit:
42+
ak: ""
43+
sk: ""
44+
priority: "high"
45+
job_name: "PaddleRec_CTR"
46+
group: ""
47+
start_cmd: "python -m paddlerec.run -m ./config.yaml"
48+
files: ./*.py ./*.yaml
49+
50+
# for mpi ps-cpu
51+
nodes: 2
52+
53+
# for k8s gpu
54+
k8s_trainers: 2
55+
k8s_cpu_cores: 2
56+
k8s_gpu_card: 1
57+
58+
# for k8s ps-cpu
59+
k8s_trainers: 2
60+
k8s_cpu_cores: 4
61+
k8s_ps_num: 2
62+
k8s_ps_cores: 4
63+

0 commit comments

Comments
 (0)