Skip to content

Commit 265e1ed

Browse files
MoFHekarhdong
authored andcommitted
[fix] FileSystem saver didn't restore parameter properly when user create their Keras model with lazy building. Also now fully support using CheckpointManager.
1 parent d0beac3 commit 265e1ed

File tree

4 files changed

+377
-146
lines changed

4 files changed

+377
-146
lines changed

tensorflow_recommenders_addons/dynamic_embedding/python/kernel_tests/horovod_sync_train_test.py

Lines changed: 170 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import itertools
2222
import os
23+
import numpy as np
2324
import shutil
2425

2526
import tensorflow as tf
@@ -34,6 +35,7 @@
3435
from tensorflow.python.ops import variables
3536
from tensorflow.python.platform import test
3637
from tensorflow.python.training import adam
38+
from tensorflow.python.training import checkpoint_management
3739
from tensorflow.python.training import monitored_session
3840
from tensorflow.python.training.optimizer import Optimizer as tf1_opt
3941
from tensorflow.python.training import training_util
@@ -97,6 +99,8 @@ def test_all_to_all_embedding_trainable(self):
9799
self.common_all_to_all_embedding_trainable_v2(keras_base_opt,
98100
keras_test_opt,
99101
name="keras_adam")
102+
self.common_lazy_build_model_with_checkpoint_management_v2(
103+
name="keras_adam_lazy_build")
100104

101105
def common_minimize_trainable_v1(self, base_opt, test_opt, name):
102106
# TODO(rhdong): Recover the testing, if the horovod import error is fixed on macOS+TF2.7+.
@@ -326,20 +330,23 @@ def common_all_to_all_embedding_trainable_v2(self, base_opt, test_opt, name):
326330
de.keras.models.de_hvd_save_model(base_model,
327331
save_dir,
328332
options=save_options)
329-
ckpt = de.train.DEHvdCheckpoint(base_model)
333+
ckpt = de.train.DECheckpoint(
334+
my_model=base_model) # Test custom model key "my_model"
330335
ckpt.save(save_dir + '/ckpt/test')
331-
tf.keras.backend.clear_session()
332336
del base_model
337+
del base_opt
338+
tf.keras.backend.clear_session()
339+
new_opt = de.DynamicEmbeddingOptimizer(Adam(1.1), synchronous=True)
333340
new_base_model = get_emb_sequential_model(
334341
de.keras.layers.HvdAllToAllEmbedding,
335-
base_opt,
342+
new_opt,
336343
dense_init='ones',
337344
embedding_size=dim,
338345
initializer=init,
339346
bp_v2=False,
340347
kv_creator=kv_creator,
341348
name='all2all_emb')
342-
ckpt = de.train.DEHvdCheckpoint(my_model=new_base_model)
349+
ckpt = de.train.DECheckpoint(my_model=new_base_model)
343350
hvd.join() # Sync for avoiding files conflict
344351
ckpt.restore(tf.train.latest_checkpoint(save_dir + '/ckpt/'))
345352
new_a2aemb_size = new_base_model.layers[0].params.size()
@@ -351,6 +358,165 @@ def common_all_to_all_embedding_trainable_v2(self, base_opt, test_opt, name):
351358
self.assertEqual(a2aemb_size, new_a2aemb_size)
352359
hvd.join() # Sync for avoiding files conflict
353360

361+
def common_lazy_build_model_with_checkpoint_management_v2(self, name):
362+
# TODO(rhdong): Recover the testing, if the horovod import error is fixed on macOS+TF2.7+.
363+
try:
364+
import horovod.tensorflow as hvd
365+
except (NotFoundError):
366+
self.skipTest(
367+
"Skip the test for horovod import error with Tensorflow-2.7.0 on MacOS-12."
368+
)
369+
370+
tf.config.set_soft_device_placement(True)
371+
372+
hvd.init()
373+
374+
# These cases need 2 GPUs at least if available.
375+
logical_devices = tf.config.list_logical_devices('GPU')
376+
_device = "GPU" if len(logical_devices) >= hvd.size() else "CPU"
377+
_device_id = hvd.local_rank(
378+
) if _device == "GPU" and len(logical_devices) >= 2 else 0
379+
380+
if _device == "GPU":
381+
os.environ["CUDA_VISIBLE_DEVICES"] = str(_device_id)
382+
383+
dim = 8
384+
385+
class NoCompileModel(tf.keras.models.Model):
386+
387+
def __init__(self, init, dynamic=False):
388+
super().__init__(dynamic=dynamic)
389+
kv_creator = de.CuckooHashTableCreator(saver=de.FileSystemSaver(
390+
proc_size=hvd.size(), proc_rank=hvd.rank()))
391+
self.emb = de.keras.layers.HvdAllToAllEmbedding(embedding_size=dim,
392+
devices=['/GPU:0'],
393+
initializer=0,
394+
kv_creator=kv_creator,
395+
name=name)
396+
self.l1 = tf.keras.layers.Dense(8, 'relu', kernel_initializer=init)
397+
self.l2 = tf.keras.layers.Dense(1, 'sigmoid', kernel_initializer=init)
398+
399+
def build(self, input_shape):
400+
self.emb.build(input_shape)
401+
self.l1.build(input_shape + dim)
402+
self.l2.build(input_shape + 8)
403+
404+
def call(self, x):
405+
out = self.emb(x)
406+
out = self.l1(out)
407+
return self.l2(out)
408+
409+
def check_TFRADynamicEmbedding_directory(save_dir,
410+
save_it,
411+
should_be_exist=True):
412+
hvd_size = hvd.size()
413+
if hvd_size <= 1:
414+
hvd_size = 1
415+
for tag in ['keys', 'values']:
416+
for rank in range(hvd_size):
417+
self.assertTrue(not (os.path.exists(
418+
save_dir +
419+
f'/TFRADynamicEmbedding-{save_it}/{name}-parameter_mht_1of1_rank{rank}_size{hvd_size}-{tag}'
420+
) ^ should_be_exist))
421+
self.assertTrue(not (os.path.exists(
422+
save_dir +
423+
f'/TFRADynamicEmbedding-{save_it}/{name}-parameter_DynamicEmbedding_keras_adam_lazy_build-shadow_m_mht_1of1_rank{rank}_size{hvd_size}-{tag}'
424+
) ^ should_be_exist))
425+
# f'/TFRADynamicEmbedding-{save_it}/{name}-parameter_no_compile_model_DynamicEmbedding_keras_adam_lazy_build-shadow_m_mht_1of1_rank{rank}_size{hvd_size}-{tag}'
426+
self.assertTrue(not (os.path.exists(
427+
save_dir +
428+
f'/TFRADynamicEmbedding-{save_it}/{name}-parameter_DynamicEmbedding_keras_adam_lazy_build-shadow_v_mht_1of1_rank{rank}_size{hvd_size}-{tag}'
429+
) ^ should_be_exist))
430+
# f'/TFRADynamicEmbedding-{save_it}/{name}-parameter_no_compile_model_DynamicEmbedding_keras_adam_lazy_build-shadow_v_mht_1of1_rank{rank}_size{hvd_size}-{tag}'
431+
432+
with tf.device("/{}:{}".format(_device, _device_id)):
433+
x = tf.reshape(tf.range(0, 32, dtype=tf.int64), [32, 1])
434+
y = tf.random.uniform(shape=[32, 1])
435+
436+
save_dir = self.get_temp_dir()
437+
438+
model = NoCompileModel('ones')
439+
base_opt = Adam(1.0)
440+
base_opt = de.DynamicEmbeddingOptimizer(base_opt, synchronous=True)
441+
ckpt = de.train.DECheckpoint(model=model, optimizer=base_opt)
442+
model.compile(optimizer=base_opt, loss='mean_absolute_error')
443+
manager = checkpoint_management.CheckpointManager(ckpt,
444+
save_dir,
445+
max_to_keep=1)
446+
model.fit(x, y, verbose=0)
447+
manager.save()
448+
if hvd.rank() == 0:
449+
check_TFRADynamicEmbedding_directory(save_dir,
450+
save_it=1,
451+
should_be_exist=True)
452+
for l in model.layers:
453+
if name in l.name:
454+
l.params.upsert(x * 10, tf.random.uniform(shape=[32, 1, dim]))
455+
emb_size = l.params.size()
456+
emb_keys, emb_values = l.params.export()
457+
break
458+
for v in base_opt.variables():
459+
if name in v.name:
460+
v.params.upsert(x * 10, tf.random.uniform(shape=[32, 1, dim]))
461+
opt_size = v.params.size()
462+
opt_keys, opt_values = l.params.export()
463+
break
464+
manager.save()
465+
if hvd.rank() == 0:
466+
check_TFRADynamicEmbedding_directory(save_dir,
467+
save_it=2,
468+
should_be_exist=True)
469+
# CheckpointManager delete checkpoint after the write functuon, but DE KV checkpoint saving and deleting inside the write functuon.
470+
# So DE KV checkpoint TFRADynamicEmbedding directory will be always one more than TF checkpoint file.
471+
manager.save()
472+
if hvd.rank() == 0:
473+
check_TFRADynamicEmbedding_directory(
474+
save_dir, save_it=1, should_be_exist=False
475+
) # Check delete TFRADynamicEmbedding directory properly.
476+
477+
del base_opt
478+
del model
479+
del ckpt
480+
tf.keras.backend.clear_session()
481+
tf.compat.v1.reset_default_graph()
482+
483+
new_model = NoCompileModel('zeros')
484+
new_opt = Adam(1.1)
485+
new_opt = de.DynamicEmbeddingOptimizer(new_opt, synchronous=True)
486+
new_ckpt = de.train.DECheckpoint(model=new_model, optimizer=new_opt)
487+
manager = checkpoint_management.CheckpointManager(new_ckpt,
488+
save_dir,
489+
max_to_keep=1)
490+
manager.restore_or_initialize()
491+
new_model.compile(optimizer=new_opt, loss='mean_absolute_error')
492+
new_model(x) # Build vairiables
493+
try:
494+
new_opt._create_all_weights(new_model.variables)
495+
except:
496+
#TODO(MoFHejia) raise ValueError: Cannot convert a partially known TensorShape <unknown> to a Tensor.
497+
pass
498+
for l in new_model.layers:
499+
if name in l.name:
500+
new_emb_size = l.params.size()
501+
new_emb_keys, new_emb_values = l.params.export()
502+
break
503+
for v in new_opt.variables():
504+
if name in v.name:
505+
new_opt_size = v.params.size()
506+
new_opt_keys, new_opt_values = l.params.export()
507+
break
508+
509+
self.assertEqual(emb_size, new_emb_size)
510+
self.assertEqual(opt_size, new_opt_size)
511+
self.assertAllEqual(np.sort(emb_keys, axis=0),
512+
np.sort(new_emb_keys, axis=0))
513+
self.assertAllClose(np.sort(emb_values, axis=0),
514+
np.sort(new_emb_values, axis=0))
515+
self.assertAllEqual(np.sort(opt_keys, axis=0),
516+
np.sort(new_opt_keys, axis=0))
517+
self.assertAllClose(np.sort(opt_values, axis=0),
518+
np.sort(new_opt_values, axis=0))
519+
354520

355521
if __name__ == "__main__":
356522
test.main()

tensorflow_recommenders_addons/dynamic_embedding/python/kernel_tests/shadow_embedding_ops_test.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -560,16 +560,16 @@ def size(self):
560560

561561
model_dir = tempfile.mkdtemp(prefix=self.get_temp_dir())
562562
save_ckpt_dir = os.path.join(model_dir, 'model')
563-
restore_ckpt_path = os.path.join(model_dir, 'model-1')
564563

565564
options = tf.saved_model.SaveOptions(namespace_whitelist=['TFRA'])
566-
ckpt = tf.train.Checkpoint(module)
565+
ckpt = de.train.DECheckpoint(module)
567566
ckpt.save(save_ckpt_dir)
568567
shadow_value = module.shadow.read_value(False)
569568
self.assertAllEqual(shadow_value.shape, (0, 2)) # clear when saving
570569

571570
new_module = TestModule()
572-
new_ckpt = tf.train.Checkpoint(new_module)
571+
new_ckpt = de.train.DECheckpoint(new_module)
572+
restore_ckpt_path = tf.train.latest_checkpoint(model_dir)
573573
new_ckpt.read(restore_ckpt_path)
574574
self.assertEqual(new_module.size(), 3)
575575
expected_values = module(keys)
@@ -640,18 +640,18 @@ def size(self):
640640

641641
model_dir = tempfile.mkdtemp(prefix=self.get_temp_dir())
642642
save_ckpt_dir = os.path.join(model_dir, 'model')
643-
restore_ckpt_path = os.path.join(model_dir, 'model-1')
644643

645644
options = tf.saved_model.SaveOptions(namespace_whitelist=['TFRA'])
646-
ckpt = tf.train.Checkpoint(module)
645+
ckpt = de.train.DECheckpoint(module)
647646
ckpt.save(save_ckpt_dir)
648647
shadow_value = module.shadow.read_value(False)
649648
self.assertAllEqual(shadow_value.shape, (0, 1)) # clear when saving
650649

651650
tf.keras.backend.clear_session()
652651
del module, ckpt
653652
new_module = TestNewModule(table_devices_)
654-
new_ckpt = tf.train.Checkpoint(new_module)
653+
new_ckpt = de.train.DECheckpoint(new_module)
654+
restore_ckpt_path = tf.train.latest_checkpoint(model_dir)
655655
new_ckpt.read(restore_ckpt_path)
656656
self.assertEqual(new_module.size(), test_size)
657657
expected_values = new_module(keys)
@@ -663,7 +663,7 @@ def size(self):
663663
shard_num = 5
664664
table_devices_ = table_device * shard_num
665665
new_module = TestNewModule(table_devices_)
666-
new_ckpt = tf.train.Checkpoint(new_module)
666+
new_ckpt = de.train.DECheckpoint(new_module)
667667
new_ckpt.read(restore_ckpt_path)
668668
self.assertEqual(new_module.size(), test_size)
669669
expected_values = new_module(keys)
@@ -675,7 +675,7 @@ def size(self):
675675
shard_num = 2
676676
table_devices_ = table_device * shard_num
677677
new_module = TestNewModule(table_devices_)
678-
new_ckpt = tf.train.Checkpoint(new_module)
678+
new_ckpt = de.train.DECheckpoint(new_module)
679679
new_ckpt.read(restore_ckpt_path)
680680
self.assertEqual(new_module.size(), test_size)
681681
expected_values = new_module(keys)
@@ -687,7 +687,7 @@ def size(self):
687687
shard_num = 1
688688
table_devices_ = table_device * shard_num
689689
new_module = TestNewModule(table_devices_)
690-
new_ckpt = tf.train.Checkpoint(new_module)
690+
new_ckpt = de.train.DECheckpoint(new_module)
691691
new_ckpt.read(restore_ckpt_path)
692692
self.assertEqual(new_module.size(), test_size)
693693
expected_values = new_module(keys)
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
11
from tensorflow_recommenders_addons.dynamic_embedding.python.train.saver import DEHvdSaver
2-
from tensorflow_recommenders_addons.dynamic_embedding.python.train.checkpoint import DEHvdCheckpoint
2+
from tensorflow_recommenders_addons.dynamic_embedding.python.train.checkpoint import DECheckpoint

0 commit comments

Comments
 (0)