@@ -690,7 +690,7 @@ def test_load_metadata_from_cache(self, wrapped_open: MagicMock) -> None:
690690 updater = self ._run_refresh ()
691691 updater .get_targetinfo ("non_existent_target" )
692692
693- # Clear statistics for calls and metadata requests
693+ # Clear statistics for open() calls and metadata requests
694694 wrapped_open .reset_mock ()
695695 self .sim .fetch_tracker .metadata .clear ()
696696
@@ -713,6 +713,87 @@ def test_load_metadata_from_cache(self, wrapped_open: MagicMock) -> None:
713713 expected_calls = [("root" , 2 ), ("timestamp" , None )]
714714 self .assertListEqual (self .sim .fetch_tracker .metadata , expected_calls )
715715
716+
717+ @patch .object (builtins , "open" , wraps = builtins .open )
718+ def test_intermediate_root_cache (self , wrapped_open : MagicMock ) -> None :
719+ """Test that refresh uses the intermediate roots from cache"""
720+ # Add root versions 2, 3
721+ self .sim .root .version += 1
722+ self .sim .publish_root ()
723+ self .sim .root .version += 1
724+ self .sim .publish_root ()
725+
726+ # Make a successful update of valid metadata which stores it in cache
727+ self ._run_refresh ()
728+
729+ # assert that cache lookups happened but data was downloaded from remote
730+ wrapped_open .assert_has_calls (
731+ [
732+ call (os .path .join (self .metadata_dir , "root_history/2.root.json" ), "rb" ),
733+ call (os .path .join (self .metadata_dir , "root_history/3.root.json" ), "rb" ),
734+ call (os .path .join (self .metadata_dir , "root_history/4.root.json" ), "rb" ),
735+ call (os .path .join (self .metadata_dir , "timestamp.json" ), "rb" ),
736+ call (os .path .join (self .metadata_dir , "snapshot.json" ), "rb" ),
737+ call (os .path .join (self .metadata_dir , "targets.json" ), "rb" ),
738+ ]
739+ )
740+ expected_calls = [("root" , 2 ), ("root" , 3 ), ("root" , 4 ), ("timestamp" , None ), ("snapshot" , 1 ), ("targets" , 1 )]
741+ self .assertListEqual (self .sim .fetch_tracker .metadata , expected_calls )
742+
743+ # Clear statistics for open() calls and metadata requests
744+ wrapped_open .reset_mock ()
745+ self .sim .fetch_tracker .metadata .clear ()
746+
747+ # Run update again, assert that metadata from cache was used (including intermediate roots)
748+ self ._run_refresh ()
749+ wrapped_open .assert_has_calls (
750+ [
751+ call (os .path .join (self .metadata_dir , "root_history/2.root.json" ), "rb" ),
752+ call (os .path .join (self .metadata_dir , "root_history/3.root.json" ), "rb" ),
753+ call (os .path .join (self .metadata_dir , "root_history/4.root.json" ), "rb" ),
754+ call (os .path .join (self .metadata_dir , "timestamp.json" ), "rb" ),
755+ call (os .path .join (self .metadata_dir , "snapshot.json" ), "rb" ),
756+ call (os .path .join (self .metadata_dir , "targets.json" ), "rb" ),
757+ ]
758+ )
759+ expected_calls = [("root" , 4 ), ("timestamp" , None )]
760+ self .assertListEqual (self .sim .fetch_tracker .metadata , expected_calls )
761+
762+ def test_intermediate_root_cache_poisoning (self ) -> None :
763+ """Test that refresh works as expected when intermediate roots in cache are poisoned"""
764+ # Add root versions 2, 3
765+ self .sim .root .version += 1
766+ self .sim .publish_root ()
767+ self .sim .root .version += 1
768+ self .sim .publish_root ()
769+
770+ # Make a successful update of valid metadata which stores it in cache
771+ self ._run_refresh ()
772+
773+ # Modify cached intermediate root v2 so that it's no longer signed correctly
774+ root_path = os .path .join (self .metadata_dir , "root_history" , "2.root.json" )
775+ md = Metadata .from_file (root_path )
776+ md .signatures .clear ()
777+ md .to_file (root_path )
778+
779+ # Clear statistics for metadata requests
780+ self .sim .fetch_tracker .metadata .clear ()
781+
782+ # Update again, assert that intermediate root v2 was downloaded again
783+ self ._run_refresh ()
784+
785+ expected_calls = [("root" , 2 ), ("root" , 4 ), ("timestamp" , None )]
786+ self .assertListEqual (self .sim .fetch_tracker .metadata , expected_calls )
787+
788+ # Clear statistics for metadata requests
789+ self .sim .fetch_tracker .metadata .clear ()
790+
791+ # Update again, this time assert that intermediate root v2 was used from cache
792+ self ._run_refresh ()
793+
794+ expected_calls = [("root" , 4 ), ("timestamp" , None )]
795+ self .assertListEqual (self .sim .fetch_tracker .metadata , expected_calls )
796+
716797 def test_expired_metadata (self ) -> None :
717798 """Verifies that expired local timestamp/snapshot can be used for
718799 updating from remote.
0 commit comments