Skip to content

Fix flaky test: test_model_checkpoint_score_and_ckpt_val_check_interval #9429

@awaelchli

Description

@awaelchli

🐛 Bug

The test case test_model_checkpoint_score_and_ckpt_val_check_interval fails sporadically, last time I saw it was in the job "macOS-10.15, 3.6, minimal, stable".

Error message:

___ test_model_checkpoint_score_and_ckpt_val_check_interval[0.25-False-True] ___

tmpdir = local('/private/var/folders/24/8k48jl6d249_n_qfxwsl6xvm0000gn/T/pytest-of-runner/pytest-0/test_model_checkpoint_score_an7')
val_check_interval = 0.25, reduce_lr_on_plateau = False, epoch_aligned = True

    @mock.patch.dict(os.environ, {"PL_DEV_DEBUG": "1"})
    @pytest.mark.parametrize(
        "val_check_interval,reduce_lr_on_plateau,epoch_aligned",
        [(0.25, True, True), (0.25, False, True), (0.42, False, False)],
    )
    def test_model_checkpoint_score_and_ckpt_val_check_interval(
        tmpdir, val_check_interval, reduce_lr_on_plateau, epoch_aligned
    ):
        """Test that when a model checkpoint is saved, it saves with the correct score appended to ckpt_path and
        checkpoint data with val_check_interval."""
        max_epochs = 3
        limit_train_batches = 12
        limit_val_batches = 7
        lr, gamma = 1e-1, 2
        monitor = "val_log"
        per_val_train_batches = int(limit_train_batches * val_check_interval)
        per_epoch_val_checks, leftover_train_batches = divmod(limit_train_batches, per_val_train_batches)
    
        class CustomBoringModel(BoringModel):
            def __init__(self):
                super().__init__()
                self.val_logs = torch.randn(per_epoch_val_checks * max_epochs, limit_val_batches)
                self.val_loop_count = 0
                self.scores = []
    
            def validation_step(self, batch, batch_idx):
                log_value = self.val_logs[self.val_loop_count, batch_idx]
                self.log("val_log", log_value)
                return super().validation_step(batch, batch_idx)
    
            def validation_epoch_end(self, outputs):
                self.val_loop_count += 1
                super().validation_epoch_end(outputs)
                self.scores.append(self.trainer.logged_metrics[monitor])
    
            def configure_optimizers(self):
                optimizer = optim.SGD(self.parameters(), lr=lr)
    
                if reduce_lr_on_plateau:
                    lr_scheduler = {
                        "scheduler": optim.lr_scheduler.ReduceLROnPlateau(optimizer),
                        "monitor": monitor,
                        "strict": True,
                    }
                else:
                    lr_scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=1, gamma=gamma)
    
                return [optimizer], [lr_scheduler]
    
        filename = "{" + f"{monitor}" + ":.4f}-{epoch}"
        checkpoint = ModelCheckpoint(dirpath=tmpdir, filename=filename, monitor=monitor, save_top_k=-1)
    
        model = CustomBoringModel()
    
        trainer = Trainer(
            default_root_dir=tmpdir,
            callbacks=[checkpoint],
            limit_train_batches=limit_train_batches,
            limit_val_batches=limit_val_batches,
            max_epochs=max_epochs,
            val_check_interval=val_check_interval,
            progress_bar_refresh_rate=0,
            num_sanity_val_steps=0,
        )
        trainer.fit(model)
        assert trainer.state.finished, f"Training failed with {trainer.state}"
    
        ckpt_files = list(Path(tmpdir).glob("*.ckpt"))
        lr_scheduler_debug = trainer.dev_debugger.saved_lr_scheduler_updates
    
        assert len(ckpt_files) == len(model.scores) == per_epoch_val_checks * max_epochs
        assert len(lr_scheduler_debug) == max_epochs
    
        def _make_assertions(epoch, ix, version=""):
            global_ix = ix + per_epoch_val_checks * epoch
            duplicated = bool(version)
    
            # checkpoint saved at the end of training epoch will have updated lr_scheduler states
            epoch_end_checkpoint = duplicated
            if epoch_aligned:
                epoch_end_checkpoint = ix == (per_epoch_val_checks - 1)
    
            score = model.scores[global_ix]
            expected_score = getattr(model, f"{monitor}s")[global_ix].mean().item()
            expected_filename = f"{monitor}={score:.4f}-epoch={epoch}{version}.ckpt"
            assert math.isclose(score, expected_score, rel_tol=1e-4)
    
            chk = pl_load(os.path.join(checkpoint.dirpath, expected_filename))
            assert chk["epoch"] == epoch + 1
            epoch_num = epoch + duplicated
            expected_global_step = per_val_train_batches * (global_ix + 1) + (leftover_train_batches * epoch_num)
            assert chk["global_step"] == expected_global_step
    
            mc_specific_data = chk["callbacks"][
                f"ModelCheckpoint{{'monitor': '{monitor}', 'mode': 'min', 'every_n_train_steps': 0, 'every_n_epochs': 1,"
                " 'train_time_interval': None, 'save_on_train_epoch_end': False}"
            ]
            assert mc_specific_data["dirpath"] == checkpoint.dirpath
            assert mc_specific_data["monitor"] == monitor
            assert mc_specific_data["current_score"] == score
    
            if not reduce_lr_on_plateau:
                actual_step_count = chk["lr_schedulers"][0]["_step_count"]
                actual_lr = chk["lr_schedulers"][0]["_last_lr"][0]
                assert actual_step_count == epoch + 1 + epoch_end_checkpoint
                assert actual_lr == lr * gamma ** (epoch + epoch_end_checkpoint)
    
            return score
    
        for epoch in range(max_epochs):
            for i in range(per_epoch_val_checks):
>               score = _make_assertions(epoch, i)

tests/checkpointing/test_model_checkpoint.py:288: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

epoch = 2, ix = 2, version = ''

    def _make_assertions(epoch, ix, version=""):
        global_ix = ix + per_epoch_val_checks * epoch
        duplicated = bool(version)
    
        # checkpoint saved at the end of training epoch will have updated lr_scheduler states
        epoch_end_checkpoint = duplicated
        if epoch_aligned:
            epoch_end_checkpoint = ix == (per_epoch_val_checks - 1)
    
        score = model.scores[global_ix]
        expected_score = getattr(model, f"{monitor}s")[global_ix].mean().item()
        expected_filename = f"{monitor}={score:.4f}-epoch={epoch}{version}.ckpt"
        assert math.isclose(score, expected_score, rel_tol=1e-4)
    
        chk = pl_load(os.path.join(checkpoint.dirpath, expected_filename))
        assert chk["epoch"] == epoch + 1
        epoch_num = epoch + duplicated
        expected_global_step = per_val_train_batches * (global_ix + 1) + (leftover_train_batches * epoch_num)
>       assert chk["global_step"] == expected_global_step
E       assert 27 == 33
E         +27
E         -33

tests/checkpointing/test_model_checkpoint.py:268: AssertionError

To Reproduce

Can't be reproduced locally

Expected behavior

Obvious

Environment:
macOS-10.15, 3.6, minimal, stable

Additional context

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't workingciContinuous Integrationhelp wantedOpen to be worked on

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions