|
| 1 | +# Copyright 2020 The ElasticDL Authors. All rights reserved. |
| 2 | +# Licensed under the Apache License, Version 2.0 (the "License"); |
| 3 | +# you may not use this file except in compliance with the License. |
| 4 | +# You may obtain a copy of the License at |
| 5 | +# |
| 6 | +# http://www.apache.org/licenses/LICENSE-2.0 |
| 7 | +# |
| 8 | +# Unless required by applicable law or agreed to in writing, software |
| 9 | +# distributed under the License is distributed on an "AS IS" BASIS, |
| 10 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 11 | +# See the License for the specific language governing permissions and |
| 12 | +# limitations under the License. |
| 13 | + |
| 14 | +""" |
| 15 | +Download the mnist dataset from |
| 16 | +https://s3.amazonaws.com/fast-ai-imageclas/mnist_png.tgz |
| 17 | +and then untar it into ${data_store_dir}. On minikube, we can use the |
| 18 | +following command to submit a training job with these codes. |
| 19 | +
|
| 20 | +elasticdl train \ |
| 21 | + --image_name=elasticdl:pt_mnist_allreduce \ |
| 22 | + --job_command="python -m model_zoo.mnist.mnist_tfv1_png \ |
| 23 | + --training_data=/local_data/mnist_png/training/" |
| 24 | + --num_minibatches_per_task=2 \ |
| 25 | + --num_workers=2 \ |
| 26 | + --worker_pod_priority=0.5 \ |
| 27 | + --master_resource_request="cpu=0.2,memory=1024Mi" \ |
| 28 | + --master_resource_limit="cpu=1,memory=2048Mi" \ |
| 29 | + --worker_resource_request="cpu=0.3,memory=1024Mi" \ |
| 30 | + --worker_resource_limit="cpu=1,memory=2048Mi" \ |
| 31 | + --envs="PYTHONUNBUFFERED=0,HOROVOD_ELASTIC=1" \ |
| 32 | + --job_name=test-mnist-allreduce \ |
| 33 | + --image_pull_policy=Never \ |
| 34 | + --volume="host_path=${data_store_dir},mount_path=/local_data" \ |
| 35 | + --distribution_strategy=AllreduceStrategy \ |
| 36 | +""" |
| 37 | + |
| 38 | +import argparse |
| 39 | +import os |
| 40 | + |
| 41 | +import cv2 |
| 42 | +import numpy as np |
| 43 | +import tensorflow as tf |
| 44 | + |
| 45 | +from elasticai_api.tensorflow.controller import create_elastic_controller |
| 46 | +from elasticai_api.tensorflow.optimizer import ( |
| 47 | + AdjustBackwardPassesPerStepHook, |
| 48 | + DistributedOptimizer, |
| 49 | +) |
| 50 | +from elasticdl.python.common.log_utils import default_logger as logger |
| 51 | + |
| 52 | +layers = tf.layers |
| 53 | + |
| 54 | + |
| 55 | +def get_samples_from_folder(folder_dir): |
| 56 | + category_index = 0 |
| 57 | + samples = [] |
| 58 | + for category_name in sorted(os.listdir(folder_dir)): |
| 59 | + category_dir = os.path.join(folder_dir, category_name) |
| 60 | + if not os.path.isdir(category_dir): |
| 61 | + continue |
| 62 | + for img_file in os.listdir(category_dir): |
| 63 | + img_dir = os.path.join(category_dir, img_file) |
| 64 | + if os.path.isfile(img_dir) and img_dir.endswith("jpg"): |
| 65 | + samples.append((img_dir, category_index)) |
| 66 | + category_index += 1 |
| 67 | + return samples |
| 68 | + |
| 69 | + |
| 70 | +def get_dataset_gen(data_shard_service, samples): |
| 71 | + def gen(): |
| 72 | + while True: |
| 73 | + # The data shard service will fetch the index of sample |
| 74 | + # from the master of ElasticDL job. |
| 75 | + index = data_shard_service.fetch_record_index() |
| 76 | + if not index: |
| 77 | + raise StopIteration("No data") |
| 78 | + image_path, label = samples[index] |
| 79 | + image = cv2.imread(image_path) |
| 80 | + image = np.array(image / 255.0, np.float32) |
| 81 | + yield image, np.array([label]) |
| 82 | + |
| 83 | + return gen |
| 84 | + |
| 85 | + |
| 86 | +def create_dataset(data_shard_service, samples): |
| 87 | + gen = get_dataset_gen(data_shard_service, samples) |
| 88 | + dataset = tf.data.Dataset.from_generator(gen, (tf.float32, tf.float32)) |
| 89 | + return dataset |
| 90 | + |
| 91 | + |
| 92 | +def conv_model(feature, target, mode): |
| 93 | + """2-layer convolution model.""" |
| 94 | + # Convert the target to a one-hot tensor of shape (batch_size, 10) and |
| 95 | + # with a on-value of 1 for each one-hot vector of length 10. |
| 96 | + target = tf.one_hot(tf.cast(target, tf.int32), 10, 1, 0) |
| 97 | + |
| 98 | + # Reshape feature to 4d tensor with 2nd and 3rd dimensions being |
| 99 | + # image width and height final dimension being the number of color |
| 100 | + # channels. |
| 101 | + feature = tf.reshape(feature, [-1, 28, 28, 3]) |
| 102 | + |
| 103 | + # First conv layer will compute 32 features for each 5x5 patch |
| 104 | + with tf.variable_scope("conv_layer1"): |
| 105 | + h_conv1 = layers.conv2d( |
| 106 | + feature, |
| 107 | + 32, |
| 108 | + kernel_size=[5, 5], |
| 109 | + activation=tf.nn.relu, |
| 110 | + padding="SAME", |
| 111 | + ) |
| 112 | + h_pool1 = tf.nn.max_pool( |
| 113 | + h_conv1, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding="SAME" |
| 114 | + ) |
| 115 | + |
| 116 | + # Second conv layer will compute 64 features for each 5x5 patch. |
| 117 | + with tf.variable_scope("conv_layer2"): |
| 118 | + h_conv2 = layers.conv2d( |
| 119 | + h_pool1, |
| 120 | + 64, |
| 121 | + kernel_size=[5, 5], |
| 122 | + activation=tf.nn.relu, |
| 123 | + padding="SAME", |
| 124 | + ) |
| 125 | + h_pool2 = tf.nn.max_pool( |
| 126 | + h_conv2, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding="SAME" |
| 127 | + ) |
| 128 | + # reshape tensor into a batch of vectors |
| 129 | + h_pool2_flat = tf.reshape(h_pool2, [-1, 7 * 7 * 64]) |
| 130 | + |
| 131 | + # Densely connected layer with 1024 neurons. |
| 132 | + h_fc1 = layers.dropout( |
| 133 | + layers.dense(h_pool2_flat, 1024, activation=tf.nn.relu), |
| 134 | + rate=0.5, |
| 135 | + training=mode == tf.estimator.ModeKeys.TRAIN, |
| 136 | + ) |
| 137 | + |
| 138 | + # Compute logits (1 per class) and compute loss. |
| 139 | + logits = layers.dense(h_fc1, 10, activation=None) |
| 140 | + loss = tf.losses.softmax_cross_entropy(target, logits) |
| 141 | + |
| 142 | + return tf.argmax(logits, 1), loss |
| 143 | + |
| 144 | + |
| 145 | +def train(args): |
| 146 | + training_samples = get_samples_from_folder(args.training_data) |
| 147 | + allreduce_controller = create_elastic_controller( |
| 148 | + batch_size=args.batch_size, |
| 149 | + num_epochs=args.num_epochs, |
| 150 | + dataset_size=len(training_samples), |
| 151 | + shuffle=True, |
| 152 | + ) |
| 153 | + |
| 154 | + # Create the dataset with the elastic allreduce controller |
| 155 | + # which will fetch data shards from the master of ElasticDL job. |
| 156 | + dataset = create_dataset( |
| 157 | + allreduce_controller.data_shard_service, training_samples |
| 158 | + ) |
| 159 | + dataset = dataset.batch(args.batch_size).prefetch(1) |
| 160 | + dataset_it = dataset.make_one_shot_iterator() |
| 161 | + batch_x, batch_y = dataset_it.get_next() |
| 162 | + batch_x = tf.cast(batch_x, tf.float32) |
| 163 | + |
| 164 | + batch_y = tf.reshape(batch_y, (-1,)) |
| 165 | + predict, loss = conv_model(batch_x, batch_y, tf.estimator.ModeKeys.TRAIN) |
| 166 | + optimizer = tf.train.GradientDescentOptimizer(0.1) |
| 167 | + |
| 168 | + # Wrap the optimizer. |
| 169 | + optimizer = DistributedOptimizer(optimizer, fixed_global_batch_size=True) |
| 170 | + global_step = tf.train.get_or_create_global_step() |
| 171 | + train_step = optimizer.minimize(loss, global_step=global_step) |
| 172 | + |
| 173 | + # Use the elastic wrapper to wrap the function to train one batch |
| 174 | + elastic_train_one_batch = allreduce_controller.elastic_run(train_one_batch) |
| 175 | + hook = AdjustBackwardPassesPerStepHook(optimizer) |
| 176 | + allreduce_controller.set_broadcast_variables(tf.global_variables()) |
| 177 | + with allreduce_controller.scope(): |
| 178 | + with tf.train.MonitoredTrainingSession(hooks=[hook]) as sess: |
| 179 | + allreduce_controller.set_session(sess) |
| 180 | + try: |
| 181 | + while True: |
| 182 | + loss_value, step, _ = elastic_train_one_batch( |
| 183 | + sess, [loss, global_step, train_step] |
| 184 | + ) |
| 185 | + logger.info( |
| 186 | + "global step = {}. loss: {}".format(step, loss_value) |
| 187 | + ) |
| 188 | + except tf.errors.OutOfRangeError: |
| 189 | + print("end!") |
| 190 | + |
| 191 | + |
| 192 | +def train_one_batch(sess, run_tensors): |
| 193 | + return sess.run(run_tensors) |
| 194 | + |
| 195 | + |
| 196 | +def arg_parser(): |
| 197 | + parser = argparse.ArgumentParser(description="Process training parameters") |
| 198 | + parser.add_argument("--batch_size", type=int, default=64, required=False) |
| 199 | + parser.add_argument("--num_epochs", type=int, default=1, required=False) |
| 200 | + parser.add_argument("--training_data", type=str, required=True) |
| 201 | + return parser |
| 202 | + |
| 203 | + |
| 204 | +if __name__ == "__main__": |
| 205 | + parser = arg_parser() |
| 206 | + args = parser.parse_args() |
| 207 | + print(args) |
| 208 | + train(args) |
0 commit comments