From 3855831cd4da2ad114e1262784fd9368553153d8 Mon Sep 17 00:00:00 2001 From: Jatin Mandav Date: Thu, 27 Mar 2025 09:02:42 +0000 Subject: [PATCH 01/19] fix: restrict policy var save for distributed setup --- .../dynamic_embedding/python/keras/models.py | 51 +++++++++++-------- 1 file changed, 30 insertions(+), 21 deletions(-) diff --git a/tensorflow_recommenders_addons/dynamic_embedding/python/keras/models.py b/tensorflow_recommenders_addons/dynamic_embedding/python/keras/models.py index bbec4da24..523e3e242 100644 --- a/tensorflow_recommenders_addons/dynamic_embedding/python/keras/models.py +++ b/tensorflow_recommenders_addons/dynamic_embedding/python/keras/models.py @@ -98,7 +98,34 @@ def _check_saveable_and_redirect_new_de_dir(hvd_rank=0): # Redirect new de_dir if hasattr(de_var, 'saveable'): de_var.saveable._saver_config.save_path = de_dir - + + def _save_de_var(de_var, proc_size=1, proc_rank=0): + a2a_emb = de_var._created_in_class + if de_var._saveable_object_creator is not None: + if not isinstance(de_var.kv_creator.saver, de.FileSystemSaver): + # This function only serves FileSystemSaver. + return + # save optimizer parameters of Dynamic Embedding + if include_optimizer is True: + de_opt_vars = a2a_emb.optimizer_vars.as_list() if hasattr( + a2a_emb.optimizer_vars, "as_list") else a2a_emb.optimizer_vars + for de_opt_var in de_opt_vars: + de_opt_var.save_to_file_system(dirpath=de_dir, + proc_size=proc_size, + proc_rank=proc_rank) + if proc_rank == 0: + # FileSystemSaver works well at rank 0. + return + # save Dynamic Embedding Parameters + de_var.save_to_file_system(dirpath=de_dir, + proc_size=proc_size, + proc_rank=proc_rank) + + def _maybe_save_restrict_policy_params(var, proc_size=1, proc_rank=0): + if var.restrict_policy is not None: + de_var = var.restrict_policy._restrict_var + _save_de_var(de_var, proc_size=proc_size, proc_rank=proc_rank) + def _traverse_emb_layers_and_save(proc_size=1, proc_rank=0): for var in model.variables: if not hasattr(var, "params"): @@ -106,26 +133,8 @@ def _traverse_emb_layers_and_save(proc_size=1, proc_rank=0): if not hasattr(var.params, "_created_in_class"): continue de_var = var.params - a2a_emb = de_var._created_in_class - if de_var._saveable_object_creator is not None: - if not isinstance(de_var.kv_creator.saver, de.FileSystemSaver): - # This function only serves FileSystemSaver. - continue - # save optimizer parameters of Dynamic Embedding - if include_optimizer is True: - de_opt_vars = a2a_emb.optimizer_vars.as_list() if hasattr( - a2a_emb.optimizer_vars, "as_list") else a2a_emb.optimizer_vars - for de_opt_var in de_opt_vars: - de_opt_var.save_to_file_system(dirpath=de_dir, - proc_size=proc_size, - proc_rank=proc_rank) - if proc_rank == 0: - # FileSystemSaver works well at rank 0. - continue - # save Dynamic Embedding Parameters - de_var.save_to_file_system(dirpath=de_dir, - proc_size=proc_size, - proc_rank=proc_rank) + _save_de_var(de_var, proc_size=proc_size, proc_rank=proc_rank) + _maybe_save_restrict_policy_params(var, proc_size=proc_size, proc_rank=proc_rank) if hvd is None: call_original_save_func() From 9d20bac8c368fb3a1accccbe4c4b4e224f4e03b3 Mon Sep 17 00:00:00 2001 From: Jatin Mandav Date: Thu, 27 Mar 2025 14:01:21 +0000 Subject: [PATCH 02/19] update --- .../movielens-1m-keras-with-horovod.py | 7 ++++--- .../movielens-1m-keras-with-horovod/start.sh | 5 +++-- .../dynamic_embedding/python/keras/models.py | 2 ++ 3 files changed, 9 insertions(+), 5 deletions(-) diff --git a/demo/dynamic_embedding/movielens-1m-keras-with-horovod/movielens-1m-keras-with-horovod.py b/demo/dynamic_embedding/movielens-1m-keras-with-horovod/movielens-1m-keras-with-horovod.py index 5eeaa307f..53ccd10ff 100644 --- a/demo/dynamic_embedding/movielens-1m-keras-with-horovod/movielens-1m-keras-with-horovod.py +++ b/demo/dynamic_embedding/movielens-1m-keras-with-horovod/movielens-1m-keras-with-horovod.py @@ -29,12 +29,13 @@ def has_horovod() -> bool: def config(): # callback calls hvd.rank() so we need to initialize horovod here hvd.init() + print("Size: ", hvd.size()) if has_horovod(): print("Horovod is enabled.") if hvd.rank() > 0: os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2' # Horovod: pin GPU to be used to process local rank (one GPU per process) - config_gpu(hvd.local_rank()) + # config_gpu(hvd.local_rank()) else: config_gpu() @@ -460,8 +461,8 @@ def call(self, features): def get_dataset(batch_size=1): ds = tfds.load("movielens/1m-ratings", split="train", - data_dir="~/dataset", - download=True) + # data_dir="~/dataset", + download=False) features = ds.map( lambda x: { "movie_id": diff --git a/demo/dynamic_embedding/movielens-1m-keras-with-horovod/start.sh b/demo/dynamic_embedding/movielens-1m-keras-with-horovod/start.sh index 34d2ed1d9..cefe5a5bb 100644 --- a/demo/dynamic_embedding/movielens-1m-keras-with-horovod/start.sh +++ b/demo/dynamic_embedding/movielens-1m-keras-with-horovod/start.sh @@ -1,6 +1,7 @@ #!/bin/bash rm -rf ./export_dir -gpu_num=$(nvidia-smi --query-gpu=name --format=csv,noheader | wc -l) +#gpu_num=$(nvidia-smi --query-gpu=name --format=csv,noheader | wc -l) +gpu_num=5 export gpu_num horovodrun -np $gpu_num python movielens-1m-keras-with-horovod.py --mode="train" --model_dir="./model_dir" --export_dir="./export_dir" \ - --steps_per_epoch=${1:-20000} --shuffle=${2:-True} \ No newline at end of file + --steps_per_epoch=${1:-10} --shuffle=${2:-False} \ No newline at end of file diff --git a/tensorflow_recommenders_addons/dynamic_embedding/python/keras/models.py b/tensorflow_recommenders_addons/dynamic_embedding/python/keras/models.py index 523e3e242..b5e3a402a 100644 --- a/tensorflow_recommenders_addons/dynamic_embedding/python/keras/models.py +++ b/tensorflow_recommenders_addons/dynamic_embedding/python/keras/models.py @@ -122,6 +122,8 @@ def _save_de_var(de_var, proc_size=1, proc_rank=0): proc_rank=proc_rank) def _maybe_save_restrict_policy_params(var, proc_size=1, proc_rank=0): + if not hasattr(var, "restrict_policy"): + return if var.restrict_policy is not None: de_var = var.restrict_policy._restrict_var _save_de_var(de_var, proc_size=proc_size, proc_rank=proc_rank) From 30319b0a662b74941a639d45c4051a8ee89a94af Mon Sep 17 00:00:00 2001 From: Jatin Mandav Date: Thu, 27 Mar 2025 14:32:50 +0000 Subject: [PATCH 03/19] udpate --- README.md | 2 -- .../movielens-1m-keras-with-horovod.py | 9 +++++++-- .../dynamic_embedding/python/keras/models.py | 10 +++++----- 3 files changed, 12 insertions(+), 9 deletions(-) diff --git a/README.md b/README.md index ece12d425..df1fd0bec 100644 --- a/README.md +++ b/README.md @@ -165,10 +165,8 @@ cd recommenders-addons # This script links project with TensorFlow dependency python configure.py - bazel build --enable_runfiles build_pip_pkg bazel-bin/build_pip_pkg artifacts - pip install artifacts/tensorflow_recommenders_addons-*.whl ``` #### GPU Support diff --git a/demo/dynamic_embedding/movielens-1m-keras-with-horovod/movielens-1m-keras-with-horovod.py b/demo/dynamic_embedding/movielens-1m-keras-with-horovod/movielens-1m-keras-with-horovod.py index 53ccd10ff..ff6ad8b85 100644 --- a/demo/dynamic_embedding/movielens-1m-keras-with-horovod/movielens-1m-keras-with-horovod.py +++ b/demo/dynamic_embedding/movielens-1m-keras-with-horovod/movielens-1m-keras-with-horovod.py @@ -23,7 +23,8 @@ def has_horovod() -> bool: - return 'OMPI_COMM_WORLD_RANK' in os.environ or 'PMI_RANK' in os.environ + #return 'OMPI_COMM_WORLD_RANK' in os.environ or 'PMI_RANK' in os.environ + return True def config(): @@ -275,6 +276,7 @@ def __init__(self, init_capacity=init_capacity, kv_creator=kv_creator_dense, short_file_name=True, + restrict_policy=de.TimestampRestrictPolicy, ) kv_creator_sparse = get_kv_creator(mpi_size, mpi_rank, init_capacity, @@ -290,6 +292,7 @@ def __init__(self, init_capacity=init_capacity, kv_creator=kv_creator_sparse, short_file_name=True, + restrict_policy=de.TimestampRestrictPolicy, ) self.dnn = tf.keras.layers.Dense( @@ -648,7 +651,7 @@ def train(): # horovod callback is used to broadcast the value generated by initializer of rank0. hvd_opt_init_callback = de.keras.callbacks.DEHvdBroadcastGlobalVariablesCallback( root_rank=0) - callbacks_list = [hvd_opt_init_callback, ckpt_callback] + callbacks_list = [hvd_opt_init_callback]#, ckpt_callback] else: callbacks_list = [ckpt_callback] @@ -664,6 +667,8 @@ def train(): steps_per_epoch=FLAGS.steps_per_epoch, verbose=1 if get_rank() == 0 else 0) + print(model.user_embedding.sparse_embedding_layer.params.restrict_policy) + export_to_savedmodel(model, FLAGS.model_dir) export_for_serving(model, FLAGS.export_dir) diff --git a/tensorflow_recommenders_addons/dynamic_embedding/python/keras/models.py b/tensorflow_recommenders_addons/dynamic_embedding/python/keras/models.py index b5e3a402a..99caa73d4 100644 --- a/tensorflow_recommenders_addons/dynamic_embedding/python/keras/models.py +++ b/tensorflow_recommenders_addons/dynamic_embedding/python/keras/models.py @@ -121,11 +121,11 @@ def _save_de_var(de_var, proc_size=1, proc_rank=0): proc_size=proc_size, proc_rank=proc_rank) - def _maybe_save_restrict_policy_params(var, proc_size=1, proc_rank=0): - if not hasattr(var, "restrict_policy"): + def _maybe_save_restrict_policy_params(de_var, proc_size=1, proc_rank=0): + if not hasattr(de_var, "restrict_policy"): return - if var.restrict_policy is not None: - de_var = var.restrict_policy._restrict_var + if de_var.restrict_policy is not None: + de_var = de_var.restrict_policy._restrict_var _save_de_var(de_var, proc_size=proc_size, proc_rank=proc_rank) def _traverse_emb_layers_and_save(proc_size=1, proc_rank=0): @@ -136,7 +136,7 @@ def _traverse_emb_layers_and_save(proc_size=1, proc_rank=0): continue de_var = var.params _save_de_var(de_var, proc_size=proc_size, proc_rank=proc_rank) - _maybe_save_restrict_policy_params(var, proc_size=proc_size, proc_rank=proc_rank) + _maybe_save_restrict_policy_params(de_var, proc_size=proc_size, proc_rank=proc_rank) if hvd is None: call_original_save_func() From 0d24b09ab5d91d24e39400596e056aed38a93655 Mon Sep 17 00:00:00 2001 From: Jatin Mandav Date: Tue, 1 Apr 2025 10:48:54 +0000 Subject: [PATCH 04/19] update logic --- .../movielens-1m-keras-with-horovod.py | 6 +-- .../dynamic_embedding/python/keras/models.py | 49 ++++++++++--------- 2 files changed, 27 insertions(+), 28 deletions(-) diff --git a/demo/dynamic_embedding/movielens-1m-keras-with-horovod/movielens-1m-keras-with-horovod.py b/demo/dynamic_embedding/movielens-1m-keras-with-horovod/movielens-1m-keras-with-horovod.py index ff6ad8b85..011fa0f91 100644 --- a/demo/dynamic_embedding/movielens-1m-keras-with-horovod/movielens-1m-keras-with-horovod.py +++ b/demo/dynamic_embedding/movielens-1m-keras-with-horovod/movielens-1m-keras-with-horovod.py @@ -19,7 +19,7 @@ import tensorflow_datasets as tfds import horovod.tensorflow as hvd # optimal performance -os.environ['TF_XLA_FLAGS'] = '--tf_xla_auto_jit=2 --tf_xla_cpu_global_jit' +# os.environ['TF_XLA_FLAGS'] = '--tf_xla_auto_jit=2 --tf_xla_cpu_global_jit' def has_horovod() -> bool: @@ -665,9 +665,7 @@ def train(): callbacks=callbacks_list, epochs=FLAGS.epochs, steps_per_epoch=FLAGS.steps_per_epoch, - verbose=1 if get_rank() == 0 else 0) - - print(model.user_embedding.sparse_embedding_layer.params.restrict_policy) + verbose=1)# if get_rank() == 0 else 0) export_to_savedmodel(model, FLAGS.model_dir) export_for_serving(model, FLAGS.export_dir) diff --git a/tensorflow_recommenders_addons/dynamic_embedding/python/keras/models.py b/tensorflow_recommenders_addons/dynamic_embedding/python/keras/models.py index 99caa73d4..a7ebe5393 100644 --- a/tensorflow_recommenders_addons/dynamic_embedding/python/keras/models.py +++ b/tensorflow_recommenders_addons/dynamic_embedding/python/keras/models.py @@ -99,34 +99,15 @@ def _check_saveable_and_redirect_new_de_dir(hvd_rank=0): if hasattr(de_var, 'saveable'): de_var.saveable._saver_config.save_path = de_dir - def _save_de_var(de_var, proc_size=1, proc_rank=0): - a2a_emb = de_var._created_in_class - if de_var._saveable_object_creator is not None: - if not isinstance(de_var.kv_creator.saver, de.FileSystemSaver): - # This function only serves FileSystemSaver. - return - # save optimizer parameters of Dynamic Embedding - if include_optimizer is True: - de_opt_vars = a2a_emb.optimizer_vars.as_list() if hasattr( - a2a_emb.optimizer_vars, "as_list") else a2a_emb.optimizer_vars - for de_opt_var in de_opt_vars: - de_opt_var.save_to_file_system(dirpath=de_dir, - proc_size=proc_size, - proc_rank=proc_rank) - if proc_rank == 0: - # FileSystemSaver works well at rank 0. - return - # save Dynamic Embedding Parameters - de_var.save_to_file_system(dirpath=de_dir, - proc_size=proc_size, - proc_rank=proc_rank) - def _maybe_save_restrict_policy_params(de_var, proc_size=1, proc_rank=0): if not hasattr(de_var, "restrict_policy"): return if de_var.restrict_policy is not None: + # Only save restrict policy var if policy created de_var = de_var.restrict_policy._restrict_var - _save_de_var(de_var, proc_size=proc_size, proc_rank=proc_rank) + de_var.save_to_file_system(dirpath=de_dir, + proc_size=proc_size, + proc_rank=proc_rank) def _traverse_emb_layers_and_save(proc_size=1, proc_rank=0): for var in model.variables: @@ -135,7 +116,27 @@ def _traverse_emb_layers_and_save(proc_size=1, proc_rank=0): if not hasattr(var.params, "_created_in_class"): continue de_var = var.params - _save_de_var(de_var, proc_size=proc_size, proc_rank=proc_rank) + a2a_emb = de_var._created_in_class + if de_var._saveable_object_creator is not None: + if not isinstance(de_var.kv_creator.saver, de.FileSystemSaver): + # This function only serves FileSystemSaver. + return + # save optimizer parameters of Dynamic Embedding + if include_optimizer is True: + de_opt_vars = a2a_emb.optimizer_vars.as_list() if hasattr( + a2a_emb.optimizer_vars, "as_list") else a2a_emb.optimizer_vars + for de_opt_var in de_opt_vars: + de_opt_var.save_to_file_system(dirpath=de_dir, + proc_size=proc_size, + proc_rank=proc_rank) + if proc_rank == 0: + # FileSystemSaver works well at rank 0. + return + # save Dynamic Embedding Parameters + de_var.save_to_file_system(dirpath=de_dir, + proc_size=proc_size, + proc_rank=proc_rank) + # Save restrict policy for each hvd.rank() _maybe_save_restrict_policy_params(de_var, proc_size=proc_size, proc_rank=proc_rank) if hvd is None: From 670eb05c79b7e5c8d8e0ffcb091bb0fa6ab01374 Mon Sep 17 00:00:00 2001 From: Jatin Mandav Date: Tue, 1 Apr 2025 12:19:54 +0000 Subject: [PATCH 05/19] cleanup --- README.md | 2 + .../movielens-1m-keras-with-horovod.py | 530 +++++++++--------- .../movielens-1m-keras-with-horovod/start.sh | 5 +- .../dynamic_embedding/python/keras/models.py | 4 +- 4 files changed, 269 insertions(+), 272 deletions(-) diff --git a/README.md b/README.md index df1fd0bec..ece12d425 100644 --- a/README.md +++ b/README.md @@ -165,8 +165,10 @@ cd recommenders-addons # This script links project with TensorFlow dependency python configure.py + bazel build --enable_runfiles build_pip_pkg bazel-bin/build_pip_pkg artifacts + pip install artifacts/tensorflow_recommenders_addons-*.whl ``` #### GPU Support diff --git a/demo/dynamic_embedding/movielens-1m-keras-with-horovod/movielens-1m-keras-with-horovod.py b/demo/dynamic_embedding/movielens-1m-keras-with-horovod/movielens-1m-keras-with-horovod.py index 011fa0f91..a8f7fde38 100644 --- a/demo/dynamic_embedding/movielens-1m-keras-with-horovod/movielens-1m-keras-with-horovod.py +++ b/demo/dynamic_embedding/movielens-1m-keras-with-horovod/movielens-1m-keras-with-horovod.py @@ -19,24 +19,22 @@ import tensorflow_datasets as tfds import horovod.tensorflow as hvd # optimal performance -# os.environ['TF_XLA_FLAGS'] = '--tf_xla_auto_jit=2 --tf_xla_cpu_global_jit' +os.environ['TF_XLA_FLAGS'] = '--tf_xla_auto_jit=2 --tf_xla_cpu_global_jit' def has_horovod() -> bool: - #return 'OMPI_COMM_WORLD_RANK' in os.environ or 'PMI_RANK' in os.environ - return True + return 'OMPI_COMM_WORLD_RANK' in os.environ or 'PMI_RANK' in os.environ def config(): # callback calls hvd.rank() so we need to initialize horovod here hvd.init() - print("Size: ", hvd.size()) if has_horovod(): print("Horovod is enabled.") if hvd.rank() > 0: os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2' # Horovod: pin GPU to be used to process local rank (one GPU per process) - # config_gpu(hvd.local_rank()) + config_gpu(hvd.local_rank()) else: config_gpu() @@ -73,105 +71,105 @@ def get_rank() -> int: FLAGS = flags.FLAGS input_spec = { - 'user_id': - tf.TensorSpec(shape=[ - None, - 1, - ], dtype=tf.int64, name='user_id'), - 'user_gender': - tf.TensorSpec(shape=[ - None, - 1, - ], dtype=tf.int64, name='user_gender'), - 'user_occupation_label': - tf.TensorSpec(shape=[ - None, - 1, - ], - dtype=tf.int64, - name='user_occupation_label'), - 'bucketized_user_age': - tf.TensorSpec(shape=[ - None, - 1, - ], - dtype=tf.int64, - name='bucketized_user_age'), - 'movie_id': - tf.TensorSpec(shape=[ - None, - 1, - ], dtype=tf.int64, name='movie_id'), - 'movie_genres': - tf.TensorSpec(shape=[ - None, - 1, - ], dtype=tf.int64, name='movie_genres'), - 'timestamp': - tf.TensorSpec(shape=[ - None, - 1, - ], dtype=tf.int64, name='timestamp') + 'user_id': + tf.TensorSpec(shape=[ + None, + 1, + ], dtype=tf.int64, name='user_id'), + 'user_gender': + tf.TensorSpec(shape=[ + None, + 1, + ], dtype=tf.int64, name='user_gender'), + 'user_occupation_label': + tf.TensorSpec(shape=[ + None, + 1, + ], + dtype=tf.int64, + name='user_occupation_label'), + 'bucketized_user_age': + tf.TensorSpec(shape=[ + None, + 1, + ], + dtype=tf.int64, + name='bucketized_user_age'), + 'movie_id': + tf.TensorSpec(shape=[ + None, + 1, + ], dtype=tf.int64, name='movie_id'), + 'movie_genres': + tf.TensorSpec(shape=[ + None, + 1, + ], dtype=tf.int64, name='movie_genres'), + 'timestamp': + tf.TensorSpec(shape=[ + None, + 1, + ], dtype=tf.int64, name='timestamp') } feature_info_spec = { - 'movie_id': { - 'code': 101, - 'dtype': tf.int64, - 'dim': 1, - 'ptype': 'sparse_cpu', - 'input_tensor': None, - 'pretreated_tensor': None - }, - 'movie_genres': { - 'code': 102, - 'dtype': tf.int64, - 'dim': 1, - 'ptype': 'normal_gpu', - 'input_tensor': None, - 'pretreated_tensor': None, - }, - 'user_id': { - 'code': 103, - 'dtype': tf.int64, - 'dim': 1, - 'ptype': 'sparse_cpu', - 'input_tensor': None, - 'pretreated_tensor': None, - }, - 'user_gender': { - 'code': 104, - 'dtype': tf.int64, - 'dim': 1, - 'ptype': 'normal_gpu', - 'input_tensor': None, - 'pretreated_tensor': None, - }, - 'user_occupation_label': { - 'code': 105, - 'dtype': tf.int64, - 'dim': 1, - 'ptype': 'normal_gpu', - 'input_tensor': None, - 'pretreated_tensor': None, - }, - 'bucketized_user_age': { - 'code': 106, - 'dtype': tf.int64, - 'dim': 1, - 'ptype': 'normal_gpu', - 'input_tensor': None, - 'pretreated_tensor': None, - 'boundaries': [i for i in range(0, 100, 10)], - }, - 'timestamp': { - 'code': 107, - 'dtype': tf.int64, - 'dim': 1, - 'ptype': 'normal_gpu', - 'input_tensor': None, - 'pretreated_tensor': None, - } + 'movie_id': { + 'code': 101, + 'dtype': tf.int64, + 'dim': 1, + 'ptype': 'sparse_cpu', + 'input_tensor': None, + 'pretreated_tensor': None + }, + 'movie_genres': { + 'code': 102, + 'dtype': tf.int64, + 'dim': 1, + 'ptype': 'normal_gpu', + 'input_tensor': None, + 'pretreated_tensor': None, + }, + 'user_id': { + 'code': 103, + 'dtype': tf.int64, + 'dim': 1, + 'ptype': 'sparse_cpu', + 'input_tensor': None, + 'pretreated_tensor': None, + }, + 'user_gender': { + 'code': 104, + 'dtype': tf.int64, + 'dim': 1, + 'ptype': 'normal_gpu', + 'input_tensor': None, + 'pretreated_tensor': None, + }, + 'user_occupation_label': { + 'code': 105, + 'dtype': tf.int64, + 'dim': 1, + 'ptype': 'normal_gpu', + 'input_tensor': None, + 'pretreated_tensor': None, + }, + 'bucketized_user_age': { + 'code': 106, + 'dtype': tf.int64, + 'dim': 1, + 'ptype': 'normal_gpu', + 'input_tensor': None, + 'pretreated_tensor': None, + 'boundaries': [i for i in range(0, 100, 10)], + }, + 'timestamp': { + 'code': 107, + 'dtype': tf.int64, + 'dim': 1, + 'ptype': 'normal_gpu', + 'input_tensor': None, + 'pretreated_tensor': None, + } } @@ -205,21 +203,21 @@ def embedding_inputs_concat(input_tensors, input_dims): def embedding_out_split(embedding_out_concat, input_split_dims): embedding_out = list() embedding_out.extend( - tf.split(embedding_out_concat, input_split_dims, - axis=1)) # (feature_combin_num, (batch, dim, emb_size)) + tf.split(embedding_out_concat, input_split_dims, + axis=1)) # (feature_combin_num, (batch, dim, emb_size)) assert (len(input_split_dims) == len(embedding_out)) return embedding_out class Bucketize(tf.keras.layers.Layer): - + def __init__(self, boundaries, **kwargs): self.boundaries = boundaries super(Bucketize, self).__init__(**kwargs) - + def call(self, x, **kwargs): return tf.raw_ops.Bucketize(input=x, boundaries=self.boundaries) - + def get_config(self,): config = {'boundaries': self.boundaries} base_config = super(Bucketize, self).get_config() @@ -240,9 +238,9 @@ def get_kv_creator(mpi_size: int, # so set the factor larger than max_capacity to avoid this case factor = mpi_size * 0.7 config = de.HkvHashTableConfig( - init_capacity=math.ceil(vocab_size / factor), - max_capacity=math.ceil(max_capacity / factor), - max_hbm_for_values=math.ceil(max_capacity * value_size * dim / factor)) + init_capacity=math.ceil(vocab_size / factor), + max_capacity=math.ceil(max_capacity / factor), + max_hbm_for_values=math.ceil(max_capacity * value_size * dim / factor)) return de.HkvHashTableCreator(config=config, saver=saver) else: # for CuckooHashTable case the init_capacity passed in by Embedding layer @@ -251,7 +249,7 @@ def get_kv_creator(mpi_size: int, class ChannelEmbeddingLayers(tf.keras.layers.Layer): - + def __init__(self, name='', dense_embedding_size=1, @@ -259,48 +257,46 @@ def __init__(self, embedding_initializer=tf.keras.initializers.Zeros(), mpi_size=1, mpi_rank=0): - + super(ChannelEmbeddingLayers, self).__init__() init_capacity = 4096000 kv_creator_dense = get_kv_creator(mpi_size, mpi_rank, init_capacity, tf.dtypes.float32.size, dense_embedding_size) - + self.dense_embedding_layer = de.keras.layers.HvdAllToAllEmbedding( - mpi_size=mpi_size, - embedding_size=dense_embedding_size, - key_dtype=tf.int64, - value_dtype=tf.float32, - initializer=embedding_initializer, - name=name + '_DenseUnifiedEmbeddingLayer', - init_capacity=init_capacity, - kv_creator=kv_creator_dense, - short_file_name=True, - restrict_policy=de.TimestampRestrictPolicy, + mpi_size=mpi_size, + embedding_size=dense_embedding_size, + key_dtype=tf.int64, + value_dtype=tf.float32, + initializer=embedding_initializer, + name=name + '_DenseUnifiedEmbeddingLayer', + init_capacity=init_capacity, + kv_creator=kv_creator_dense, + short_file_name=True, ) - + kv_creator_sparse = get_kv_creator(mpi_size, mpi_rank, init_capacity, tf.dtypes.float32.size, sparse_embedding_size) self.sparse_embedding_layer = de.keras.layers.HvdAllToAllEmbedding( - mpi_size=mpi_size, - embedding_size=sparse_embedding_size, - key_dtype=tf.int64, - value_dtype=tf.float32, - initializer=embedding_initializer, - name=name + '_SparseUnifiedEmbeddingLayer', - init_capacity=init_capacity, - kv_creator=kv_creator_sparse, - short_file_name=True, - restrict_policy=de.TimestampRestrictPolicy, + mpi_size=mpi_size, + embedding_size=sparse_embedding_size, + key_dtype=tf.int64, + value_dtype=tf.float32, + initializer=embedding_initializer, + name=name + '_SparseUnifiedEmbeddingLayer', + init_capacity=init_capacity, + kv_creator=kv_creator_sparse, + short_file_name=True, ) - + self.dnn = tf.keras.layers.Dense( - 128, - activation='relu', - kernel_initializer=tf.keras.initializers.RandomNormal(0.0, 0.1), - bias_initializer=tf.keras.initializers.RandomNormal(0.0, 0.1)) - + 128, + activation='relu', + kernel_initializer=tf.keras.initializers.RandomNormal(0.0, 0.1), + bias_initializer=tf.keras.initializers.RandomNormal(0.0, 0.1)) + def __call__(self, features_info): dense_inputs = [] dense_input_dims = [] @@ -337,20 +333,20 @@ def __call__(self, features_info): if input_is_sequence_feature[i] == True: # Deal with the embedding from vector features. embedding_vec = tf.math.reduce_mean( - embedding, axis=1, - keepdims=True) # (feature_combin_num, (batch, x, emb_size)) + embedding, axis=1, + keepdims=True) # (feature_combin_num, (batch, x, emb_size)) else: embedding_vec = embedding embedding_vec = tf.keras.layers.Flatten()(embedding_vec) embedding_outs.append(embedding_vec) # Final embedding result. embeddings_concat = tf.keras.layers.Concatenate(axis=1)(embedding_outs) - + return self.dnn(embeddings_concat) class DualChannelsDeepModel(tf.keras.Model): - + def __init__(self, user_embedding_size=1, movie_embedding_size=1, @@ -358,7 +354,7 @@ def __init__(self, is_training=True, mpi_size=1, mpi_rank=0): - + if is_training: de.enable_train_mode() if embedding_initializer is None: @@ -367,47 +363,47 @@ def __init__(self, de.enable_inference_mode() if embedding_initializer is None: embedding_initializer = tf.keras.initializers.Zeros() - + super(DualChannelsDeepModel, self).__init__() self.user_embedding_size = user_embedding_size self.movie_embedding_size = movie_embedding_size print(f"mpi_size {mpi_size}, mpi_rank {mpi_rank}") self.user_embedding = ChannelEmbeddingLayers( - name='user', - dense_embedding_size=user_embedding_size, - sparse_embedding_size=user_embedding_size * 2, - embedding_initializer=embedding_initializer, - mpi_size=mpi_size, - mpi_rank=mpi_rank) + name='user', + dense_embedding_size=user_embedding_size, + sparse_embedding_size=user_embedding_size * 2, + embedding_initializer=embedding_initializer, + mpi_size=mpi_size, + mpi_rank=mpi_rank) self.movie_embedding = ChannelEmbeddingLayers( - name='movie', - dense_embedding_size=movie_embedding_size, - sparse_embedding_size=movie_embedding_size * 2, - embedding_initializer=embedding_initializer, - mpi_size=mpi_size, - mpi_rank=mpi_rank) + name='movie', + dense_embedding_size=movie_embedding_size, + sparse_embedding_size=movie_embedding_size * 2, + embedding_initializer=embedding_initializer, + mpi_size=mpi_size, + mpi_rank=mpi_rank) self.dynamic_layer_norm = de.keras.layers.LayerNormalization() self.dnn1 = tf.keras.layers.Dense( - 64, - activation='relu', - kernel_initializer=tf.keras.initializers.RandomNormal(0.0, 0.1), - bias_initializer=tf.keras.initializers.RandomNormal(0.0, 0.1)) + 64, + activation='relu', + kernel_initializer=tf.keras.initializers.RandomNormal(0.0, 0.1), + bias_initializer=tf.keras.initializers.RandomNormal(0.0, 0.1)) self.dnn2 = tf.keras.layers.Dense( - 16, - activation='relu', - kernel_initializer=tf.keras.initializers.RandomNormal(0.0, 0.1), - bias_initializer=tf.keras.initializers.RandomNormal(0.0, 0.1)) + 16, + activation='relu', + kernel_initializer=tf.keras.initializers.RandomNormal(0.0, 0.1), + bias_initializer=tf.keras.initializers.RandomNormal(0.0, 0.1)) self.dnn3 = tf.keras.layers.Dense( - 5, - activation='softmax', - kernel_initializer=tf.keras.initializers.RandomNormal(0.0, 0.1), - bias_initializer=tf.keras.initializers.RandomNormal(0.0, 0.1)) + 5, + activation='softmax', + kernel_initializer=tf.keras.initializers.RandomNormal(0.0, 0.1), + bias_initializer=tf.keras.initializers.RandomNormal(0.0, 0.1)) self.bias_net = tf.keras.layers.Dense( - 5, - activation='softmax', - kernel_initializer=tf.keras.initializers.RandomNormal(0.0, 0.1), - bias_initializer=tf.keras.initializers.RandomNormal(0.0, 0.1)) - + 5, + activation='softmax', + kernel_initializer=tf.keras.initializers.RandomNormal(0.0, 0.1), + bias_initializer=tf.keras.initializers.RandomNormal(0.0, 0.1)) + @tf.function def call(self, features): # Construct input layers @@ -420,7 +416,7 @@ def call(self, features): fea_info['input_tensor'] = input_tensor if fea_info.__contains__('boundaries'): input_tensor = Bucketize( - boundaries=fea_info['boundaries'])(input_tensor) + boundaries=fea_info['boundaries'])(input_tensor) # To prepare for GPU table combined queries, use a prefix to distinguish different features in a table. if fea_info['ptype'] == 'user_occupation_label': input_tensor_prefix_code = int(fea_info['code']) << 48 @@ -431,30 +427,30 @@ def call(self, features): # xor operation can be replaced with addition operation to facilitate subsequent optimization of TRT and OpenVino. input_tensor = tf.add(input_tensor, input_tensor_prefix_code) fea_info['pretreated_tensor'] = input_tensor - + user_fea = ['user_id', 'user_gender', 'user_occupation_label'] user_fea = [i for i in features.keys() if i in user_fea] user_fea_info = { - key: value - for key, value in feature_info_spec.items() - if key in user_fea + key: value + for key, value in feature_info_spec.items() + if key in user_fea } movie_fea = ['movie_id', 'movie_genres', 'user_occupation_label'] movie_fea = [i for i in features.keys() if i in movie_fea] movie_fea_info = { - key: value - for key, value in feature_info_spec.items() - if key in movie_fea + key: value + for key, value in feature_info_spec.items() + if key in movie_fea } user_latent = self.user_embedding(user_fea_info) movie_latent = self.movie_embedding(movie_fea_info) latent = tf.concat([user_latent, movie_latent], axis=1) - + normalized_emb = self.dynamic_layer_norm(latent) x = self.dnn1(normalized_emb) x = self.dnn2(x) x = self.dnn3(x) - + bias = self.bias_net(normalized_emb) x = 0.2 * x + 0.8 * bias user_rating = tf.keras.layers.Lambda(lambda x: x, name='user_rating')(x) @@ -464,29 +460,29 @@ def call(self, features): def get_dataset(batch_size=1): ds = tfds.load("movielens/1m-ratings", split="train", - # data_dir="~/dataset", - download=False) + data_dir="~/dataset", + download=True) features = ds.map( - lambda x: { - "movie_id": - tf.strings.to_number(x["movie_id"], tf.int64), - "movie_genres": - tf.cast(x["movie_genres"][0], tf.int64), - "user_id": - tf.strings.to_number(x["user_id"], tf.int64), - "user_gender": - tf.cast(x["user_gender"], tf.int64), - "user_occupation_label": - tf.cast(x["user_occupation_label"], tf.int64), - "bucketized_user_age": - tf.cast(x["bucketized_user_age"], tf.int64), - "timestamp": - tf.cast(x["timestamp"] - 880000000, tf.int64), - }) - + lambda x: { + "movie_id": + tf.strings.to_number(x["movie_id"], tf.int64), + "movie_genres": + tf.cast(x["movie_genres"][0], tf.int64), + "user_id": + tf.strings.to_number(x["user_id"], tf.int64), + "user_gender": + tf.cast(x["user_gender"], tf.int64), + "user_occupation_label": + tf.cast(x["user_occupation_label"], tf.int64), + "bucketized_user_age": + tf.cast(x["bucketized_user_age"], tf.int64), + "timestamp": + tf.cast(x["timestamp"] - 880000000, tf.int64), + }) + ratings = ds.map(lambda x: { - "user_rating": - tf.one_hot(tf.cast(x["user_rating"] - 1, dtype=tf.int64), 5) + "user_rating": + tf.one_hot(tf.cast(x["user_rating"] - 1, dtype=tf.int64), 5) }) dataset = tf.data.Dataset.zip((features, ratings)) if FLAGS.shuffle: @@ -496,16 +492,16 @@ def get_dataset(batch_size=1): dataset = dataset.repeat(1).batch(batch_size).prefetch(tf.data.AUTOTUNE) # Only GPU:0 since TF is set to be visible to GPU:X dataset = dataset.apply( - tf.data.experimental.prefetch_to_device('GPU:0', buffer_size=2)) + tf.data.experimental.prefetch_to_device('GPU:0', buffer_size=2)) return dataset def export_to_savedmodel(model, savedmodel_dir): save_options = tf.saved_model.SaveOptions(namespace_whitelist=['TFRA']) - + if not os.path.exists(savedmodel_dir): os.mkdir(savedmodel_dir) - + ########################## What really happened ########################## # # Calling the TF save API for all ranks causes file conflicts, so KV files other than rank0 need to be saved by calling the underlying API separately. # if hvd.rank() == 0: @@ -530,7 +526,7 @@ def export_to_savedmodel(model, savedmodel_dir): # opt_de_var.save_to_file_system(dirpath=de_dir, # proc_size=hvd.size(), # proc_rank=hvd.rank()) - + # TFRA modify the Keras save function with a patch. # !!!! Run save_model function in all rank !!!! de.keras.models.save_model(model, @@ -560,12 +556,12 @@ def serve(save_model, *args, **kwargs): def export_for_serving(model, export_dir): save_options = tf.saved_model.SaveOptions(namespace_whitelist=['TFRA']) - + if not os.path.exists(export_dir): os.mkdir(export_dir) - + arg_specs, kwarg_specs = save_spec(model) - + ########################## What really happened ########################## # if hvd.rank() == 0: # # Remember to remove optimizer parameters when ready to serve. @@ -587,21 +583,21 @@ def export_for_serving(model, export_dir): # layer.params.save_to_file_system(dirpath=de_dir, # proc_size=hvd.size(), # proc_rank=hvd.rank()) - + # TFRA modify the Keras save function with a patch. # !!!! Run save_model function in all rank !!!! de.keras.models.save_model( - model, - export_dir, - overwrite=True, - include_optimizer=False, - options=save_options, - signatures={ - 'serving_default': - serve.get_concrete_function(model, *arg_specs, **kwarg_specs) - }, + model, + export_dir, + overwrite=True, + include_optimizer=False, + options=save_options, + signatures={ + 'serving_default': + serve.get_concrete_function(model, *arg_specs, **kwarg_specs) + }, ) - + if get_rank() == 0: # Modify the inference graph to a stand-alone version tf.keras.backend.clear_session() @@ -617,10 +613,10 @@ def export_for_serving(model, export_dir): options=save_options, experimental_skip_checkpoint=True, signatures={ - 'serving_default': - serve.get_concrete_function( - export_model, *arg_specs, - **kwarg_specs) + 'serving_default': + serve.get_concrete_function( + export_model, *arg_specs, + **kwarg_specs) }) @@ -631,42 +627,42 @@ def train(): True, get_cluster_size(), get_rank()) optimizer = Adam(1E-3) optimizer = de.DynamicEmbeddingOptimizer(optimizer, synchronous=True) - + auc = tf.keras.metrics.AUC(num_thresholds=1000) model.compile(optimizer=optimizer, loss=tf.keras.losses.MeanSquaredError(), metrics=[ - auc, + auc, ]) - + if os.path.exists(FLAGS.model_dir + '/variables'): model.load_weights(FLAGS.model_dir) - + tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir=FLAGS.model_dir) save_options = tf.saved_model.SaveOptions(namespace_whitelist=['TFRA']) ckpt_callback = de.keras.callbacks.ModelCheckpoint( - filepath=FLAGS.model_dir + '/weights_epoch{epoch:03d}_loss{loss:.4f}', - options=save_options) + filepath=FLAGS.model_dir + '/weights_epoch{epoch:03d}_loss{loss:.4f}', + options=save_options) if has_horovod(): # horovod callback is used to broadcast the value generated by initializer of rank0. hvd_opt_init_callback = de.keras.callbacks.DEHvdBroadcastGlobalVariablesCallback( - root_rank=0) - callbacks_list = [hvd_opt_init_callback]#, ckpt_callback] + root_rank=0) + callbacks_list = [hvd_opt_init_callback, ckpt_callback] else: callbacks_list = [ckpt_callback] - + # The log class callback only takes effect in rank0 for convenience if get_rank() == 0: callbacks_list.extend([tensorboard_callback]) # If there are callbacks such as evaluation metrics that call model calculations, take effect on all ranks. # callbacks_list.extend([my_auc_callback]) - + model.fit(dataset, callbacks=callbacks_list, epochs=FLAGS.epochs, steps_per_epoch=FLAGS.steps_per_epoch, - verbose=1)# if get_rank() == 0 else 0) - + verbose=1 if get_rank() == 0 else 0) + export_to_savedmodel(model, FLAGS.model_dir) export_for_serving(model, FLAGS.export_dir) @@ -678,19 +674,19 @@ def export(): export_model = DualChannelsDeepModel(FLAGS.embedding_size, FLAGS.embedding_size, tf.keras.initializers.RandomNormal( - 0.0, 0.5), + 0.0, 0.5), False, mpi_size=1, mpi_rank=0) save_options = tf.saved_model.SaveOptions(namespace_whitelist=['TFRA']) dummy_features = { - 'movie_id': tf.constant([0], dtype=tf.int64), - 'movie_genres': tf.constant([0], dtype=tf.int64), - 'user_id': tf.constant([0], dtype=tf.int64), - 'user_gender': tf.constant([0], dtype=tf.int64), - 'user_occupation_label': tf.constant([0], dtype=tf.int64), - 'bucketized_user_age': tf.constant([0], dtype=tf.int64), - 'timestamp': tf.constant([0], dtype=tf.int64) + 'movie_id': tf.constant([0], dtype=tf.int64), + 'movie_genres': tf.constant([0], dtype=tf.int64), + 'user_id': tf.constant([0], dtype=tf.int64), + 'user_gender': tf.constant([0], dtype=tf.int64), + 'user_occupation_label': tf.constant([0], dtype=tf.int64), + 'bucketized_user_age': tf.constant([0], dtype=tf.int64), + 'timestamp': tf.constant([0], dtype=tf.int64) } export_model(dummy_features) arg_specs, kwarg_specs = save_spec(export_model) @@ -702,36 +698,36 @@ def export(): options=save_options, experimental_skip_checkpoint=True, signatures={ - 'serving_default': - serve.get_concrete_function( - export_model, *arg_specs, - **kwarg_specs) + 'serving_default': + serve.get_concrete_function( + export_model, *arg_specs, + **kwarg_specs) }) def test(): de.enable_inference_mode() - + dataset = get_dataset(batch_size=FLAGS.test_batch) model = tf.keras.models.load_model(FLAGS.export_dir) - + def get_close_or_equal_cnt(model, features, ratings): preds = model(features) preds = tf.math.argmax(preds, axis=1) ratings = tf.math.argmax(ratings, axis=1) close_cnt = tf.reduce_sum( - tf.cast(tf.math.abs(preds - ratings) <= 1, dtype=tf.int32)) + tf.cast(tf.math.abs(preds - ratings) <= 1, dtype=tf.int32)) equal_cnt = tf.reduce_sum( - tf.cast(tf.math.abs(preds - ratings) == 0, dtype=tf.int32)) + tf.cast(tf.math.abs(preds - ratings) == 0, dtype=tf.int32)) return close_cnt, equal_cnt - + it = iter(dataset) for step in range(FLAGS.test_steps): features, ratings = it.get_next() close_cnt, equal_cnt = get_close_or_equal_cnt(model, features, ratings) print( - f'In batch prediction, step: {step}, {close_cnt}/{FLAGS.test_batch} are closely' - f' accurate, {equal_cnt}/{FLAGS.test_batch} are absolutely accurate.') + f'In batch prediction, step: {step}, {close_cnt}/{FLAGS.test_batch} are closely' + f' accurate, {equal_cnt}/{FLAGS.test_batch} are absolutely accurate.') def inference(): @@ -739,30 +735,30 @@ def inference(): model = tf.keras.models.load_model(FLAGS.export_dir) print(f"model signature keys: {model.signatures.keys()} {model.signatures}") inference_func = model.signatures['serving_default'] - + dataset = get_dataset(batch_size=FLAGS.test_batch) it = iter(dataset) - + def get_close_or_equal_cnt(preds, ratings): preds = tf.math.argmax(preds['user_rating'], axis=1) ratings = tf.math.argmax(ratings['user_rating'], axis=1) close_cnt = tf.reduce_sum( - tf.cast(tf.math.abs(preds - ratings) <= 1, dtype=tf.int32)) + tf.cast(tf.math.abs(preds - ratings) <= 1, dtype=tf.int32)) equal_cnt = tf.reduce_sum( - tf.cast(tf.math.abs(preds - ratings) == 0, dtype=tf.int32)) + tf.cast(tf.math.abs(preds - ratings) == 0, dtype=tf.int32)) return close_cnt, equal_cnt - + for step in range(FLAGS.test_steps): features, ratings = next(it) ratings = ratings['user_rating'] outputs = inference_func(**features) preds = outputs['user_rating'] - + close_cnt, equal_cnt = get_close_or_equal_cnt(preds, ratings) - + print( - f'In batch prediction, step: {step}, {close_cnt}/{FLAGS.test_batch} are closely' - f' accurate, {equal_cnt}/{FLAGS.test_batch} are absolutely accurate.') + f'In batch prediction, step: {step}, {close_cnt}/{FLAGS.test_batch} are closely' + f' accurate, {equal_cnt}/{FLAGS.test_batch} are absolutely accurate.') def main(argv): diff --git a/demo/dynamic_embedding/movielens-1m-keras-with-horovod/start.sh b/demo/dynamic_embedding/movielens-1m-keras-with-horovod/start.sh index cefe5a5bb..34d2ed1d9 100644 --- a/demo/dynamic_embedding/movielens-1m-keras-with-horovod/start.sh +++ b/demo/dynamic_embedding/movielens-1m-keras-with-horovod/start.sh @@ -1,7 +1,6 @@ #!/bin/bash rm -rf ./export_dir -#gpu_num=$(nvidia-smi --query-gpu=name --format=csv,noheader | wc -l) -gpu_num=5 +gpu_num=$(nvidia-smi --query-gpu=name --format=csv,noheader | wc -l) export gpu_num horovodrun -np $gpu_num python movielens-1m-keras-with-horovod.py --mode="train" --model_dir="./model_dir" --export_dir="./export_dir" \ - --steps_per_epoch=${1:-10} --shuffle=${2:-False} \ No newline at end of file + --steps_per_epoch=${1:-20000} --shuffle=${2:-True} \ No newline at end of file diff --git a/tensorflow_recommenders_addons/dynamic_embedding/python/keras/models.py b/tensorflow_recommenders_addons/dynamic_embedding/python/keras/models.py index a7ebe5393..2eed9ad43 100644 --- a/tensorflow_recommenders_addons/dynamic_embedding/python/keras/models.py +++ b/tensorflow_recommenders_addons/dynamic_embedding/python/keras/models.py @@ -120,7 +120,7 @@ def _traverse_emb_layers_and_save(proc_size=1, proc_rank=0): if de_var._saveable_object_creator is not None: if not isinstance(de_var.kv_creator.saver, de.FileSystemSaver): # This function only serves FileSystemSaver. - return + continue # save optimizer parameters of Dynamic Embedding if include_optimizer is True: de_opt_vars = a2a_emb.optimizer_vars.as_list() if hasattr( @@ -131,7 +131,7 @@ def _traverse_emb_layers_and_save(proc_size=1, proc_rank=0): proc_rank=proc_rank) if proc_rank == 0: # FileSystemSaver works well at rank 0. - return + continue # save Dynamic Embedding Parameters de_var.save_to_file_system(dirpath=de_dir, proc_size=proc_size, From 6885b3b276f7059e1d4aa677f3a4c3705767e490 Mon Sep 17 00:00:00 2001 From: Jatin Mandav Date: Tue, 1 Apr 2025 12:24:28 +0000 Subject: [PATCH 06/19] lint --- .../movielens-1m-keras-with-horovod.py | 512 +++++++++--------- 1 file changed, 256 insertions(+), 256 deletions(-) diff --git a/demo/dynamic_embedding/movielens-1m-keras-with-horovod/movielens-1m-keras-with-horovod.py b/demo/dynamic_embedding/movielens-1m-keras-with-horovod/movielens-1m-keras-with-horovod.py index a8f7fde38..5eeaa307f 100644 --- a/demo/dynamic_embedding/movielens-1m-keras-with-horovod/movielens-1m-keras-with-horovod.py +++ b/demo/dynamic_embedding/movielens-1m-keras-with-horovod/movielens-1m-keras-with-horovod.py @@ -71,105 +71,105 @@ def get_rank() -> int: FLAGS = flags.FLAGS input_spec = { - 'user_id': - tf.TensorSpec(shape=[ - None, - 1, - ], dtype=tf.int64, name='user_id'), - 'user_gender': - tf.TensorSpec(shape=[ - None, - 1, - ], dtype=tf.int64, name='user_gender'), - 'user_occupation_label': - tf.TensorSpec(shape=[ - None, - 1, - ], - dtype=tf.int64, - name='user_occupation_label'), - 'bucketized_user_age': - tf.TensorSpec(shape=[ - None, - 1, - ], - dtype=tf.int64, - name='bucketized_user_age'), - 'movie_id': - tf.TensorSpec(shape=[ - None, - 1, - ], dtype=tf.int64, name='movie_id'), - 'movie_genres': - tf.TensorSpec(shape=[ - None, - 1, - ], dtype=tf.int64, name='movie_genres'), - 'timestamp': - tf.TensorSpec(shape=[ - None, - 1, - ], dtype=tf.int64, name='timestamp') + 'user_id': + tf.TensorSpec(shape=[ + None, + 1, + ], dtype=tf.int64, name='user_id'), + 'user_gender': + tf.TensorSpec(shape=[ + None, + 1, + ], dtype=tf.int64, name='user_gender'), + 'user_occupation_label': + tf.TensorSpec(shape=[ + None, + 1, + ], + dtype=tf.int64, + name='user_occupation_label'), + 'bucketized_user_age': + tf.TensorSpec(shape=[ + None, + 1, + ], + dtype=tf.int64, + name='bucketized_user_age'), + 'movie_id': + tf.TensorSpec(shape=[ + None, + 1, + ], dtype=tf.int64, name='movie_id'), + 'movie_genres': + tf.TensorSpec(shape=[ + None, + 1, + ], dtype=tf.int64, name='movie_genres'), + 'timestamp': + tf.TensorSpec(shape=[ + None, + 1, + ], dtype=tf.int64, name='timestamp') } feature_info_spec = { - 'movie_id': { - 'code': 101, - 'dtype': tf.int64, - 'dim': 1, - 'ptype': 'sparse_cpu', - 'input_tensor': None, - 'pretreated_tensor': None - }, - 'movie_genres': { - 'code': 102, - 'dtype': tf.int64, - 'dim': 1, - 'ptype': 'normal_gpu', - 'input_tensor': None, - 'pretreated_tensor': None, - }, - 'user_id': { - 'code': 103, - 'dtype': tf.int64, - 'dim': 1, - 'ptype': 'sparse_cpu', - 'input_tensor': None, - 'pretreated_tensor': None, - }, - 'user_gender': { - 'code': 104, - 'dtype': tf.int64, - 'dim': 1, - 'ptype': 'normal_gpu', - 'input_tensor': None, - 'pretreated_tensor': None, - }, - 'user_occupation_label': { - 'code': 105, - 'dtype': tf.int64, - 'dim': 1, - 'ptype': 'normal_gpu', - 'input_tensor': None, - 'pretreated_tensor': None, - }, - 'bucketized_user_age': { - 'code': 106, - 'dtype': tf.int64, - 'dim': 1, - 'ptype': 'normal_gpu', - 'input_tensor': None, - 'pretreated_tensor': None, - 'boundaries': [i for i in range(0, 100, 10)], - }, - 'timestamp': { - 'code': 107, - 'dtype': tf.int64, - 'dim': 1, - 'ptype': 'normal_gpu', - 'input_tensor': None, - 'pretreated_tensor': None, - } + 'movie_id': { + 'code': 101, + 'dtype': tf.int64, + 'dim': 1, + 'ptype': 'sparse_cpu', + 'input_tensor': None, + 'pretreated_tensor': None + }, + 'movie_genres': { + 'code': 102, + 'dtype': tf.int64, + 'dim': 1, + 'ptype': 'normal_gpu', + 'input_tensor': None, + 'pretreated_tensor': None, + }, + 'user_id': { + 'code': 103, + 'dtype': tf.int64, + 'dim': 1, + 'ptype': 'sparse_cpu', + 'input_tensor': None, + 'pretreated_tensor': None, + }, + 'user_gender': { + 'code': 104, + 'dtype': tf.int64, + 'dim': 1, + 'ptype': 'normal_gpu', + 'input_tensor': None, + 'pretreated_tensor': None, + }, + 'user_occupation_label': { + 'code': 105, + 'dtype': tf.int64, + 'dim': 1, + 'ptype': 'normal_gpu', + 'input_tensor': None, + 'pretreated_tensor': None, + }, + 'bucketized_user_age': { + 'code': 106, + 'dtype': tf.int64, + 'dim': 1, + 'ptype': 'normal_gpu', + 'input_tensor': None, + 'pretreated_tensor': None, + 'boundaries': [i for i in range(0, 100, 10)], + }, + 'timestamp': { + 'code': 107, + 'dtype': tf.int64, + 'dim': 1, + 'ptype': 'normal_gpu', + 'input_tensor': None, + 'pretreated_tensor': None, + } } @@ -203,21 +203,21 @@ def embedding_inputs_concat(input_tensors, input_dims): def embedding_out_split(embedding_out_concat, input_split_dims): embedding_out = list() embedding_out.extend( - tf.split(embedding_out_concat, input_split_dims, - axis=1)) # (feature_combin_num, (batch, dim, emb_size)) + tf.split(embedding_out_concat, input_split_dims, + axis=1)) # (feature_combin_num, (batch, dim, emb_size)) assert (len(input_split_dims) == len(embedding_out)) return embedding_out class Bucketize(tf.keras.layers.Layer): - + def __init__(self, boundaries, **kwargs): self.boundaries = boundaries super(Bucketize, self).__init__(**kwargs) - + def call(self, x, **kwargs): return tf.raw_ops.Bucketize(input=x, boundaries=self.boundaries) - + def get_config(self,): config = {'boundaries': self.boundaries} base_config = super(Bucketize, self).get_config() @@ -238,9 +238,9 @@ def get_kv_creator(mpi_size: int, # so set the factor larger than max_capacity to avoid this case factor = mpi_size * 0.7 config = de.HkvHashTableConfig( - init_capacity=math.ceil(vocab_size / factor), - max_capacity=math.ceil(max_capacity / factor), - max_hbm_for_values=math.ceil(max_capacity * value_size * dim / factor)) + init_capacity=math.ceil(vocab_size / factor), + max_capacity=math.ceil(max_capacity / factor), + max_hbm_for_values=math.ceil(max_capacity * value_size * dim / factor)) return de.HkvHashTableCreator(config=config, saver=saver) else: # for CuckooHashTable case the init_capacity passed in by Embedding layer @@ -249,7 +249,7 @@ def get_kv_creator(mpi_size: int, class ChannelEmbeddingLayers(tf.keras.layers.Layer): - + def __init__(self, name='', dense_embedding_size=1, @@ -257,46 +257,46 @@ def __init__(self, embedding_initializer=tf.keras.initializers.Zeros(), mpi_size=1, mpi_rank=0): - + super(ChannelEmbeddingLayers, self).__init__() init_capacity = 4096000 kv_creator_dense = get_kv_creator(mpi_size, mpi_rank, init_capacity, tf.dtypes.float32.size, dense_embedding_size) - + self.dense_embedding_layer = de.keras.layers.HvdAllToAllEmbedding( - mpi_size=mpi_size, - embedding_size=dense_embedding_size, - key_dtype=tf.int64, - value_dtype=tf.float32, - initializer=embedding_initializer, - name=name + '_DenseUnifiedEmbeddingLayer', - init_capacity=init_capacity, - kv_creator=kv_creator_dense, - short_file_name=True, + mpi_size=mpi_size, + embedding_size=dense_embedding_size, + key_dtype=tf.int64, + value_dtype=tf.float32, + initializer=embedding_initializer, + name=name + '_DenseUnifiedEmbeddingLayer', + init_capacity=init_capacity, + kv_creator=kv_creator_dense, + short_file_name=True, ) - + kv_creator_sparse = get_kv_creator(mpi_size, mpi_rank, init_capacity, tf.dtypes.float32.size, sparse_embedding_size) self.sparse_embedding_layer = de.keras.layers.HvdAllToAllEmbedding( - mpi_size=mpi_size, - embedding_size=sparse_embedding_size, - key_dtype=tf.int64, - value_dtype=tf.float32, - initializer=embedding_initializer, - name=name + '_SparseUnifiedEmbeddingLayer', - init_capacity=init_capacity, - kv_creator=kv_creator_sparse, - short_file_name=True, + mpi_size=mpi_size, + embedding_size=sparse_embedding_size, + key_dtype=tf.int64, + value_dtype=tf.float32, + initializer=embedding_initializer, + name=name + '_SparseUnifiedEmbeddingLayer', + init_capacity=init_capacity, + kv_creator=kv_creator_sparse, + short_file_name=True, ) - + self.dnn = tf.keras.layers.Dense( - 128, - activation='relu', - kernel_initializer=tf.keras.initializers.RandomNormal(0.0, 0.1), - bias_initializer=tf.keras.initializers.RandomNormal(0.0, 0.1)) - + 128, + activation='relu', + kernel_initializer=tf.keras.initializers.RandomNormal(0.0, 0.1), + bias_initializer=tf.keras.initializers.RandomNormal(0.0, 0.1)) + def __call__(self, features_info): dense_inputs = [] dense_input_dims = [] @@ -333,20 +333,20 @@ def __call__(self, features_info): if input_is_sequence_feature[i] == True: # Deal with the embedding from vector features. embedding_vec = tf.math.reduce_mean( - embedding, axis=1, - keepdims=True) # (feature_combin_num, (batch, x, emb_size)) + embedding, axis=1, + keepdims=True) # (feature_combin_num, (batch, x, emb_size)) else: embedding_vec = embedding embedding_vec = tf.keras.layers.Flatten()(embedding_vec) embedding_outs.append(embedding_vec) # Final embedding result. embeddings_concat = tf.keras.layers.Concatenate(axis=1)(embedding_outs) - + return self.dnn(embeddings_concat) class DualChannelsDeepModel(tf.keras.Model): - + def __init__(self, user_embedding_size=1, movie_embedding_size=1, @@ -354,7 +354,7 @@ def __init__(self, is_training=True, mpi_size=1, mpi_rank=0): - + if is_training: de.enable_train_mode() if embedding_initializer is None: @@ -363,47 +363,47 @@ def __init__(self, de.enable_inference_mode() if embedding_initializer is None: embedding_initializer = tf.keras.initializers.Zeros() - + super(DualChannelsDeepModel, self).__init__() self.user_embedding_size = user_embedding_size self.movie_embedding_size = movie_embedding_size print(f"mpi_size {mpi_size}, mpi_rank {mpi_rank}") self.user_embedding = ChannelEmbeddingLayers( - name='user', - dense_embedding_size=user_embedding_size, - sparse_embedding_size=user_embedding_size * 2, - embedding_initializer=embedding_initializer, - mpi_size=mpi_size, - mpi_rank=mpi_rank) + name='user', + dense_embedding_size=user_embedding_size, + sparse_embedding_size=user_embedding_size * 2, + embedding_initializer=embedding_initializer, + mpi_size=mpi_size, + mpi_rank=mpi_rank) self.movie_embedding = ChannelEmbeddingLayers( - name='movie', - dense_embedding_size=movie_embedding_size, - sparse_embedding_size=movie_embedding_size * 2, - embedding_initializer=embedding_initializer, - mpi_size=mpi_size, - mpi_rank=mpi_rank) + name='movie', + dense_embedding_size=movie_embedding_size, + sparse_embedding_size=movie_embedding_size * 2, + embedding_initializer=embedding_initializer, + mpi_size=mpi_size, + mpi_rank=mpi_rank) self.dynamic_layer_norm = de.keras.layers.LayerNormalization() self.dnn1 = tf.keras.layers.Dense( - 64, - activation='relu', - kernel_initializer=tf.keras.initializers.RandomNormal(0.0, 0.1), - bias_initializer=tf.keras.initializers.RandomNormal(0.0, 0.1)) + 64, + activation='relu', + kernel_initializer=tf.keras.initializers.RandomNormal(0.0, 0.1), + bias_initializer=tf.keras.initializers.RandomNormal(0.0, 0.1)) self.dnn2 = tf.keras.layers.Dense( - 16, - activation='relu', - kernel_initializer=tf.keras.initializers.RandomNormal(0.0, 0.1), - bias_initializer=tf.keras.initializers.RandomNormal(0.0, 0.1)) + 16, + activation='relu', + kernel_initializer=tf.keras.initializers.RandomNormal(0.0, 0.1), + bias_initializer=tf.keras.initializers.RandomNormal(0.0, 0.1)) self.dnn3 = tf.keras.layers.Dense( - 5, - activation='softmax', - kernel_initializer=tf.keras.initializers.RandomNormal(0.0, 0.1), - bias_initializer=tf.keras.initializers.RandomNormal(0.0, 0.1)) + 5, + activation='softmax', + kernel_initializer=tf.keras.initializers.RandomNormal(0.0, 0.1), + bias_initializer=tf.keras.initializers.RandomNormal(0.0, 0.1)) self.bias_net = tf.keras.layers.Dense( - 5, - activation='softmax', - kernel_initializer=tf.keras.initializers.RandomNormal(0.0, 0.1), - bias_initializer=tf.keras.initializers.RandomNormal(0.0, 0.1)) - + 5, + activation='softmax', + kernel_initializer=tf.keras.initializers.RandomNormal(0.0, 0.1), + bias_initializer=tf.keras.initializers.RandomNormal(0.0, 0.1)) + @tf.function def call(self, features): # Construct input layers @@ -416,7 +416,7 @@ def call(self, features): fea_info['input_tensor'] = input_tensor if fea_info.__contains__('boundaries'): input_tensor = Bucketize( - boundaries=fea_info['boundaries'])(input_tensor) + boundaries=fea_info['boundaries'])(input_tensor) # To prepare for GPU table combined queries, use a prefix to distinguish different features in a table. if fea_info['ptype'] == 'user_occupation_label': input_tensor_prefix_code = int(fea_info['code']) << 48 @@ -427,30 +427,30 @@ def call(self, features): # xor operation can be replaced with addition operation to facilitate subsequent optimization of TRT and OpenVino. input_tensor = tf.add(input_tensor, input_tensor_prefix_code) fea_info['pretreated_tensor'] = input_tensor - + user_fea = ['user_id', 'user_gender', 'user_occupation_label'] user_fea = [i for i in features.keys() if i in user_fea] user_fea_info = { - key: value - for key, value in feature_info_spec.items() - if key in user_fea + key: value + for key, value in feature_info_spec.items() + if key in user_fea } movie_fea = ['movie_id', 'movie_genres', 'user_occupation_label'] movie_fea = [i for i in features.keys() if i in movie_fea] movie_fea_info = { - key: value - for key, value in feature_info_spec.items() - if key in movie_fea + key: value + for key, value in feature_info_spec.items() + if key in movie_fea } user_latent = self.user_embedding(user_fea_info) movie_latent = self.movie_embedding(movie_fea_info) latent = tf.concat([user_latent, movie_latent], axis=1) - + normalized_emb = self.dynamic_layer_norm(latent) x = self.dnn1(normalized_emb) x = self.dnn2(x) x = self.dnn3(x) - + bias = self.bias_net(normalized_emb) x = 0.2 * x + 0.8 * bias user_rating = tf.keras.layers.Lambda(lambda x: x, name='user_rating')(x) @@ -463,26 +463,26 @@ def get_dataset(batch_size=1): data_dir="~/dataset", download=True) features = ds.map( - lambda x: { - "movie_id": - tf.strings.to_number(x["movie_id"], tf.int64), - "movie_genres": - tf.cast(x["movie_genres"][0], tf.int64), - "user_id": - tf.strings.to_number(x["user_id"], tf.int64), - "user_gender": - tf.cast(x["user_gender"], tf.int64), - "user_occupation_label": - tf.cast(x["user_occupation_label"], tf.int64), - "bucketized_user_age": - tf.cast(x["bucketized_user_age"], tf.int64), - "timestamp": - tf.cast(x["timestamp"] - 880000000, tf.int64), - }) - + lambda x: { + "movie_id": + tf.strings.to_number(x["movie_id"], tf.int64), + "movie_genres": + tf.cast(x["movie_genres"][0], tf.int64), + "user_id": + tf.strings.to_number(x["user_id"], tf.int64), + "user_gender": + tf.cast(x["user_gender"], tf.int64), + "user_occupation_label": + tf.cast(x["user_occupation_label"], tf.int64), + "bucketized_user_age": + tf.cast(x["bucketized_user_age"], tf.int64), + "timestamp": + tf.cast(x["timestamp"] - 880000000, tf.int64), + }) + ratings = ds.map(lambda x: { - "user_rating": - tf.one_hot(tf.cast(x["user_rating"] - 1, dtype=tf.int64), 5) + "user_rating": + tf.one_hot(tf.cast(x["user_rating"] - 1, dtype=tf.int64), 5) }) dataset = tf.data.Dataset.zip((features, ratings)) if FLAGS.shuffle: @@ -492,16 +492,16 @@ def get_dataset(batch_size=1): dataset = dataset.repeat(1).batch(batch_size).prefetch(tf.data.AUTOTUNE) # Only GPU:0 since TF is set to be visible to GPU:X dataset = dataset.apply( - tf.data.experimental.prefetch_to_device('GPU:0', buffer_size=2)) + tf.data.experimental.prefetch_to_device('GPU:0', buffer_size=2)) return dataset def export_to_savedmodel(model, savedmodel_dir): save_options = tf.saved_model.SaveOptions(namespace_whitelist=['TFRA']) - + if not os.path.exists(savedmodel_dir): os.mkdir(savedmodel_dir) - + ########################## What really happened ########################## # # Calling the TF save API for all ranks causes file conflicts, so KV files other than rank0 need to be saved by calling the underlying API separately. # if hvd.rank() == 0: @@ -526,7 +526,7 @@ def export_to_savedmodel(model, savedmodel_dir): # opt_de_var.save_to_file_system(dirpath=de_dir, # proc_size=hvd.size(), # proc_rank=hvd.rank()) - + # TFRA modify the Keras save function with a patch. # !!!! Run save_model function in all rank !!!! de.keras.models.save_model(model, @@ -556,12 +556,12 @@ def serve(save_model, *args, **kwargs): def export_for_serving(model, export_dir): save_options = tf.saved_model.SaveOptions(namespace_whitelist=['TFRA']) - + if not os.path.exists(export_dir): os.mkdir(export_dir) - + arg_specs, kwarg_specs = save_spec(model) - + ########################## What really happened ########################## # if hvd.rank() == 0: # # Remember to remove optimizer parameters when ready to serve. @@ -583,21 +583,21 @@ def export_for_serving(model, export_dir): # layer.params.save_to_file_system(dirpath=de_dir, # proc_size=hvd.size(), # proc_rank=hvd.rank()) - + # TFRA modify the Keras save function with a patch. # !!!! Run save_model function in all rank !!!! de.keras.models.save_model( - model, - export_dir, - overwrite=True, - include_optimizer=False, - options=save_options, - signatures={ - 'serving_default': - serve.get_concrete_function(model, *arg_specs, **kwarg_specs) - }, + model, + export_dir, + overwrite=True, + include_optimizer=False, + options=save_options, + signatures={ + 'serving_default': + serve.get_concrete_function(model, *arg_specs, **kwarg_specs) + }, ) - + if get_rank() == 0: # Modify the inference graph to a stand-alone version tf.keras.backend.clear_session() @@ -613,10 +613,10 @@ def export_for_serving(model, export_dir): options=save_options, experimental_skip_checkpoint=True, signatures={ - 'serving_default': - serve.get_concrete_function( - export_model, *arg_specs, - **kwarg_specs) + 'serving_default': + serve.get_concrete_function( + export_model, *arg_specs, + **kwarg_specs) }) @@ -627,42 +627,42 @@ def train(): True, get_cluster_size(), get_rank()) optimizer = Adam(1E-3) optimizer = de.DynamicEmbeddingOptimizer(optimizer, synchronous=True) - + auc = tf.keras.metrics.AUC(num_thresholds=1000) model.compile(optimizer=optimizer, loss=tf.keras.losses.MeanSquaredError(), metrics=[ - auc, + auc, ]) - + if os.path.exists(FLAGS.model_dir + '/variables'): model.load_weights(FLAGS.model_dir) - + tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir=FLAGS.model_dir) save_options = tf.saved_model.SaveOptions(namespace_whitelist=['TFRA']) ckpt_callback = de.keras.callbacks.ModelCheckpoint( - filepath=FLAGS.model_dir + '/weights_epoch{epoch:03d}_loss{loss:.4f}', - options=save_options) + filepath=FLAGS.model_dir + '/weights_epoch{epoch:03d}_loss{loss:.4f}', + options=save_options) if has_horovod(): # horovod callback is used to broadcast the value generated by initializer of rank0. hvd_opt_init_callback = de.keras.callbacks.DEHvdBroadcastGlobalVariablesCallback( - root_rank=0) + root_rank=0) callbacks_list = [hvd_opt_init_callback, ckpt_callback] else: callbacks_list = [ckpt_callback] - + # The log class callback only takes effect in rank0 for convenience if get_rank() == 0: callbacks_list.extend([tensorboard_callback]) # If there are callbacks such as evaluation metrics that call model calculations, take effect on all ranks. # callbacks_list.extend([my_auc_callback]) - + model.fit(dataset, callbacks=callbacks_list, epochs=FLAGS.epochs, steps_per_epoch=FLAGS.steps_per_epoch, verbose=1 if get_rank() == 0 else 0) - + export_to_savedmodel(model, FLAGS.model_dir) export_for_serving(model, FLAGS.export_dir) @@ -674,19 +674,19 @@ def export(): export_model = DualChannelsDeepModel(FLAGS.embedding_size, FLAGS.embedding_size, tf.keras.initializers.RandomNormal( - 0.0, 0.5), + 0.0, 0.5), False, mpi_size=1, mpi_rank=0) save_options = tf.saved_model.SaveOptions(namespace_whitelist=['TFRA']) dummy_features = { - 'movie_id': tf.constant([0], dtype=tf.int64), - 'movie_genres': tf.constant([0], dtype=tf.int64), - 'user_id': tf.constant([0], dtype=tf.int64), - 'user_gender': tf.constant([0], dtype=tf.int64), - 'user_occupation_label': tf.constant([0], dtype=tf.int64), - 'bucketized_user_age': tf.constant([0], dtype=tf.int64), - 'timestamp': tf.constant([0], dtype=tf.int64) + 'movie_id': tf.constant([0], dtype=tf.int64), + 'movie_genres': tf.constant([0], dtype=tf.int64), + 'user_id': tf.constant([0], dtype=tf.int64), + 'user_gender': tf.constant([0], dtype=tf.int64), + 'user_occupation_label': tf.constant([0], dtype=tf.int64), + 'bucketized_user_age': tf.constant([0], dtype=tf.int64), + 'timestamp': tf.constant([0], dtype=tf.int64) } export_model(dummy_features) arg_specs, kwarg_specs = save_spec(export_model) @@ -698,36 +698,36 @@ def export(): options=save_options, experimental_skip_checkpoint=True, signatures={ - 'serving_default': - serve.get_concrete_function( - export_model, *arg_specs, - **kwarg_specs) + 'serving_default': + serve.get_concrete_function( + export_model, *arg_specs, + **kwarg_specs) }) def test(): de.enable_inference_mode() - + dataset = get_dataset(batch_size=FLAGS.test_batch) model = tf.keras.models.load_model(FLAGS.export_dir) - + def get_close_or_equal_cnt(model, features, ratings): preds = model(features) preds = tf.math.argmax(preds, axis=1) ratings = tf.math.argmax(ratings, axis=1) close_cnt = tf.reduce_sum( - tf.cast(tf.math.abs(preds - ratings) <= 1, dtype=tf.int32)) + tf.cast(tf.math.abs(preds - ratings) <= 1, dtype=tf.int32)) equal_cnt = tf.reduce_sum( - tf.cast(tf.math.abs(preds - ratings) == 0, dtype=tf.int32)) + tf.cast(tf.math.abs(preds - ratings) == 0, dtype=tf.int32)) return close_cnt, equal_cnt - + it = iter(dataset) for step in range(FLAGS.test_steps): features, ratings = it.get_next() close_cnt, equal_cnt = get_close_or_equal_cnt(model, features, ratings) print( - f'In batch prediction, step: {step}, {close_cnt}/{FLAGS.test_batch} are closely' - f' accurate, {equal_cnt}/{FLAGS.test_batch} are absolutely accurate.') + f'In batch prediction, step: {step}, {close_cnt}/{FLAGS.test_batch} are closely' + f' accurate, {equal_cnt}/{FLAGS.test_batch} are absolutely accurate.') def inference(): @@ -735,30 +735,30 @@ def inference(): model = tf.keras.models.load_model(FLAGS.export_dir) print(f"model signature keys: {model.signatures.keys()} {model.signatures}") inference_func = model.signatures['serving_default'] - + dataset = get_dataset(batch_size=FLAGS.test_batch) it = iter(dataset) - + def get_close_or_equal_cnt(preds, ratings): preds = tf.math.argmax(preds['user_rating'], axis=1) ratings = tf.math.argmax(ratings['user_rating'], axis=1) close_cnt = tf.reduce_sum( - tf.cast(tf.math.abs(preds - ratings) <= 1, dtype=tf.int32)) + tf.cast(tf.math.abs(preds - ratings) <= 1, dtype=tf.int32)) equal_cnt = tf.reduce_sum( - tf.cast(tf.math.abs(preds - ratings) == 0, dtype=tf.int32)) + tf.cast(tf.math.abs(preds - ratings) == 0, dtype=tf.int32)) return close_cnt, equal_cnt - + for step in range(FLAGS.test_steps): features, ratings = next(it) ratings = ratings['user_rating'] outputs = inference_func(**features) preds = outputs['user_rating'] - + close_cnt, equal_cnt = get_close_or_equal_cnt(preds, ratings) - + print( - f'In batch prediction, step: {step}, {close_cnt}/{FLAGS.test_batch} are closely' - f' accurate, {equal_cnt}/{FLAGS.test_batch} are absolutely accurate.') + f'In batch prediction, step: {step}, {close_cnt}/{FLAGS.test_batch} are closely' + f' accurate, {equal_cnt}/{FLAGS.test_batch} are absolutely accurate.') def main(argv): From 69ce3574f4ba2cc6eba80b2a51d6497bb3d3dc21 Mon Sep 17 00:00:00 2001 From: Jatin Mandav Date: Tue, 1 Apr 2025 12:25:10 +0000 Subject: [PATCH 07/19] lint --- .../dynamic_embedding/python/keras/models.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tensorflow_recommenders_addons/dynamic_embedding/python/keras/models.py b/tensorflow_recommenders_addons/dynamic_embedding/python/keras/models.py index 2eed9ad43..e1d6ac67c 100644 --- a/tensorflow_recommenders_addons/dynamic_embedding/python/keras/models.py +++ b/tensorflow_recommenders_addons/dynamic_embedding/python/keras/models.py @@ -124,7 +124,7 @@ def _traverse_emb_layers_and_save(proc_size=1, proc_rank=0): # save optimizer parameters of Dynamic Embedding if include_optimizer is True: de_opt_vars = a2a_emb.optimizer_vars.as_list() if hasattr( - a2a_emb.optimizer_vars, "as_list") else a2a_emb.optimizer_vars + a2a_emb.optimizer_vars, "as_list") else a2a_emb.optimizer_vars for de_opt_var in de_opt_vars: de_opt_var.save_to_file_system(dirpath=de_dir, proc_size=proc_size, From e53c0694254aa4f33ef87101199fb089247d4a3a Mon Sep 17 00:00:00 2001 From: Jatin Mandav Date: Tue, 1 Apr 2025 12:25:43 +0000 Subject: [PATCH 08/19] lint --- .../dynamic_embedding/python/keras/models.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tensorflow_recommenders_addons/dynamic_embedding/python/keras/models.py b/tensorflow_recommenders_addons/dynamic_embedding/python/keras/models.py index e1d6ac67c..3477bf0da 100644 --- a/tensorflow_recommenders_addons/dynamic_embedding/python/keras/models.py +++ b/tensorflow_recommenders_addons/dynamic_embedding/python/keras/models.py @@ -98,7 +98,7 @@ def _check_saveable_and_redirect_new_de_dir(hvd_rank=0): # Redirect new de_dir if hasattr(de_var, 'saveable'): de_var.saveable._saver_config.save_path = de_dir - + def _maybe_save_restrict_policy_params(de_var, proc_size=1, proc_rank=0): if not hasattr(de_var, "restrict_policy"): return @@ -108,7 +108,7 @@ def _maybe_save_restrict_policy_params(de_var, proc_size=1, proc_rank=0): de_var.save_to_file_system(dirpath=de_dir, proc_size=proc_size, proc_rank=proc_rank) - + def _traverse_emb_layers_and_save(proc_size=1, proc_rank=0): for var in model.variables: if not hasattr(var, "params"): From e8abbe6c3a17afbda5b4d33289dd3afad4f73c3d Mon Sep 17 00:00:00 2001 From: Jatin Mandav Date: Tue, 1 Apr 2025 18:53:23 +0000 Subject: [PATCH 09/19] Add test-case for restrict policy save --- pytest.txt | 7 + .../horovod_embedding_restrict_save_test.py | 125 ++++++++++++++++++ 2 files changed, 132 insertions(+) create mode 100644 pytest.txt create mode 100644 tensorflow_recommenders_addons/dynamic_embedding/python/kernel_tests/horovod_embedding_restrict_save_test.py diff --git a/pytest.txt b/pytest.txt new file mode 100644 index 000000000..9183d6375 --- /dev/null +++ b/pytest.txt @@ -0,0 +1,7 @@ +pytest~=6.2.5 +pytest-xdist~=1.31 +pytest-extra-durations~=0.1.3 +scikit-learn<=1.2.2 +scikit-image<=0.20.0 +Pillow~=9.4.0 +tqdm>=4.36.1 diff --git a/tensorflow_recommenders_addons/dynamic_embedding/python/kernel_tests/horovod_embedding_restrict_save_test.py b/tensorflow_recommenders_addons/dynamic_embedding/python/kernel_tests/horovod_embedding_restrict_save_test.py new file mode 100644 index 000000000..a4af30b63 --- /dev/null +++ b/tensorflow_recommenders_addons/dynamic_embedding/python/kernel_tests/horovod_embedding_restrict_save_test.py @@ -0,0 +1,125 @@ +""" +unit tests of save model that uses HvdAllToAllEmbedding +""" +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import os +import shutil +from time import sleep + +import tensorflow as tf + +from tensorflow_recommenders_addons import dynamic_embedding as de + +from tensorflow.python.framework import dtypes +from tensorflow.python.framework.errors_impl import NotFoundError +from tensorflow.python.ops import math_ops +from tensorflow.python.platform import test + +try: + from tf_keras import layers, Sequential, models, backend + from tf_keras.initializers import Zeros + from tf_keras.optimizers import Adam +except: + from tensorflow.keras import layers, Sequential, models, backend + from tensorflow.keras.initializers import Zeros + try: + from tensorflow.keras.optimizers import Adam + except: + from tensorflow.keras.legacy.optimizers import Adam + + +def get_all_to_all_emb_model(emb_t, opt, *args, **kwargs): + l0 = layers.InputLayer(input_shape=(None,), dtype=dtypes.int64) + l1 = emb_t(*args, **kwargs) + l2 = layers.Dense(8, 'relu', kernel_initializer='zeros') + l3 = layers.Dense(1, 'sigmoid', kernel_initializer='zeros') + if emb_t == de.keras.layers.HvdAllToAllEmbedding: + model = Sequential([l0, l1, l2, l3]) + else: + raise TypeError('Unsupported embedding layer {}'.format(emb_t)) + + model.compile(optimizer=opt, loss='mean_absolute_error') + return model + + +class HorovodAllToAllRestrictPolicyTest(test.TestCase): + def test_all_to_all_embedding_restrict_policy_save(self): + try: + import horovod.tensorflow as hvd + except (NotFoundError): + self.skipTest( + "Skip the test for horovod import error with Tensorflow-2.7.0 on MacOS-12." + ) + + hvd.init() + + name = "all2all_emb" + keras_base_opt = Adam(1.0) + base_opt = de.DynamicEmbeddingOptimizer(keras_base_opt, synchronous=True) + + init = Zeros() + kv_creator = de.CuckooHashTableCreator( + saver=de.FileSystemSaver(proc_size=hvd.size(), proc_rank=hvd.rank())) + batch_size = 8 + start = 0 + dim = 10 + run_step = 10 + + save_dir = "/tmp/hvd_distributed_restrict_policy_save" + str( + hvd.size()) + str( + dim) # All ranks should share same save directory + + base_model = get_all_to_all_emb_model( + de.keras.layers.HvdAllToAllEmbedding, + base_opt, + embedding_size=dim, + initializer=init, + bp_v2=False, + kv_creator=kv_creator, + restrict_policy=de.TimestampRestrictPolicy, # Embedding table with restrict policy + name='all2all_emb') + + for i in range(1, run_step): + x = math_ops.range(start, start + batch_size, dtype=dtypes.int64) + x = tf.reshape(x, (batch_size, -1)) + start += batch_size + y = tf.zeros((batch_size, 1), dtype=dtypes.float32) + base_model.fit(x, y, verbose=0) + + save_options = tf.saved_model.SaveOptions(namespace_whitelist=['TFRA']) + if hvd.rank() == 0: + if os.path.exists(save_dir): + shutil.rmtree(save_dir) + hvd.join() # Sync for avoiding files conflict + base_model.save(save_dir, options=save_options) + de.keras.models.save_model(base_model, save_dir, options=save_options) + + sleep(4) # Wait for filesystem operation + hvd_size = hvd.size() + if hvd_size <= 1: + hvd_size = 1 + base_dir = os.path.join(save_dir, "variables", "TFRADynamicEmbedding") + for tag in ['keys', 'values']: + for rank in range(hvd_size): + self.assertTrue(os.path.exists( + base_dir + + f'/{name}-parameter_mht_1of1_rank{rank}_size{hvd_size}-{tag}')) + self.assertTrue(os.path.exists( + base_dir + + f'/{name}-parameter_DynamicEmbedding_{name}-shadow_m_mht_1of1_rank{rank}_size{hvd_size}-{tag}' + )) + self.assertTrue(os.path.exists( + base_dir + + f'/{name}-parameter_DynamicEmbedding_{name}-shadow_v_mht_1of1_rank{rank}_size{hvd_size}-{tag}' + )) + # Restrict policy var saved for all ranks + self.assertTrue(os.path.exists( + base_dir + + f'/{name}-parameter_timestamp_mht_1of1_rank{rank}_size{hvd_size}-{tag}')) + + +if __name__ == "__main__": + test.main() From 170fe7151af8f218109527c666e0be55d13892a6 Mon Sep 17 00:00:00 2001 From: Jatin Mandav Date: Tue, 1 Apr 2025 18:53:56 +0000 Subject: [PATCH 10/19] remove extra file --- pytest.txt | 7 ------- 1 file changed, 7 deletions(-) delete mode 100644 pytest.txt diff --git a/pytest.txt b/pytest.txt deleted file mode 100644 index 9183d6375..000000000 --- a/pytest.txt +++ /dev/null @@ -1,7 +0,0 @@ -pytest~=6.2.5 -pytest-xdist~=1.31 -pytest-extra-durations~=0.1.3 -scikit-learn<=1.2.2 -scikit-image<=0.20.0 -Pillow~=9.4.0 -tqdm>=4.36.1 From 173038281abe1290f181edab6d85566d6773ce66 Mon Sep 17 00:00:00 2001 From: Jatin Mandav Date: Fri, 11 Apr 2025 08:13:35 +0530 Subject: [PATCH 11/19] update --- .../kernel_tests/horovod_embedding_restrict_save_test.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tensorflow_recommenders_addons/dynamic_embedding/python/kernel_tests/horovod_embedding_restrict_save_test.py b/tensorflow_recommenders_addons/dynamic_embedding/python/kernel_tests/horovod_embedding_restrict_save_test.py index a4af30b63..8b056ff3c 100644 --- a/tensorflow_recommenders_addons/dynamic_embedding/python/kernel_tests/horovod_embedding_restrict_save_test.py +++ b/tensorflow_recommenders_addons/dynamic_embedding/python/kernel_tests/horovod_embedding_restrict_save_test.py @@ -93,7 +93,8 @@ def test_all_to_all_embedding_restrict_policy_save(self): if hvd.rank() == 0: if os.path.exists(save_dir): shutil.rmtree(save_dir) - hvd.join() # Sync for avoiding files conflict + # Sync for avoiding files conflict + hvd.join() base_model.save(save_dir, options=save_options) de.keras.models.save_model(base_model, save_dir, options=save_options) From 3d09a96a3a22dbd4acbe92946633cdf778422086 Mon Sep 17 00:00:00 2001 From: Jatin Mandav Date: Fri, 11 Apr 2025 08:14:50 +0530 Subject: [PATCH 12/19] update --- .../kernel_tests/horovod_embedding_restrict_save_test.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tensorflow_recommenders_addons/dynamic_embedding/python/kernel_tests/horovod_embedding_restrict_save_test.py b/tensorflow_recommenders_addons/dynamic_embedding/python/kernel_tests/horovod_embedding_restrict_save_test.py index 8b056ff3c..01de3f681 100644 --- a/tensorflow_recommenders_addons/dynamic_embedding/python/kernel_tests/horovod_embedding_restrict_save_test.py +++ b/tensorflow_recommenders_addons/dynamic_embedding/python/kernel_tests/horovod_embedding_restrict_save_test.py @@ -93,8 +93,7 @@ def test_all_to_all_embedding_restrict_policy_save(self): if hvd.rank() == 0: if os.path.exists(save_dir): shutil.rmtree(save_dir) - # Sync for avoiding files conflict - hvd.join() + hvd.join() # Sync for avoiding files conflict base_model.save(save_dir, options=save_options) de.keras.models.save_model(base_model, save_dir, options=save_options) From a98ad2ac9a2f478c0e8c46514a61fcbc265b6095 Mon Sep 17 00:00:00 2001 From: Jatin Mandav Date: Fri, 11 Apr 2025 04:53:02 +0000 Subject: [PATCH 13/19] lint --- .../dynamic_embedding/python/keras/models.py | 4 +- .../horovod_embedding_restrict_save_test.py | 78 ++++++++++--------- 2 files changed, 45 insertions(+), 37 deletions(-) diff --git a/tensorflow_recommenders_addons/dynamic_embedding/python/keras/models.py b/tensorflow_recommenders_addons/dynamic_embedding/python/keras/models.py index 3477bf0da..26d8af29b 100644 --- a/tensorflow_recommenders_addons/dynamic_embedding/python/keras/models.py +++ b/tensorflow_recommenders_addons/dynamic_embedding/python/keras/models.py @@ -137,7 +137,9 @@ def _traverse_emb_layers_and_save(proc_size=1, proc_rank=0): proc_size=proc_size, proc_rank=proc_rank) # Save restrict policy for each hvd.rank() - _maybe_save_restrict_policy_params(de_var, proc_size=proc_size, proc_rank=proc_rank) + _maybe_save_restrict_policy_params(de_var, + proc_size=proc_size, + proc_rank=proc_rank) if hvd is None: call_original_save_func() diff --git a/tensorflow_recommenders_addons/dynamic_embedding/python/kernel_tests/horovod_embedding_restrict_save_test.py b/tensorflow_recommenders_addons/dynamic_embedding/python/kernel_tests/horovod_embedding_restrict_save_test.py index 01de3f681..afbbd8d25 100644 --- a/tensorflow_recommenders_addons/dynamic_embedding/python/kernel_tests/horovod_embedding_restrict_save_test.py +++ b/tensorflow_recommenders_addons/dynamic_embedding/python/kernel_tests/horovod_embedding_restrict_save_test.py @@ -40,63 +40,64 @@ def get_all_to_all_emb_model(emb_t, opt, *args, **kwargs): model = Sequential([l0, l1, l2, l3]) else: raise TypeError('Unsupported embedding layer {}'.format(emb_t)) - + model.compile(optimizer=opt, loss='mean_absolute_error') return model class HorovodAllToAllRestrictPolicyTest(test.TestCase): + def test_all_to_all_embedding_restrict_policy_save(self): try: import horovod.tensorflow as hvd except (NotFoundError): self.skipTest( - "Skip the test for horovod import error with Tensorflow-2.7.0 on MacOS-12." + "Skip the test for horovod import error with Tensorflow-2.7.0 on MacOS-12." ) - + hvd.init() - + name = "all2all_emb" keras_base_opt = Adam(1.0) base_opt = de.DynamicEmbeddingOptimizer(keras_base_opt, synchronous=True) - + init = Zeros() kv_creator = de.CuckooHashTableCreator( - saver=de.FileSystemSaver(proc_size=hvd.size(), proc_rank=hvd.rank())) + saver=de.FileSystemSaver(proc_size=hvd.size(), proc_rank=hvd.rank())) batch_size = 8 start = 0 dim = 10 run_step = 10 - + save_dir = "/tmp/hvd_distributed_restrict_policy_save" + str( - hvd.size()) + str( - dim) # All ranks should share same save directory - + hvd.size()) + str(dim) # All ranks should share same save directory + base_model = get_all_to_all_emb_model( - de.keras.layers.HvdAllToAllEmbedding, - base_opt, - embedding_size=dim, - initializer=init, - bp_v2=False, - kv_creator=kv_creator, - restrict_policy=de.TimestampRestrictPolicy, # Embedding table with restrict policy - name='all2all_emb') - + de.keras.layers.HvdAllToAllEmbedding, + base_opt, + embedding_size=dim, + initializer=init, + bp_v2=False, + kv_creator=kv_creator, + restrict_policy=de. + TimestampRestrictPolicy, # Embedding table with restrict policy + name='all2all_emb') + for i in range(1, run_step): x = math_ops.range(start, start + batch_size, dtype=dtypes.int64) x = tf.reshape(x, (batch_size, -1)) start += batch_size y = tf.zeros((batch_size, 1), dtype=dtypes.float32) base_model.fit(x, y, verbose=0) - + save_options = tf.saved_model.SaveOptions(namespace_whitelist=['TFRA']) if hvd.rank() == 0: if os.path.exists(save_dir): shutil.rmtree(save_dir) - hvd.join() # Sync for avoiding files conflict + hvd.join() # Sync for avoiding files conflict base_model.save(save_dir, options=save_options) de.keras.models.save_model(base_model, save_dir, options=save_options) - + sleep(4) # Wait for filesystem operation hvd_size = hvd.size() if hvd_size <= 1: @@ -104,21 +105,26 @@ def test_all_to_all_embedding_restrict_policy_save(self): base_dir = os.path.join(save_dir, "variables", "TFRADynamicEmbedding") for tag in ['keys', 'values']: for rank in range(hvd_size): - self.assertTrue(os.path.exists( - base_dir + - f'/{name}-parameter_mht_1of1_rank{rank}_size{hvd_size}-{tag}')) - self.assertTrue(os.path.exists( - base_dir + - f'/{name}-parameter_DynamicEmbedding_{name}-shadow_m_mht_1of1_rank{rank}_size{hvd_size}-{tag}' - )) - self.assertTrue(os.path.exists( - base_dir + - f'/{name}-parameter_DynamicEmbedding_{name}-shadow_v_mht_1of1_rank{rank}_size{hvd_size}-{tag}' - )) + self.assertTrue( + os.path.exists( + base_dir + + f'/{name}-parameter_mht_1of1_rank{rank}_size{hvd_size}-{tag}')) + self.assertTrue( + os.path.exists( + base_dir + + f'/{name}-parameter_DynamicEmbedding_{name}-shadow_m_mht_1of1_rank{rank}_size{hvd_size}-{tag}' + )) + self.assertTrue( + os.path.exists( + base_dir + + f'/{name}-parameter_DynamicEmbedding_{name}-shadow_v_mht_1of1_rank{rank}_size{hvd_size}-{tag}' + )) # Restrict policy var saved for all ranks - self.assertTrue(os.path.exists( - base_dir + - f'/{name}-parameter_timestamp_mht_1of1_rank{rank}_size{hvd_size}-{tag}')) + self.assertTrue( + os.path.exists( + base_dir + + f'/{name}-parameter_timestamp_mht_1of1_rank{rank}_size{hvd_size}-{tag}' + )) if __name__ == "__main__": From 3cb9d423f88f112635d6f0646a2f1de7eb4f47fe Mon Sep 17 00:00:00 2001 From: Jatin Mandav Date: Fri, 11 Apr 2025 05:15:08 +0000 Subject: [PATCH 14/19] Update tests + linting --- pytest.txt | 7 +++++++ tools/testing/build_and_run_tests.sh | 7 +++++-- 2 files changed, 12 insertions(+), 2 deletions(-) create mode 100644 pytest.txt diff --git a/pytest.txt b/pytest.txt new file mode 100644 index 000000000..9183d6375 --- /dev/null +++ b/pytest.txt @@ -0,0 +1,7 @@ +pytest~=6.2.5 +pytest-xdist~=1.31 +pytest-extra-durations~=0.1.3 +scikit-learn<=1.2.2 +scikit-image<=0.20.0 +Pillow~=9.4.0 +tqdm>=4.36.1 diff --git a/tools/testing/build_and_run_tests.sh b/tools/testing/build_and_run_tests.sh index 9d2a6553f..6998f3fc7 100644 --- a/tools/testing/build_and_run_tests.sh +++ b/tools/testing/build_and_run_tests.sh @@ -60,7 +60,7 @@ if [ "$TF_NEED_CUDA" -ne 0 ]; then bash /install/install_horovod.sh $HOROVOD_VERSION --only-cpu fi # TODO(jamesrong): Test on GPU. - CUDA_VISIBLE_DEVICES="" mpirun -np 2 -H localhost:2 --allow-run-as-root pytest -v ./tensorflow_recommenders_addons/dynamic_embedding/python/kernel_tests/horovod_sync_train_test.py + CUDA_VISIBLE_DEVICES="" mpirun -np 2 -H localhost:2 --allow-run-as-root pytest -v ./tensorflow_recommenders_addons/dynamic_embedding/python/kernel_tests/horovod_sync_train_test.py ./tensorflow_recommenders_addons/dynamic_embedding/python/kernel_tests/horovod_embedding_restrict_save_test.py # Reinstall Horovod after tests if [ "$(uname)" != "Darwin" ]; then # Mac only with MPI @@ -74,12 +74,15 @@ if [ "$TF_NEED_CUDA" -eq 0 ]; then IGNORE_HKV="--ignore=./tensorflow_recommenders_addons/dynamic_embedding/python/kernel_tests/hkv_hashtable_ops_test.py" fi +# Test only with horovod on GPU +IGNORE_HOROVOD_DIST_TRAINING_TEST = "--ignore=./tensorflow_recommenders_addons/dynamic_embedding/python/kernel_tests/horovod_embedding_restrict_save_test.py" + # Only use GPU 0 if available. if [ -x "$(command -v nvidia-smi)" ]; then export CUDA_VISIBLE_DEVICES=0 fi -python -m pytest -v -s --functions-durations=20 --modules-durations=5 $IGNORE_HKV $SKIP_CUSTOM_OP_TESTS_FLAG $EXTRA_ARGS ./tensorflow_recommenders_addons/dynamic_embedding/python/kernel_tests/ +python -m pytest -v -s --functions-durations=20 --modules-durations=5 $IGNORE_HKV $IGNORE_HOROVOD_DIST_TRAINING_TEST $SKIP_CUSTOM_OP_TESTS_FLAG $EXTRA_ARGS ./tensorflow_recommenders_addons/dynamic_embedding/python/kernel_tests/ # Release disk space bazel clean --expunge From b2ab454eed8d34c671f4374686ab76b6641a3c63 Mon Sep 17 00:00:00 2001 From: Jatin Mandav Date: Fri, 11 Apr 2025 05:15:30 +0000 Subject: [PATCH 15/19] remove pytest.txt --- pytest.txt | 7 ------- 1 file changed, 7 deletions(-) delete mode 100644 pytest.txt diff --git a/pytest.txt b/pytest.txt deleted file mode 100644 index 9183d6375..000000000 --- a/pytest.txt +++ /dev/null @@ -1,7 +0,0 @@ -pytest~=6.2.5 -pytest-xdist~=1.31 -pytest-extra-durations~=0.1.3 -scikit-learn<=1.2.2 -scikit-image<=0.20.0 -Pillow~=9.4.0 -tqdm>=4.36.1 From c309f329a85c3d4840d5807ec71d67dd5c9f1086 Mon Sep 17 00:00:00 2001 From: Jatin Mandav Date: Fri, 11 Apr 2025 06:01:23 +0000 Subject: [PATCH 16/19] Support restrict var save for DEHvdModelCheckpoint --- .../dynamic_embedding/python/keras/callbacks.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/tensorflow_recommenders_addons/dynamic_embedding/python/keras/callbacks.py b/tensorflow_recommenders_addons/dynamic_embedding/python/keras/callbacks.py index a6a6d9fcb..0cf5cd09d 100644 --- a/tensorflow_recommenders_addons/dynamic_embedding/python/keras/callbacks.py +++ b/tensorflow_recommenders_addons/dynamic_embedding/python/keras/callbacks.py @@ -113,6 +113,15 @@ def __init__(self, *args, **kwargs): super(DEHvdModelCheckpoint, self).__init__(*args, **kwargs) def _save_de_model(self, filepath): + def _maybe_save_restrict_policy_params(de_var, proc_size=1, proc_rank=0): + if not hasattr(de_var, "restrict_policy"): + return + if de_var.restrict_policy is not None: + # Only save restrict policy var if policy created + de_var = de_var.restrict_policy._restrict_var + de_var.save_to_file_system(dirpath=de_dir, + proc_size=proc_size, + proc_rank=proc_rank) if hvd.rank() == 0: if self.save_weights_only: self.model.save_weights(filepath, overwrite=True, options=self._options) @@ -143,6 +152,11 @@ def _save_de_model(self, filepath): de_opt_var.save_to_file_system(dirpath=de_dir, proc_size=hvd.size(), proc_rank=hvd.rank()) + + # Save restrict policy for each hvd.rank() + _maybe_save_restrict_policy_params(de_var, + proc_size=hvd.size(), + proc_rank=hvd.rank()) hvd.join() # Sync for avoiding data conflict or missing rank def _save_model(self, epoch, logs): From 1859041188556e72b9f6415635c6239b109cb122 Mon Sep 17 00:00:00 2001 From: Jatin Mandav Date: Mon, 14 Apr 2025 10:38:35 +0530 Subject: [PATCH 17/19] Test cases + linting --- .github/workflows/make_wheel_macOS_arm64.sh | 3 ++- .../kernels/cuckoo_hashtable_op_gpu.cu.cc | 1 + .../core/kernels/fill_functor.cu.cc | 3 ++- .../kernels/lookup_impl/lookup_table_op_cpu.h | 20 +++++++++---------- .../kernels/lookup_impl/lookup_table_op_gpu.h | 20 +++++++++---------- .../core/kernels/redis_impl/json.cc | 4 ++-- .../core/kernels/segment_reduction_ops_impl.h | 2 +- .../core/lib/nvhash/nv_util.h | 6 ++++-- .../dynamic_embedding/core/utils/utils.h | 3 ++- .../python/keras/callbacks.py | 4 +++- tools/docker/cpu_tests.Dockerfile | 2 +- tools/testing/build_and_run_tests.sh | 2 +- 12 files changed, 39 insertions(+), 31 deletions(-) diff --git a/.github/workflows/make_wheel_macOS_arm64.sh b/.github/workflows/make_wheel_macOS_arm64.sh index 3607d0e59..af23c3f55 100644 --- a/.github/workflows/make_wheel_macOS_arm64.sh +++ b/.github/workflows/make_wheel_macOS_arm64.sh @@ -7,6 +7,7 @@ export TF_NEED_CUDA=0 export IGNORE_HKV="--ignore=./tensorflow_recommenders_addons/dynamic_embedding/python/kernel_tests/hkv_hashtable_ops_test.py" export IGNORE_REDIS="--ignore=./tensorflow_recommenders_addons/dynamic_embedding/python/kernel_tests/redis_table_ops_test.py" export IGNORE_REDIS_VAR="--ignore=./tensorflow_recommenders_addons/dynamic_embedding/python/kernel_tests/redis_table_variable_test.py" +export IGNORE_HOROVOD_DIST_TRAINING_TEST="--ignore=./tensorflow_recommenders_addons/dynamic_embedding/python/kernel_tests/horovod_embedding_restrict_save_test.py" export USE_BAZEL_VERSION='5.1.1' # For TensorFlow version 2.12 or earlier: @@ -59,7 +60,7 @@ delocate-wheel -w wheelhouse -v --ignore-missing-dependencies artifacts/*.whl # Test pip install --default-timeout=1000 -r tools/install_deps/pytest.txt cp ./bazel-bin/tensorflow_recommenders_addons/dynamic_embedding/core/_*_ops.so ./tensorflow_recommenders_addons/dynamic_embedding/core/ -python -m pytest -v -s --functions-durations=20 --modules-durations=5 $IGNORE_HKV $IGNORE_REDIS $IGNORE_REDIS_VAR $SKIP_CUSTOM_OP_TESTS ./tensorflow_recommenders_addons/dynamic_embedding/python/kernel_tests/ +python -m pytest -v -s --functions-durations=20 --modules-durations=5 $IGNORE_HKV $IGNORE_HOROVOD_DIST_TRAINING_TEST $IGNORE_REDIS $IGNORE_REDIS_VAR $SKIP_CUSTOM_OP_TESTS ./tensorflow_recommenders_addons/dynamic_embedding/python/kernel_tests/ # Clean bazel clean \ No newline at end of file diff --git a/tensorflow_recommenders_addons/dynamic_embedding/core/kernels/cuckoo_hashtable_op_gpu.cu.cc b/tensorflow_recommenders_addons/dynamic_embedding/core/kernels/cuckoo_hashtable_op_gpu.cu.cc index eef020d09..099ec1b66 100644 --- a/tensorflow_recommenders_addons/dynamic_embedding/core/kernels/cuckoo_hashtable_op_gpu.cu.cc +++ b/tensorflow_recommenders_addons/dynamic_embedding/core/kernels/cuckoo_hashtable_op_gpu.cu.cc @@ -15,6 +15,7 @@ limitations under the License. #if GOOGLE_CUDA #include "tensorflow_recommenders_addons/dynamic_embedding/core/kernels/cuckoo_hashtable_op_gpu.h" + #include "tensorflow_recommenders_addons/dynamic_embedding/core/kernels/lookup_impl/lookup_table_op_gpu.h" #include "tensorflow_recommenders_addons/dynamic_embedding/core/utils/utils.h" diff --git a/tensorflow_recommenders_addons/dynamic_embedding/core/kernels/fill_functor.cu.cc b/tensorflow_recommenders_addons/dynamic_embedding/core/kernels/fill_functor.cu.cc index c3e9ab7e0..4a838c798 100644 --- a/tensorflow_recommenders_addons/dynamic_embedding/core/kernels/fill_functor.cu.cc +++ b/tensorflow_recommenders_addons/dynamic_embedding/core/kernels/fill_functor.cu.cc @@ -18,9 +18,10 @@ limitations under the License. #define EIGEN_USE_GPU +#include "tensorflow/core/kernels/fill_functor.h" + #include "tensorflow/core/framework/register_types.h" #include "tensorflow/core/framework/tensor_types.h" -#include "tensorflow/core/kernels/fill_functor.h" #include "tensorflow/core/platform/types.h" namespace Eigen { diff --git a/tensorflow_recommenders_addons/dynamic_embedding/core/kernels/lookup_impl/lookup_table_op_cpu.h b/tensorflow_recommenders_addons/dynamic_embedding/core/kernels/lookup_impl/lookup_table_op_cpu.h index b74285e1a..d294450f2 100644 --- a/tensorflow_recommenders_addons/dynamic_embedding/core/kernels/lookup_impl/lookup_table_op_cpu.h +++ b/tensorflow_recommenders_addons/dynamic_embedding/core/kernels/lookup_impl/lookup_table_op_cpu.h @@ -429,16 +429,16 @@ struct TableDispatcher { #define CREATE_TABLE_PARTIAL_BRANCHES(PREFIX) \ do { \ - CREATE_A_TABLE((PREFIX)*10 + 0); \ - CREATE_A_TABLE((PREFIX)*10 + 1); \ - CREATE_A_TABLE((PREFIX)*10 + 2); \ - CREATE_A_TABLE((PREFIX)*10 + 3); \ - CREATE_A_TABLE((PREFIX)*10 + 4); \ - CREATE_A_TABLE((PREFIX)*10 + 5); \ - CREATE_A_TABLE((PREFIX)*10 + 6); \ - CREATE_A_TABLE((PREFIX)*10 + 7); \ - CREATE_A_TABLE((PREFIX)*10 + 8); \ - CREATE_A_TABLE((PREFIX)*10 + 9); \ + CREATE_A_TABLE((PREFIX) * 10 + 0); \ + CREATE_A_TABLE((PREFIX) * 10 + 1); \ + CREATE_A_TABLE((PREFIX) * 10 + 2); \ + CREATE_A_TABLE((PREFIX) * 10 + 3); \ + CREATE_A_TABLE((PREFIX) * 10 + 4); \ + CREATE_A_TABLE((PREFIX) * 10 + 5); \ + CREATE_A_TABLE((PREFIX) * 10 + 6); \ + CREATE_A_TABLE((PREFIX) * 10 + 7); \ + CREATE_A_TABLE((PREFIX) * 10 + 8); \ + CREATE_A_TABLE((PREFIX) * 10 + 9); \ } while (0) // create branches with dim range [1, 100] diff --git a/tensorflow_recommenders_addons/dynamic_embedding/core/kernels/lookup_impl/lookup_table_op_gpu.h b/tensorflow_recommenders_addons/dynamic_embedding/core/kernels/lookup_impl/lookup_table_op_gpu.h index f00a4f3cd..ba9e21d2b 100644 --- a/tensorflow_recommenders_addons/dynamic_embedding/core/kernels/lookup_impl/lookup_table_op_gpu.h +++ b/tensorflow_recommenders_addons/dynamic_embedding/core/kernels/lookup_impl/lookup_table_op_gpu.h @@ -214,16 +214,16 @@ class TableWrapper final : public TableWrapperBase { #define CREATE_TABLE_PARTIAL_BRANCHES(PERIFX) \ do { \ - CREATE_A_TABLE((PERIFX)*10 + 0); \ - CREATE_A_TABLE((PERIFX)*10 + 1); \ - CREATE_A_TABLE((PERIFX)*10 + 2); \ - CREATE_A_TABLE((PERIFX)*10 + 3); \ - CREATE_A_TABLE((PERIFX)*10 + 4); \ - CREATE_A_TABLE((PERIFX)*10 + 5); \ - CREATE_A_TABLE((PERIFX)*10 + 6); \ - CREATE_A_TABLE((PERIFX)*10 + 7); \ - CREATE_A_TABLE((PERIFX)*10 + 8); \ - CREATE_A_TABLE((PERIFX)*10 + 9); \ + CREATE_A_TABLE((PERIFX) * 10 + 0); \ + CREATE_A_TABLE((PERIFX) * 10 + 1); \ + CREATE_A_TABLE((PERIFX) * 10 + 2); \ + CREATE_A_TABLE((PERIFX) * 10 + 3); \ + CREATE_A_TABLE((PERIFX) * 10 + 4); \ + CREATE_A_TABLE((PERIFX) * 10 + 5); \ + CREATE_A_TABLE((PERIFX) * 10 + 6); \ + CREATE_A_TABLE((PERIFX) * 10 + 7); \ + CREATE_A_TABLE((PERIFX) * 10 + 8); \ + CREATE_A_TABLE((PERIFX) * 10 + 9); \ } while (0) // create branches with dim range: diff --git a/tensorflow_recommenders_addons/dynamic_embedding/core/kernels/redis_impl/json.cc b/tensorflow_recommenders_addons/dynamic_embedding/core/kernels/redis_impl/json.cc index 184548436..7fbe4206b 100644 --- a/tensorflow_recommenders_addons/dynamic_embedding/core/kernels/redis_impl/json.cc +++ b/tensorflow_recommenders_addons/dynamic_embedding/core/kernels/redis_impl/json.cc @@ -402,8 +402,8 @@ json_value *json_parse_ex(json_settings *settings, const json_char *json, case json_object: if (state.first_pass) - (*(json_char * - __attribute((__may_alias__)) *)&top->u.object.values) += + (*(json_char *__attribute(( + __may_alias__)) *)&top->u.object.values) += string_length + 1; else { top->u.object.values[top->u.object.length].name = diff --git a/tensorflow_recommenders_addons/dynamic_embedding/core/kernels/segment_reduction_ops_impl.h b/tensorflow_recommenders_addons/dynamic_embedding/core/kernels/segment_reduction_ops_impl.h index 638bf5dfb..b70349382 100644 --- a/tensorflow_recommenders_addons/dynamic_embedding/core/kernels/segment_reduction_ops_impl.h +++ b/tensorflow_recommenders_addons/dynamic_embedding/core/kernels/segment_reduction_ops_impl.h @@ -82,7 +82,7 @@ template class SparseSegmentSumGpuOp : public AsyncOpKernel { public: explicit SparseSegmentSumGpuOp(OpKernelConstruction* context) - : AsyncOpKernel(context){}; + : AsyncOpKernel(context) {}; void ComputeAsync(OpKernelContext* context, DoneCallback done) override { const Tensor& input_data = context->input(0); diff --git a/tensorflow_recommenders_addons/dynamic_embedding/core/lib/nvhash/nv_util.h b/tensorflow_recommenders_addons/dynamic_embedding/core/lib/nvhash/nv_util.h index a5d935a22..91cc5e4c3 100644 --- a/tensorflow_recommenders_addons/dynamic_embedding/core/lib/nvhash/nv_util.h +++ b/tensorflow_recommenders_addons/dynamic_embedding/core/lib/nvhash/nv_util.h @@ -14,8 +14,10 @@ #include "cuda_runtime_api.h" -#define CUDA_CHECK(val) \ - { nv::cuda_check_((val), __FILE__, __LINE__); } +#define CUDA_CHECK(val) \ + { \ + nv::cuda_check_((val), __FILE__, __LINE__); \ + } namespace nv { diff --git a/tensorflow_recommenders_addons/dynamic_embedding/core/utils/utils.h b/tensorflow_recommenders_addons/dynamic_embedding/core/utils/utils.h index 0467e753e..aba5978f1 100644 --- a/tensorflow_recommenders_addons/dynamic_embedding/core/utils/utils.h +++ b/tensorflow_recommenders_addons/dynamic_embedding/core/utils/utils.h @@ -44,7 +44,8 @@ This code is for compatibility.*/ #ifndef MAYBE_ADD_SOURCE_LOCATION #define MAYBE_ADD_SOURCE_LOCATION(status) \ - {} + { \ + } #endif // MAYBE_ADD_SOURCE_LOCATION // For propagating errors when calling a function but not return status. diff --git a/tensorflow_recommenders_addons/dynamic_embedding/python/keras/callbacks.py b/tensorflow_recommenders_addons/dynamic_embedding/python/keras/callbacks.py index 0cf5cd09d..2339873de 100644 --- a/tensorflow_recommenders_addons/dynamic_embedding/python/keras/callbacks.py +++ b/tensorflow_recommenders_addons/dynamic_embedding/python/keras/callbacks.py @@ -113,6 +113,7 @@ def __init__(self, *args, **kwargs): super(DEHvdModelCheckpoint, self).__init__(*args, **kwargs) def _save_de_model(self, filepath): + def _maybe_save_restrict_policy_params(de_var, proc_size=1, proc_rank=0): if not hasattr(de_var, "restrict_policy"): return @@ -122,6 +123,7 @@ def _maybe_save_restrict_policy_params(de_var, proc_size=1, proc_rank=0): de_var.save_to_file_system(dirpath=de_dir, proc_size=proc_size, proc_rank=proc_rank) + if hvd.rank() == 0: if self.save_weights_only: self.model.save_weights(filepath, overwrite=True, options=self._options) @@ -152,7 +154,7 @@ def _maybe_save_restrict_policy_params(de_var, proc_size=1, proc_rank=0): de_opt_var.save_to_file_system(dirpath=de_dir, proc_size=hvd.size(), proc_rank=hvd.rank()) - + # Save restrict policy for each hvd.rank() _maybe_save_restrict_policy_params(de_var, proc_size=hvd.size(), diff --git a/tools/docker/cpu_tests.Dockerfile b/tools/docker/cpu_tests.Dockerfile index 93adefe93..210274ea0 100644 --- a/tools/docker/cpu_tests.Dockerfile +++ b/tools/docker/cpu_tests.Dockerfile @@ -35,7 +35,7 @@ RUN python configure.py RUN pip install -e ./ RUN --mount=type=cache,id=cache_bazel,target=/root/.cache/bazel \ bash tools/install_so_files.sh -RUN pytest -v -s -n auto --durations=25 --ignore-glob="*/hkv_hashtable_ops_test.py" --doctest-modules ./tensorflow_recommenders_addons \ +RUN pytest -v -s -n auto --durations=25 --ignore-glob="*/hkv_hashtable_ops_test.py" --ignore-glob="*/horovod_embedding_restrict_save_test.py" --doctest-modules ./tensorflow_recommenders_addons \ --cov=tensorflow_recommenders_addons ./tensorflow_recommenders_addons/ RUN bazel build --enable_runfiles build_pip_pkg diff --git a/tools/testing/build_and_run_tests.sh b/tools/testing/build_and_run_tests.sh index 6998f3fc7..d587dc91c 100644 --- a/tools/testing/build_and_run_tests.sh +++ b/tools/testing/build_and_run_tests.sh @@ -75,7 +75,7 @@ if [ "$TF_NEED_CUDA" -eq 0 ]; then fi # Test only with horovod on GPU -IGNORE_HOROVOD_DIST_TRAINING_TEST = "--ignore=./tensorflow_recommenders_addons/dynamic_embedding/python/kernel_tests/horovod_embedding_restrict_save_test.py" +IGNORE_HOROVOD_DIST_TRAINING_TEST="--ignore=./tensorflow_recommenders_addons/dynamic_embedding/python/kernel_tests/horovod_embedding_restrict_save_test.py" # Only use GPU 0 if available. if [ -x "$(command -v nvidia-smi)" ]; then From 1f5b0040a8b9433be4724a30646a5400241bbc3c Mon Sep 17 00:00:00 2001 From: Jatin Mandav Date: Mon, 14 Apr 2025 07:25:29 +0000 Subject: [PATCH 18/19] lint --- .../dynamic_embedding/python/keras/callbacks.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tensorflow_recommenders_addons/dynamic_embedding/python/keras/callbacks.py b/tensorflow_recommenders_addons/dynamic_embedding/python/keras/callbacks.py index 0cf5cd09d..2339873de 100644 --- a/tensorflow_recommenders_addons/dynamic_embedding/python/keras/callbacks.py +++ b/tensorflow_recommenders_addons/dynamic_embedding/python/keras/callbacks.py @@ -113,6 +113,7 @@ def __init__(self, *args, **kwargs): super(DEHvdModelCheckpoint, self).__init__(*args, **kwargs) def _save_de_model(self, filepath): + def _maybe_save_restrict_policy_params(de_var, proc_size=1, proc_rank=0): if not hasattr(de_var, "restrict_policy"): return @@ -122,6 +123,7 @@ def _maybe_save_restrict_policy_params(de_var, proc_size=1, proc_rank=0): de_var.save_to_file_system(dirpath=de_dir, proc_size=proc_size, proc_rank=proc_rank) + if hvd.rank() == 0: if self.save_weights_only: self.model.save_weights(filepath, overwrite=True, options=self._options) @@ -152,7 +154,7 @@ def _maybe_save_restrict_policy_params(de_var, proc_size=1, proc_rank=0): de_opt_var.save_to_file_system(dirpath=de_dir, proc_size=hvd.size(), proc_rank=hvd.rank()) - + # Save restrict policy for each hvd.rank() _maybe_save_restrict_policy_params(de_var, proc_size=hvd.size(), From 74b29a350f15c98d7b96c08d23d4df43187e9bf7 Mon Sep 17 00:00:00 2001 From: Jatin Mandav Date: Wed, 16 Apr 2025 04:18:56 +0000 Subject: [PATCH 19/19] clang format --- .../kernels/cuckoo_hashtable_op_gpu.cu.cc | 1 - .../core/kernels/fill_functor.cu.cc | 3 +-- .../kernels/lookup_impl/lookup_table_op_cpu.h | 20 +++++++++---------- .../kernels/lookup_impl/lookup_table_op_gpu.h | 20 +++++++++---------- .../core/kernels/redis_impl/json.cc | 4 ++-- .../core/kernels/segment_reduction_ops_impl.h | 2 +- .../core/lib/nvhash/nv_util.h | 6 ++---- .../dynamic_embedding/core/utils/utils.h | 3 +-- 8 files changed, 27 insertions(+), 32 deletions(-) diff --git a/tensorflow_recommenders_addons/dynamic_embedding/core/kernels/cuckoo_hashtable_op_gpu.cu.cc b/tensorflow_recommenders_addons/dynamic_embedding/core/kernels/cuckoo_hashtable_op_gpu.cu.cc index 099ec1b66..eef020d09 100644 --- a/tensorflow_recommenders_addons/dynamic_embedding/core/kernels/cuckoo_hashtable_op_gpu.cu.cc +++ b/tensorflow_recommenders_addons/dynamic_embedding/core/kernels/cuckoo_hashtable_op_gpu.cu.cc @@ -15,7 +15,6 @@ limitations under the License. #if GOOGLE_CUDA #include "tensorflow_recommenders_addons/dynamic_embedding/core/kernels/cuckoo_hashtable_op_gpu.h" - #include "tensorflow_recommenders_addons/dynamic_embedding/core/kernels/lookup_impl/lookup_table_op_gpu.h" #include "tensorflow_recommenders_addons/dynamic_embedding/core/utils/utils.h" diff --git a/tensorflow_recommenders_addons/dynamic_embedding/core/kernels/fill_functor.cu.cc b/tensorflow_recommenders_addons/dynamic_embedding/core/kernels/fill_functor.cu.cc index 4a838c798..c3e9ab7e0 100644 --- a/tensorflow_recommenders_addons/dynamic_embedding/core/kernels/fill_functor.cu.cc +++ b/tensorflow_recommenders_addons/dynamic_embedding/core/kernels/fill_functor.cu.cc @@ -18,10 +18,9 @@ limitations under the License. #define EIGEN_USE_GPU -#include "tensorflow/core/kernels/fill_functor.h" - #include "tensorflow/core/framework/register_types.h" #include "tensorflow/core/framework/tensor_types.h" +#include "tensorflow/core/kernels/fill_functor.h" #include "tensorflow/core/platform/types.h" namespace Eigen { diff --git a/tensorflow_recommenders_addons/dynamic_embedding/core/kernels/lookup_impl/lookup_table_op_cpu.h b/tensorflow_recommenders_addons/dynamic_embedding/core/kernels/lookup_impl/lookup_table_op_cpu.h index d294450f2..b74285e1a 100644 --- a/tensorflow_recommenders_addons/dynamic_embedding/core/kernels/lookup_impl/lookup_table_op_cpu.h +++ b/tensorflow_recommenders_addons/dynamic_embedding/core/kernels/lookup_impl/lookup_table_op_cpu.h @@ -429,16 +429,16 @@ struct TableDispatcher { #define CREATE_TABLE_PARTIAL_BRANCHES(PREFIX) \ do { \ - CREATE_A_TABLE((PREFIX) * 10 + 0); \ - CREATE_A_TABLE((PREFIX) * 10 + 1); \ - CREATE_A_TABLE((PREFIX) * 10 + 2); \ - CREATE_A_TABLE((PREFIX) * 10 + 3); \ - CREATE_A_TABLE((PREFIX) * 10 + 4); \ - CREATE_A_TABLE((PREFIX) * 10 + 5); \ - CREATE_A_TABLE((PREFIX) * 10 + 6); \ - CREATE_A_TABLE((PREFIX) * 10 + 7); \ - CREATE_A_TABLE((PREFIX) * 10 + 8); \ - CREATE_A_TABLE((PREFIX) * 10 + 9); \ + CREATE_A_TABLE((PREFIX)*10 + 0); \ + CREATE_A_TABLE((PREFIX)*10 + 1); \ + CREATE_A_TABLE((PREFIX)*10 + 2); \ + CREATE_A_TABLE((PREFIX)*10 + 3); \ + CREATE_A_TABLE((PREFIX)*10 + 4); \ + CREATE_A_TABLE((PREFIX)*10 + 5); \ + CREATE_A_TABLE((PREFIX)*10 + 6); \ + CREATE_A_TABLE((PREFIX)*10 + 7); \ + CREATE_A_TABLE((PREFIX)*10 + 8); \ + CREATE_A_TABLE((PREFIX)*10 + 9); \ } while (0) // create branches with dim range [1, 100] diff --git a/tensorflow_recommenders_addons/dynamic_embedding/core/kernels/lookup_impl/lookup_table_op_gpu.h b/tensorflow_recommenders_addons/dynamic_embedding/core/kernels/lookup_impl/lookup_table_op_gpu.h index ba9e21d2b..f00a4f3cd 100644 --- a/tensorflow_recommenders_addons/dynamic_embedding/core/kernels/lookup_impl/lookup_table_op_gpu.h +++ b/tensorflow_recommenders_addons/dynamic_embedding/core/kernels/lookup_impl/lookup_table_op_gpu.h @@ -214,16 +214,16 @@ class TableWrapper final : public TableWrapperBase { #define CREATE_TABLE_PARTIAL_BRANCHES(PERIFX) \ do { \ - CREATE_A_TABLE((PERIFX) * 10 + 0); \ - CREATE_A_TABLE((PERIFX) * 10 + 1); \ - CREATE_A_TABLE((PERIFX) * 10 + 2); \ - CREATE_A_TABLE((PERIFX) * 10 + 3); \ - CREATE_A_TABLE((PERIFX) * 10 + 4); \ - CREATE_A_TABLE((PERIFX) * 10 + 5); \ - CREATE_A_TABLE((PERIFX) * 10 + 6); \ - CREATE_A_TABLE((PERIFX) * 10 + 7); \ - CREATE_A_TABLE((PERIFX) * 10 + 8); \ - CREATE_A_TABLE((PERIFX) * 10 + 9); \ + CREATE_A_TABLE((PERIFX)*10 + 0); \ + CREATE_A_TABLE((PERIFX)*10 + 1); \ + CREATE_A_TABLE((PERIFX)*10 + 2); \ + CREATE_A_TABLE((PERIFX)*10 + 3); \ + CREATE_A_TABLE((PERIFX)*10 + 4); \ + CREATE_A_TABLE((PERIFX)*10 + 5); \ + CREATE_A_TABLE((PERIFX)*10 + 6); \ + CREATE_A_TABLE((PERIFX)*10 + 7); \ + CREATE_A_TABLE((PERIFX)*10 + 8); \ + CREATE_A_TABLE((PERIFX)*10 + 9); \ } while (0) // create branches with dim range: diff --git a/tensorflow_recommenders_addons/dynamic_embedding/core/kernels/redis_impl/json.cc b/tensorflow_recommenders_addons/dynamic_embedding/core/kernels/redis_impl/json.cc index 7fbe4206b..184548436 100644 --- a/tensorflow_recommenders_addons/dynamic_embedding/core/kernels/redis_impl/json.cc +++ b/tensorflow_recommenders_addons/dynamic_embedding/core/kernels/redis_impl/json.cc @@ -402,8 +402,8 @@ json_value *json_parse_ex(json_settings *settings, const json_char *json, case json_object: if (state.first_pass) - (*(json_char *__attribute(( - __may_alias__)) *)&top->u.object.values) += + (*(json_char * + __attribute((__may_alias__)) *)&top->u.object.values) += string_length + 1; else { top->u.object.values[top->u.object.length].name = diff --git a/tensorflow_recommenders_addons/dynamic_embedding/core/kernels/segment_reduction_ops_impl.h b/tensorflow_recommenders_addons/dynamic_embedding/core/kernels/segment_reduction_ops_impl.h index b70349382..638bf5dfb 100644 --- a/tensorflow_recommenders_addons/dynamic_embedding/core/kernels/segment_reduction_ops_impl.h +++ b/tensorflow_recommenders_addons/dynamic_embedding/core/kernels/segment_reduction_ops_impl.h @@ -82,7 +82,7 @@ template class SparseSegmentSumGpuOp : public AsyncOpKernel { public: explicit SparseSegmentSumGpuOp(OpKernelConstruction* context) - : AsyncOpKernel(context) {}; + : AsyncOpKernel(context){}; void ComputeAsync(OpKernelContext* context, DoneCallback done) override { const Tensor& input_data = context->input(0); diff --git a/tensorflow_recommenders_addons/dynamic_embedding/core/lib/nvhash/nv_util.h b/tensorflow_recommenders_addons/dynamic_embedding/core/lib/nvhash/nv_util.h index 91cc5e4c3..a5d935a22 100644 --- a/tensorflow_recommenders_addons/dynamic_embedding/core/lib/nvhash/nv_util.h +++ b/tensorflow_recommenders_addons/dynamic_embedding/core/lib/nvhash/nv_util.h @@ -14,10 +14,8 @@ #include "cuda_runtime_api.h" -#define CUDA_CHECK(val) \ - { \ - nv::cuda_check_((val), __FILE__, __LINE__); \ - } +#define CUDA_CHECK(val) \ + { nv::cuda_check_((val), __FILE__, __LINE__); } namespace nv { diff --git a/tensorflow_recommenders_addons/dynamic_embedding/core/utils/utils.h b/tensorflow_recommenders_addons/dynamic_embedding/core/utils/utils.h index aba5978f1..0467e753e 100644 --- a/tensorflow_recommenders_addons/dynamic_embedding/core/utils/utils.h +++ b/tensorflow_recommenders_addons/dynamic_embedding/core/utils/utils.h @@ -44,8 +44,7 @@ This code is for compatibility.*/ #ifndef MAYBE_ADD_SOURCE_LOCATION #define MAYBE_ADD_SOURCE_LOCATION(status) \ - { \ - } + {} #endif // MAYBE_ADD_SOURCE_LOCATION // For propagating errors when calling a function but not return status.