|
13 | 13 | from tests.common.db.organizations import ( |
14 | 14 | OrganizationFactory, |
15 | 15 | OrganizationManualActivationFactory, |
| 16 | + OrganizationOIDCIssuerFactory, |
16 | 17 | OrganizationRoleFactory, |
17 | 18 | OrganizationStripeCustomerFactory, |
18 | 19 | ) |
19 | 20 | from tests.common.db.subscriptions import StripeCustomerFactory |
20 | 21 | from warehouse.admin.views import organizations as views |
21 | 22 | from warehouse.organizations.models import ( |
| 23 | + OIDCIssuerType, |
22 | 24 | OrganizationManualActivation, |
| 25 | + OrganizationOIDCIssuer, |
23 | 26 | OrganizationRole, |
24 | 27 | OrganizationRoleType, |
25 | 28 | OrganizationType, |
@@ -1736,3 +1739,317 @@ def test_set_total_size_limit_below_default(self, db_request): |
1736 | 1739 | ) |
1737 | 1740 | ] |
1738 | 1741 | assert result.status_code == 303 |
| 1742 | + |
| 1743 | + |
| 1744 | +class TestAddOIDCIssuer: |
| 1745 | + def test_add_oidc_issuer_success(self, db_request, monkeypatch): |
| 1746 | + organization = OrganizationFactory.create() |
| 1747 | + admin_user = UserFactory.create(username="admin") |
| 1748 | + |
| 1749 | + # Mock record_event |
| 1750 | + record_event = pretend.call_recorder(lambda **kwargs: None) |
| 1751 | + monkeypatch.setattr(organization, "record_event", record_event) |
| 1752 | + |
| 1753 | + db_request.matchdict = {"organization_id": str(organization.id)} |
| 1754 | + db_request.user = admin_user |
| 1755 | + db_request.route_path = pretend.call_recorder( |
| 1756 | + lambda *a, **kw: "/admin/organizations/" |
| 1757 | + ) |
| 1758 | + db_request.session = pretend.stub( |
| 1759 | + flash=pretend.call_recorder(lambda *a, **kw: None) |
| 1760 | + ) |
| 1761 | + db_request.POST = MultiDict( |
| 1762 | + { |
| 1763 | + "issuer_type": "gitlab", |
| 1764 | + "issuer_url": "https://gitlab.company.com", |
| 1765 | + } |
| 1766 | + ) |
| 1767 | + |
| 1768 | + result = views.add_oidc_issuer(db_request) |
| 1769 | + |
| 1770 | + assert isinstance(result, HTTPSeeOther) |
| 1771 | + assert result.location == "/admin/organizations/" |
| 1772 | + |
| 1773 | + assert db_request.session.flash.calls == [ |
| 1774 | + pretend.call( |
| 1775 | + "OIDC issuer 'https://gitlab.company.com' (gitlab) added to " |
| 1776 | + f"'{organization.name}'", |
| 1777 | + queue="success", |
| 1778 | + ) |
| 1779 | + ] |
| 1780 | + |
| 1781 | + issuer = db_request.db.query(OrganizationOIDCIssuer).one() |
| 1782 | + assert issuer.issuer_type == OIDCIssuerType.GitLab |
| 1783 | + assert issuer.issuer_url == "https://gitlab.company.com" |
| 1784 | + assert issuer.organization == organization |
| 1785 | + assert issuer.created_by == admin_user |
| 1786 | + |
| 1787 | + # Check event was recorded |
| 1788 | + assert record_event.calls == [ |
| 1789 | + pretend.call( |
| 1790 | + request=db_request, |
| 1791 | + tag="admin:organization:oidc_issuer:add", |
| 1792 | + additional={ |
| 1793 | + "issuer_type": "gitlab", |
| 1794 | + "issuer_url": "https://gitlab.company.com", |
| 1795 | + }, |
| 1796 | + ) |
| 1797 | + ] |
| 1798 | + |
| 1799 | + def test_add_oidc_issuer_invalid_form(self, db_request): |
| 1800 | + organization = OrganizationFactory.create() |
| 1801 | + admin_user = UserFactory.create(username="admin") |
| 1802 | + |
| 1803 | + db_request.matchdict = {"organization_id": str(organization.id)} |
| 1804 | + db_request.user = admin_user |
| 1805 | + db_request.route_path = pretend.call_recorder( |
| 1806 | + lambda *a, **kw: "/admin/organizations/" |
| 1807 | + ) |
| 1808 | + db_request.session = pretend.stub( |
| 1809 | + flash=pretend.call_recorder(lambda *a, **kw: None) |
| 1810 | + ) |
| 1811 | + # Missing issuer_url |
| 1812 | + db_request.POST = MultiDict({"issuer_type": "gitlab"}) |
| 1813 | + |
| 1814 | + result = views.add_oidc_issuer(db_request) |
| 1815 | + assert isinstance(result, HTTPSeeOther) |
| 1816 | + |
| 1817 | + # Should flash form validation errors |
| 1818 | + assert len(db_request.session.flash.calls) > 0 |
| 1819 | + assert "error" in str(db_request.session.flash.calls[0]) |
| 1820 | + |
| 1821 | + def test_add_oidc_issuer_invalid_url(self, db_request): |
| 1822 | + organization = OrganizationFactory.create() |
| 1823 | + admin_user = UserFactory.create(username="admin") |
| 1824 | + |
| 1825 | + db_request.matchdict = {"organization_id": str(organization.id)} |
| 1826 | + db_request.user = admin_user |
| 1827 | + db_request.route_path = pretend.call_recorder( |
| 1828 | + lambda *a, **kw: "/admin/organizations/" |
| 1829 | + ) |
| 1830 | + db_request.session = pretend.stub( |
| 1831 | + flash=pretend.call_recorder(lambda *a, **kw: None) |
| 1832 | + ) |
| 1833 | + # Invalid URL (not https) |
| 1834 | + db_request.POST = MultiDict( |
| 1835 | + { |
| 1836 | + "issuer_type": "gitlab", |
| 1837 | + "issuer_url": "http://gitlab.company.com", |
| 1838 | + } |
| 1839 | + ) |
| 1840 | + |
| 1841 | + result = views.add_oidc_issuer(db_request) |
| 1842 | + assert isinstance(result, HTTPSeeOther) |
| 1843 | + |
| 1844 | + # Should flash form validation errors |
| 1845 | + flash_messages = [call.args[0] for call in db_request.session.flash.calls] |
| 1846 | + assert any("https://" in msg for msg in flash_messages) |
| 1847 | + |
| 1848 | + def test_add_oidc_issuer_duplicate(self, db_request, monkeypatch): |
| 1849 | + organization = OrganizationFactory.create() |
| 1850 | + admin_user = UserFactory.create(username="admin") |
| 1851 | + |
| 1852 | + # Create existing issuer |
| 1853 | + OrganizationOIDCIssuerFactory.create( |
| 1854 | + organization=organization, |
| 1855 | + issuer_type=OIDCIssuerType.GitLab, |
| 1856 | + issuer_url="https://gitlab.company.com", |
| 1857 | + created_by=admin_user, |
| 1858 | + ) |
| 1859 | + |
| 1860 | + # Mock record_event (should not be called on duplicate) |
| 1861 | + record_event = pretend.call_recorder(lambda **kwargs: None) |
| 1862 | + monkeypatch.setattr(organization, "record_event", record_event) |
| 1863 | + |
| 1864 | + db_request.matchdict = {"organization_id": str(organization.id)} |
| 1865 | + db_request.user = admin_user |
| 1866 | + db_request.route_path = pretend.call_recorder( |
| 1867 | + lambda *a, **kw: "/admin/organizations/" |
| 1868 | + ) |
| 1869 | + db_request.session = pretend.stub( |
| 1870 | + flash=pretend.call_recorder(lambda *a, **kw: None) |
| 1871 | + ) |
| 1872 | + db_request.POST = MultiDict( |
| 1873 | + { |
| 1874 | + "issuer_type": "gitlab", |
| 1875 | + "issuer_url": "https://gitlab.company.com", |
| 1876 | + } |
| 1877 | + ) |
| 1878 | + |
| 1879 | + result = views.add_oidc_issuer(db_request) |
| 1880 | + assert isinstance(result, HTTPSeeOther) |
| 1881 | + |
| 1882 | + assert db_request.session.flash.calls == [ |
| 1883 | + pretend.call( |
| 1884 | + "Issuer 'https://gitlab.company.com' already exists " |
| 1885 | + f"for organization '{organization.name}'", |
| 1886 | + queue="error", |
| 1887 | + ) |
| 1888 | + ] |
| 1889 | + |
| 1890 | + # No new event recorded |
| 1891 | + assert record_event.calls == [] |
| 1892 | + |
| 1893 | + def test_add_oidc_issuer_organization_not_found(self, db_request): |
| 1894 | + admin_user = UserFactory.create(username="admin") |
| 1895 | + |
| 1896 | + db_request.matchdict = { |
| 1897 | + "organization_id": "00000000-0000-0000-0000-000000000000" |
| 1898 | + } |
| 1899 | + db_request.user = admin_user |
| 1900 | + |
| 1901 | + with pytest.raises(HTTPNotFound): |
| 1902 | + views.add_oidc_issuer(db_request) |
| 1903 | + |
| 1904 | + |
| 1905 | +class TestDeleteOIDCIssuer: |
| 1906 | + def test_delete_oidc_issuer_success(self, db_request, monkeypatch): |
| 1907 | + organization = OrganizationFactory.create() |
| 1908 | + admin_user = UserFactory.create(username="admin") |
| 1909 | + |
| 1910 | + issuer = OrganizationOIDCIssuerFactory.create( |
| 1911 | + organization=organization, |
| 1912 | + issuer_type=OIDCIssuerType.GitLab, |
| 1913 | + issuer_url="https://gitlab.company.com", |
| 1914 | + created_by=admin_user, |
| 1915 | + ) |
| 1916 | + |
| 1917 | + # Mock record_event |
| 1918 | + record_event = pretend.call_recorder(lambda **kwargs: None) |
| 1919 | + monkeypatch.setattr(organization, "record_event", record_event) |
| 1920 | + |
| 1921 | + db_request.matchdict = { |
| 1922 | + "organization_id": str(organization.id), |
| 1923 | + "issuer_id": str(issuer.id), |
| 1924 | + } |
| 1925 | + db_request.route_path = pretend.call_recorder( |
| 1926 | + lambda *a, **kw: "/admin/organizations/" |
| 1927 | + ) |
| 1928 | + db_request.session = pretend.stub( |
| 1929 | + flash=pretend.call_recorder(lambda *a, **kw: None) |
| 1930 | + ) |
| 1931 | + db_request.POST = MultiDict({"confirm": "https://gitlab.company.com"}) |
| 1932 | + |
| 1933 | + result = views.delete_oidc_issuer(db_request) |
| 1934 | + |
| 1935 | + assert isinstance(result, HTTPSeeOther) |
| 1936 | + assert result.location == "/admin/organizations/" |
| 1937 | + |
| 1938 | + assert db_request.session.flash.calls == [ |
| 1939 | + pretend.call( |
| 1940 | + "OIDC issuer 'https://gitlab.company.com' removed " |
| 1941 | + f"from '{organization.name}'", |
| 1942 | + queue="success", |
| 1943 | + ) |
| 1944 | + ] |
| 1945 | + |
| 1946 | + assert db_request.db.query(OrganizationOIDCIssuer).count() == 0 |
| 1947 | + |
| 1948 | + # Check event was recorded |
| 1949 | + assert record_event.calls == [ |
| 1950 | + pretend.call( |
| 1951 | + request=db_request, |
| 1952 | + tag="admin:organization:oidc_issuer:delete", |
| 1953 | + additional={ |
| 1954 | + "issuer_type": "gitlab", |
| 1955 | + "issuer_url": "https://gitlab.company.com", |
| 1956 | + }, |
| 1957 | + ) |
| 1958 | + ] |
| 1959 | + |
| 1960 | + def test_delete_oidc_issuer_not_found(self, db_request): |
| 1961 | + organization = OrganizationFactory.create() |
| 1962 | + |
| 1963 | + db_request.matchdict = { |
| 1964 | + "organization_id": str(organization.id), |
| 1965 | + "issuer_id": "00000000-0000-0000-0000-000000000000", |
| 1966 | + } |
| 1967 | + db_request.route_path = pretend.call_recorder( |
| 1968 | + lambda *a, **kw: "/admin/organizations/" |
| 1969 | + ) |
| 1970 | + db_request.session = pretend.stub( |
| 1971 | + flash=pretend.call_recorder(lambda *a, **kw: None) |
| 1972 | + ) |
| 1973 | + db_request.POST = MultiDict({"confirm": "https://gitlab.company.com"}) |
| 1974 | + |
| 1975 | + result = views.delete_oidc_issuer(db_request) |
| 1976 | + assert isinstance(result, HTTPSeeOther) |
| 1977 | + |
| 1978 | + assert db_request.session.flash.calls == [ |
| 1979 | + pretend.call("This issuer does not exist", queue="error") |
| 1980 | + ] |
| 1981 | + |
| 1982 | + def test_delete_oidc_issuer_wrong_confirmation(self, db_request): |
| 1983 | + organization = OrganizationFactory.create() |
| 1984 | + admin_user = UserFactory.create(username="admin") |
| 1985 | + |
| 1986 | + issuer = OrganizationOIDCIssuerFactory.create( |
| 1987 | + organization=organization, |
| 1988 | + issuer_type=OIDCIssuerType.GitLab, |
| 1989 | + issuer_url="https://gitlab.company.com", |
| 1990 | + created_by=admin_user, |
| 1991 | + ) |
| 1992 | + |
| 1993 | + db_request.matchdict = { |
| 1994 | + "organization_id": str(organization.id), |
| 1995 | + "issuer_id": str(issuer.id), |
| 1996 | + } |
| 1997 | + db_request.route_path = pretend.call_recorder( |
| 1998 | + lambda *a, **kw: "/admin/organizations/" |
| 1999 | + ) |
| 2000 | + db_request.session = pretend.stub( |
| 2001 | + flash=pretend.call_recorder(lambda *a, **kw: None) |
| 2002 | + ) |
| 2003 | + db_request.POST = MultiDict({"confirm": "https://wrong-url.com"}) |
| 2004 | + |
| 2005 | + result = views.delete_oidc_issuer(db_request) |
| 2006 | + assert isinstance(result, HTTPSeeOther) |
| 2007 | + |
| 2008 | + assert db_request.session.flash.calls == [ |
| 2009 | + pretend.call("Confirm the request", queue="error") |
| 2010 | + ] |
| 2011 | + |
| 2012 | + # Issuer should still exist |
| 2013 | + assert db_request.db.query(OrganizationOIDCIssuer).count() == 1 |
| 2014 | + |
| 2015 | + def test_delete_oidc_issuer_no_confirmation(self, db_request): |
| 2016 | + organization = OrganizationFactory.create() |
| 2017 | + admin_user = UserFactory.create(username="admin") |
| 2018 | + |
| 2019 | + issuer = OrganizationOIDCIssuerFactory.create( |
| 2020 | + organization=organization, |
| 2021 | + issuer_type=OIDCIssuerType.GitLab, |
| 2022 | + issuer_url="https://gitlab.company.com", |
| 2023 | + created_by=admin_user, |
| 2024 | + ) |
| 2025 | + |
| 2026 | + db_request.matchdict = { |
| 2027 | + "organization_id": str(organization.id), |
| 2028 | + "issuer_id": str(issuer.id), |
| 2029 | + } |
| 2030 | + db_request.route_path = pretend.call_recorder( |
| 2031 | + lambda *a, **kw: "/admin/organizations/" |
| 2032 | + ) |
| 2033 | + db_request.session = pretend.stub( |
| 2034 | + flash=pretend.call_recorder(lambda *a, **kw: None) |
| 2035 | + ) |
| 2036 | + db_request.POST = MultiDict({}) |
| 2037 | + |
| 2038 | + result = views.delete_oidc_issuer(db_request) |
| 2039 | + assert isinstance(result, HTTPSeeOther) |
| 2040 | + |
| 2041 | + assert db_request.session.flash.calls == [ |
| 2042 | + pretend.call("Confirm the request", queue="error") |
| 2043 | + ] |
| 2044 | + |
| 2045 | + # Issuer should still exist |
| 2046 | + assert db_request.db.query(OrganizationOIDCIssuer).count() == 1 |
| 2047 | + |
| 2048 | + def test_delete_oidc_issuer_organization_not_found(self, db_request): |
| 2049 | + db_request.matchdict = { |
| 2050 | + "organization_id": "00000000-0000-0000-0000-000000000000", |
| 2051 | + "issuer_id": "00000000-0000-0000-0000-000000000001", |
| 2052 | + } |
| 2053 | + |
| 2054 | + with pytest.raises(HTTPNotFound): |
| 2055 | + views.delete_oidc_issuer(db_request) |
0 commit comments