2
2
3
3
## 分布式深度学习程序难写
4
4
5
- 一个深度学习的训练任务往往需要较多的训练数据,和较长的训练时间。一个通常的做法是
6
- 把单机程序给分布式化,利用集群的资源,启动多个 worker,来共同完成一个训练任务。
7
-
8
- 分布式深度学习程序的编写是相对困难的,编程者既要了解深度学习,也要了解分布式系统
9
- 开发。
10
- 在一个分布式深度学习系统中,需要启动和监控若干个 workers
11
- 进程,对数据和计算任务进行拆分,并且分发给 workers。
12
- 此外,还需要考虑 workers 之间的通信(communication)和 同步(synchronization)。
13
- 随着计算规模的增加,workers
14
- 进程数目也会增加。当计算规模很大时,包含数十个进程的作业在执行过程中一个进程都不
15
- 挂的概率几乎是0。
16
- 如果一个进程挂掉,则整个作业重启,那么这个作业会陷入永不停歇的重启,无法结束。
5
+ 为了从海量数据中学习规律,我们需要编写分布式深度学习程序来完成训练任务。这在工业场景中尤为常见。
6
+
7
+ 分布式深度学习程序的编写是相对困难的,编程者既要了解深度学习,也要了解分布式系统开发。
8
+ 在一个分布式深度学习系统中,需要启动和监控若干个 worker,对数据和计算任务进行拆分,并且分发给 workers。
9
+ 此外,还需要考虑 worker 之间的通信(communication)和 同步(synchronization)。
10
+ 随着计算规模的增加,worker
11
+ 数目也会增加。当 worker 数目很多时,作业在执行过程中有 worker 挂掉的概率也会变得很大。
12
+ 如果一个 worker 挂掉,则整个作业重启,那么重启之后可能又会有 worker 挂掉导致重启,于是作业不断陷入重启。
17
13
此时,需要结合深度学习训练算法的数学性质,设计容错机制。
18
14
这要求编程者必须同时是深度学习和分布式系统的专家。
19
15
16
+ 我们对编写分布式深度学习程序的现有开源方案进行了调研。一方面,TensorFlow 是当今最受欢迎的深度学习平台,在蚂蚁集团内部,TensorFlow
17
+ 在诸多业务场景中被广泛使用;
18
+ 另一方面,Kubernetes 是目前最先进的分布式操作系统,是公有云和私有云的事实工业标准。
19
+ 因此,本文重点讨论在 Kubernetes 上运行 TensorFlow 分布式训练程序的解决方案。调研结果参见下表。
20
+
21
+ | | 模型定义 | 分布式调度工具 |
22
+ | --- | --- | --- |
23
+ | 方案一 | TensorFlow Estimator API | Kubeflow TF-operator |
24
+ | 方案二 | TensorFlow Keras API | Kubeflow TF-operator |
25
+ | 方案三 | Horovod with TensorFlow | Kubeflow MPI-operator |
26
+
27
+ 现有开源方案的使用仍有一定门槛。在模型定义方面,TensorFlow Estimator API 仅支持 graph execution,不支持 eager
28
+ execution,调试代码和网络各层输出较为麻烦。
29
+ TensorFlow 2.x 默认支持 eager execution,并且推荐使用更加精简的 Keras API
30
+ 来定义模型。
31
+ TensorFlow Keras API 提高开发效率,降低使用门槛,与 eager execution
32
+ 配合之后,使得程序更为直观,也更易调试。
33
+ 目前 TensorFlow 2.x Keras API 还暂不支持 ParameterServer 分布式策略,对 AllReduce 分布式策略提供了实验性的支持。
34
+ 而Horovod 有一定的侵入性,用户除了熟悉 TensorFlow API之外,还需学习 Horovod API。
35
+ 另一方面,用户依赖 Kubeflow 项目提供的 Kubernetes Operator
36
+ 在 Kubernetes 集群上运行分布式训练作业,这要求用户对 Kubernetes 分布式操作系统有一定掌握。
37
+
20
38
我们为此设计和开发了 ElasticDL
21
39
分布式计算框架,让编程者只需了解深度学习,不需要了解分布式系统开发。
40
+ 同时,ElasticDL 从易用性的角度出发,直接支持了 TensorFlow 2.x 的 Keras API。
22
41
23
42
就像 MapReduce 框架中只需要用户完形填空两个函数:map 和 reduce,ElasticDL
24
- 只需要用户填写 forward、cost、 feed 三个函数 。
43
+ 需要用户填写 forward、loss、optimizer、 feed 等函数 。
25
44
其中 forward 定义深度学习的前向计算过程,
26
45
ElasticDL 会调用 TensorFlow eager mode 中提供的 Gradient Tape 接口,
27
46
来自动推导对应的后向计算过程(backward pass);
28
- cost 指定模型训练时使用的 cost 函数;
47
+ loss 指定模型训练时使用的损失函数;
48
+ optimizer 指定模型训练时使用的优化器;
29
49
feed 用来定制化训练数据到 TensorFlow 的 tensor的转换过程。
30
50
31
- 所有的这些函数的编程只需要了解 TensorFlow
51
+ 所有这些函数的编程只需要了解 TensorFlow
32
52
API,不需要对分布式训练有任何背景知识。
33
53
这些函数也可以在单机上用小数据做调试验证,然后就可以放心地交给 ElasticDL
34
54
做分布式的容错的大规模训练了。
35
55
36
- ElasticDL 要求分布式计算平台是 Kubernetes。
37
- 一方面 Kubernetes 是目前最先进的分布式操作系统,是公有云和私有云的事实工业标准;
38
- 另一方面,ElasticDL 一改 Kubeflow 通过增加 Kubernetes operator 的方式定制
39
- Kubernetes 的思路,
40
- 为每个作业引入一个 master 进程(类似 Google MapReduce)。
41
- 这个 master 进程作为作业的一部分,而不是 Kubernetes 的一部分,
56
+ 不同于 Kubeflow 通过增加 Kubernetes Operator 来定制
57
+ Kubernetes 的思路,ElasticDL 为每个作业引入一个 master(类似于 Google MapReduce)。
58
+ 这个 master 作为作业的一部分,而不是 Kubernetes 的一部分,
42
59
不仅了解集群情况,更了解深度学习作业本身,所以有充分的信息来做更优的调度。
43
- 比如 master 进程可以请 Kubernetes 把两个 workers 启动在同一台物理机上,共用一个
60
+ 比如 master 可以请 Kubernetes 把两个 worker 启动在同一台物理机上,共用一个
44
61
GPU。
45
62
这样,一个进程读数据的时候,请另外一个进程来做计算,从而让 GPU
46
63
的利用率总是很高。
47
64
48
- TensorFlow 是当今最受欢迎的深度学习框架。在蚂蚁金服内部,TensorFlow
49
- 在诸多业务场景中被广泛使用。
50
- 我们发现 AllReduce 和 Parameter Server 是分布式训练程序中常用两种梯度聚合策略。
51
- 在图像语音模型中,AllReduce 策略被广泛的使用。
52
- 在搜索广告推荐模型中,我们更倾向于使用 Parameter Server 策略。
53
-
54
- 我们调研了目前在 Kubernetes 上运行 TensorFlow 分布式训练程序的一些开源解决方案。
65
+ ## ElasticDL 简化分布式深度学习程序编写
55
66
56
- | 分布式策略 | 模型定义 | 任务提交工具 |
57
- | --- | --- | --- |
58
- | ParameterServer | TensorFlow Estimator | Kubeflow TF-operator |
59
- | AllReduce | Keras + Horovod | Kubeflow MPI-operator |
67
+ ### 用户只需提供模型定义
60
68
61
- 我们发现 TensorFlow Estimator 仅支持 graph execution,不支持 eager
62
- execution,调试代码和网络各层输出比较麻烦。并且,用户需要组合使用不同的工具,来
63
- 编写不同分布式策略的训练程序。
64
-
65
- TensorFlow 2.x 默认支持 eager execution,并且推荐使用更加精简的 Keras API
66
- 来定义模型。
67
- TensorFlow Keras API 提高开发效率,降低使用门槛,与 eager execution
68
- 配合之后,使得程序更为直观,也更易调试。
69
- 目前 TensorFlow 2.x 的 ParameterServer 和 AllReduce 分布式策略对 Keras API
70
- 的支持还不完善。
71
-
72
- 而 ElasticDL 从易用性的角度出发,直接支持了 TensorFlow 2.x 的 Keras API。
73
- ElasticDL 同时提供统一的 ElasticDL client 命令行工具来提交作业。
74
-
75
- | 分布式策略 | 模型定义 | 任务提交工具 |
76
- | --- | --- | --- |
77
- | ParameterServer | TensorFlow Keras API | ElasticDL client |
78
- | AllReduce | TensorFlow Keras API | ElasticDL client |
79
-
80
- 统一的模型定义接口和统一的任务提交工具,极大地减少了用户的心智负担,提高了工作效
81
- 率。
82
-
83
- ## ElasticDL 是如何解决问题的
84
-
85
- 在 ElasticDL 中,用户专注于使用 TensorFlow Keras API
86
- 描述单机程序,而不需要关心分布式程序的写法。ElasticDL
87
- 会自动把单机程序转为分布式训练程序。下面我们用一个mnist的训练例子来详细说明。
88
-
89
- ### 使用 Keras API 定义模型
90
-
91
- 用户使用 Keras API 定义模型结构:
69
+ 如上文所述,用户使用 Keras API 定义模型结构。这里我们用一个 MNIST 手写数字识别的例子来详细说明。
92
70
93
71
``` python
94
72
def custom_model ():
@@ -162,22 +140,21 @@ def dataset_fn(dataset, mode, _):
162
140
在反向计算中,worker 可以通过 TensorFlow 2.x 暴露的 Gradient Tape接口
163
141
来计算得到梯度。
164
142
165
- ### ElasticDL master 提供 training loop
143
+ ### ElasticDL 提供 Training Loop
166
144
167
145
我们通常使用 mini-batch SGD 的方法来训练深度学习模型。ElasticDL worker
168
146
会进行如下步骤来完成对一个 mini-batch 的训练:
169
147
170
148
1 . 读取一个 mini-batch 的训练数据
171
- 2 . 进行 forward 计算
149
+ 2 . 获取模型参数, 进行 forward 计算
172
150
3 . 进行 backward 计算,得到梯度
173
151
4 . 把梯度以某种方式进行聚合,并更新模型
174
152
175
- 我们需要一个更大的 training loop 来包含这四个步骤 ,确保 worker
153
+ 我们需要一个更大的 training loop 来包含上述的四个步骤 ,确保 worker
176
154
可以持续的读取下一个 mini-batch 的数据,继续训练,直到满足终止条件。
177
155
178
- ElasticDL master 中实现了这样的training
156
+ ElasticDL master 中实现了这样的 training
179
157
loop,其关键点是通过动态数据分发,解决分布式训练中的数据读取问题。
180
-
181
158
首先 master 会根据数据索引将数据分片,然后为每个分片的索引创建一个 task。
182
159
ElasticDL worker 会向 master 请求拿到 task。拿到 task 之后,worker
183
160
可以数据的索引找到对应的数据分片。
@@ -201,77 +178,50 @@ ElasticDL master 中还为这些 task 维护了三个队列,todo/doing/done
201
178
新加入的 worker 可以直接向 master 请求分配数据分片,从而更方便地支持弹性调度
202
179
worker 的数量。
203
180
204
- ### ElasticDL client 命令行工具提交作业
181
+ ### ElasticDL 高效完成 Training Loop
205
182
206
- 在本地完成对模型的调试之后,我们可以借助 ElasticDL client 提供的命令行工具向
207
- Kubernetes
208
- 集群提交分布式训练作业。我们只需要指定模型定义文件和一些额外的参数,包括资源配置
209
- 等 。
183
+ Training loop 中的关键一步是把来自多个 worker 的梯度进行高效聚合。一种常用的梯度聚合策略是 Parameter Server (PS) 策略。
184
+ 在 PS 策略下,模型参数被分成若干个 shard,存储在一组 PS 上。
185
+ worker 首先向 PS 请求参数,然后使用本地训练数据计算梯度,并把梯度发送给PS。
186
+ PS 使用 worker 上传来的梯度来迭代更新模型参数 。
210
187
211
- ``` bash
212
- elasticdl train \
213
- --image_name=elasticdl:ci \
214
- --model_zoo=model_zoo \
215
- --model_def=mnist_functional_api.mnist_functional_api.custom_model \
216
- --training_data=/data/mnist/train \
217
- --num_epochs=1 \
218
- --master_resource_request=" cpu=1,memory=4096Mi,ephemeral-storage=1024Mi" \
219
- --worker_resource_request=" cpu=1,memory=4096Mi,ephemeral-storage=1024Mi" \
220
- --ps_resource_request=" cpu=1,memory=4096Mi,ephemeral-storage=1024Mi" \
221
- --minibatch_size=64 \
222
- --num_ps_pods=1 \
223
- --num_workers=2 \
224
- --job_name=test-train \
225
- --distribution_strategy=ParameterServerStrategy \
226
- --output=model_output
227
- ```
228
-
229
- 在上述例子中,我们指定了 Parameter Server 的分布式策略,由一个parameter server
230
- 和 两个 worker 共同完成训练任务。
231
- ElasticDL 的 master pod 将会被首先创建,然后由 master 负责启动 worker
232
- pod,以及 parameter server pod,并且建立通信。
233
- ElasticDL master 可以监控每个 pod 的状态,当有 pod 挂掉时,master
234
- 会重新拉起新的 pod。
235
-
236
- ## Parameter Server 的改进
237
-
238
- 在搜索广告等场景,模型中可能包含较大的 embedding
239
- table,其内存会超过单机内存。我们通常使用 Parameter Server (PS)
240
- 分布式策略来训练此类模型。
241
- 在 PS 策略下,PS 上存储着模型参数,worker 从 PS 上请求参数。
242
- worker 在本地使用训练数据计算梯度之后,把梯度再发送到 PS 上,PS 使用 worker
243
- 传来的梯度来迭代更新模型参数。
244
-
245
- ElasticDL 用 Go 实现了 Parameter
246
- Server,具有良好的吞吐能力和可扩展性。并且,我们针对 embedding table
188
+ ElasticDL master 会首先进行组网,协调 worker 和 PS 之间进行通信。ElasticDL 的 master 将会被首先创建,
189
+ 然后由 master 启动 worker pod,以及 parameter server pod,并且建立通信。
190
+ ElasticDL master 可以监控每个 pod 的状态,当有 pod 挂掉时,master 会重新拉起新的 pod。
191
+
192
+ 其次,ElasticDL 使用 Go 实现了 parameter
193
+ server,具有良好的吞吐能力和可扩展性。
194
+ 在搜索推荐广告等场景中,神经网络模型通常包含较大的 embedding
195
+ table。在一些情况下,embedding table 的大小会超过单机内存。
196
+ 为此,我们针对 embedding table
247
197
做了一些额外的优化。
248
198
249
- - embedding vector 惰性初始化 ,用户无需提前指定 embedding table 的大小
250
- - 把一个 embedding table 拆分到多个 PS 上存储与更新,均衡存储与通信的负载
251
- - worker 从 PS 请求参数时,先滤除重复 ID ,只请求不同的参数 ,减少通信量
252
- - worker 向 PS 发送梯度时,本地先把相同 ID 的梯度进行合并,减少通信量
199
+ - embedding vector 在 PS 上惰性初始化 ,用户无需提前指定 embedding table 的大小
200
+ - 把一个 embedding table 拆分到多个 PS 上,均衡存储与通信负载
201
+ - worker 从 PS 请求参数时,先滤除重复 ID,只取回不同 ID 的参数 ,减少通信量
202
+ - worker 向 PS 发送梯度时,先把相同 ID 的梯度进行合并,减少通信量
253
203
254
204
通过上述设计与实现,ElasticDL 可以很高效的完成搜索推荐广告模型的训练。
255
205
256
206
ElasticDL 自去年9月份开源以来,我们对 Parameter Server
257
207
持续迭代开发,不断提升性能。
258
208
我们以一个推荐中常用的 deepFM 模型来进行测试,测试中使用 frappe 数据集。
259
- 在每次实验中,我们启动一个 parameter server 进程和四个 worker 进程 ,训练10个
209
+ 在每次实验中,我们启动一个 parameter server 和四个 worker,训练10个
260
210
epoch。
261
211
262
212
| Parameter Server 实现 | 训练时间(秒) |
263
213
| --- | --- |
264
214
| By Redis (2019.9) | 1350 |
265
215
| By Go (2020.2) | 106 |
266
216
267
- 从上表中我们可以看出 Go Parameter Server 相比于之前实现有10倍以上的提升 。
217
+ 从上表中我们可以看出 Go Parameter Server 相比于之前的实现有10倍以上的提升 。
268
218
269
219
## 使用 ElasticDL 进行 Kaggle 实战
270
220
271
221
在本小节中,我们将使用 ElasticDL 进行一次 Kaggle 实战。
272
222
本例中使用的是 Kaggle 上 Display Advertising Challenge 中的 criteo
273
223
数据集,这是一个关于广告点击率预估的比赛。
274
- 我们将使用 xDeepFM 模型来进行建模,所有的实例代码都放在了 ElasticDL 的 [ model
224
+ 我们使用 xDeepFM 模型来进行建模,所有的实例代码都放在了 ElasticDL 的 [ model
275
225
zoo] ( https://github.com/sql-machine-learning/elasticdl/tree/develop/model_zoo/dac_ctr ) 中。
276
226
277
227
### 数据预处理
@@ -333,8 +283,8 @@ COPY model_zoo /model_zoo
333
283
我们需要把该镜像推送到 GKE 集群能够访问到的仓库中,比如说 docker hub 的仓库中。
334
284
335
285
最后,我们通过 ElasticDL client 工具向 GKE 集群提交训练作业。
336
- 我们使用 ParameterServer 分布式策略进行训练,有 2 个 parameter serve pods 和
337
- 5个 worker pods共同参与训练 。
286
+ 我们使用 ParameterServer 分布式策略进行训练,本次作业中,我们启动了2个 parameter serve pods 和
287
+ 5个 worker pods 共同参与训练 。
338
288
339
289
``` bash
340
290
elasticdl train \
0 commit comments