|
| 1 | +# ElasticDL:基于 TensorFlow 的 Kubernetes-native 弹性分布式训练系统 |
| 2 | + |
| 3 | +TensorFlow 目前在业界得到了广泛的使用。在实际生产中, |
| 4 | +由于参数规模和训练数据非常大,很多 TensorFlow 任务需要运行在分布式集群上。 |
| 5 | +为了提升集群的运维管理效率,工程师们会共用一个集群, |
| 6 | +并使用集群管理系统调度分布式作业。 |
| 7 | +Kubernetes 是目前最先进的分布式操作系统,是公有云和私有云的事实工业标准。 |
| 8 | +ElasticDL 通过实现弹性调度,来提升 Kubernetes 集群上 TensorFlow |
| 9 | +作业的研发效率和集群利用率。 |
| 10 | + |
| 11 | +## Kubernetes 上运行分布式 TensorFlow 作业 |
| 12 | + |
| 13 | +目前,Kubernetes 集群上运行分布式 TensorFlow 作业主要依赖 Kubeflow 项目 |
| 14 | +提供的 Kubernetes controller —— [kubeflow/tf-operator](https://github.com/kubeflow/tf-operator) |
| 15 | +—— 这是因为 TensorFlow 不是一个 Kubernetes-native 的编程框架,自己不会调用 |
| 16 | +Kubernetes API 启动进程。类似的,很多诞生在 Kubernetes 之前的分布式编程框架 |
| 17 | +(比如 Spark)都需要一个 Kubernetes controller 来启动作业。 |
| 18 | +这类应用或者框架特定的 controller 被 CoreOS 公司的工程师们称为 |
| 19 | +Kubernetes Operator,后来演化成了行业术语。 |
| 20 | + |
| 21 | +在向 Kubernetes 集群提交作业时,kubeflow/tf-operator |
| 22 | +会询问 Kubernetes 计划分配哪几台机器来运行一个分布式作业中的各个进程, |
| 23 | +随后告知每个进程所有其它进程的 IP 地址和 port, |
| 24 | +从而保证一个作业里各个进程之间互相知道对方。 |
| 25 | + |
| 26 | +为什么需要让所有进程互相知道对方呢?这是 TensorFlow 1.x ps-based distribution |
| 27 | +方式要求的。TenosrFlow 1.x 原生的分布式训练功能让一个作业中所有进程都执行 |
| 28 | +TensorFlow 1.x runtime 程序。这些进程互相通信, |
| 29 | +互相协调成为一个“分布式 runtime”来解释执行表示深度学习计算过程的 graph。 |
| 30 | +在开始分布式训练之初,graph 被 TensorFlow runtime 拆解成若干子 graph; |
| 31 | +每个进程负责执行一个子 graph —— 任何一个进程被抢占(preempted) |
| 32 | +或者失败(fail),则整个大 graph 的执行就失败了。最近随着 TensorFlow runtime |
| 33 | +的改进,作业可以依赖剩下的进程继续执行, |
| 34 | +不过也不会因为随后集群里出现空闲资源而增加进程的数量。 |
| 35 | +所以使用 kubeflow/tf-operator 提交 TensorFlow |
| 36 | +分布式作业并不支持弹性调度。 |
| 37 | + |
| 38 | +Kubeflow 可以在 Kubernetes 上发挥 TensorFlow 原生的分布式计算能力, |
| 39 | +但是因为后者并不能弹性调度运行资源,所以 Kubeflow 并不能无中生有。 |
| 40 | +使用 kubeflow/tf-operator 执行分布式 TensorFlow 作业, |
| 41 | +模型迭代必须等待申请的进程全部启动后才能开始。 |
| 42 | +如果集群资源不足以启动所有进程,则当前作业只能等待其他作业释放资源。 |
| 43 | +为了缩短资源等待时间,可以给作业配置专有资源池。由于资源不共享, |
| 44 | +集群资源利用率会很低。所以 kubeflow/tf-operator |
| 45 | +很难同时兼顾研发效率和集群利用率。 |
| 46 | + |
| 47 | +ElasticDL 利用 TensorFlow eager execution 和 Kubernetes API, |
| 48 | +只要部分进程启动就可以开始模型迭代,无需等待所有进程全部启动。 |
| 49 | +当集群中有其他作业释放资源时,可以启动新的进程加入到训练作业中, |
| 50 | +加速模型迭代。这样既能缩短用户作业等待时间,也能提升集群资源利用率。 |
| 51 | + |
| 52 | +## 基于 TensorFlow Eager Execution 实现分布式训练 |
| 53 | + |
| 54 | +目前基于 TensorFlow 的分布式训练系统大致可以分为以下四类: |
| 55 | + |
| 56 | +| | TensorFlow 1.x graph mode| TensorFlow 2.x eager execution | |
| 57 | +| ---- | ------------------------ | -------------------------------- | |
| 58 | +| in TensorFlow runtime | TensorFlow's parameter server | TensorFlow distributed strategy| |
| 59 | +| above TensorFlow API | Uber Horovod | ElasticDL | |
| 60 | + |
| 61 | +如上文解释,我们没法通过修改 TensorFlow runtime |
| 62 | +实现完备的主动的容错和弹性调度。 |
| 63 | +ElasticDL 和 Uber Horovod 都是在 TensorFlow API 基础上构建。 |
| 64 | +ElasticDL 位于田字格的右下角,是为了利用 Kubernetes 来实现容错和弹性调度。 |
| 65 | + |
| 66 | +Horovod 基于 TensorFlow 1.x。 |
| 67 | +一个 Horovod 作业的每个进程调用单机版 TensorFlow 做本地计算, |
| 68 | +然后收集 gradients,并且通过 AllReduce 调用汇聚 gradients 并且更新模型。 |
| 69 | +Horovod 也是平台无关的,所以它提供的 AllReduce 操作不支持容错和弹性调度。 |
| 70 | +这一点和 ElasticDL 不一样。和 ElasticDL 一样的是, |
| 71 | +Horovod 需要从 TensorFlow 获取 gradient。 |
| 72 | +在 TensorFlow 1.x 中,深度学习计算是表示成一个计算图(graph), |
| 73 | +并且由 TensorFlow runtime 解释执行。 |
| 74 | +所以 Horovod 为了获得每个进程算的 gradients 并且执行 AllReduce 操作, |
| 75 | +就得 hack 进入图执行的过程。为此, |
| 76 | +Horovod 要求用户使用特定的 optimizer 代替 TensorFlow 提供的 optimizer, |
| 77 | +从而可以在模型训练迭代阶段透露出 gradients。 |
| 78 | + |
| 79 | +一个调用 Horovod 的用户程序的结构如下。 |
| 80 | +其中标记为 (*) 和 (**) 的部 分是 Horovod 要求用户写的, |
| 81 | +帮助 Horovod 截获 TensorFlow 计算得到的 gradients 的代码。 |
| 82 | +如果用户不慎忘记写了,那么程序执行结果就不对了。 |
| 83 | + |
| 84 | +```python |
| 85 | +hvd.init() |
| 86 | +config = tf.ConfigProto() |
| 87 | +config.gpu_options.visible_device_list = str(hvd.local_rank()) |
| 88 | + |
| 89 | +loss = ... # Build model... |
| 90 | +opt = tf.train.AdagradOptimizer(0.01 * hvd.size()) |
| 91 | +opt = hvd.DistributedOptimizer(opt) # (*) |
| 92 | +train_op = opt.minimize(loss) |
| 93 | + |
| 94 | +hooks = [hvd.BroadcastGlobalVariablesHook(0)] # (**) |
| 95 | +with tf.train.MonitoredTrainingSession(checkpoint_dir,config, hooks) as s: |
| 96 | + while not s.should_stop(): |
| 97 | + s.run(train_op) |
| 98 | +``` |
| 99 | + |
| 100 | +ElasticDL 没有这些问题,因为它依赖的是 TensorFlow 2.x eager execution。 |
| 101 | +TensorFlow 2.x 主推的 eager execution |
| 102 | +采用和解释执行图完全不同的深度学习计算方式。 |
| 103 | +前向计算过程把对基本计算单元(operator)的调用记录在一个内存数据结构 tape 里, |
| 104 | +随后反向计算过程(计算 gradients)可以回溯这个 tape, |
| 105 | +以此调用 operator 对应的 gradient operator。 |
| 106 | +我们可以调用 `tape.gradient` 方法来获取每个模型参数的gradient。 |
| 107 | + |
| 108 | +ElasticDL 通过调用 TensorFlow 2.x API 可以很直接地获取 gradients: |
| 109 | + |
| 110 | +```python |
| 111 | +with tf.GradientTape() as tape: |
| 112 | + outputs = self._model.call(features, training=True) |
| 113 | + loss = self._loss(outputs, labels) |
| 114 | + # Add regularization loss if any |
| 115 | + if self._model.losses: |
| 116 | + loss += tf.math.add_n(self._model.losses) |
| 117 | +grads = tape.gradient(loss, self.get_trainable_items()) |
| 118 | +``` |
| 119 | + |
| 120 | +ElasticDL通过 tape 获取 gradient 后, |
| 121 | +可以通过 Parameter Server 或者 AllReduce 分布式策略来更新模型参数。 |
| 122 | + |
| 123 | +## Kubernetes-native 的弹性调度 |
| 124 | + |
| 125 | +ElasticDL 通过实现一个 Kubernetes-native 的框架,调用 TensorFlow 2.x |
| 126 | +来实现弹性深度学习训练。所谓 Kubernetes-native 指的是一个程序调用 |
| 127 | +Kubernetes API 来起止进程。ElasticDL 没有选择开发 Kubernetes Operator, |
| 128 | +是因为 Operator 只能管理作业集群状态。像上面所说的, |
| 129 | +如果训练框架自身不支持容错和动态增加进程数,Operator 也无能为力。 |
| 130 | +所以 ElasticDL 通过在 Kubernetes 上创建 |
| 131 | +master 进程来控制深度学习训练作业的弹性调度。 |
| 132 | + |
| 133 | +ElasticDL 的 master 会根据数据索引将数据分片,然后为每个数据分片创建一个 task。 |
| 134 | +然后 master 会调用 Kubernetes API 启动多个 worker 进程。每个 worker 启动后, |
| 135 | +会向 master 请求 task。worker 收到来自 master 分发的 task 后, |
| 136 | +会读取 task 对应的数据分片来前向计算和梯度计算。 |
| 137 | + |
| 138 | +同时,master 会通过 Kubernetes API 观察集群中每个worker的状态。 |
| 139 | +当有 worker 被高优先级作业抢占后,master 会回收该 worker 的未完成task, |
| 140 | +然后重新分发给其他的 worker。同时 master 会尝试通过 Kubernetes API |
| 141 | +重新拉起被抢占的 worker。等到资源充足时,worker 进程会被重新启动, |
| 142 | +并加入训练作业。 |
| 143 | + |
| 144 | +Kubernetes-native 架构使得 master 进程有机会与 Kubernetes |
| 145 | +协作实现容错和弹性调度。不过,因为 ElasticDL 调用 Kubernetes API, |
| 146 | +也就意味着 ElasticDL 只能运行在 Kubernetes 上。 |
| 147 | + |
| 148 | +理论上,不调用 Kubernetes API 也是可以实现一定程度的容错的。 |
| 149 | +即使没有 Kubernetes 的通知,master 可以通过检查其他进程的心跳(heartbeat) |
| 150 | +或者检查 TCP 连接状态,判断其他进程的生死存亡。 |
| 151 | +但是,不调用 Kubernetes API(或者其他调度系统的 API), |
| 152 | +master 无法通知调度系统重启进程,也无法得知新启动的进程的信息, |
| 153 | +并且帮助它加入作业。这种“非 Kubernetes-native”的容错方式颇为被动, |
| 154 | +只能接受资源紧张时一些进程被抢占而挂掉的事实, |
| 155 | +而不能在其他作业释放资源后增加进程充分利用空闲资源。 |
| 156 | + |
| 157 | +## 弹性调度 Benchmark |
| 158 | + |
| 159 | +为了说明 ElasticDL 弹性调度可以带来用户体验和集群利用率的双丰收,我们做了三个 |
| 160 | +实验来对比弹性调度和无弹性调度的性能。 |
| 161 | + |
| 162 | +### 实验一:多个深度学习训练作业同时在集群上启动 |
| 163 | + |
| 164 | +考虑两个深度学习训练作业需要的资源总和略超过集群的情况: |
| 165 | + |
| 166 | +- 如果没有弹性调度,第二个作业需要等待第一个作业完成后才能启动。 |
| 167 | +第二个作业的发起人需要等很久 —— 用户体验不好。 |
| 168 | +并且任何时刻只有一个作业在运行 —— 集群资源用不满。 |
| 169 | +- 如果有弹性调度,则两个作业并发执行,虽然后启动的作业拿不到期待的全部资源, |
| 170 | +但是也马上就开始执行了 —— 用户体验好。因为两个作业并发 —— 集群被用满。 |
| 171 | + |
| 172 | +我们做了一个实验来验证上述好处,这个实验可以在蚂蚁金服的 Kubernetes 集群(ASI) |
| 173 | +和开源 Kubernetes 集群上复现。 |
| 174 | + |
| 175 | + |
| 176 | + |
| 177 | +上图对应的实验里,我们用 kubeflow/tf-operator 提交了两个训练作业, |
| 178 | +每个作业都需要 175 个 CPU。而集群总 CPU 数是 320,不足以同时运行两个作业, |
| 179 | +所以依次运行它们。可以看到第一个作业在 650 秒时结束。随后集群花了一点时间调度, |
| 180 | +然后开始运行第二个作业,直到 1300 秒时结束。 |
| 181 | + |
| 182 | +下图对应的实验里,我们用 ElasticDL 来执行同样的两个训练作业。 |
| 183 | +第一个作业提交之后的 300 秒,我们提交了第二个作业。 |
| 184 | +第二个作业⻢上就开始运行,用满了集群剩下的资源,而不需要等到 第一个作业结束。 |
| 185 | +在 650 秒时,第一个作业结束。随后,在 1100 秒时,第二个作业也结束了。 |
| 186 | +因为弹性调度,使得两个作业尽量同时运行,所以总结束时间比也上图要早。 |
| 187 | + |
| 188 | +总结: |
| 189 | + |
| 190 | +- 用户等待作业启动时间几乎是 0。这对于深度学习很重要, |
| 191 | +因为用户最关注的是第一个迭代能否执行成功,如果失败了,能够快速发现用户程序的 bug。 |
| 192 | +- 集群利用率高。第二个弹性调度实验执行期间,有一段时间集群利用率是 100%; |
| 193 | +其他时间也不低于第一个无弹性调度实验。 |
| 194 | +- 作业完成更快。第二个试验里,两个作业用了约 1100 秒; |
| 195 | +第一个实验里需要约 1300 秒。 |
| 196 | + |
| 197 | +### 实验二:深度学习训练作业和在线服务混布 |
| 198 | + |
| 199 | +运行各种在线服务的生产集群,通常需要留出余量资源,以应付突然增⻓的用户请求量。 |
| 200 | +我们希望利用这些“余量”来做深度学习训练,从而提升集群利用率。 |
| 201 | + |
| 202 | +下面实验验证:通过用较低优先级运行 ElasticDL 训练作业,在用户请求增加的时候, |
| 203 | +Kubernetes 自动扩容在线服务(nginx);此时 ElasticDL 作业自动释放资源, |
| 204 | +配合在线服务的扩容。当流量高峰过去之后,Kubernetes 自动缩容 nginx 服务, |
| 205 | +此时,ElasticDL 自动利用释放的资源来扩容训练任务。 |
| 206 | +真实场景中,可以在 Kubernetes 上使用 [Horizontal Pod Autoscaler](https://kubernetes.io/docs/tasks/run-application/horizontal-pod-autoscale/) |
| 207 | +根据流量的大小来对在线服务进行伸缩。在本实验中,则是通过在特定的时间点, |
| 208 | +调用 `kubectl scale` 命令直接对在线服务进行伸缩,来去模拟流量的增加或者减少。 |
| 209 | +集群中总共有 320个CPU,训练任务是 deepFM 二分类模型训练,运行时长约40分钟。 |
| 210 | + |
| 211 | + |
| 212 | + |
| 213 | +图中紫色曲线是 nginx 服务使用的 CPU 数量,随用户请求数量变化。 |
| 214 | +绿色曲线是 ElasticDL 训练作业使用的 CPU 数量,随 nginx 的资源需求自动变化。 |
| 215 | +蓝色曲线是机群的总体资源利用率 —— 保持在 90% 以上。 |
| 216 | + |
| 217 | +### 实验三:训练时调整 Worker 数量不影响收敛性 |
| 218 | + |
| 219 | +有用户担心训练过程中 worker 的数量发生变化,会导致模型不收敛。 |
| 220 | +实际情况下并未发生这类问题。使用 [Kaggle Display Advertising Challenge](https://www.kaggle.com/c/criteo-display-ad-challenge) |
| 221 | +的数据集,其中训练样本 4 千万条,测试样本 600 万条。 |
| 222 | +用 ElasticDL 和用无弹性调度的 kubeflow/tf-operator 分别 |
| 223 | +训练 wide & deep 模型。模型中 deep 包含 2 层 layer,输出单元数分别为 |
| 224 | +8 和 4,激活函数才用 relu,模型收敛曲线如下: |
| 225 | + |
| 226 | + |
| 227 | + |
| 228 | +## ElasticDL 在蚂蚁金服花呗推荐场景的实践 |
| 229 | + |
| 230 | +蚂蚁金服部署了万级节点的 Kubernetes 集群, |
| 231 | +同时蚂蚁金服有许多场景使用深度学习来提升产品性能和用户体验 |
| 232 | +算法工程师们共用一个 Kubernetes集群来训深度学习模型。 |
| 233 | +ElasticDL 已经成功将蚂蚁花呗推荐场景的深度学习模型运行在 Kubernetes 集群上。 |
| 234 | + |
| 235 | +蚂蚁花呗是蚂蚁金服推出的一款消费信贷产品。用户在购买商品时, |
| 236 | +会给用户推荐花呗支付,并给予一定额度的优惠。 |
| 237 | +此推荐场景使用的是 Deep Interest Evolution Network [(DIEN)](https://arxiv.org/abs/1809.03672) |
| 238 | +来预估推荐的点击率。DIEN 模型的输入包括用户属性特征、 |
| 239 | +商品属性特征和用户行为的序列特征,其中用户和商品属性特征是高维稀疏特征。 |
| 240 | +所以模型首先需要通过 embedding 将稀疏特征转成 embedding vector, |
| 241 | +然后进行给深度学习模型进行计算。因为 embedding 的规模很大, |
| 242 | +ElasticDL 采用 Parameter Server(PS)策略来进行分布式训练, |
| 243 | +将参数分散到多个 PS 节点上,worker 负责前向计算并获取梯度, |
| 244 | +PS 负责梯度汇总和参数更新。 |
| 245 | + |
| 246 | +模型上线后,我们和使用 TensorFlow 原生分布式训练的模型进行了对比, |
| 247 | +线上点击率持平。同时在一个 Kubernetes namespace 下运行多个训练任务时, |
| 248 | +可以占满该 namespace 下的资源。 |
0 commit comments