|
108 | 108 | "- Multiple _worker_ jobs (job name `worker`); and\n",
|
109 | 109 | "- Multiple _parameter server_ jobs (job name `ps`)\n",
|
110 | 110 | "\n",
|
111 |
| - "While the _coordinator_ creates resources, dispatches training tasks, writes checkpoints, and deals with task failures, _workers_ and _parameter servers_ run `tf.distribute.Server` that listen for requests from the coordinator." |
| 111 | + "The _coordinator_ creates resources, dispatches training tasks, writes checkpoints, and deals with task failures. The _workers_ and _parameter servers_ run `tf.distribute.Server` instances that listen for requests from the coordinator." |
112 | 112 | ]
|
113 | 113 | },
|
114 | 114 | {
|
|
348 | 348 | },
|
349 | 349 | "source": [
|
350 | 350 | "When a `variable_partitioner` is passed in and if you create a variable directly\n",
|
351 |
| - "under `strategy.scope()`, it will become a container type with a `variables`\n", |
352 |
| - "property which provides access to the list of shards. In most cases, this\n", |
| 351 | + "under `Strategy.scope`, it will become a container type with a `variables`\n", |
| 352 | + "property, which provides access to the list of shards. In most cases, this\n", |
353 | 353 | "container will be automatically converted to a Tensor by concatenating all the\n",
|
354 | 354 | "shards. As a result, it can be used as a normal variable. On the other hand,\n",
|
355 | 355 | "some TensorFlow methods such as `tf.nn.embedding_lookup` provide efficient\n",
|
356 | 356 | "implementation for this container type and in these methods automatic\n",
|
357 | 357 | "concatenation will be avoided.\n",
|
358 | 358 | "\n",
|
359 |
| - "Please see the API docs of `tf.distribute.experimental.ParameterServerStrategy` for more details." |
| 359 | + "Refer to the API docs of `tf.distribute.experimental.ParameterServerStrategy` for more details." |
360 | 360 | ]
|
361 | 361 | },
|
362 | 362 | {
|
|
384 | 384 | "\n",
|
385 | 385 | "Note that it is recommended to shuffle and repeat the data with parameter server training, and specify `steps_per_epoch` in `fit` call so the library knows the epoch boundaries.\n",
|
386 | 386 | "\n",
|
387 |
| - "Please see the [Distributed input](https://www.tensorflow.org/tutorials/distribute/input#usage_2) tutorial for more information about the `InputContext` argument." |
| 387 | + "Refer to the [Distributed input](https://www.tensorflow.org/tutorials/distribute/input#usage_2) tutorial for more information about the `InputContext` argument." |
388 | 388 | ]
|
389 | 389 | },
|
390 | 390 | {
|
|
420 | 420 | "id": "v_jhF70K7zON"
|
421 | 421 | },
|
422 | 422 | "source": [
|
423 |
| - "The code in `dataset_fn` will be invoked on the input device, which is usually the CPU, on each of the worker machines.\n", |
424 |
| - "\n" |
| 423 | + "The code in `dataset_fn` will be invoked on the input device, which is usually the CPU, on each of the worker machines.\n" |
425 | 424 | ]
|
426 | 425 | },
|
427 | 426 | {
|
|
463 | 462 | "\n",
|
464 | 463 | "- `ModelCheckpoint`: to save the model weights.\n",
|
465 | 464 | "- `BackupAndRestore`: to make sure the training progress is automatically backed up, and recovered if the cluster experiences unavailability (such as abort or preemption); or\n",
|
466 |
| - "- `TensorBoard`: to save the progress reports into summary files, which get visualized in TensorBoard tool.\n", |
| 465 | + "- `TensorBoard`: to save the progress reports into summary files, which can be visualized in the TensorBoard tool.\n", |
467 | 466 | "\n",
|
468 | 467 | "Note: Due to performance consideration, custom callbacks cannot have batch level callbacks overridden when used with `ParameterServerStrategy`. Please modify your custom callbacks to make them epoch level calls, and adjust `steps_per_epoch` to a suitable value. In addition, `steps_per_epoch` is a required argument for `Model.fit` when used with `ParameterServerStrategy`."
|
469 | 468 | ]
|
|
530 | 529 | "\n",
|
531 | 530 | "First, write a function that creates a dataset.\n",
|
532 | 531 | "\n",
|
533 |
| - "If you would like to preprocess the data with [Keras preprocessing layers](https://www.tensorflow.org/guide/keras/preprocessing_layers) or [Tensorflow Transform layers](https://www.tensorflow.org/tfx/tutorials/transform/simple), create these layers **outside the `dataset_fn`** and **under `strategy.scope()`** like you would do for any other Keras layers. This is because the `dataset_fn` will be wrapped into a `tf.function` and then executed on each worker to generate the data pipeline. \n", |
| 532 | + "If you would like to preprocess the data with [Keras preprocessing layers](https://www.tensorflow.org/guide/keras/preprocessing_layers) or [Tensorflow Transform layers](https://www.tensorflow.org/tfx/tutorials/transform/simple), create these layers **outside the `dataset_fn`** and **under `Strategy.scope`** like you would do for any other Keras layers. This is because the `dataset_fn` will be wrapped into a `tf.function` and then executed on each worker to generate the data pipeline.\n", |
534 | 533 | "\n",
|
535 |
| - "If you don't follow the above procedure, creating the layers might create Tensorflow states which will be lifted out of the `tf.function` to the coordinator, and thus accessing them on workers would incur repetitive RPC calls between coordinator and workers and causes significant slowdown. \n", |
| 534 | + "If you don't follow the above procedure, creating the layers might create Tensorflow states which will be lifted out of the `tf.function` to the coordinator. Thus, accessing them on workers would incur repetitive RPC calls between coordinator and workers, and cause significant slowdown.\n", |
536 | 535 | "\n",
|
537 |
| - "Placing the layers under `strategy.scope()` will instead create them on all workers. Then, you will apply the transformation inside the `dataset_fn` via `tf.data.Dataset.map`. Please refer to [this tutorial](https://www.tensorflow.org/tutorials/distribute/input#data_preprocessing) for more information on data preprocessing with distributed input." |
| 536 | + "Placing the layers under `Strategy.scope` will instead create them on all workers. Then, you will apply the transformation inside the `dataset_fn` via `tf.data.Dataset.map`. Refer to _Data preprocessing_ in the [Distributed input](https://www.tensorflow.org/tutorials/distribute/input) tutorial for more information on data preprocessing with distributed input." |
538 | 537 | ]
|
539 | 538 | },
|
540 | 539 | {
|
|
644 | 643 | "source": [
|
645 | 644 | "### Build the model\n",
|
646 | 645 | "\n",
|
647 |
| - "Next, create the model and other objects. Make sure to create all variables under `strategy.scope`." |
| 646 | + "Next, create the model and other objects. Make sure to create all variables under `Strategy.scope`." |
648 | 647 | ]
|
649 | 648 | },
|
650 | 649 | {
|
|
655 | 654 | },
|
656 | 655 | "outputs": [],
|
657 | 656 | "source": [
|
658 |
| - "# These variables created under the `strategy.scope` will be placed on parameter\n", |
| 657 | + "# These variables created under the `Strategy.scope` will be placed on parameter\n", |
659 | 658 | "# servers in a round-robin fashion.\n",
|
660 | 659 | "with strategy.scope():\n",
|
661 | 660 | " # Create the model. The input needs to be compatible with Keras processing layers.\n",
|
|
875 | 874 | "source": [
|
876 | 875 | "### More about dataset creation\n",
|
877 | 876 | "\n",
|
878 |
| - "The dataset in the above code is created using the `ClusterCoordinator.create_per_worker_dataset` API). It creates one dataset per worker and returns a container object. You can call the `iter` method on it to create a per-worker iterator. The per-worker iterator contains one iterator per worker and the corresponding slice of a worker will be substituted in the input argument of the function passed to the `ClusterCoordinator.schedule` method before the function is executed on a particular worker.\n", |
| 877 | + "The dataset in the above code is created using the `ClusterCoordinator.create_per_worker_dataset` API. It creates one dataset per worker and returns a container object. You can call the `iter` method on it to create a per-worker iterator. The per-worker iterator contains one iterator per worker and the corresponding slice of a worker will be substituted in the input argument of the function passed to the `ClusterCoordinator.schedule` method before the function is executed on a particular worker.\n", |
879 | 878 | "\n",
|
880 |
| - "Currently, the `ClusterCoordinator.schedule` method assumes workers are equivalent and thus assumes the datasets on different workers are the same except they may be shuffled differently if they contain a `Dataset.shuffle` operation. Because of this, it is also recommended that the datasets to be repeated indefinitely and you schedule a finite number of steps instead of relying on the `OutOfRangeError` from a dataset.\n", |
| 879 | + "The `ClusterCoordinator.schedule` method assumes workers are equivalent and thus assumes the datasets on different workers are the same (except that they may be shuffled differently). Because of this, it is also recommended to repeat datasets, and schedule a finite number of steps instead of relying on the `OutOfRangeError` from a dataset.\n", |
881 | 880 | "\n",
|
882 | 881 | "Another important note is that `tf.data` datasets don’t support implicit serialization and deserialization across task boundaries. So it is important to create the whole dataset inside the function passed to `ClusterCoordinator.create_per_worker_dataset`."
|
883 | 882 | ]
|
|
1227 | 1226 | "2. Avoid creating a hotspot variable that is required by all parameter servers in a single step if possible. For example, use a constant learning rate or subclass `tf.keras.optimizers.schedules.LearningRateSchedule` in optimizers since the default behavior is that the learning rate will become a variable placed on a particular parameter server and requested by all other parameter servers in each step.\n",
|
1228 | 1227 | "3. Shuffle your large vocabularies before passing them to Keras preprocessing layers.\n",
|
1229 | 1228 | "\n",
|
1230 |
| - "Another possible reason for performance issues is the coordinator. The implementation of `schedule`/`join` is Python-based and thus may have threading overhead. Also, the latency between the coordinator and the workers can be large. If this is the case,\n", |
| 1229 | + "Another possible reason for performance issues is the coordinator. The implementation of `schedule`/`join` is Python-based and thus may have threading overhead. Also, the latency between the coordinator and the workers can be large. If this is the case:\n", |
1231 | 1230 | "\n",
|
1232 |
| - "- For `Model.fit`, you can set `steps_per_execution` argument provided at `Model.compile` to a value larger than 1.\n", |
| 1231 | + "- For `Model.fit`, you can set the `steps_per_execution` argument provided at `Model.compile` to a value larger than 1.\n", |
1233 | 1232 | "\n",
|
1234 | 1233 | "- For a custom training loop, you can pack multiple steps into a single `tf.function`:\n",
|
1235 | 1234 | "\n",
|
|
1271 | 1270 | "- It is not supported to load a saved_model via `tf.saved_model.load` containing sharded variables. Note loading such a saved_model using TensorFlow Serving is expected to work.\n",
|
1272 | 1271 | "- It is not supported to load a checkpoint containing sharded optimizer slot variables into a different number of shards.\n",
|
1273 | 1272 | "- It is not supported to recover from parameter server failure without restarting the coordinator task.\n",
|
1274 |
| - "- Creation of `tf.lookup.StaticHashTable`, commonly employed by some Keras preprocessing layers, such as `tf.keras.layers.IntegerLookup`, `tf.keras.layers.StringLookup`, and `tf.keras.layers.TextVectorization`, should be placed under `strategy.scope()`. Otherwise, resources will be placed on the coordinator, and lookup RPCs from workers to the coordinator incur performance implications. \n", |
1275 |
| - "\n", |
1276 |
| - "\n" |
| 1273 | + "- Creation of `tf.lookup.StaticHashTable`, commonly employed by some Keras preprocessing layers, such as `tf.keras.layers.IntegerLookup`, `tf.keras.layers.StringLookup`, and `tf.keras.layers.TextVectorization`, should be placed under `Strategy.scope`. Otherwise, resources will be placed on the coordinator, and lookup RPCs from workers to the coordinator incur performance implications.\n" |
1277 | 1274 | ]
|
1278 | 1275 | },
|
1279 | 1276 | {
|
|
1286 | 1283 | "\n",
|
1287 | 1284 | "- `steps_per_epoch` argument is required in `Model.fit`. You can select a value that provides appropriate intervals in an epoch.\n",
|
1288 | 1285 | "- `ParameterServerStrategy` does not have support for custom callbacks that have batch-level calls for performance reasons. You should convert those calls into epoch-level calls with suitably picked `steps_per_epoch`, so that they are called every `steps_per_epoch` number of steps. Built-in callbacks are not affected: their batch-level calls have been modified to be performant. Supporting batch-level calls for `ParameterServerStrategy` is being planned.\n",
|
1289 |
| - "- For the same reason, unlike other strategies, progress bar and metrics are logged only at epoch boundaries.\n", |
1290 |
| - "- `run_eagerly` is not supported.\n", |
1291 |
| - "\n" |
| 1286 | + "- For the same reason, unlike other strategies, progress bars and metrics are logged only at epoch boundaries.\n", |
| 1287 | + "- `run_eagerly` is not supported.\n" |
1292 | 1288 | ]
|
1293 | 1289 | },
|
1294 | 1290 | {
|
|
1310 | 1306 | "colab": {
|
1311 | 1307 | "collapsed_sections": [],
|
1312 | 1308 | "name": "parameter_server_training.ipynb",
|
1313 |
| - "provenance": [], |
1314 | 1309 | "toc_visible": true
|
1315 | 1310 | },
|
1316 | 1311 | "kernelspec": {
|
|
0 commit comments