|
| 1 | +# ElasticDL: 像写单机程序一样写分布式深度学习程序 |
| 2 | + |
| 3 | +## 分布式深度学习程序难写 |
| 4 | + |
| 5 | +一个深度学习的训练任务往往需要较多的训练数据,和较长的训练时间。一个通常的做法是把单机程序给分布式化,利用集群的资源,启动多个 worker,来共同完成一个训练任务。 |
| 6 | + |
| 7 | +分布式深度学习程序的编写是相对困难的,编程者既要了解深度学习,也要了解分布式系统开发。 |
| 8 | +在一个分布式深度学习系统中,需要启动和监控若干个 workers 进程,对数据和计算任务进行拆分,并且分发给 workers。 |
| 9 | +此外,还需要考虑 workers 之间的通信(communication)和 同步(synchronization)。 |
| 10 | +随着计算规模的增加,workers 进程数目也会增加。当计算规模很大时,包含数十个进程的作业在执行过程中一个进程都不挂的概率几乎是0。 |
| 11 | +如果一个进程挂掉,则整个作业重启,那么这个作业会陷入永不停歇的重启,无法结束。 |
| 12 | +此时,需要结合深度学习训练算法的数学性质,设计容错机制。 |
| 13 | +这要求编程者必须同时是深度学习和分布式系统的专家。 |
| 14 | + |
| 15 | +我们为此设计和开发了 ElasticDL 分布式计算框架,让编程者只需了解深度学习,不需要了解分布式系统开发。 |
| 16 | + |
| 17 | +就像 MapReduce 框架中只需要用户完形填空两个函数:map 和 reduce,ElasticDL 只需要用户填写 forward、cost、feed 三个函数。 |
| 18 | +其中 forward 定义深度学习的前向计算过程, |
| 19 | +ElasticDL 会调用 TensorFlow eager mode 中提供的 Gradient Tape 接口, |
| 20 | +来自动推导对应的后向计算过程(backward pass); |
| 21 | +cost 指定模型训练时使用的 cost 函数; |
| 22 | +feed 用来定制化训练数据到 TensorFlow 的 tensor的转换过程。 |
| 23 | + |
| 24 | +所有的这些函数的编程只需要了解 TensorFlow API,不需要对分布式训练有任何背景知识。 |
| 25 | +这些函数也可以在单机上用小数据做调试验证,然后就可以放心地交给 ElasticDL 做分布式的容错的大规模训练了。 |
| 26 | + |
| 27 | +ElasticDL 要求分布式计算平台是 Kubernetes。 |
| 28 | +一方面 Kubernetes 是目前最先进的分布式操作系统,是公有云和私有云的事实工业标准; |
| 29 | +另一方面,ElasticDL 一改 Kubeflow 通过增加 Kubernetes operator 的方式定制 Kubernetes 的思路, |
| 30 | +为每个作业引入一个 master 进程(类似 Google MapReduce)。 |
| 31 | +这个 master 进程作为作业的一部分,而不是 Kubernetes 的一部分, |
| 32 | +不仅了解集群情况,更了解深度学习作业本身,所以有充分的信息来做更优的调度。 |
| 33 | +比如 master 进程可以请 Kubernetes 把两个 workers 启动在同一台物理机上,共用一个 GPU。 |
| 34 | +这样,一个进程读数据的时候,请另外一个进程来做计算,从而让 GPU 的利用率总是很高。 |
| 35 | + |
| 36 | +TensorFlow 是当今最受欢迎的深度学习框架。在蚂蚁金服内部,TensorFlow 在诸多业务场景中被广泛使用。 |
| 37 | +我们发现 AllReduce 和 Parameter Server 是分布式训练程序中常用两种梯度聚合策略。 |
| 38 | +在图像语音模型中,AllReduce 策略被广泛的使用。 |
| 39 | +在搜索广告推荐模型中,我们更倾向于使用 Parameter Server 策略。 |
| 40 | + |
| 41 | +我们调研了目前在 Kubernetes 上运行 TensorFlow 分布式训练程序的一些开源解决方案。 |
| 42 | + |
| 43 | +| 分布式策略 | 模型定义 | 任务提交工具 | |
| 44 | +| --- | --- | --- | |
| 45 | +| ParameterServer | TensorFlow Estimator | Kubeflow TF-operator | |
| 46 | +| AllReduce | Keras + Horovod | Kubeflow MPI-operator | |
| 47 | + |
| 48 | +我们发现 TensorFlow Estimator 仅支持 graph execution,不支持 eager execution,调试代码和网络各层输出比较麻烦。并且,用户需要组合使用不同的工具,来编写不同分布式策略的训练程序。 |
| 49 | + |
| 50 | +TensorFlow 2.x 默认支持 eager execution,并且推荐使用更加精简的 Keras API 来定义模型。 |
| 51 | +TensorFlow Keras API 提高开发效率,降低使用门槛,与 eager execution 配合之后,使得程序更为直观,也更易调试。 |
| 52 | +目前 TensorFlow 2.x 的 ParameterServer 和 AllReduce 分布式策略对 Keras API 的支持还不完善。 |
| 53 | + |
| 54 | +而 ElasticDL 从易用性的角度出发,直接支持了 TensorFlow 2.x 的 Keras API。 |
| 55 | +ElasticDL 同时提供统一的 ElasticDL client 命令行工具来提交作业。 |
| 56 | + |
| 57 | +| 分布式策略 | 模型定义 | 任务提交工具 | |
| 58 | +| --- | --- | --- | |
| 59 | +| ParameterServer | TensorFlow Keras API | ElasticDL client | |
| 60 | +| AllReduce | TensorFlow Keras API | ElasticDL client | |
| 61 | + |
| 62 | +统一的模型定义接口和统一的任务提交工具,极大地减少了用户的心智负担,提高了工作效率。 |
| 63 | + |
| 64 | +## ElasticDL 是如何解决问题的 |
| 65 | + |
| 66 | +在 ElasticDL 中,用户专注于使用 TensorFlow Keras API 描述单机程序,而不需要关心分布式程序的写法。ElasticDL 会自动把单机程序转为分布式训练程序。下面我们用一个mnist的训练例子来详细说明。 |
| 67 | + |
| 68 | +### 使用 Keras API 定义模型 |
| 69 | + |
| 70 | +用户使用 Keras API 定义模型结构: |
| 71 | + |
| 72 | +```python |
| 73 | +def custom_model(): |
| 74 | + inputs = tf.keras.Input(shape=(28, 28), name="image") |
| 75 | + x = tf.keras.layers.Reshape((28, 28, 1))(inputs) |
| 76 | + x = tf.keras.layers.Conv2D(32, kernel_size=(3, 3), activation="relu")(x) |
| 77 | + x = tf.keras.layers.Conv2D(64, kernel_size=(3, 3), activation="relu")(x) |
| 78 | + x = tf.keras.layers.BatchNormalization()(x) |
| 79 | + x = tf.keras.layers.MaxPooling2D(pool_size=(2, 2))(x) |
| 80 | + x = tf.keras.layers.Dropout(0.25)(x) |
| 81 | + x = tf.keras.layers.Flatten()(x) |
| 82 | + outputs = tf.keras.layers.Dense(10)(x) |
| 83 | + |
| 84 | + return tf.keras.Model(inputs=inputs, outputs=outputs, name="mnist_model") |
| 85 | +``` |
| 86 | + |
| 87 | +除了模型定义之外,用户还需要指定 dataset, loss,optimizer 以及 evaluation函数 |
| 88 | + |
| 89 | +```python |
| 90 | +def loss(labels, predictions): |
| 91 | + labels = tf.reshape(labels, [-1]) |
| 92 | + return tf.reduce_mean( |
| 93 | + input_tensor=tf.nn.sparse_softmax_cross_entropy_with_logits( |
| 94 | + logits=predictions, labels=labels |
| 95 | + ) |
| 96 | + ) |
| 97 | + |
| 98 | +def optimizer(lr=0.1): |
| 99 | + return tf.optimizers.SGD(lr) |
| 100 | + |
| 101 | +def eval_metrics_fn(): |
| 102 | + return { |
| 103 | + "accuracy": lambda labels, predictions: tf.equal( |
| 104 | + tf.argmax(predictions, 1, output_type=tf.int32), |
| 105 | + tf.cast(tf.reshape(labels, [-1]), tf.int32), |
| 106 | + ) |
| 107 | + } |
| 108 | + |
| 109 | +def dataset_fn(dataset, mode, _): |
| 110 | + def _parse_data(record): |
| 111 | + if mode == Mode.PREDICTION: |
| 112 | + feature_description = { |
| 113 | + "image": tf.io.FixedLenFeature([28, 28], tf.float32) |
| 114 | + } |
| 115 | + else: |
| 116 | + feature_description = { |
| 117 | + "image": tf.io.FixedLenFeature([28, 28], tf.float32), |
| 118 | + "label": tf.io.FixedLenFeature([1], tf.int64), |
| 119 | + } |
| 120 | + r = tf.io.parse_single_example(record, feature_description) |
| 121 | + features = { |
| 122 | + "image": tf.math.divide(tf.cast(r["image"], tf.float32), 255.0) |
| 123 | + } |
| 124 | + if mode == Mode.PREDICTION: |
| 125 | + return features |
| 126 | + else: |
| 127 | + return features, tf.cast(r["label"], tf.int32) |
| 128 | + |
| 129 | + dataset = dataset.map(_parse_data) |
| 130 | + |
| 131 | + if mode == Mode.TRAINING: |
| 132 | + dataset = dataset.shuffle(buffer_size=1024) |
| 133 | + return dataset |
| 134 | +``` |
| 135 | + |
| 136 | +在 TensorFlow 2.x 中,上述定义的每个接口都可以单独测试,我们可以很方便的在本地调试模型定义。 |
| 137 | + |
| 138 | +同时,TensorFlow 2.x 默认支持 eager execution,ElasticDL worker 可以直接调用模型定义,进行前向计算。 |
| 139 | +在反向计算中,worker 可以通过 TensorFlow 2.x 暴露的 Gradient Tape接口 来计算得到梯度。 |
| 140 | + |
| 141 | +### ElasticDL master 提供 training loop |
| 142 | + |
| 143 | +我们通常使用 mini-batch SGD 的方法来训练深度学习模型。ElasticDL worker 会进行如下步骤来完成对一个 mini-batch 的训练: |
| 144 | + |
| 145 | +1. 读取一个 mini-batch 的训练数据 |
| 146 | +2. 进行 forward 计算 |
| 147 | +3. 进行 backward 计算,得到梯度 |
| 148 | +4. 把梯度以某种方式进行聚合,并更新模型 |
| 149 | + |
| 150 | +我们需要一个更大的 training loop 来包含这四个步骤,确保 worker 可以持续的读取下一个 mini-batch 的数据,继续训练,直到满足终止条件。 |
| 151 | + |
| 152 | +ElasticDL master 中实现了这样的training loop,其关键点是通过动态数据分发,解决分布式训练中的数据读取问题。 |
| 153 | + |
| 154 | +首先 master 会根据数据索引将数据分片,然后为每个分片的索引创建一个 task。 |
| 155 | +ElasticDL worker 会向 master 请求拿到 task。拿到 task 之后,worker 可以数据的索引找到对应的数据分片。 |
| 156 | + |
| 157 | +ElasticDL master 中还为这些 task 维护了三个队列,todo/doing/done 队列。 |
| 158 | +任务开始时,master 会将所有 task 放入 todo 队列。每分发一个 task 给 worker, |
| 159 | +都会把这个 task 从 todo 队列挪到 doing 队列。 |
| 160 | +如果一个 worker 被抢占或者因为其他原因失败,master 可以通过监控 doing 队列 task 的 timeout, |
| 161 | +把这个 task 挪回到 todo 队列中。 |
| 162 | +如果 worker 顺利完成一个 task,master 则会收到通知,把这个 task 从 doing 队列挪到 done 队列。 |
| 163 | + |
| 164 | +由于ElasticDL master 负责把数据索引分发给所有的 worker,所以我们只需要给 master 配置数据源即可。 |
| 165 | +目前 ElasticDL 支持 [RecordIO](https://github.com/wangkuiyi/recordio) 文件和 |
| 166 | + [MaxCompute](https://www.alibabacloud.com/zh/product/maxcompute) 表两种数据源。 |
| 167 | +用户只需配置训练数据集的 RecordIO 文件路径或者 MaxCompute 表名。 |
| 168 | + |
| 169 | +同时使用动态数据分发机制之后,worker 数目也可以动态变化。 |
| 170 | +新加入的 worker 可以直接向 master 请求分配数据分片,从而更方便地支持弹性调度 worker 的数量。 |
| 171 | + |
| 172 | +### ElasticDL client 命令行工具提交作业 |
| 173 | + |
| 174 | +在本地完成对模型的调试之后,我们可以借助 ElasticDL client 提供的命令行工具向 Kubernetes 集群提交分布式训练作业。我们只需要指定模型定义文件和一些额外的参数,包括资源配置等。 |
| 175 | + |
| 176 | +```bash |
| 177 | +elasticdl train \ |
| 178 | + --image_name=elasticdl:ci \ |
| 179 | + --model_zoo=model_zoo \ |
| 180 | + --model_def=mnist_functional_api.mnist_functional_api.custom_model \ |
| 181 | + --training_data=/data/mnist/train \ |
| 182 | + --num_epochs=1 \ |
| 183 | + --master_resource_request="cpu=1,memory=4096Mi,ephemeral-storage=1024Mi" \ |
| 184 | + --worker_resource_request="cpu=1,memory=4096Mi,ephemeral-storage=1024Mi" \ |
| 185 | + --ps_resource_request="cpu=1,memory=4096Mi,ephemeral-storage=1024Mi" \ |
| 186 | + --minibatch_size=64 \ |
| 187 | + --num_ps_pods=1 \ |
| 188 | + --num_workers=2 \ |
| 189 | + --job_name=test-train \ |
| 190 | + --distribution_strategy=ParameterServerStrategy \ |
| 191 | + --output=model_output |
| 192 | + ``` |
| 193 | + |
| 194 | + 在上述例子中,我们指定了 Parameter Server 的分布式策略,由一个parameter server 和 两个 worker 共同完成训练任务。 |
| 195 | + ElasticDL 的 master pod 将会被首先创建,然后由 master 负责启动 worker pod,以及 parameter server pod,并且建立通信。 |
| 196 | + ElasticDL master 可以监控每个 pod 的状态,当有 pod 挂掉时,master 会重新拉起新的 pod。 |
| 197 | + |
| 198 | +## Parameter Server 的改进 |
| 199 | + |
| 200 | +在搜索广告等场景,模型中可能包含较大的 embedding table,其内存会超过单机内存。我们通常使用 Parameter Server (PS) 分布式策略来训练此类模型。 |
| 201 | +在 PS 策略下,PS 上存储着模型参数,worker 从 PS 上请求参数。 |
| 202 | +worker 在本地使用训练数据计算梯度之后,把梯度再发送到 PS 上,PS 使用 worker 传来的梯度来迭代更新模型参数。 |
| 203 | + |
| 204 | +ElasticDL 用 Go 实现了 Parameter Server,具有良好的吞吐能力和可扩展性。并且,我们针对 embedding table 做了一些额外的优化。 |
| 205 | + |
| 206 | +- embedding vector 惰性初始化,用户无需提前指定 embedding table 的大小 |
| 207 | +- 把一个 embedding table 拆分到多个 PS 上存储与更新,均衡存储与通信的负载 |
| 208 | +- worker 从 PS 请求参数时,先滤除重复 ID ,只请求不同的参数,减少通信量 |
| 209 | +- worker 向 PS 发送梯度时,本地先把相同 ID 的梯度进行合并,减少通信量 |
| 210 | + |
| 211 | +通过上述设计与实现,ElasticDL 可以很高效的完成搜索推荐广告模型的训练。 |
| 212 | + |
| 213 | +ElasticDL 自去年9月份开源以来,我们对 Parameter Server 持续迭代开发,不断提升性能。 |
| 214 | +我们以一个推荐中常用的 deepFM 模型来进行测试,测试中使用 frappe 数据集。 |
| 215 | +在每次实验中,我们启动一个 parameter server 进程和四个 worker 进程,训练10个 epoch。 |
| 216 | + |
| 217 | +| Parameter Server 实现 | 训练时间(秒) | |
| 218 | +| --- | --- | |
| 219 | +| By Redis (2019.9) | 1350 | |
| 220 | +| By Go (2020.2) | 106 | |
| 221 | + |
| 222 | +从上表中我们可以看出 Go Parameter Server 相比于之前实现有10倍以上的提升。 |
| 223 | + |
| 224 | +## 使用 ElasticDL 进行 Kaggle 实战 |
| 225 | + |
| 226 | +在本小节中,我们将使用 ElasticDL 进行一次 Kaggle 实战。 |
| 227 | +本例中使用的是 Kaggle 上 Display Advertising Challenge 中的 criteo 数据集,这是一个关于广告点击率预估的比赛。 |
| 228 | +我们将使用 xDeepFM 模型来进行建模,所有的实例代码都放在了 ElasticDL 的 [model zoo](https://github.com/sql-machine-learning/elasticdl/tree/develop/model_zoo/dac_ctr)中。 |
| 229 | + |
| 230 | +### 数据预处理 |
| 231 | + |
| 232 | +1. 我们首先从官方 |
| 233 | +[链接](https://labs.criteo.com/2014/02/download-kaggle-display-advertising-challenge-dataset) |
| 234 | +下载 criteo 数据集。 |
| 235 | + |
| 236 | +1. 然后我们需要把原始数据转换为 RecordIO 文件格式。 |
| 237 | +我们提供了如下的转换脚本: |
| 238 | + |
| 239 | +```bash |
| 240 | +python convert_to_recordio.py \ |
| 241 | + --records_per_shard 400000 \ |
| 242 | + --output_dir ./dac_records \ |
| 243 | + --data_path train.txt |
| 244 | +``` |
| 245 | + |
| 246 | +原始数据会被按照 19:1 的比例,拆分为训练集和验证集, |
| 247 | +转换后的数据放在 dac_records 目录中。 |
| 248 | + |
| 249 | +1. 对原始数据进行特征统计。对于连续的特征,我们统计得出均值和方差; |
| 250 | +对于离散的特征,我们得出特征值个数。我们把统计后的数据放在一个文件中,供后续使用。 |
| 251 | + |
| 252 | +### 模型定义 |
| 253 | + |
| 254 | +xDeepFM 模型由三部分组成,分别是 linear logits,dnn logits 和 xfm logits。 |
| 255 | +借助 Keras API,我们可以很清晰的描述模型结构。 |
| 256 | +这里贴出 dnn logits 部分的描述代码,完整的模型定义可以参见 model zoo。 |
| 257 | + |
| 258 | +```python |
| 259 | +deep_embeddings = lookup_embedding_func( |
| 260 | + id_tensors, max_ids, embedding_dim=deep_embedding_dim, |
| 261 | +) |
| 262 | +dnn_input = tf.reshape( |
| 263 | + deep_embeddings, shape=(-1, len(deep_embeddings) * deep_embedding_dim) |
| 264 | +) |
| 265 | +if dense_tensor is not None: |
| 266 | + dnn_input = tf.keras.layers.Concatenate()([dense_tensor, dnn_input]) |
| 267 | + |
| 268 | +dnn_output = DNN(hidden_units=[16, 4], activation="relu")(dnn_input) |
| 269 | + |
| 270 | +dnn_logit = tf.keras.layers.Dense(1, use_bias=False, activation=None)( |
| 271 | + dnn_output |
| 272 | +) |
| 273 | +``` |
| 274 | + |
| 275 | +### 提交训练任务 |
| 276 | + |
| 277 | +我们首先在 Google Cloud 上创建一个 GKE 集群,并且把转换好的 RecordIO 训练数据上传到集群上。 |
| 278 | +详细的过程可以参考 ElasticDL 的 [gcloud教程](https://github.com/sql-machine-learning/elasticdl/blob/develop/docs/tutorials/elasticdl_cloud.md)。 |
| 279 | + |
| 280 | +然后,我们在本地制作一个镜像,该镜像包含了 xDeepFM 模型定义,以及相关依赖包。 |
| 281 | + |
| 282 | +```bash |
| 283 | +FROM tensorflow |
| 284 | +RUN pip install elasticdl |
| 285 | +COPY model_zoo /model_zoo |
| 286 | +``` |
| 287 | + |
| 288 | +我们需要把该镜像推送到 GKE 集群能够访问到的仓库中,比如说 docker hub 的仓库中。 |
| 289 | + |
| 290 | +最后,我们通过 ElasticDL client 工具向 GKE 集群提交训练作业。 |
| 291 | +我们使用 ParameterServer 分布式策略进行训练,有 2 个 parameter serve pods 和 5个 worker pods共同参与训练。 |
| 292 | + |
| 293 | +```bash |
| 294 | +elasticdl train \ |
| 295 | + --image_name=${your_docker_hub_repo}/elasticdl:ci \ |
| 296 | + --model_zoo=model_zoo \ |
| 297 | + --model_def=dac_ctr.elasticdl_train.custom_model \ |
| 298 | + --volume="mount_path=/data,claim_name=fileserver-claim" \ |
| 299 | + --minibatch_size=512 \ |
| 300 | + --num_minibatches_per_task=50 \ |
| 301 | + --num_epochs=20 \ |
| 302 | + --num_workers=5 \ |
| 303 | + --num_ps_pods=2 \ |
| 304 | + --use_async=True \ |
| 305 | + --use_go_ps=True \ |
| 306 | + --training_data=/data/dac_records/train \ |
| 307 | + --validation_data=/data/dac_records/val \ |
| 308 | + --master_resource_request="cpu=1,memory=1024Mi,ephemeral-storage=1024Mi" \ |
| 309 | + --worker_resource_request="cpu=4,memory=2048Mi,ephemeral-storage=1024Mi" \ |
| 310 | + --ps_resource_request="cpu=8,memory=6000Mi,ephemeral-storage=1024Mi" \ |
| 311 | + --evaluation_steps=10000 \ |
| 312 | + --job_name=test-edl \ |
| 313 | + --log_level=INFO \ |
| 314 | + --image_pull_policy=Always \ |
| 315 | + --distribution_strategy=ParameterServerStrategy |
| 316 | +``` |
| 317 | + |
| 318 | +约迭代8万个 step 后模型收敛,AUC 可以达到 0.8002 左右。 |
0 commit comments