@@ -696,7 +696,7 @@ def test_load_metadata_from_cache(self, wrapped_open: MagicMock) -> None:
696696 updater = self ._run_refresh ()
697697 updater .get_targetinfo ("non_existent_target" )
698698
699- # Clear statistics for calls and metadata requests
699+ # Clear statistics for open() calls and metadata requests
700700 wrapped_open .reset_mock ()
701701 self .sim .fetch_tracker .metadata .clear ()
702702
@@ -719,6 +719,87 @@ def test_load_metadata_from_cache(self, wrapped_open: MagicMock) -> None:
719719 expected_calls = [("root" , 2 ), ("timestamp" , None )]
720720 self .assertListEqual (self .sim .fetch_tracker .metadata , expected_calls )
721721
722+
723+ @patch .object (builtins , "open" , wraps = builtins .open )
724+ def test_intermediate_root_cache (self , wrapped_open : MagicMock ) -> None :
725+ """Test that refresh uses the intermediate roots from cache"""
726+ # Add root versions 2, 3
727+ self .sim .root .version += 1
728+ self .sim .publish_root ()
729+ self .sim .root .version += 1
730+ self .sim .publish_root ()
731+
732+ # Make a successful update of valid metadata which stores it in cache
733+ self ._run_refresh ()
734+
735+ # assert that cache lookups happened but data was downloaded from remote
736+ wrapped_open .assert_has_calls (
737+ [
738+ call (os .path .join (self .metadata_dir , "root_history/2.root.json" ), "rb" ),
739+ call (os .path .join (self .metadata_dir , "root_history/3.root.json" ), "rb" ),
740+ call (os .path .join (self .metadata_dir , "root_history/4.root.json" ), "rb" ),
741+ call (os .path .join (self .metadata_dir , "timestamp.json" ), "rb" ),
742+ call (os .path .join (self .metadata_dir , "snapshot.json" ), "rb" ),
743+ call (os .path .join (self .metadata_dir , "targets.json" ), "rb" ),
744+ ]
745+ )
746+ expected_calls = [("root" , 2 ), ("root" , 3 ), ("root" , 4 ), ("timestamp" , None ), ("snapshot" , 1 ), ("targets" , 1 )]
747+ self .assertListEqual (self .sim .fetch_tracker .metadata , expected_calls )
748+
749+ # Clear statistics for open() calls and metadata requests
750+ wrapped_open .reset_mock ()
751+ self .sim .fetch_tracker .metadata .clear ()
752+
753+ # Run update again, assert that metadata from cache was used (including intermediate roots)
754+ self ._run_refresh ()
755+ wrapped_open .assert_has_calls (
756+ [
757+ call (os .path .join (self .metadata_dir , "root_history/2.root.json" ), "rb" ),
758+ call (os .path .join (self .metadata_dir , "root_history/3.root.json" ), "rb" ),
759+ call (os .path .join (self .metadata_dir , "root_history/4.root.json" ), "rb" ),
760+ call (os .path .join (self .metadata_dir , "timestamp.json" ), "rb" ),
761+ call (os .path .join (self .metadata_dir , "snapshot.json" ), "rb" ),
762+ call (os .path .join (self .metadata_dir , "targets.json" ), "rb" ),
763+ ]
764+ )
765+ expected_calls = [("root" , 4 ), ("timestamp" , None )]
766+ self .assertListEqual (self .sim .fetch_tracker .metadata , expected_calls )
767+
768+ def test_intermediate_root_cache_poisoning (self ) -> None :
769+ """Test that refresh works as expected when intermediate roots in cache are poisoned"""
770+ # Add root versions 2, 3
771+ self .sim .root .version += 1
772+ self .sim .publish_root ()
773+ self .sim .root .version += 1
774+ self .sim .publish_root ()
775+
776+ # Make a successful update of valid metadata which stores it in cache
777+ self ._run_refresh ()
778+
779+ # Modify cached intermediate root v2 so that it's no longer signed correctly
780+ root_path = os .path .join (self .metadata_dir , "root_history" , "2.root.json" )
781+ md = Metadata .from_file (root_path )
782+ md .signatures .clear ()
783+ md .to_file (root_path )
784+
785+ # Clear statistics for metadata requests
786+ self .sim .fetch_tracker .metadata .clear ()
787+
788+ # Update again, assert that intermediate root v2 was downloaded again
789+ self ._run_refresh ()
790+
791+ expected_calls = [("root" , 2 ), ("root" , 4 ), ("timestamp" , None )]
792+ self .assertListEqual (self .sim .fetch_tracker .metadata , expected_calls )
793+
794+ # Clear statistics for metadata requests
795+ self .sim .fetch_tracker .metadata .clear ()
796+
797+ # Update again, this time assert that intermediate root v2 was used from cache
798+ self ._run_refresh ()
799+
800+ expected_calls = [("root" , 4 ), ("timestamp" , None )]
801+ self .assertListEqual (self .sim .fetch_tracker .metadata , expected_calls )
802+
722803 def test_expired_metadata (self ) -> None :
723804 """Verifies that expired local timestamp/snapshot can be used for
724805 updating from remote.
0 commit comments