Skip to content

Commit f292e1c

Browse files
w-xinyicopybara-github
authored andcommitted
Update documentation for data preprocessing and lookup table with tf.distribute.
PiperOrigin-RevId: 430111286
1 parent 465c891 commit f292e1c

File tree

1 file changed

+240
-0
lines changed

1 file changed

+240
-0
lines changed

site/en/tutorials/distribute/input.ipynb

Lines changed: 240 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -626,6 +626,245 @@
626626
"metadata": {
627627
"id": "-OAa6svUzuWm"
628628
},
629+
"source": [
630+
"## Data Preprocessing"
631+
]
632+
},
633+
{
634+
"cell_type": "markdown",
635+
"metadata": {
636+
"id": "pSMrs3kJQexW"
637+
},
638+
"source": [
639+
"So far, we have discussed how to distribute a `tf.data.Dataset`. Yet before the data is ready for the model, we have the crucial step of preprocessing the data, e.g., cleansing, transforming, augmenting. Two sets of those handy tools are:\n",
640+
"\n",
641+
"* [Keras preprocessing layers](https://www.tensorflow.org/guide/keras/preprocessing_layers): a set of Keras layers that allow developers to build Keras-native input processing pipelines. Some Keras preprocessing layers contain non-trainable states, which can be set on initialization or [\"adapted\"](https://www.tensorflow.org/guide/keras/preprocessing_layers#the_adapt_method). When distributing stateful preprocessing layers, we want the states replicated to all workers. To use these layers, you can either make them part of the model or apply them to the datasets.\n",
642+
"\n",
643+
"* [TensorFlow Transform (tf.Transform)](https://www.tensorflow.org/tfx/transform/get_started): a library for TensorFlow that allows you to define both instance-level and full-pass data transformation through data preprocessing pipelines. Tensorflow Transform has two phases. The first is the Analyze phase, where the raw training data is analyzed in a full-pass process to compute the statistics needed for the transformations, and the transformation logic is generated as instance-level operations. The second is the Transform phase, where the raw training data is transformed in an instance-level process.\n",
644+
"\n"
645+
]
646+
},
647+
{
648+
"cell_type": "markdown",
649+
"metadata": {
650+
"id": "Pd4aUCFdVlZ1"
651+
},
652+
"source": [
653+
"### Keras preprocessing layers vs. Tensorflow Transform \n",
654+
"\n",
655+
"Both Tensorflow Transform and Keras preprocessing layers provide a way to split out preprocessing during training and bundle preprocessing with a model during inference, reducing train/serve skew.\n",
656+
"\n",
657+
"Tensorflow Transform, deeply integrated with [TFX](https://www.tensorflow.org/tfx), provides a scalable map-reduce solution to analyzing and transforming datasets of any size in a job separate from the training pipeline. If you need to run an analysis on a dataset that cannot fit on a single machine, Tensorflow Transform should be your first choice.\n",
658+
"\n",
659+
"Keras preprocessing layers are more geared towards preprocessing applied during training, after reading data from disk. They fit seamlessly with model development in the Keras library. They support analysis of a smaller dataset via [`adapt`](https://www.tensorflow.org/guide/keras/preprocessing_layers#the_adapt_method) and supports use cases like image data augmentation, where each pass over the input dataset will yield different examples for training.\n",
660+
"\n",
661+
"The two libraries can also be mixed, where Tensorflow Transform is used for analysis and static transformations of input data, and Keras preprocessing layers are used for train-time transformations (e.g., one-hot encoding or data augmentation).\n",
662+
"\n"
663+
]
664+
},
665+
{
666+
"cell_type": "markdown",
667+
"metadata": {
668+
"id": "MReKhhZpHUpj"
669+
},
670+
"source": [
671+
"### Best Practice with tf.distribute\n",
672+
"\n",
673+
"Working with both tools involves initializing the transformation logic to apply to data, which might create Tensorflow resources. We would want these resources or states replicated to all workers to save inter-workers or worker-coordinator communication. To do so, we recommend you create Keras preprocessing layers, `tft.TFTransformOutput.transform_features_layer`, or `tft.TransformFeaturesLayer` under `tf.distribute.Strategy.scope()`, just like you would for any other Keras layers.\n",
674+
"\n",
675+
"We will demonstrate examples with the high-level Keras `Model.fit` API and the custom training loop separately."
676+
]
677+
},
678+
{
679+
"cell_type": "markdown",
680+
"metadata": {
681+
"id": "rwEGMWuoX7kJ"
682+
},
683+
"source": [
684+
"#### Extra notes for Keras preprocessing layers users:\n",
685+
"\n",
686+
"**Preprocessing layers and large vocabularies**\n",
687+
"\n",
688+
"When dealing with large vocabularies (over one gigabyte) in a multi-worker setting (i.e., `tf.distribute.MultiWorkerMirroredStrategy`, `tf.distribute.experimental.ParameterServerStrategy`, `tf.distribute.TPUStrategy`), we recommend saving the vocabulary to a static file accessible from all workers (e.g., with Cloud Storage). This will reduce the time spent replicating the vocabulary to all workers during training.\n",
689+
"\n",
690+
"**Preprocessing in data pipeline vs. in model**\n",
691+
"\n",
692+
"While Keras preprocessing layers can be applied either as part of the model or directly to a `tf.data.Dataset`, each of the options come with their edge:\n",
693+
"\n",
694+
"* Applying in the model makes your model portable, and it helps reduce the training/serving skew. ([more details](https://www.tensorflow.org/guide/keras/preprocessing_layers#benefits_of_doing_preprocessing_inside_the_model_at_inference_time))\n",
695+
"* Applying in the `tf.data` pipeline allows prefetching or offloading to the CPU, which generally gives better performance when using accelerators.\n",
696+
"\n",
697+
"When running on TPU, users should almost always place preprocessing layers in the `tf.data` pipeline, as not all layers support TPU, and string ops do not execute on TPU. (The two exceptions are `Normalization` and `Rescaling`, which run fine on TPU and are commonly used as the first layer is an image model.)"
698+
]
699+
},
700+
{
701+
"cell_type": "markdown",
702+
"metadata": {
703+
"id": "hNCYZ9L-BD2R"
704+
},
705+
"source": [
706+
"### Model.fit"
707+
]
708+
},
709+
{
710+
"cell_type": "markdown",
711+
"metadata": {
712+
"id": "NhRB2Xe8B6bX"
713+
},
714+
"source": [
715+
"Users of Keras `Model.fit` do not need to distribute data with `tf.distribute.Strategy.experimental_distribute_dataset` nor `tf.distribute.Strategy.distribute_datasets_from_function` themselves. Check out the [Working with Preprocessing Layers](https://www.tensorflow.org/guide/keras/preprocessing_layers) guide and [Distributed Training with Keras](https://www.tensorflow.org/tutorials/distribute/keras) guide for details. A shortened example may look as below:\n",
716+
"\n",
717+
"```\n",
718+
"strategy = tf.distribute.MirroredStrategy()\n",
719+
"with strategy.scope():\n",
720+
" # Create the layer(s) under scope.\n",
721+
" integer_preprocessing_layer = tf.keras.layers.IntegerLookup(vocabulary=FILE_PATH)\n",
722+
" model = ...\n",
723+
" model.compile(...)\n",
724+
"dataset = dataset.map(lambda x, y: (integer_preprocessing_layer(x), y))\n",
725+
"model.fit(dataset)\n",
726+
"```\n"
727+
]
728+
},
729+
{
730+
"cell_type": "markdown",
731+
"metadata": {
732+
"id": "3zL2vzJ-G0yg"
733+
},
734+
"source": [
735+
"Users of `tf.distribute.experimental.ParameterServerStrategy` with the `Model.fit` API need to use a `tf.keras.utils.experimental.DatasetCreator` as the input. (See the [Parameter Server Training](https://www.tensorflow.org/tutorials/distribute/parameter_server_training#parameter_server_training_with_modelfit_api) guide for more)\n",
736+
"\n",
737+
"```\n",
738+
"strategy = tf.distribute.experimental.ParameterServerStrategy(\n",
739+
" cluster_resolver,\n",
740+
" variable_partitioner=variable_partitioner)\n",
741+
"\n",
742+
"with strategy.scope():\n",
743+
" preprocessing_layer = tf.keras.layers.StringLookup(vocabulary=FILE_PATH)\n",
744+
" model = ...\n",
745+
" model.compile(...)\n",
746+
"\n",
747+
"def dataset_fn(input_context):\n",
748+
" ...\n",
749+
" dataset = dataset.map(preprocessing_layer)\n",
750+
" ...\n",
751+
" return dataset\n",
752+
"\n",
753+
"dataset_creator = tf.keras.utils.experimental.DatasetCreator(dataset_fn)\n",
754+
"model.fit(dataset_creator, epochs=5, steps_per_epoch=20, callbacks=callbacks)\n",
755+
"\n",
756+
"```"
757+
]
758+
},
759+
{
760+
"cell_type": "markdown",
761+
"metadata": {
762+
"id": "imZLQUOYBJyW"
763+
},
764+
"source": [
765+
"### Custom Training Loop"
766+
]
767+
},
768+
{
769+
"cell_type": "markdown",
770+
"metadata": {
771+
"id": "r2PX1QH_OwU3"
772+
},
773+
"source": [
774+
"When writing a [custom training loop](https://www.tensorflow.org/tutorials/distribute/custom_training), you will distribute your data with either the `tf.distribute.Strategy.experimental_distribute_dataset` API or the `tf.distribute.Strategy.distribute_datasets_from_function` API. If you distribute your dataset through `tf.distribute.Strategy.experimental_distribute_dataset`, applying these preprocessing APIs in your data pipeline will lead the resources automatically co-located with the data pipeline to avoid remote resource access. Thus we will only demonstrate examples with `tf.distribute.Strategy.distribute_datasets_from_function`, in which case it is crucial to place initialization of these APIs under `strategy.scope()` for efficiency:"
775+
]
776+
},
777+
{
778+
"cell_type": "code",
779+
"execution_count": null,
780+
"metadata": {
781+
"id": "wJS1UmcWQeab"
782+
},
783+
"outputs": [],
784+
"source": [
785+
"strategy = tf.distribute.MirroredStrategy()\n",
786+
"vocab = [\"a\", \"b\", \"c\", \"d\", \"f\"]\n",
787+
"\n",
788+
"with strategy.scope():\n",
789+
" # Create the layer(s) under scope.\n",
790+
" layer = tf.keras.layers.StringLookup(vocabulary=vocab)\n",
791+
"\n",
792+
"def dataset_fn(input_context):\n",
793+
" # a tf.data.Dataset\n",
794+
" dataset = tf.data.Dataset.from_tensor_slices([\"a\", \"c\", \"e\"]).repeat()\n",
795+
"\n",
796+
" # Custom your batching, sharding, prefetching, etc.\n",
797+
" global_batch_size = 4\n",
798+
" batch_size = input_context.get_per_replica_batch_size(global_batch_size)\n",
799+
" dataset = dataset.batch(batch_size)\n",
800+
" dataset = dataset.shard(\n",
801+
" input_context.num_input_pipelines,\n",
802+
" input_context.input_pipeline_id)\n",
803+
"\n",
804+
" # Apply the preprocessing layer(s) to the tf.data.Dataset\n",
805+
" def preprocess_with_kpl(input):\n",
806+
" return layer(input)\n",
807+
"\n",
808+
" processed_ds = dataset.map(preprocess_with_kpl)\n",
809+
" return processed_ds\n",
810+
"\n",
811+
"distributed_dataset = strategy.distribute_datasets_from_function(dataset_fn)\n",
812+
"\n",
813+
"# Print out a few example batches.\n",
814+
"distributed_dataset_iterator = iter(distributed_dataset)\n",
815+
"for _ in range(3):\n",
816+
" print(next(distributed_dataset_iterator))"
817+
]
818+
},
819+
{
820+
"cell_type": "markdown",
821+
"metadata": {
822+
"id": "PVl1cblWQy8b"
823+
},
824+
"source": [
825+
"Note that if you are training with `tf.distribute.experimental.ParameterServerStrategy`, you'll also call `tf.distribute.experimental.coordinator.ClusterCoordinator.create_per_worker_dataset`\n",
826+
"\n",
827+
"```\n",
828+
"@tf.function\n",
829+
"def per_worker_dataset_fn():\n",
830+
" return strategy.distribute_datasets_from_function(dataset_fn)\n",
831+
"\n",
832+
"per_worker_dataset = coordinator.create_per_worker_dataset(per_worker_dataset_fn)\n",
833+
"per_worker_iterator = iter(per_worker_dataset)\n",
834+
"```\n"
835+
]
836+
},
837+
{
838+
"cell_type": "markdown",
839+
"metadata": {
840+
"id": "Ol7SmPID1dAt"
841+
},
842+
"source": [
843+
"For Tensorflow Transform, as mentioned above, the Analyze stage is done separately from training and thus omitted here. See the [tutorial](https://www.tensorflow.org/tfx/tutorials/transform/census) for a detailed how-to. Usually, this stage includes creating a `tf.Transform` preprocessing function and transforming the data in an [Apache Beam](https://beam.apache.org/) pipeline with this preprocessing function. At the end of the Analyze stage, the output can be exported as a TensorFlow graph which you can use for both training and serving. Our example covers only the training pipeline part:\n",
844+
"\n",
845+
"```\n",
846+
"with strategy.scope():\n",
847+
" # working_dir contains the tf.Transform output.\n",
848+
" tf_transform_output = tft.TFTransformOutput(working_dir)\n",
849+
" # Loading from working_dir to create a Keras layer for applying the tf.Transform output to data\n",
850+
" tft_layer = tf_transform_output.transform_features_layer()\n",
851+
" ...\n",
852+
"\n",
853+
"def dataset_fn(input_context):\n",
854+
" ...\n",
855+
" dataset.map(tft_layer, num_parallel_calls=tf.data.AUTOTUNE)\n",
856+
" ...\n",
857+
" return dataset\n",
858+
"\n",
859+
"distributed_dataset = strategy.distribute_datasets_from_function(dataset_fn)\n",
860+
"```"
861+
]
862+
},
863+
{
864+
"cell_type": "markdown",
865+
"metadata": {
866+
"id": "3_IQxRXxQWof"
867+
},
629868
"source": [
630869
"## Partial Batches"
631870
]
@@ -827,6 +1066,7 @@
8271066
"colab": {
8281067
"collapsed_sections": [],
8291068
"name": "input.ipynb",
1069+
"provenance": [],
8301070
"toc_visible": true
8311071
},
8321072
"kernelspec": {

0 commit comments

Comments
 (0)