|  | 
| 38 | 38 | from scanpipe.models import CodebaseResource | 
| 39 | 39 | from scanpipe.models import DiscoveredPackage | 
| 40 | 40 | from scanpipe.models import Project | 
|  | 41 | +from scanpipe.pipes import alpine | 
| 41 | 42 | from scanpipe.pipes import codebase | 
| 42 | 43 | from scanpipe.pipes import docker | 
| 43 | 44 | from scanpipe.pipes import fetch | 
| @@ -756,6 +757,126 @@ def test_scanpipe_pipes_rootfs_has_hash_diff(self): | 
| 756 | 757 |         codebase_resource = CodebaseResource(sha256="sha256", md5="md5") | 
| 757 | 758 |         self.assertFalse(rootfs.has_hash_diff(install_file, codebase_resource)) | 
| 758 | 759 | 
 | 
|  | 760 | +    @mock.patch("scanpipe.pipes.alpine.fetch_via_git") | 
|  | 761 | +    def test_scanpipe_pipes_alpine_download_or_checkout_aports(self, fetch_via_git): | 
|  | 762 | +        example_path = Path() | 
|  | 763 | +        aports_path = str(example_path / alpine.APORTS_DIR_NAME) | 
|  | 764 | + | 
|  | 765 | +        alpine.download_or_checkout_aports( | 
|  | 766 | +            aports_dir_path=example_path, alpine_version="3.13.14" | 
|  | 767 | +        ) | 
|  | 768 | +        fetch_via_git.assert_called_with( | 
|  | 769 | +            url=f"git+{alpine.APORTS_URL}@3.13-stable", location=aports_path | 
|  | 770 | +        ) | 
|  | 771 | + | 
|  | 772 | +        alpine.download_or_checkout_aports( | 
|  | 773 | +            aports_dir_path=example_path, alpine_version="3.13.14", commit_id="1" | 
|  | 774 | +        ) | 
|  | 775 | +        fetch_via_git.assert_called_with( | 
|  | 776 | +            url=f"git+{alpine.APORTS_URL}@1", location=aports_path | 
|  | 777 | +        ) | 
|  | 778 | + | 
|  | 779 | +    def test_scanpipe_pipes_alpine_get_unscanned_packages_from_db(self): | 
|  | 780 | +        project = Project.objects.create(name="example") | 
|  | 781 | +        alpine_versions = {"1": "3.12", "2": "3.13"} | 
|  | 782 | +        package_field_names = ( | 
|  | 783 | +            "type", | 
|  | 784 | +            "name", | 
|  | 785 | +            "version", | 
|  | 786 | +            "vcs_url", | 
|  | 787 | +            "source_packages", | 
|  | 788 | +            "extra_data", | 
|  | 789 | +        ) | 
|  | 790 | +        package_data = [ | 
|  | 791 | +            ("debian",), | 
|  | 792 | +            ("rpm",), | 
|  | 793 | +            ("alpine", "A", "1.0", "id=A", [], {"image_id": "1"}), | 
|  | 794 | +            ("alpine", "B", "1.0", "id=B", [], {"image_id": "2"}), | 
|  | 795 | +        ] | 
|  | 796 | +        #The test will get bigger (thus arrays and loops instead of consecutive function calls) - futher patches for this function expected | 
|  | 797 | +        expected_package_tuples = [ | 
|  | 798 | +            ( | 
|  | 799 | +                "3.13", | 
|  | 800 | +                "B", | 
|  | 801 | +                project.tmp_path / "B_1.0", | 
|  | 802 | +                project.output_path / "B_1.0.json", | 
|  | 803 | +            ), | 
|  | 804 | +        ] | 
|  | 805 | +        (project.output_path / "A_1.0.json").touch() | 
|  | 806 | +        for package_data_tuple in package_data: | 
|  | 807 | +            DiscoveredPackage.objects.create( | 
|  | 808 | +                project=project, **dict(zip(package_field_names, package_data_tuple)) | 
|  | 809 | +            ) | 
|  | 810 | +        yielded_package_tuples = alpine.get_unscanned_packages_from_db( | 
|  | 811 | +            project=project, alpine_versions=alpine_versions | 
|  | 812 | +        ) | 
|  | 813 | +        for i, package_tuple in enumerate(yielded_package_tuples): | 
|  | 814 | +            self.assertEqual(expected_package_tuples[i], package_tuple[:4]) | 
|  | 815 | + | 
|  | 816 | +    @mock.patch("scanpipe.pipes.alpine.alpine.parse_apkbuild") | 
|  | 817 | +    @mock.patch("scanpipe.pipes.alpine.copytree") | 
|  | 818 | +    def test_scanpipe_pipes_alpine_prepare_scan_dir(self, copytree, parse_apkbuild): | 
|  | 819 | +        example_path = Path() | 
|  | 820 | + | 
|  | 821 | +        (self.data_location / alpine.APORTS_DIR_NAME / "main" / "A").mkdir( | 
|  | 822 | +            parents=True, exist_ok=True | 
|  | 823 | +        ) | 
|  | 824 | +        (self.data_location / alpine.APORTS_DIR_NAME / "non-free" / "A").mkdir( | 
|  | 825 | +            parents=True, exist_ok=True | 
|  | 826 | +        ) | 
|  | 827 | +        (self.data_location / alpine.APORTS_DIR_NAME / "community" / "B").mkdir( | 
|  | 828 | +            parents=True, exist_ok=True | 
|  | 829 | +        ) | 
|  | 830 | + | 
|  | 831 | +        returned_value = alpine.prepare_scan_dir( | 
|  | 832 | +            package_name="A", | 
|  | 833 | +            scan_target_path=example_path, | 
|  | 834 | +            aports_dir_path=self.data_location, | 
|  | 835 | +        ) | 
|  | 836 | +        self.assertEqual(returned_value, None) | 
|  | 837 | + | 
|  | 838 | +        returned_value = alpine.prepare_scan_dir( | 
|  | 839 | +            package_name="B", | 
|  | 840 | +            scan_target_path=example_path, | 
|  | 841 | +            aports_dir_path=self.data_location, | 
|  | 842 | +        ) | 
|  | 843 | +        self.assertEqual(returned_value, None) | 
|  | 844 | + | 
|  | 845 | +        returned_value = alpine.prepare_scan_dir( | 
|  | 846 | +            package_name="C", | 
|  | 847 | +            scan_target_path=example_path, | 
|  | 848 | +            aports_dir_path=self.data_location, | 
|  | 849 | +        ) | 
|  | 850 | +        self.assertEqual(returned_value, None) | 
|  | 851 | + | 
|  | 852 | +        returned_value = alpine.prepare_scan_dir( | 
|  | 853 | +            package_name="D", | 
|  | 854 | +            scan_target_path=example_path, | 
|  | 855 | +            aports_dir_path=self.data_location, | 
|  | 856 | +        ) | 
|  | 857 | +        self.assertEqual(returned_value, example_path) | 
|  | 858 | + | 
|  | 859 | +        returned_value = alpine.prepare_scan_dir( | 
|  | 860 | +            package_name="E", | 
|  | 861 | +            scan_target_path=example_path, | 
|  | 862 | +            aports_dir_path=self.data_location, | 
|  | 863 | +        ) | 
|  | 864 | +        self.assertEqual(returned_value, example_path) | 
|  | 865 | + | 
|  | 866 | +    def test_scanpipe_pipes_alpine_extract_summary_fields(self): | 
|  | 867 | +        returned_value = alpine.extract_summary_fields( | 
|  | 868 | +            self.data_location / "example_scan_summary.json", | 
|  | 869 | +            ["copyrights", "holders", "authors"], | 
|  | 870 | +        ) | 
|  | 871 | +        self.assertEqual( | 
|  | 872 | +            returned_value, | 
|  | 873 | +            { | 
|  | 874 | +                "copyrights": ["Copyright (c) A B", "Copyright (c) C D"], | 
|  | 875 | +                "holders": ["A B", "C D"], | 
|  | 876 | +                "authors": ["A B", "C D"], | 
|  | 877 | +            }, | 
|  | 878 | +        ) | 
|  | 879 | + | 
| 759 | 880 | 
 | 
| 760 | 881 | class ScanPipePipesTransactionTest(TransactionTestCase): | 
| 761 | 882 |     """ | 
|  | 
0 commit comments