|
| 1 | +# Collective Training |
| 2 | + |
| 3 | +## Background |
| 4 | + |
| 5 | +For sparse recommendation models like DLRM, there are a large number of parameters and heavy GEMM operations. The asynchronous training paradigm of PS makes it difficult to fully utilize the GPUs in the cluster to accelerate the entire training/inference process.We try to place all the parameters on the worker, but the large amount of memory consumed by the parameters(Embedding) cannot be stored on a single GPU, so we need to perform sharding to place on all GPUs.Native Tensorflow did not support model parallel training (MP), and the community has many excellent plug-ins based on Tensorflow, such as HybridBackend (hereinafter referred to as HB), SparseOperationKit (hereinafter referred to as SOK), and so on. DeepRec provides a unified synchronous training interface `CollectiveStrategy` for users to choose and use. Users can use different synchronous training frameworks with very little code. |
| 6 | + |
| 7 | +## Interface Introduction |
| 8 | + |
| 9 | +1. Currently the interface supports HB and SOK, users can choose through the environment variable `COLLECTIVE_STRATEGY`. `COLLECTIVE_STRATEGY` can configure hb, sok corresponding to HB and SOK respectively. The difference from normal startup of Tensorflow tasks is that when users use synchronous training, they need to pull up through additional modules, which need to be started in the following way: |
| 10 | + |
| 11 | +```bash |
| 12 | +CUDA_VISIBLE_DEVICES=0,1 COLLECTIVE_STRATEGY=hb python3 -m tensorflow.python.distribute.launch <python script.py> |
| 13 | +``` |
| 14 | +If the environment variable is not configured with `CUDA_VISIBLE_DEVICES`, the process will pull up the training sub-processes with the number of GPUs in the current environment by default. |
| 15 | + |
| 16 | +2. In the user script, a `CollectiveStrategy` needs to be initialized to complete the construction of the model. |
| 17 | + |
| 18 | +```python |
| 19 | +class CollectiveStrategy: |
| 20 | + def scope(self, *args, **kwargs): |
| 21 | + pass |
| 22 | + def embedding_scope(self, **kwargs): |
| 23 | + pass |
| 24 | + def world_size(self): |
| 25 | + pass |
| 26 | + def rank(self): |
| 27 | + pass |
| 28 | + def estimator(self): |
| 29 | + pass |
| 30 | + def export_saved_model(self): |
| 31 | + pass |
| 32 | +``` |
| 33 | + |
| 34 | +Following steps below to using synchronous training: |
| 35 | +- Mark with strategy.scope() before the entire model definition. |
| 36 | +- Use the embedding_scope() flag where model parallelism is required (embedding layer) |
| 37 | +- Use export_saved_model when exporting |
| 38 | +- (Optional) In addition, the strategy also provides the estimator interface for users to use. |
| 39 | + |
| 40 | +## Example |
| 41 | + |
| 42 | +**MonitoredTrainingSession** |
| 43 | + |
| 44 | +The following example guides users how to construct Graph through tf.train.MonitoredTrainingSession. |
| 45 | + |
| 46 | +```python |
| 47 | +import tensorflow as tf |
| 48 | +from tensorflow.python.distribute.group_embedding_collective_strategy import CollectiveStrategy |
| 49 | + |
| 50 | +#STEP1: initialize a collective strategy |
| 51 | +strategy = CollectiveStrategy() |
| 52 | +#STEP2: define the data parallel scope |
| 53 | +with strategy.scope(), tf.Graph().as_default(): |
| 54 | + #STEP3: define the model parallel scope |
| 55 | + with strategy.embedding_scope(): |
| 56 | + var = tf.get_variable( |
| 57 | + 'var_1', |
| 58 | + shape=(1000, 3), |
| 59 | + initializer=tf.ones_initializer(tf.float32), |
| 60 | + partitioner=tf.fixed_size_partitioner(num_shards=strategy.world_size()) |
| 61 | + ) |
| 62 | + emb = tf.nn.embedding_lookup( |
| 63 | + var, tf.cast([0, 1, 2, 5, 6, 7], tf.int64)) |
| 64 | + fun = tf.multiply(emb, 2.0, name='multiply') |
| 65 | + loss = tf.reduce_sum(fun, name='reduce_sum') |
| 66 | + opt = tf.train.FtrlOptimizer( |
| 67 | + 0.1, |
| 68 | + l1_regularization_strength=2.0, |
| 69 | + l2_regularization_strength=0.00001) |
| 70 | + g_v = opt.compute_gradients(loss) |
| 71 | + train_op = opt.apply_gradients(g_v) |
| 72 | + with tf.train.MonitoredTrainingSession('') as sess: |
| 73 | + emb_result, loss_result, _ = sess.run([emb, loss, train_op]) |
| 74 | + print (emb_result, loss_result) |
| 75 | +``` |
| 76 | + |
| 77 | +**Estimator** |
| 78 | + |
| 79 | +The following example guides users how to construct Graph through tf.estimator.Estimator. |
| 80 | +```python |
| 81 | +import tensorflow as tf |
| 82 | +import tensorflow_datasets as tfds |
| 83 | +from tensorflow.python.distribute.group_embedding_collective_strategy import CollectiveStrategy |
| 84 | + |
| 85 | +#STEP1: initialize a collective strategy |
| 86 | +strategy = CollectiveStrategy() |
| 87 | +#STEP2: define the data parallel scope |
| 88 | +with strategy.scope(), tf.Graph().as_default(): |
| 89 | + def input_fn(): |
| 90 | + ratings = tfds.load("movie_lens/100k-ratings", split="train") |
| 91 | + ratings = ratings.map( |
| 92 | + lambda x: { |
| 93 | + "movie_id": tf.strings.to_number(x["movie_id"], tf.int64), |
| 94 | + "user_id": tf.strings.to_number(x["user_id"], tf.int64), |
| 95 | + "user_rating": x["user_rating"] |
| 96 | + }) |
| 97 | + shuffled = ratings.shuffle(1_000_000, |
| 98 | + seed=2021, |
| 99 | + reshuffle_each_iteration=False) |
| 100 | + dataset = shuffled.batch(256) |
| 101 | + return dataset |
| 102 | + |
| 103 | + def input_receiver(): |
| 104 | + r'''Prediction input receiver. |
| 105 | + ''' |
| 106 | + inputs = { |
| 107 | + "movie_id": tf.placeholder(dtype=tf.int64, shape=[None]), |
| 108 | + "user_id": tf.placeholder(dtype=tf.int64, shape=[None]), |
| 109 | + "user_rating": tf.placeholder(dtype=tf.float32, shape=[None]) |
| 110 | + } |
| 111 | + return tf.estimator.export.ServingInputReceiver(inputs, inputs) |
| 112 | + |
| 113 | + def model_fn(features, labels, mode, params): |
| 114 | + r'''Model function for estimator. |
| 115 | + ''' |
| 116 | + del params |
| 117 | + movie_id = features["movie_id"] |
| 118 | + user_id = features["user_id"] |
| 119 | + rating = features["user_rating"] |
| 120 | + |
| 121 | + embedding_columns = [ |
| 122 | + tf.feature_column.embedding_column( |
| 123 | + tf.feature_column.categorical_column_with_embedding( |
| 124 | + "movie_id", dtype=tf.int64), |
| 125 | + dimension=16, |
| 126 | + initializer=tf.random_uniform_initializer(-1e-3, 1e-3)), |
| 127 | + tf.feature_column.embedding_column( |
| 128 | + tf.feature_column.categorical_column_with_embedding( |
| 129 | + "user_id", dtype=tf.int64), |
| 130 | + dimension=16, |
| 131 | + initializer=tf.random_uniform_initializer(-1e-3, 1e-3)) |
| 132 | + ] |
| 133 | + #STEP3: define the model parallel scope |
| 134 | + with strategy.embedding_scope(): |
| 135 | + with tf.variable_scope( |
| 136 | + 'embedding', |
| 137 | + partitioner=tf.fixed_size_partitioner( |
| 138 | + strategy.world_size)): |
| 139 | + deep_features = [ |
| 140 | + tf.feature_column.input_layer(features, [c]) |
| 141 | + for c in embedding_columns] |
| 142 | + emb = tf.concat(deep_features, axis=-1) |
| 143 | + logits = tf.multiply(emb, 2.0, name='multiply') |
| 144 | + |
| 145 | + if mode == tf.estimator.ModeKeys.TRAIN: |
| 146 | + labels = tf.reshape(tf.to_float(labels), shape=[-1, 1]) |
| 147 | + loss = tf.reduce_mean(tf.keras.losses.binary_crossentropy(labels, logits)) |
| 148 | + step = tf.train.get_or_create_global_step() |
| 149 | + opt = tf.train.AdagradOptimizer(learning_rate=self._args.lr) |
| 150 | + train_op = opt.minimize(loss, global_step=step) |
| 151 | + return tf.estimator.EstimatorSpec( |
| 152 | + mode=mode, |
| 153 | + loss=loss, |
| 154 | + train_op=train_op, |
| 155 | + training_chief_hooks=[]) |
| 156 | + |
| 157 | + return None |
| 158 | + estimator = strategy.estimator(model_fn=model_fn, |
| 159 | + model_dir="./", |
| 160 | + config=None) |
| 161 | + estimator.train_and_evaluate( |
| 162 | + tf.estimator.TrainSpec( |
| 163 | + input_fn=input_fn, |
| 164 | + max_steps=50), |
| 165 | + tf.estimator.EvalSpec( |
| 166 | + input_fn=input_fn)) |
| 167 | + estimator.export_saved_model("./", input_receiver) |
| 168 | +``` |
| 169 | + |
| 170 | +## Appendix |
| 171 | + |
| 172 | +- Currently DeepRec provides the corresponding GPU image for users to use (alideeprec/deeprec-release:deeprec2304-gpu-py38-cu116-ubuntu20.04-hybridbackend), users can also refer to [Dockerfile](../../cibuild/dockerfiles/Dockerfile.devel-py3.8-cu116-ubuntu20.04-hybridbackend) |
| 173 | +- We also provides more detailed demos about the above two usage methods, see: [ModelZoo](../../modelzoo/features/grouped_embedding) |
| 174 | + |
| 175 | +- If further optimization is required, there are more fine-tuning parameters for HB and SOK, please refer to: |
| 176 | +[SOK](./SOK.md) 和 [HB](https://github.com/DeepRec-AI/HybridBackend) |
0 commit comments