diff --git a/tests/unit/admin/views/test_organizations.py b/tests/unit/admin/views/test_organizations.py
index b74a38cd5a82..0317192e3375 100644
--- a/tests/unit/admin/views/test_organizations.py
+++ b/tests/unit/admin/views/test_organizations.py
@@ -1,275 +1,286 @@
# SPDX-License-Identifier: Apache-2.0
+import datetime
+
+import freezegun
import pretend
import pytest
from pyramid.httpexceptions import HTTPBadRequest, HTTPNotFound, HTTPSeeOther
-from webob.multidict import MultiDict
+from pyramid.response import Response
+from sqlalchemy import and_
+from sqlalchemy.orm.exc import NoResultFound
+from warehouse.accounts.models import User
from warehouse.admin.views import organizations as views
+from warehouse.constants import ONE_GIB, ONE_MIB
from warehouse.organizations.models import (
+ Organization,
+ OrganizationApplication,
OrganizationApplicationStatus,
OrganizationRole,
OrganizationRoleType,
OrganizationType,
)
-from warehouse.subscriptions.interfaces import IBillingService
+from warehouse.subscriptions.models import StripeCustomer
-from ....common.db.accounts import UserFactory
from ....common.db.organizations import (
OrganizationApplicationFactory,
OrganizationFactory,
OrganizationRoleFactory,
- OrganizationStripeCustomerFactory,
)
+from ....common.db.accounts import UserFactory
from ....common.db.subscriptions import StripeCustomerFactory
-class TestOrganizationForm:
- def test_validate_success(self):
- form_data = MultiDict(
- {
- "display_name": "My Organization",
- "link_url": "https://example.com",
- "description": "A test organization",
- "orgtype": "Company",
- }
- )
- form = views.OrganizationForm(formdata=form_data)
- assert form.validate(), str(form.errors)
-
- def test_validate_invalid_url(self):
- form_data = MultiDict(
- {
- "display_name": "My Organization",
- "link_url": "not-a-url",
- "description": "A test organization",
- "orgtype": "Company",
- }
- )
- form = views.OrganizationForm(formdata=form_data)
- assert not form.validate()
- assert "Organization URL must start with http:// or https://" in str(
- form.link_url.errors
- )
-
- def test_validate_missing_required_fields(self):
- form_data = MultiDict({})
- form = views.OrganizationForm(formdata=form_data)
- assert not form.validate()
- assert form.display_name.errors
- assert form.link_url.errors
- assert form.description.errors
- assert form.orgtype.errors
-
- def test_validate_field_too_long(self):
- form_data = MultiDict(
- {
- "display_name": "x" * 101, # Max is 100
- "link_url": "https://example.com/" + "x" * 381, # Max is 400
- "description": "x" * 401, # Max is 400
- "orgtype": "Company",
- }
- )
- form = views.OrganizationForm(formdata=form_data)
- assert not form.validate()
- assert "100 characters or less" in str(form.display_name.errors)
- assert "400 characters or less" in str(form.link_url.errors)
- assert "400 characters or less" in str(form.description.errors)
+@pytest.fixture
+def enable_organizations(request, db_request, monkeypatch):
+ monkeypatch.setattr(db_request, "organization_access", True)
class TestOrganizationList:
-
@pytest.mark.usefixtures("_enable_organizations")
def test_no_query(self, db_request):
- organizations = sorted(
- OrganizationFactory.create_batch(30),
- key=lambda o: o.normalized_name,
- )
- result = views.organization_list(db_request)
-
- assert result == {"organizations": organizations[:25], "query": "", "terms": []}
+ page = pretend.stub()
+ organizations_query = pretend.stub()
+ db_request.db.query = pretend.call_recorder(lambda *a: organizations_query)
+ organization_query_paginate = pretend.call_recorder(lambda *a, **kw: page)
+ monkeypatch.setattr(views, "SQLAlchemyORMPage", organization_query_paginate)
+
+ assert views.organization_list(db_request) == {
+ "organizations": page,
+ "query": "",
+ "terms": [],
+ }
+ assert db_request.db.query.calls == [pretend.call(Organization)]
@pytest.mark.usefixtures("_enable_organizations")
def test_with_page(self, db_request):
- organizations = sorted(
- OrganizationFactory.create_batch(30),
- key=lambda o: o.normalized_name,
- )
+ page = pretend.stub()
+ organizations_query = pretend.stub()
+ db_request.db.query = pretend.call_recorder(lambda *a: organizations_query)
+ organization_query_paginate = pretend.call_recorder(lambda *a, **kw: page)
+ monkeypatch.setattr(views, "SQLAlchemyORMPage", organization_query_paginate)
db_request.GET["page"] = "2"
- result = views.organization_list(db_request)
- assert result == {"organizations": organizations[25:], "query": "", "terms": []}
+ assert views.organization_list(db_request) == {
+ "organizations": page,
+ "query": "",
+ "terms": [],
+ }
+ assert db_request.db.query.calls == [pretend.call(Organization)]
@pytest.mark.usefixtures("_enable_organizations")
- def test_with_invalid_page(self):
- request = pretend.stub(
- flags=pretend.stub(enabled=lambda *a: False),
- params={"page": "not an integer"},
- )
+ def test_with_invalid_page(self, db_request):
+ db_request.GET["page"] = "not integer"
with pytest.raises(HTTPBadRequest):
- views.organization_list(request)
+ views.organization_list(db_request)
@pytest.mark.usefixtures("_enable_organizations")
def test_basic_query(self, db_request):
- organizations = sorted(
- OrganizationFactory.create_batch(5),
- key=lambda o: o.normalized_name,
- )
- db_request.GET["q"] = organizations[0].name
- result = views.organization_list(db_request)
-
- assert organizations[0] in result["organizations"]
- assert result["query"] == organizations[0].name
- assert result["terms"] == [organizations[0].name]
-
- @pytest.mark.usefixtures("_enable_organizations")
- def test_name_query(self, db_request):
- organizations = sorted(
- OrganizationFactory.create_batch(5),
- key=lambda o: o.normalized_name,
- )
- db_request.GET["q"] = f"name:{organizations[0].name}"
- result = views.organization_list(db_request)
-
- assert organizations[0] in result["organizations"]
- assert result["query"] == f"name:{organizations[0].name}"
- assert result["terms"] == [f"name:{organizations[0].name}"]
-
- @pytest.mark.usefixtures("_enable_organizations")
- def test_organization_query(self, db_request):
- organizations = sorted(
- OrganizationFactory.create_batch(5),
- key=lambda o: o.normalized_name,
- )
- db_request.GET["q"] = f"organization:{organizations[0].display_name}"
- result = views.organization_list(db_request)
-
- assert organizations[0] in result["organizations"]
- assert result["query"] == f"organization:{organizations[0].display_name}"
- assert result["terms"] == [f"organization:{organizations[0].display_name}"]
-
- @pytest.mark.usefixtures("_enable_organizations")
- def test_url_query(self, db_request):
- organizations = sorted(
- OrganizationFactory.create_batch(5),
- key=lambda o: o.normalized_name,
- )
- db_request.GET["q"] = f"url:{organizations[0].link_url}"
- result = views.organization_list(db_request)
-
- assert organizations[0] in result["organizations"]
- assert result["query"] == f"url:{organizations[0].link_url}"
- assert result["terms"] == [f"url:{organizations[0].link_url}"]
-
- @pytest.mark.usefixtures("_enable_organizations")
- def test_description_query(self, db_request):
- organizations = sorted(
- OrganizationFactory.create_batch(5),
- key=lambda o: o.normalized_name,
- )
- db_request.GET["q"] = f"description:'{organizations[0].description}'"
- result = views.organization_list(db_request)
-
- assert organizations[0] in result["organizations"]
- assert result["query"] == f"description:'{organizations[0].description}'"
- assert result["terms"] == [f"description:{organizations[0].description}"]
-
- @pytest.mark.usefixtures("_enable_organizations")
- def test_is_active_query(self, db_request):
- organizations = sorted(
- OrganizationFactory.create_batch(5),
- key=lambda o: o.normalized_name,
- )
- organizations[0].is_active = True
- organizations[1].is_active = True
- organizations[2].is_active = False
- organizations[3].is_active = False
- organizations[4].is_active = False
- db_request.GET["q"] = "is:active"
- result = views.organization_list(db_request)
-
- assert result == {
- "organizations": organizations[:2],
- "query": "is:active",
- "terms": ["is:active"],
+ page = pretend.stub()
+ organizations_query = pretend.stub(
+ filter=pretend.call_recorder(lambda *a: organizations_query),
+ options=pretend.call_recorder(lambda *a: organizations_query),
+ order_by=pretend.call_recorder(lambda *a: organizations_query),
+ )
+ db_request.db.query = pretend.call_recorder(lambda *a: organizations_query)
+ organization_query_paginate = pretend.call_recorder(lambda *a, **kw: page)
+ monkeypatch.setattr(views, "SQLAlchemyORMPage", organization_query_paginate)
+ db_request.GET["q"] = "foo"
+
+ assert views.organization_list(db_request) == {
+ "organizations": page,
+ "query": "foo",
+ "terms": ["foo"],
}
+ assert db_request.db.query.calls == [pretend.call(Organization)]
+ assert organizations_query.filter.calls == [pretend.call(False)]
@pytest.mark.usefixtures("_enable_organizations")
- def test_is_inactive_query(self, db_request):
- organizations = sorted(
- OrganizationFactory.create_batch(5),
- key=lambda o: o.normalized_name,
- )
- organizations[0].is_active = True
- organizations[1].is_active = True
- organizations[2].is_active = False
- organizations[3].is_active = False
- organizations[4].is_active = False
- db_request.GET["q"] = "is:inactive"
- result = views.organization_list(db_request)
-
- assert result == {
- "organizations": organizations[2:],
- "query": "is:inactive",
- "terms": ["is:inactive"],
+ def test_wildcard_query(self, db_request):
+ page = pretend.stub()
+ organizations_query = pretend.stub(
+ filter=pretend.call_recorder(lambda *a: organizations_query),
+ options=pretend.call_recorder(lambda *a: organizations_query),
+ order_by=pretend.call_recorder(lambda *a: organizations_query),
+ )
+ db_request.db.query = pretend.call_recorder(lambda *a: organizations_query)
+ organization_query_paginate = pretend.call_recorder(lambda *a, **kw: page)
+ monkeypatch.setattr(views, "SQLAlchemyORMPage", organization_query_paginate)
+ db_request.GET["q"] = "foo%"
+
+ assert views.organization_list(db_request) == {
+ "organizations": page,
+ "query": "foo%",
+ "terms": ["foo%"],
}
-
- @pytest.mark.usefixtures("_enable_organizations")
- def test_type_query(self, db_request):
- company_org = OrganizationFactory.create(orgtype=OrganizationType.Company)
- community_org = OrganizationFactory.create(orgtype=OrganizationType.Community)
- db_request.GET["q"] = "type:company"
- result = views.organization_list(db_request)
-
- assert result == {
- "organizations": [company_org],
- "query": "type:company",
- "terms": ["type:company"],
+ assert db_request.db.query.calls == [pretend.call(Organization)]
+ assert organizations_query.filter.calls == [pretend.call(False)]
+
+ @pytest.mark.parametrize("field", ["name", "org", "organization", "url", "link_url", "desc", "description"])
+ def test_field_query(self, db_request, field):
+ page = pretend.stub()
+ organizations_query = pretend.stub(
+ filter=pretend.call_recorder(lambda *a: organizations_query),
+ options=pretend.call_recorder(lambda *a: organizations_query),
+ order_by=pretend.call_recorder(lambda *a: organizations_query),
+ )
+ db_request.db.query = pretend.call_recorder(lambda *a: organizations_query)
+ organization_query_paginate = pretend.call_recorder(lambda *a, **kw: page)
+ monkeypatch.setattr(views, "SQLAlchemyORMPage", organization_query_paginate)
+ db_request.GET["q"] = f"{field}:foo"
+
+ assert views.organization_list(db_request) == {
+ "organizations": page,
+ "query": f"{field}:foo",
+ "terms": [f"{field}:foo"],
}
-
- db_request.GET["q"] = "type:community"
- result = views.organization_list(db_request)
-
- assert result == {
- "organizations": [community_org],
- "query": "type:community",
- "terms": ["type:community"],
+ assert db_request.db.query.calls == [pretend.call(Organization)]
+
+ @pytest.mark.parametrize("query", ["is:active", "is:inactive"])
+ def test_is_query(self, db_request, query):
+ page = pretend.stub()
+ organizations_query = pretend.stub(
+ filter=pretend.call_recorder(lambda *a: organizations_query),
+ options=pretend.call_recorder(lambda *a: organizations_query),
+ order_by=pretend.call_recorder(lambda *a: organizations_query),
+ )
+ db_request.db.query = pretend.call_recorder(lambda *a: organizations_query)
+ organization_query_paginate = pretend.call_recorder(lambda *a, **kw: page)
+ monkeypatch.setattr(views, "SQLAlchemyORMPage", organization_query_paginate)
+ db_request.GET["q"] = query
+
+ assert views.organization_list(db_request) == {
+ "organizations": page,
+ "query": query,
+ "terms": [query],
+ }
+ assert db_request.db.query.calls == [pretend.call(Organization)]
+
+ @pytest.mark.parametrize("query", ["type:company", "type:community"])
+ def test_type_query(self, db_request, query):
+ page = pretend.stub()
+ organizations_query = pretend.stub(
+ filter=pretend.call_recorder(lambda *a: organizations_query),
+ options=pretend.call_recorder(lambda *a: organizations_query),
+ order_by=pretend.call_recorder(lambda *a: organizations_query),
+ )
+ db_request.db.query = pretend.call_recorder(lambda *a: organizations_query)
+ organization_query_paginate = pretend.call_recorder(lambda *a, **kw: page)
+ monkeypatch.setattr(views, "SQLAlchemyORMPage", organization_query_paginate)
+ db_request.GET["q"] = query
+
+ assert views.organization_list(db_request) == {
+ "organizations": page,
+ "query": query,
+ "terms": [query],
}
+ assert db_request.db.query.calls == [pretend.call(Organization)]
+
+ def test_quoted_name_query(self, db_request):
+ page = pretend.stub()
+ organizations_query = pretend.stub(
+ filter=pretend.call_recorder(lambda *a: organizations_query),
+ options=pretend.call_recorder(lambda *a: organizations_query),
+ order_by=pretend.call_recorder(lambda *a: organizations_query),
+ )
+ db_request.db.query = pretend.call_recorder(lambda *a: organizations_query)
+ organization_query_paginate = pretend.call_recorder(lambda *a, **kw: page)
+ monkeypatch.setattr(views, "SQLAlchemyORMPage", organization_query_paginate)
+ db_request.GET["q"] = 'name:"foo bar"'
+
+ assert views.organization_list(db_request) == {
+ "organizations": page,
+ "query": 'name:"foo bar"',
+ "terms": ["name:foo bar"],
+ }
+ assert db_request.db.query.calls == [pretend.call(Organization)]
+
+class TestOrganizationDetail:
@pytest.mark.usefixtures("_enable_organizations")
- def test_invalid_type_query(self, db_request):
- company_org = OrganizationFactory.create(orgtype=OrganizationType.Company)
+ def test_not_found(self, db_request):
+ organization_service = pretend.stub(
+ get_organization=pretend.call_recorder(lambda *a: None)
+ )
+ db_request.find_service = pretend.call_recorder(
+ lambda service, **kwargs: organization_service
+ )
+ db_request.matchdict = {"organization_id": "00000000-0000-0000-0000-000000000000"}
- db_request.GET["q"] = "type:invalid"
- result = views.organization_list(db_request)
+ with pytest.raises(HTTPNotFound):
+ views.organization_detail(db_request)
- assert result == {
- "organizations": [company_org],
- "query": "type:invalid",
- "terms": ["type:invalid"],
- }
+ assert organization_service.get_organization.calls == [
+ pretend.call("00000000-0000-0000-0000-000000000000")
+ ]
@pytest.mark.usefixtures("_enable_organizations")
- def test_is_invalid_query(self, db_request):
- organizations = sorted(
- OrganizationFactory.create_batch(5),
- key=lambda o: o.normalized_name,
+ def test_post_update_billing_name(self, db_request, monkeypatch):
+ billing_service = pretend.stub(
+ update_customer=pretend.call_recorder(lambda *a: None)
)
- db_request.GET["q"] = "is:not-actually-a-valid-query"
- result = views.organization_list(db_request)
-
- assert result == {
- "organizations": organizations[:25],
- "query": "is:not-actually-a-valid-query",
- "terms": ["is:not-actually-a-valid-query"],
+ customer = StripeCustomerFactory.create()
+ organization = OrganizationFactory.create(
+ customer=customer,
+ name="example",
+ display_name="Example",
+ orgtype=OrganizationType.Company,
+ link_url="https://www.example.com/",
+ description="An example organization for testing",
+ is_active=False,
+ )
+ monkeypatch.setattr(
+ organization,
+ "customer_name",
+ lambda site_name: f"{organization.name} (via {site_name})",
+ )
+ db_request.registry.settings = {"site.name": "PyPI"}
+ organization_service = pretend.stub(
+ get_organization=pretend.call_recorder(lambda *a: organization)
+ )
+ db_request.find_service = pretend.call_recorder(
+ lambda service, **kwargs: {
+ views.IOrganizationService: organization_service,
+ views.IBillingService: billing_service,
+ }[service]
+ )
+ db_request.matchdict = {"organization_id": str(organization.id)}
+ db_request.method = "POST"
+ db_request.POST = {
+ "display_name": "New Example",
+ "link_url": "https://www.new-example.com/",
+ "description": "A new example organization for testing",
+ "orgtype": str(OrganizationType.Company.value),
}
+ db_request.route_path = pretend.call_recorder(lambda *a, **kw: "/foo/bar/")
+ db_request.session = pretend.stub(
+ flash=pretend.call_recorder(lambda *a, **kw: None)
+ )
+ result = views.organization_detail(db_request)
+
+ assert billing_service.update_customer.calls == [
+ pretend.call(
+ customer.customer_id,
+ "example (via PyPI)",
+ "A new example organization for testing",
+ )
+ ]
+ assert isinstance(result, HTTPSeeOther)
+ assert result.location == "/foo/bar/"
+ assert organization.display_name == "New Example"
+ assert organization.link_url == "https://www.new-example.com/"
+ assert organization.description == "A new example organization for testing"
+ assert organization.orgtype == OrganizationType.Company
+ assert db_request.session.flash.calls == [
+ pretend.call("Organization 'example' updated successfully", queue="success")
+ ]
-class TestOrganizationDetail:
@pytest.mark.usefixtures("_enable_organizations")
- def test_detail(self, db_request):
+ def test_detail_is_approved_false(self, db_request):
organization = OrganizationFactory.create(
name="example",
display_name="Example",
@@ -288,6 +299,11 @@ def test_detail(self, db_request):
result = views.organization_detail(db_request)
assert result["organization"] == organization
assert isinstance(result["form"], views.OrganizationForm)
+ assert result["ONE_MIB"] == views.ONE_MIB
+ assert result["MAX_FILESIZE"] == views.MAX_FILESIZE
+ assert result["ONE_GIB"] == views.ONE_GIB
+ assert result["MAX_PROJECT_SIZE"] == views.MAX_PROJECT_SIZE
+ assert result["UPLOAD_LIMIT_CAP"] == views.UPLOAD_LIMIT_CAP
assert result["roles"] == []
assert result["role_forms"] == {}
assert isinstance(result["add_role_form"], views.AddOrganizationRoleForm)
@@ -312,13 +328,20 @@ def test_detail_is_approved_true(self, db_request):
result = views.organization_detail(db_request)
assert result["organization"] == organization
assert isinstance(result["form"], views.OrganizationForm)
+ assert result["ONE_MIB"] == views.ONE_MIB
+ assert result["MAX_FILESIZE"] == views.MAX_FILESIZE
+ assert result["ONE_GIB"] == views.ONE_GIB
+ assert result["MAX_PROJECT_SIZE"] == views.MAX_PROJECT_SIZE
+ assert result["UPLOAD_LIMIT_CAP"] == views.UPLOAD_LIMIT_CAP
assert result["roles"] == []
assert result["role_forms"] == {}
assert isinstance(result["add_role_form"], views.AddOrganizationRoleForm)
@pytest.mark.usefixtures("_enable_organizations")
- def test_detail_is_approved_false(self, db_request):
+ def test_detail_with_subscription(self, db_request, monkeypatch):
+ customer = StripeCustomerFactory.create()
organization = OrganizationFactory.create(
+ customer=customer,
name="example",
display_name="Example",
orgtype=OrganizationType.Company,
@@ -328,7 +351,7 @@ def test_detail_is_approved_false(self, db_request):
"You may use this company in literature without prior "
"coordination or asking for permission."
),
- is_active=False,
+ is_active=True,
)
db_request.matchdict = {"organization_id": str(organization.id)}
db_request.method = "GET"
@@ -336,277 +359,131 @@ def test_detail_is_approved_false(self, db_request):
result = views.organization_detail(db_request)
assert result["organization"] == organization
assert isinstance(result["form"], views.OrganizationForm)
+ assert result["ONE_MIB"] == views.ONE_MIB
+ assert result["MAX_FILESIZE"] == views.MAX_FILESIZE
+ assert result["ONE_GIB"] == views.ONE_GIB
+ assert result["MAX_PROJECT_SIZE"] == views.MAX_PROJECT_SIZE
+ assert result["UPLOAD_LIMIT_CAP"] == views.UPLOAD_LIMIT_CAP
assert result["roles"] == []
assert result["role_forms"] == {}
assert isinstance(result["add_role_form"], views.AddOrganizationRoleForm)
@pytest.mark.usefixtures("_enable_organizations")
- def test_detail_not_found(self, db_request):
- db_request.matchdict = {
- "organization_id": "00000000-0000-0000-0000-000000000000"
- }
+ def test_detail_with_roles(self, db_request):
+ organization = OrganizationFactory.create(name="example", is_active=True)
+ user1 = UserFactory.create(username="alice")
+ user2 = UserFactory.create(username="bob")
+ role1 = OrganizationRoleFactory.create(
+ organization=organization,
+ user=user1,
+ role_name=OrganizationRoleType.Manager,
+ )
+ role2 = OrganizationRoleFactory.create(
+ organization=organization,
+ user=user2,
+ role_name=OrganizationRoleType.Member,
+ )
+ db_request.matchdict = {"organization_id": str(organization.id)}
db_request.method = "GET"
- with pytest.raises(HTTPNotFound):
- views.organization_detail(db_request)
+ result = views.organization_detail(db_request)
+ assert result["organization"] == organization
+ assert isinstance(result["form"], views.OrganizationForm)
+ # Roles should be sorted by username
+ assert result["roles"] == [role1, role2] # alice before bob
+ assert len(result["role_forms"]) == 2
+ assert role1.id in result["role_forms"]
+ assert role2.id in result["role_forms"]
+ assert isinstance(result["add_role_form"], views.AddOrganizationRoleForm)
- def test_updates_organization(self, db_request):
- organization = OrganizationFactory.create(
- display_name="Old Name",
- link_url="https://old-url.com",
- description="Old description",
- orgtype=OrganizationType.Company,
- )
- organization.customer = None # No Stripe customer
- db_request.matchdict = {"organization_id": str(organization.id)}
- db_request.method = "POST"
- db_request.POST = MultiDict(
- {
- "display_name": "New Name",
- "link_url": "https://new-url.com",
- "description": "New description",
- "orgtype": "Community",
- }
- )
- db_request.route_path = pretend.call_recorder(
- lambda name, **kwargs: f"/admin/organizations/{organization.id}/"
+class TestOrganizationRename:
+ @pytest.mark.usefixtures("_enable_organizations")
+ def test_not_found(self, db_request):
+ organization_service = pretend.stub(
+ get_organization=pretend.call_recorder(lambda *a: None)
)
- db_request.session = pretend.stub(
- flash=pretend.call_recorder(lambda *a, **kw: None)
+ db_request.find_service = pretend.call_recorder(
+ lambda service, **kwargs: organization_service
)
+ db_request.matchdict = {"organization_id": "00000000-0000-0000-0000-000000000000"}
- result = views.organization_detail(db_request)
+ with pytest.raises(HTTPNotFound):
+ views.organization_rename(db_request)
- assert isinstance(result, HTTPSeeOther)
- assert result.location == f"/admin/organizations/{organization.id}/"
- assert organization.display_name == "New Name"
- assert organization.link_url == "https://new-url.com"
- assert organization.description == "New description"
- assert organization.orgtype == OrganizationType.Community
- assert db_request.session.flash.calls == [
- pretend.call(
- f"Organization {organization.name!r} updated successfully",
- queue="success",
- )
+ assert organization_service.get_organization.calls == [
+ pretend.call("00000000-0000-0000-0000-000000000000")
]
- def test_updates_organization_with_stripe_customer(self, db_request, monkeypatch):
- organization = OrganizationFactory.create(
- name="acme",
- display_name="Old Name",
- link_url="https://old-url.com",
- description="Old description",
- orgtype=OrganizationType.Company,
+ @pytest.mark.usefixtures("_enable_organizations")
+ def test_rename_success(self, db_request):
+ organization = OrganizationFactory.create(name="oldname")
+ organization_service = pretend.stub(
+ get_organization=pretend.call_recorder(lambda *a: organization),
+ rename_organization=pretend.call_recorder(lambda *a: None),
)
- stripe_customer = StripeCustomerFactory.create(customer_id="cus_123456")
- OrganizationStripeCustomerFactory.create(
- organization=organization, customer=stripe_customer
+ db_request.find_service = pretend.call_recorder(
+ lambda service, **kwargs: organization_service
)
-
db_request.matchdict = {"organization_id": str(organization.id)}
- db_request.method = "POST"
- db_request.POST = MultiDict(
- {
- "display_name": "New Name",
- "link_url": "https://new-url.com",
- "description": "New description",
- "orgtype": "Community",
- }
- )
+ db_request.params = {"new_organization_name": "newname"}
db_request.route_path = pretend.call_recorder(
- lambda name, **kwargs: f"/admin/organizations/{organization.id}/"
+ lambda *a, **kw: "/admin/organizations/1/"
)
db_request.session = pretend.stub(
flash=pretend.call_recorder(lambda *a, **kw: None)
)
- db_request.registry = pretend.stub(settings={"site.name": "TestPyPI"})
-
- # Patch the billing service's update_customer method
- billing_service = db_request.find_service(IBillingService)
- update_customer = pretend.call_recorder(lambda *a, **kw: None)
- monkeypatch.setattr(billing_service, "update_customer", update_customer)
- result = views.organization_detail(db_request)
+ result = views.organization_rename(db_request)
assert isinstance(result, HTTPSeeOther)
- assert result.location == f"/admin/organizations/{organization.id}/"
- assert organization.display_name == "New Name"
- assert organization.link_url == "https://new-url.com"
- assert organization.description == "New description"
- assert organization.orgtype == OrganizationType.Community
- assert update_customer.calls == [
- pretend.call(
- "cus_123456",
- "TestPyPI Organization - New Name (acme)",
- "New description",
- )
+ assert result.location == "/admin/organizations/1/"
+ assert organization_service.rename_organization.calls == [
+ pretend.call(str(organization.id), "newname")
]
assert db_request.session.flash.calls == [
- pretend.call(
- f"Organization {organization.name!r} updated successfully",
- queue="success",
- )
+ pretend.call('"oldname" organization renamed "newname"', queue="success")
]
- def test_does_not_update_with_invalid_form(self, db_request):
- organization = OrganizationFactory.create()
-
- db_request.matchdict = {"organization_id": str(organization.id)}
- db_request.method = "POST"
- db_request.POST = MultiDict(
- {
- "display_name": "", # Required field
- "link_url": "invalid-url", # Invalid URL
- "description": "Some description",
- "orgtype": "Company",
- }
- )
-
- result = views.organization_detail(db_request)
-
- assert result["organization"] == organization
- assert isinstance(result["form"], views.OrganizationForm)
- assert result["form"].errors
- assert "display_name" in result["form"].errors
- assert "link_url" in result["form"].errors
-
@pytest.mark.usefixtures("_enable_organizations")
- def test_detail_with_roles(self, db_request):
- """Test that organization detail view includes roles"""
- organization = OrganizationFactory.create(name="pypi")
-
- # Create some users with roles
- # Intentionally not ordered to test order later
- user3 = UserFactory.create(username="charlie")
- user2 = UserFactory.create(username="bob")
- user1 = UserFactory.create(username="alice")
-
- OrganizationRoleFactory.create(
- organization=organization, user=user1, role_name=OrganizationRoleType.Owner
- )
- OrganizationRoleFactory.create(
- organization=organization,
- user=user2,
- role_name=OrganizationRoleType.Manager,
+ def test_rename_validation_error(self, db_request):
+ organization = OrganizationFactory.create(name="oldname")
+ organization_service = pretend.stub(
+ get_organization=pretend.call_recorder(lambda *a: organization),
+ rename_organization=pretend.raiser(
+ ValueError("Organization name already exists")
+ ),
)
- OrganizationRoleFactory.create(
- organization=organization, user=user3, role_name=OrganizationRoleType.Member
+ db_request.find_service = pretend.call_recorder(
+ lambda service, **kwargs: organization_service
)
-
db_request.matchdict = {"organization_id": str(organization.id)}
- db_request.method = "GET"
-
- result = views.organization_detail(db_request)
-
- assert result["organization"] == organization
- assert isinstance(result["form"], views.OrganizationForm)
-
- # Check that roles are included and sorted by username
- assert len(result["roles"]) == 3
- assert result["roles"][0].user.username == "alice"
- assert result["roles"][1].user.username == "bob"
- assert result["roles"][2].user.username == "charlie"
-
- # Check that role forms are created for each role
- assert len(result["role_forms"]) == 3
- assert set(result["role_forms"].keys()) == {role.id for role in result["roles"]}
- for role_id, form in result["role_forms"].items():
- assert isinstance(form, views.OrganizationRoleForm)
-
- assert isinstance(result["add_role_form"], views.AddOrganizationRoleForm)
-
- @pytest.mark.usefixtures("_enable_organizations")
- def test_detail_no_roles(self, db_request):
- """Test that organization detail view works with no roles"""
- organization = OrganizationFactory.create(name="pypi")
-
- db_request.matchdict = {"organization_id": str(organization.id)}
- db_request.method = "GET"
-
- result = views.organization_detail(db_request)
-
- assert result["organization"] == organization
- assert isinstance(result["form"], views.OrganizationForm)
- assert result["roles"] == []
- assert result["role_forms"] == {}
- assert isinstance(result["add_role_form"], views.AddOrganizationRoleForm)
-
-
-class TestOrganizationActions:
- @pytest.mark.usefixtures("_enable_organizations")
- def test_rename_not_found(self, db_request):
- admin = UserFactory.create()
-
- db_request.matchdict = {
- "organization_id": "deadbeef-dead-beef-dead-beefdeadbeef"
- }
- db_request.params = {
- "new_organization_name": "widget",
- }
- db_request.user = admin
- db_request.route_path = pretend.call_recorder(_organization_application_routes)
-
- with pytest.raises(HTTPNotFound):
- views.organization_rename(db_request)
-
- @pytest.mark.usefixtures("_enable_organizations")
- def test_rename(self, db_request):
- admin = UserFactory.create()
- organization = OrganizationFactory.create(name="example")
-
- db_request.matchdict = {"organization_id": organization.id}
- db_request.params = {
- "new_organization_name": "widget",
- }
- db_request.user = admin
- db_request.route_path = pretend.call_recorder(_organization_application_routes)
- db_request.session.flash = pretend.call_recorder(lambda *a, **kw: None)
-
- result = views.organization_rename(db_request)
-
- assert db_request.session.flash.calls == [
- pretend.call(
- '"example" organization renamed "widget"',
- queue="success",
- ),
- ]
- assert result.status_code == 303
- assert result.location == f"/admin/organizations/{organization.id}/"
-
- @pytest.mark.usefixtures("_enable_organizations")
- def test_rename_fails_on_conflict(self, db_request):
- admin = UserFactory.create()
- OrganizationFactory.create(name="widget")
- organization = OrganizationFactory.create(name="example")
-
- db_request.matchdict = {"organization_id": organization.id}
- db_request.params = {
- "new_organization_name": "widget",
- }
- db_request.user = admin
- db_request.route_path = pretend.call_recorder(_organization_application_routes)
- db_request.session.flash = pretend.call_recorder(lambda *a, **kw: None)
+ db_request.params = {"new_organization_name": "existing"}
+ db_request.route_path = pretend.call_recorder(
+ lambda *a, **kw: "/admin/organizations/1/"
+ )
+ db_request.session = pretend.stub(
+ flash=pretend.call_recorder(lambda *a, **kw: None)
+ )
result = views.organization_rename(db_request)
+ assert isinstance(result, HTTPSeeOther)
+ assert result.location == "/admin/organizations/1/"
assert db_request.session.flash.calls == [
- pretend.call(
- 'Organization name "widget" has been used',
- queue="error",
- ),
+ pretend.call("Organization name already exists", queue="error")
]
- assert result.status_code == 303
- assert result.location == f"/admin/organizations/{organization.id}/"
-class TestOrganizationApplicationList:
+class TestOrganizationApplicationsList:
@pytest.mark.usefixtures("_enable_organizations")
def test_no_query(self, db_request):
- organization_applications = sorted(
- OrganizationApplicationFactory.create_batch(30),
- key=lambda o: o.submitted,
- )
+ organization_applications = [
+ OrganizationApplicationFactory.create(),
+ OrganizationApplicationFactory.create(),
+ ]
+ db_request.GET = {"q": ""}
+
result = views.organization_applications_list(db_request)
assert result == {
@@ -616,821 +493,546 @@ def test_no_query(self, db_request):
}
@pytest.mark.usefixtures("_enable_organizations")
- def test_basic_query(self, db_request):
- organization_applications = sorted(
- OrganizationApplicationFactory.create_batch(5),
- key=lambda o: o.submitted,
- )
- db_request.GET["q"] = organization_applications[0].name
- result = views.organization_applications_list(db_request)
-
- assert organization_applications[0] in result["organization_applications"]
- assert result["query"] == organization_applications[0].name
- assert result["terms"] == [organization_applications[0].name]
-
- @pytest.mark.usefixtures("_enable_organizations")
- def test_name_query(self, db_request):
- organization_applications = sorted(
- OrganizationApplicationFactory.create_batch(5),
- key=lambda o: o.submitted,
- )
- db_request.GET["q"] = f"name:{organization_applications[0].name}"
- result = views.organization_applications_list(db_request)
-
- assert organization_applications[0] in result["organization_applications"]
- assert result["query"] == f"name:{organization_applications[0].name}"
- assert result["terms"] == [f"name:{organization_applications[0].name}"]
-
- @pytest.mark.usefixtures("_enable_organizations")
- def test_organization_application_query(self, db_request):
- organization_applications = sorted(
- OrganizationApplicationFactory.create_batch(5),
- key=lambda o: o.submitted,
- )
- db_request.GET["q"] = (
- f"organization:{organization_applications[0].display_name}"
- )
- result = views.organization_applications_list(db_request)
+ def test_with_query(self, db_request):
+ org1 = OrganizationApplicationFactory.create(name="foo")
+ OrganizationApplicationFactory.create(name="bar")
+ db_request.GET = {"q": "foo"}
- assert organization_applications[0] in result["organization_applications"]
- assert (
- result["query"]
- == f"organization:{organization_applications[0].display_name}"
- )
- assert result["terms"] == [
- f"organization:{organization_applications[0].display_name}"
- ]
-
- @pytest.mark.usefixtures("_enable_organizations")
- def test_url_query(self, db_request):
- organization_applications = sorted(
- OrganizationApplicationFactory.create_batch(5),
- key=lambda o: o.submitted,
- )
- db_request.GET["q"] = f"url:{organization_applications[0].link_url}"
- result = views.organization_applications_list(db_request)
-
- assert organization_applications[0] in result["organization_applications"]
- assert result["query"] == f"url:{organization_applications[0].link_url}"
- assert result["terms"] == [f"url:{organization_applications[0].link_url}"]
-
- @pytest.mark.usefixtures("_enable_organizations")
- def test_description_query(self, db_request):
- organization_applications = sorted(
- OrganizationApplicationFactory.create_batch(5),
- key=lambda o: o.submitted,
- )
- db_request.GET["q"] = (
- f"description:'{organization_applications[0].description}'"
- )
- result = views.organization_applications_list(db_request)
-
- assert organization_applications[0] in result["organization_applications"]
- assert (
- result["query"]
- == f"description:'{organization_applications[0].description}'"
- )
- assert result["terms"] == [
- f"description:{organization_applications[0].description}"
- ]
-
- @pytest.mark.usefixtures("_enable_organizations")
- def test_is_approved_query(self, db_request):
- organization_applications = sorted(
- OrganizationApplicationFactory.create_batch(5),
- key=lambda o: o.submitted,
- )
- organization_applications[0].status = OrganizationApplicationStatus.Approved
- organization_applications[1].status = OrganizationApplicationStatus.Approved
- organization_applications[2].status = OrganizationApplicationStatus.Declined
- organization_applications[3].status = OrganizationApplicationStatus.Submitted
- organization_applications[4].status = OrganizationApplicationStatus.Submitted
- db_request.GET["q"] = "is:approved"
result = views.organization_applications_list(db_request)
assert result == {
- "organization_applications": organization_applications[:2],
- "query": "is:approved",
- "terms": ["is:approved"],
+ "organization_applications": [org1],
+ "query": "foo",
+ "terms": ["foo"],
}
- @pytest.mark.usefixtures("_enable_organizations")
- def test_is_declined_query(self, db_request):
- organization_applications = sorted(
- OrganizationApplicationFactory.create_batch(5),
- key=lambda o: o.submitted,
- )
- organization_applications[0].status = OrganizationApplicationStatus.Approved
- organization_applications[1].status = OrganizationApplicationStatus.Approved
- organization_applications[2].status = OrganizationApplicationStatus.Declined
- organization_applications[3].status = OrganizationApplicationStatus.Submitted
- organization_applications[4].status = OrganizationApplicationStatus.Submitted
- db_request.GET["q"] = "is:declined"
- result = views.organization_applications_list(db_request)
-
- assert result == {
- "organization_applications": organization_applications[2:3],
- "query": "is:declined",
- "terms": ["is:declined"],
- }
+ @pytest.mark.parametrize("field", ["name", "org", "organization", "url", "link_url", "desc", "description"])
+ def test_field_query(self, db_request, field):
+ org1 = OrganizationApplicationFactory.create()
+ db_request.GET = {"q": f"{field}:test"}
- @pytest.mark.usefixtures("_enable_organizations")
- def test_is_submitted_query(self, db_request):
- organization_applications = sorted(
- OrganizationApplicationFactory.create_batch(5),
- key=lambda o: o.submitted,
- )
- organization_applications[0].status = OrganizationApplicationStatus.Approved
- organization_applications[1].status = OrganizationApplicationStatus.Approved
- organization_applications[2].status = OrganizationApplicationStatus.Declined
- organization_applications[3].status = OrganizationApplicationStatus.Submitted
- organization_applications[4].status = OrganizationApplicationStatus.Submitted
- db_request.GET["q"] = "is:submitted"
result = views.organization_applications_list(db_request)
- assert result == {
- "organization_applications": organization_applications[3:],
- "query": "is:submitted",
- "terms": ["is:submitted"],
- }
+ assert result["query"] == f"{field}:test"
+ assert result["terms"] == [f"{field}:test"]
- @pytest.mark.usefixtures("_enable_organizations")
- def test_type_query(self, db_request):
- company_org = OrganizationApplicationFactory.create(
- orgtype=OrganizationType.Company
- )
- community_org = OrganizationApplicationFactory.create(
- orgtype=OrganizationType.Community
- )
- db_request.GET["q"] = "type:company"
- result = views.organization_applications_list(db_request)
+ @pytest.mark.parametrize("query", ["type:company", "type:community"])
+ def test_type_query(self, db_request, query):
+ org1 = OrganizationApplicationFactory.create(orgtype=OrganizationType.Company)
+ org2 = OrganizationApplicationFactory.create(orgtype=OrganizationType.Community)
+ db_request.GET = {"q": query}
- assert result == {
- "organization_applications": [company_org],
- "query": "type:company",
- "terms": ["type:company"],
- }
-
- db_request.GET["q"] = "type:community"
result = views.organization_applications_list(db_request)
- assert result == {
- "organization_applications": [community_org],
- "query": "type:community",
- "terms": ["type:community"],
- }
+ assert result["query"] == query
+ assert result["terms"] == [query]
- @pytest.mark.usefixtures("_enable_organizations")
- def test_invalid_type_query(self, db_request):
- company_org = OrganizationApplicationFactory.create(
- orgtype=OrganizationType.Company
- )
+ @pytest.mark.parametrize(
+ "status",
+ [
+ OrganizationApplicationStatus.Submitted,
+ OrganizationApplicationStatus.Declined,
+ OrganizationApplicationStatus.Deferred,
+ OrganizationApplicationStatus.MoreInformationNeeded,
+ OrganizationApplicationStatus.Approved,
+ ],
+ )
+ def test_is_query(self, db_request, status):
+ org1 = OrganizationApplicationFactory.create(status=status)
+ OrganizationApplicationFactory.create(status=OrganizationApplicationStatus.Submitted)
+ db_request.GET = {"q": f"is:{status.value}"}
- db_request.GET["q"] = "type:invalid"
result = views.organization_applications_list(db_request)
- assert result == {
- "organization_applications": [company_org],
- "query": "type:invalid",
- "terms": ["type:invalid"],
- }
-
- @pytest.mark.usefixtures("_enable_organizations")
- def test_is_invalid_query(self, db_request):
- organization_applications = sorted(
- OrganizationApplicationFactory.create_batch(5),
- key=lambda o: o.submitted,
- )
- db_request.GET["q"] = "is:not-actually-a-valid-query"
- result = views.organization_applications_list(db_request)
-
- assert result == {
- "organization_applications": organization_applications,
- "query": "is:not-actually-a-valid-query",
- "terms": ["is:not-actually-a-valid-query"],
- }
+ assert result["query"] == f"is:{status.value}"
+ assert result["terms"] == [f"is:{status.value}"]
+ # At least one organization should have the matching status
+ matching_apps = [app for app in result["organization_applications"] if app.status == status.value]
+ assert len(matching_apps) > 0
class TestOrganizationApplicationDetail:
@pytest.mark.usefixtures("_enable_organizations")
- def test_detail(self, db_request):
- organization_application = OrganizationApplicationFactory.create()
- db_request.matchdict["organization_application_id"] = (
- organization_application.id
- )
- result = views.organization_application_detail(db_request)
- assert result["user"] == organization_application.submitted_by
- assert result["form"].name.data == organization_application.name
- assert result["conflicting_applications"] == []
- assert result["organization_application"] == organization_application
-
- @pytest.mark.usefixtures("_enable_organizations")
- def test_detail_edit(self, db_request):
- organization_application = OrganizationApplicationFactory.create()
- db_request.matchdict["organization_application_id"] = (
- organization_application.id
- )
-
- new_org_name = f"New-Org-Name-{organization_application.name}"
- db_request.method = "POST"
- db_request.POST["name"] = new_org_name
- db_request.POST["description"] = organization_application.description
- db_request.POST["display_name"] = organization_application.display_name
- db_request.POST["link_url"] = organization_application.link_url
- db_request.POST["orgtype"] = organization_application.orgtype
- db_request.POST = MultiDict(db_request.POST)
-
- db_request.session.flash = pretend.call_recorder(lambda *a, **kw: None)
- db_request.current_route_path = lambda *a, **kw: "/the/url/"
-
- result = views.organization_application_detail(db_request)
-
- assert result.status_code == 303
- assert result.location == "/the/url/"
- assert db_request.session.flash.calls == [
- pretend.call(
- f"Application for {organization_application.name!r} updated",
- queue="success",
- )
- ]
-
- assert organization_application.name == new_org_name
-
- @pytest.mark.usefixtures("_enable_organizations")
- def test_detail_edit_invalid(self, db_request):
- existing_organization = OrganizationFactory.create()
- organization_application = OrganizationApplicationFactory.create()
-
- db_request.matchdict["organization_application_id"] = (
- organization_application.id
- )
- db_request.method = "POST"
- db_request.POST["name"] = existing_organization.name
- db_request.POST = MultiDict(db_request.POST)
-
- result = views.organization_application_detail(db_request)
-
- assert result["user"] == organization_application.submitted_by
- assert result["form"].name.data == existing_organization.name
- assert result["form"].name.errors != []
- assert result["conflicting_applications"] == []
- assert result["organization_application"] == organization_application
-
- @pytest.mark.usefixtures("_enable_organizations")
- def test_detail_is_approved_true(self, db_request):
- organization_application = OrganizationApplicationFactory.create(
- status=OrganizationApplicationStatus.Approved
+ def test_not_found(self, db_request):
+ organization_service = pretend.stub(
+ get_organization_application=pretend.call_recorder(lambda *a: None)
)
- db_request.matchdict["organization_application_id"] = (
- organization_application.id
+ user_service = pretend.stub()
+ db_request.find_service = pretend.call_recorder(
+ lambda service, **kwargs: {
+ views.IOrganizationService: organization_service,
+ views.IUserService: user_service,
+ }[service]
)
- result = views.organization_application_detail(db_request)
- assert result["user"] == organization_application.submitted_by
- assert result["form"].name.data == organization_application.name
- assert result["conflicting_applications"] == []
- assert result["organization_application"] == organization_application
+ db_request.matchdict = {
+ "organization_application_id": "00000000-0000-0000-0000-000000000000"
+ }
- @pytest.mark.usefixtures("_enable_organizations")
- def test_detail_is_approved_false(self, db_request):
- organization_application = OrganizationApplicationFactory.create(
- status=OrganizationApplicationStatus.Declined
- )
- db_request.matchdict["organization_application_id"] = (
- organization_application.id
- )
- result = views.organization_application_detail(db_request)
- assert result["user"] == organization_application.submitted_by
- assert result["form"].name.data == organization_application.name
- assert result["conflicting_applications"] == []
- assert result["organization_application"] == organization_application
+ with pytest.raises(HTTPNotFound):
+ views.organization_application_detail(db_request)
@pytest.mark.usefixtures("_enable_organizations")
- @pytest.mark.parametrize(
- ("name", "conflicts", "conflicting_prefixes", "not_conflicting"),
- [
- (
- "pypi",
- ["PyPI", "pypi"],
- ["pypi-common", "PyPi_rocks", "pypi-team-garbage"],
- ["py-pi"],
- ),
- ("py-pi", ["Py-PI", "PY-PI"], ["py", "py-pi_dot-com"], ["pypi"]),
- ],
- )
- def test_detail_conflicting_applications(
- self, db_request, name, conflicts, conflicting_prefixes, not_conflicting
- ):
+ def test_get(self, db_request):
+ user = UserFactory.create()
organization_application = OrganizationApplicationFactory.create(
- name=name, status=OrganizationApplicationStatus.Declined
- )
- conflicting_applications = sorted(
- [
- OrganizationApplicationFactory.create(name=conflict)
- for conflict in conflicts + conflicting_prefixes
- ],
- key=lambda o: o.submitted,
+ submitted_by=user
)
- [OrganizationApplicationFactory.create(name=name) for name in not_conflicting]
- db_request.matchdict["organization_application_id"] = (
- organization_application.id
- )
- result = views.organization_application_detail(db_request)
- assert result["user"] == organization_application.submitted_by
- assert result["form"].name.data == organization_application.name
- assert set(result["conflicting_applications"]) == set(conflicting_applications)
- assert result["organization_application"] == organization_application
-
- @pytest.mark.usefixtures("_enable_organizations")
- def test_detail_not_found(self):
organization_service = pretend.stub(
- get_organization_application=lambda *a, **kw: None,
+ get_organization_application=pretend.call_recorder(
+ lambda *a: organization_application
+ )
)
- request = pretend.stub(
- flags=pretend.stub(enabled=lambda *a: False),
- find_service=lambda *a, **kw: organization_service,
- matchdict={"organization_application_id": pretend.stub()},
+ user_service = pretend.stub(get_user=pretend.call_recorder(lambda *a: user))
+ db_request.find_service = pretend.call_recorder(
+ lambda service, **kwargs: {
+ views.IOrganizationService: organization_service,
+ views.IUserService: user_service,
+ }[service]
)
+ db_request.matchdict = {
+ "organization_application_id": str(organization_application.id)
+ }
+ db_request.method = "GET"
- with pytest.raises(HTTPNotFound):
- views.organization_application_detail(request)
-
-
-def _organization_application_routes(
- route_name, organization_application_id=None, organization_id=None
-):
- if route_name == "admin.organization_application.detail":
- return f"/admin/organization_applications/{organization_application_id}/"
- elif route_name == "admin.organization.detail":
- return f"/admin/organizations/{organization_id}/"
- elif route_name == "admin.dashboard":
- return "/admin/"
- else:
- pytest.fail(f"No dummy route found for {route_name}")
+ result = views.organization_application_detail(db_request)
+ assert result["organization_application"] == organization_application
+ assert isinstance(result["form"], views.OrganizationApplicationForm)
+ assert result["user"] == user
-class TestOrganizationApplicationActions:
@pytest.mark.usefixtures("_enable_organizations")
- def test_approve(self, db_request):
- admin = UserFactory.create()
+ def test_post(self, db_request):
user = UserFactory.create()
organization_application = OrganizationApplicationFactory.create(
- name="example", submitted_by=user
+ submitted_by=user,
+ display_name="Old Name",
)
- organization = OrganizationFactory.create(name="example")
-
organization_service = pretend.stub(
- get_organization_application=lambda *a, **kw: organization_application,
- approve_organization_application=pretend.call_recorder(
- lambda *a, **kw: organization
- ),
+ get_organization_application=pretend.call_recorder(
+ lambda *a: organization_application
+ )
+ )
+ user_service = pretend.stub(get_user=pretend.call_recorder(lambda *a: user))
+ db_request.find_service = pretend.call_recorder(
+ lambda service, **kwargs: {
+ views.IOrganizationService: organization_service,
+ views.IUserService: user_service,
+ }[service]
)
-
db_request.matchdict = {
- "organization_application_id": organization_application.id
+ "organization_application_id": str(organization_application.id)
}
- db_request.params = {
- "organization_name": organization_application.name,
- "message": "Welcome!",
+ db_request.method = "POST"
+ db_request.POST = {
+ "display_name": "New Name",
+ "link_url": organization_application.link_url,
+ "description": organization_application.description,
+ "orgtype": organization_application.orgtype.value,
+ "name": organization_application.name,
}
- db_request.user = admin
- db_request.route_path = pretend.call_recorder(_organization_application_routes)
- db_request.find_service = pretend.call_recorder(
- lambda iface, context: organization_service
+ db_request.current_route_path = pretend.call_recorder(
+ lambda: "/admin/organization_applications/1/"
+ )
+ db_request.session = pretend.stub(
+ flash=pretend.call_recorder(lambda *a, **kw: None)
)
- db_request.session.flash = pretend.call_recorder(lambda *a, **kw: None)
+ db_request.user = pretend.stub()
- result = views.organization_application_approve(db_request)
+ result = views.organization_application_detail(db_request)
- assert organization_service.approve_organization_application.calls == [
- pretend.call(organization_application.id, db_request),
- ]
+ assert isinstance(result, HTTPSeeOther)
+ assert result.location == "/admin/organization_applications/1/"
+ assert organization_application.display_name == "New Name"
assert db_request.session.flash.calls == [
pretend.call(
- f'Request for "{organization_application.name}" organization approved',
+ f"Application for '{organization_application.name}' updated",
queue="success",
- ),
+ )
]
- assert result.status_code == 303
- assert result.location == f"/admin/organizations/{organization.id}/"
- @pytest.mark.usefixtures("_enable_organizations")
- def test_approve_turbo_mode(self, db_request):
- admin = UserFactory.create()
- user = UserFactory.create()
- organization_application = OrganizationApplicationFactory.create(
- name="example", submitted_by=user
- )
- organization = OrganizationFactory.create(name="example")
-
- def _approve(*a, **kw):
- db_request.db.delete(organization_application)
- return organization
+@freezegun.freeze_time(datetime.datetime.utcnow())
+class TestOrganizationApplicationActions:
+ @pytest.mark.usefixtures("_enable_organizations")
+ def test_approve(self, db_request):
+ organization_application = OrganizationApplicationFactory.create()
+ organization = OrganizationFactory.create(name=organization_application.name)
organization_service = pretend.stub(
- get_organization_application=lambda *a, **kw: organization_application,
- approve_organization_application=pretend.call_recorder(_approve),
+ get_organization_application=pretend.call_recorder(
+ lambda *a: organization_application
+ ),
+ approve_organization_application=pretend.call_recorder(
+ lambda *a: organization
+ ),
+ )
+ db_request.find_service = pretend.call_recorder(
+ lambda service, **kwargs: organization_service
)
-
db_request.matchdict = {
- "organization_application_id": organization_application.id
+ "organization_application_id": str(organization_application.id)
}
- db_request.params = {
- "organization_name": organization_application.name,
- "message": "Welcome!",
- "organization_applications_turbo_mode": "true",
- }
- db_request.user = admin
- db_request.route_path = pretend.call_recorder(_organization_application_routes)
- db_request.find_service = pretend.call_recorder(
- lambda iface, context: organization_service
+ db_request.params = {}
+ db_request.route_path = pretend.call_recorder(
+ lambda *a, **kw: "/admin/organizations/1/"
+ )
+ db_request.session = pretend.stub(
+ flash=pretend.call_recorder(lambda *a, **kw: None)
)
- db_request.session.flash = pretend.call_recorder(lambda *a, **kw: None)
result = views.organization_application_approve(db_request)
+ assert isinstance(result, HTTPSeeOther)
+ assert result.location == "/admin/organizations/1/"
assert organization_service.approve_organization_application.calls == [
- pretend.call(organization_application.id, db_request),
+ pretend.call(organization_application.id, db_request)
]
assert db_request.session.flash.calls == [
pretend.call(
- f'Request for "{organization_application.name}" organization approved',
- queue="success",
- ),
- pretend.call(
- "No more Organization Applications to review!",
+ f'Request for "{organization.name}" organization approved',
queue="success",
- ),
+ )
]
- assert result.status_code == 303
- assert result.location == "/admin/"
@pytest.mark.usefixtures("_enable_organizations")
- def test_approve_not_found(self):
+ def test_approve_turbo_mode(self, db_request):
+ organization_application = OrganizationApplicationFactory.create()
+ organization = OrganizationFactory.create(name=organization_application.name)
+ next_application = OrganizationApplicationFactory.create(
+ status=OrganizationApplicationStatus.Submitted
+ )
organization_service = pretend.stub(
- get_organization_application=lambda *a, **kw: None,
+ get_organization_application=pretend.call_recorder(
+ lambda *a: organization_application
+ ),
+ approve_organization_application=pretend.call_recorder(
+ lambda *a: organization
+ ),
+ )
+ db_request.find_service = pretend.call_recorder(
+ lambda service, **kwargs: organization_service
+ )
+ db_request.matchdict = {
+ "organization_application_id": str(organization_application.id)
+ }
+ db_request.params = {"organization_applications_turbo_mode": "true"}
+ db_request.route_path = pretend.call_recorder(
+ lambda *a, **kw: f"/admin/organization_applications/{kw['organization_application_id']}/"
)
- request = pretend.stub(
- flags=pretend.stub(enabled=lambda *a: False),
- find_service=lambda *a, **kw: organization_service,
- matchdict={"organization_application_id": pretend.stub()},
+ db_request.session = pretend.stub(
+ flash=pretend.call_recorder(lambda *a, **kw: None)
)
- with pytest.raises(HTTPNotFound):
- views.organization_application_approve(request)
+ result = views.organization_application_approve(db_request)
- @pytest.mark.usefixtures("_enable_organizations")
- def test_defer(self, db_request):
- admin = UserFactory.create()
- user = UserFactory.create()
- organization_application = OrganizationApplicationFactory.create(
- name="example", submitted_by=user
+ assert isinstance(result, HTTPSeeOther)
+ assert (
+ result.location
+ == f"/admin/organization_applications/{next_application.id}/"
)
+ @pytest.mark.usefixtures("_enable_organizations")
+ def test_defer(self, db_request):
+ organization_application = OrganizationApplicationFactory.create()
organization_service = pretend.stub(
- get_organization_application=lambda *a, **kw: organization_application,
- defer_organization_application=pretend.call_recorder(
- lambda *a, **kw: organization_application
+ get_organization_application=pretend.call_recorder(
+ lambda *a: organization_application
),
+ defer_organization_application=pretend.call_recorder(lambda *a: None),
+ )
+ db_request.find_service = pretend.call_recorder(
+ lambda service, **kwargs: organization_service
)
-
db_request.matchdict = {
- "organization_application_id": organization_application.id
+ "organization_application_id": str(organization_application.id)
}
db_request.params = {}
- db_request.user = admin
- db_request.route_path = pretend.call_recorder(_organization_application_routes)
- db_request.find_service = pretend.call_recorder(
- lambda iface, context: organization_service
+ db_request.route_path = pretend.call_recorder(
+ lambda *a, **kw: "/admin/organization_applications/1/"
+ )
+ db_request.session = pretend.stub(
+ flash=pretend.call_recorder(lambda *a, **kw: None)
)
- db_request.session.flash = pretend.call_recorder(lambda *a, **kw: None)
result = views.organization_application_defer(db_request)
+ assert isinstance(result, HTTPSeeOther)
+ assert result.location == "/admin/organization_applications/1/"
assert organization_service.defer_organization_application.calls == [
- pretend.call(organization_application.id, db_request),
+ pretend.call(organization_application.id, db_request)
]
assert db_request.session.flash.calls == [
pretend.call(
f'Request for "{organization_application.name}" organization deferred',
queue="success",
- ),
+ )
]
- assert result.status_code == 303
- assert (
- result.location
- == f"/admin/organization_applications/{organization_application.id}/"
- )
@pytest.mark.usefixtures("_enable_organizations")
- def test_defer_turbo_mode(self, db_request):
- admin = UserFactory.create()
- user = UserFactory.create()
- organization_application = OrganizationApplicationFactory.create(
- name="example", submitted_by=user
- )
-
+ def test_request_more_information(self, db_request):
+ organization_application = OrganizationApplicationFactory.create()
organization_service = pretend.stub(
- get_organization_application=lambda *a, **kw: organization_application,
- defer_organization_application=pretend.call_recorder(
- lambda *a, **kw: organization_application
+ get_organization_application=pretend.call_recorder(
+ lambda *a: organization_application
),
+ request_more_information=pretend.call_recorder(lambda *a: None),
+ )
+ db_request.find_service = pretend.call_recorder(
+ lambda service, **kwargs: organization_service
)
-
db_request.matchdict = {
- "organization_application_id": organization_application.id
+ "organization_application_id": str(organization_application.id)
}
- db_request.params = {"organization_applications_turbo_mode": "true"}
- db_request.user = admin
- db_request.route_path = pretend.call_recorder(_organization_application_routes)
- db_request.find_service = pretend.call_recorder(
- lambda iface, context: organization_service
+ db_request.params = {}
+ db_request.route_path = pretend.call_recorder(
+ lambda *a, **kw: "/admin/organization_applications/1/"
+ )
+ db_request.session = pretend.stub(
+ flash=pretend.call_recorder(lambda *a, **kw: None)
)
- db_request.session.flash = pretend.call_recorder(lambda *a, **kw: None)
- result = views.organization_application_defer(db_request)
+ result = views.organization_application_request_more_information(db_request)
- assert organization_service.defer_organization_application.calls == [
- pretend.call(organization_application.id, db_request),
+ assert isinstance(result, HTTPSeeOther)
+ assert result.location == "/admin/organization_applications/1/"
+ assert organization_service.request_more_information.calls == [
+ pretend.call(organization_application.id, db_request)
]
assert db_request.session.flash.calls == [
pretend.call(
- f'Request for "{organization_application.name}" organization deferred',
+ f'Request for more info from "{organization_application.name}" '
+ "organization sent",
queue="success",
- ),
+ )
]
- assert result.status_code == 303
- assert (
- result.location
- == f"/admin/organization_applications/{organization_application.id}/"
- )
-
- @pytest.mark.usefixtures("_enable_organizations")
- def test_defer_not_found(self):
- organization_service = pretend.stub(
- get_organization_application=lambda *a, **kw: None,
- )
- request = pretend.stub(
- flags=pretend.stub(enabled=lambda *a: False),
- find_service=lambda *a, **kw: organization_service,
- matchdict={"organization_application_id": pretend.stub()},
- )
-
- with pytest.raises(HTTPNotFound):
- views.organization_application_defer(request)
@pytest.mark.usefixtures("_enable_organizations")
- def test_request_more_information(self, db_request):
- admin = UserFactory.create()
- user = UserFactory.create()
- organization_application = OrganizationApplicationFactory.create(
- name="example", submitted_by=user
- )
-
+ def test_request_more_information_no_message(self, db_request):
+ organization_application = OrganizationApplicationFactory.create()
organization_service = pretend.stub(
- get_organization_application=lambda *a, **kw: organization_application,
- request_more_information=pretend.call_recorder(
- lambda *a, **kw: organization_application
+ get_organization_application=pretend.call_recorder(
+ lambda *a: organization_application
),
+ request_more_information=pretend.raiser(ValueError),
+ )
+ db_request.find_service = pretend.call_recorder(
+ lambda service, **kwargs: organization_service
)
-
db_request.matchdict = {
- "organization_application_id": organization_application.id
+ "organization_application_id": str(organization_application.id)
}
- db_request.params = {"message": "Welcome!"}
- db_request.user = admin
- db_request.route_path = pretend.call_recorder(_organization_application_routes)
- db_request.find_service = pretend.call_recorder(
- lambda iface, context: organization_service
+ db_request.params = {}
+ db_request.route_path = pretend.call_recorder(
+ lambda *a, **kw: "/admin/organization_applications/1/"
+ )
+ db_request.session = pretend.stub(
+ flash=pretend.call_recorder(lambda *a, **kw: None)
)
- db_request.session.flash = pretend.call_recorder(lambda *a, **kw: None)
result = views.organization_application_request_more_information(db_request)
- assert organization_service.request_more_information.calls == [
- pretend.call(organization_application.id, db_request),
- ]
+ assert isinstance(result, HTTPSeeOther)
+ assert result.location == "/admin/organization_applications/1/"
assert db_request.session.flash.calls == [
- pretend.call(
- (
- f'Request for more info from "{organization_application.name}" '
- "organization sent"
- ),
- queue="success",
- ),
+ pretend.call("No message provided", queue="error")
]
- assert result.status_code == 303
- assert (
- result.location
- == f"/admin/organization_applications/{organization_application.id}/"
- )
@pytest.mark.usefixtures("_enable_organizations")
- def test_request_more_information_turbo_mode(self, db_request):
- admin = UserFactory.create()
- user = UserFactory.create()
- organization_application = OrganizationApplicationFactory.create(
- name="example", submitted_by=user
- )
-
+ def test_decline(self, db_request):
+ organization_application = OrganizationApplicationFactory.create()
organization_service = pretend.stub(
- get_organization_application=lambda *a, **kw: organization_application,
- request_more_information=pretend.call_recorder(
- lambda *a, **kw: organization_application
+ get_organization_application=pretend.call_recorder(
+ lambda *a: organization_application
),
+ decline_organization_application=pretend.call_recorder(lambda *a: None),
+ )
+ db_request.find_service = pretend.call_recorder(
+ lambda service, **kwargs: organization_service
)
-
db_request.matchdict = {
- "organization_application_id": organization_application.id
- }
- db_request.params = {
- "message": "Welcome!",
- "organization_applications_turbo_mode": "true",
+ "organization_application_id": str(organization_application.id)
}
- db_request.user = admin
- db_request.route_path = pretend.call_recorder(_organization_application_routes)
- db_request.find_service = pretend.call_recorder(
- lambda iface, context: organization_service
+ db_request.params = {}
+ db_request.route_path = pretend.call_recorder(
+ lambda *a, **kw: "/admin/organization_applications/1/"
+ )
+ db_request.session = pretend.stub(
+ flash=pretend.call_recorder(lambda *a, **kw: None)
)
- db_request.session.flash = pretend.call_recorder(lambda *a, **kw: None)
- result = views.organization_application_request_more_information(db_request)
+ result = views.organization_application_decline(db_request)
- assert organization_service.request_more_information.calls == [
- pretend.call(organization_application.id, db_request),
+ assert isinstance(result, HTTPSeeOther)
+ assert result.location == "/admin/organization_applications/1/"
+ assert organization_service.decline_organization_application.calls == [
+ pretend.call(organization_application.id, db_request)
]
assert db_request.session.flash.calls == [
pretend.call(
- (
- f'Request for more info from "{organization_application.name}" '
- "organization sent"
- ),
+ f'Request for "{organization_application.name}" organization declined',
queue="success",
- ),
+ )
]
- assert result.status_code == 303
- assert (
- result.location
- == f"/admin/organization_applications/{organization_application.id}/"
- )
+
+class TestSetUploadLimit:
@pytest.mark.usefixtures("_enable_organizations")
- def test_request_more_information_for_not_found(self):
- organization_service = pretend.stub(
- get_organization_application=lambda *a, **kw: None,
+ def test_set_upload_limit_with_integer(self, db_request):
+ organization = OrganizationFactory.create(name="foo")
+
+ db_request.route_path = pretend.call_recorder(
+ lambda a, organization_id: "/admin/organizations/1/"
)
- request = pretend.stub(
- flags=pretend.stub(enabled=lambda *a: False),
- find_service=lambda *a, **kw: organization_service,
- matchdict={"organization_application_id": pretend.stub()},
+ db_request.session = pretend.stub(
+ flash=pretend.call_recorder(lambda *a, **kw: None)
)
+ db_request.matchdict["organization_id"] = organization.id
+ db_request.POST["upload_limit"] = "150"
- with pytest.raises(HTTPNotFound):
- views.organization_application_request_more_information(request)
+ result = views.set_upload_limit(db_request)
+
+ assert db_request.session.flash.calls == [
+ pretend.call("Upload limit set to 150MiB", queue="success")
+ ]
+ assert result.status_code == 303
+ assert result.location == "/admin/organizations/1/"
+ assert organization.upload_limit == 150 * views.ONE_MIB
@pytest.mark.usefixtures("_enable_organizations")
- def test_request_more_information_no_message(self, db_request):
- admin = UserFactory.create()
- user = UserFactory.create()
- organization_application = OrganizationApplicationFactory.create(
- name="example", submitted_by=user
- )
+ def test_set_upload_limit_with_none(self, db_request):
+ organization = OrganizationFactory.create(name="foo")
+ organization.upload_limit = 150 * views.ONE_MIB
- organization_service = pretend.stub(
- get_organization_application=lambda *a, **kw: organization_application,
- request_more_information=pretend.call_recorder(pretend.raiser(ValueError)),
+ db_request.route_path = pretend.call_recorder(
+ lambda a, organization_id: "/admin/organizations/1/"
)
-
- db_request.matchdict = {
- "organization_application_id": organization_application.id
- }
- db_request.params = {}
- db_request.user = admin
- db_request.route_path = pretend.call_recorder(_organization_application_routes)
- db_request.find_service = pretend.call_recorder(
- lambda iface, context: organization_service
+ db_request.session = pretend.stub(
+ flash=pretend.call_recorder(lambda *a, **kw: None)
)
- db_request.session.flash = pretend.call_recorder(lambda *a, **kw: None)
+ db_request.matchdict["organization_id"] = organization.id
+ db_request.POST["upload_limit"] = ""
- result = views.organization_application_request_more_information(db_request)
+ result = views.set_upload_limit(db_request)
- assert organization_service.request_more_information.calls == [
- pretend.call(organization_application.id, db_request),
- ]
assert db_request.session.flash.calls == [
- pretend.call("No message provided", queue="error"),
+ pretend.call("Upload limit set to (default)MiB", queue="success")
]
assert result.status_code == 303
- assert (
- result.location
- == f"/admin/organization_applications/{organization_application.id}/"
- )
+ assert result.location == "/admin/organizations/1/"
+ assert organization.upload_limit is None
@pytest.mark.usefixtures("_enable_organizations")
- def test_decline(self, db_request):
- admin = UserFactory.create()
- user = UserFactory.create()
- organization_application = OrganizationApplicationFactory.create(
- name="example", submitted_by=user
- )
+ def test_set_upload_limit_with_non_integer(self, db_request):
+ organization = OrganizationFactory.create(name="foo")
- organization_service = pretend.stub(
- get_organization_application=lambda *a, **kw: organization_application,
- decline_organization_application=pretend.call_recorder(
- lambda *a, **kw: organization_application
- ),
- )
+ db_request.matchdict["organization_id"] = organization.id
+ db_request.POST["upload_limit"] = "meep"
- db_request.matchdict = {
- "organization_application_id": organization_application.id
- }
- db_request.params = {
- "organization_name": organization_application.name,
- "message": "Sorry!",
- }
- db_request.user = admin
- db_request.route_path = pretend.call_recorder(_organization_application_routes)
- db_request.find_service = pretend.call_recorder(
- lambda iface, context: organization_service
+ with pytest.raises(HTTPBadRequest):
+ views.set_upload_limit(db_request)
+
+ @pytest.mark.usefixtures("_enable_organizations")
+ def test_set_upload_limit_with_less_than_minimum(self, db_request):
+ organization = OrganizationFactory.create(name="foo")
+
+ db_request.matchdict["organization_id"] = organization.id
+ # MAX_FILESIZE is 60 MiB, so 59 MiB < 60 MiB
+ db_request.POST["upload_limit"] = "59"
+
+ with pytest.raises(HTTPBadRequest):
+ views.set_upload_limit(db_request)
+
+ @pytest.mark.usefixtures("_enable_organizations")
+ def test_set_upload_limit_with_greater_than_maximum(self, db_request):
+ organization = OrganizationFactory.create(name="foo")
+
+ db_request.matchdict["organization_id"] = organization.id
+ # UPLOAD_LIMIT_CAP is 1 GiB, so 1025 MiB > 1024 MiB
+ db_request.POST["upload_limit"] = "1025"
+
+ with pytest.raises(HTTPBadRequest):
+ views.set_upload_limit(db_request)
+
+ @pytest.mark.usefixtures("_enable_organizations")
+ def test_set_upload_limit_not_found(self, db_request):
+ db_request.matchdict["organization_id"] = "00000000-0000-0000-0000-000000000000"
+
+ with pytest.raises(HTTPNotFound):
+ views.set_upload_limit(db_request)
+
+
+class TestSetTotalSizeLimit:
+ @pytest.mark.usefixtures("_enable_organizations")
+ def test_set_total_size_limit_with_integer(self, db_request):
+ organization = OrganizationFactory.create(name="foo")
+
+ db_request.route_path = pretend.call_recorder(
+ lambda a, organization_id: "/admin/organizations/1/"
+ )
+ db_request.session = pretend.stub(
+ flash=pretend.call_recorder(lambda *a, **kw: None)
)
- db_request.session.flash = pretend.call_recorder(lambda *a, **kw: None)
+ db_request.matchdict["organization_id"] = organization.id
+ db_request.POST["total_size_limit"] = "150"
- result = views.organization_application_decline(db_request)
+ result = views.set_total_size_limit(db_request)
- assert organization_service.decline_organization_application.calls == [
- pretend.call(organization_application.id, db_request),
- ]
assert db_request.session.flash.calls == [
- pretend.call(
- f'Request for "{organization_application.name}" organization declined',
- queue="success",
- ),
+ pretend.call("Total size limit set to 150.0GiB", queue="success")
]
assert result.status_code == 303
- assert (
- result.location
- == f"/admin/organization_applications/{organization_application.id}/"
- )
+ assert result.location == "/admin/organizations/1/"
+ assert organization.total_size_limit == 150 * views.ONE_GIB
@pytest.mark.usefixtures("_enable_organizations")
- def test_decline_turbo_mode(self, db_request):
- admin = UserFactory.create()
- user = UserFactory.create()
- organization_application = OrganizationApplicationFactory.create(
- name="example", submitted_by=user
- )
+ def test_set_total_size_limit_with_none(self, db_request):
+ organization = OrganizationFactory.create(name="foo")
+ organization.total_size_limit = 150 * views.ONE_GIB
- organization_service = pretend.stub(
- get_organization_application=lambda *a, **kw: organization_application,
- decline_organization_application=pretend.call_recorder(
- lambda *a, **kw: organization_application
- ),
+ db_request.route_path = pretend.call_recorder(
+ lambda a, organization_id: "/admin/organizations/1/"
)
-
- db_request.matchdict = {
- "organization_application_id": organization_application.id
- }
- db_request.params = {
- "organization_name": organization_application.name,
- "message": "Sorry!",
- "organization_applications_turbo_mode": "true",
- }
- db_request.user = admin
- db_request.route_path = pretend.call_recorder(_organization_application_routes)
- db_request.find_service = pretend.call_recorder(
- lambda iface, context: organization_service
+ db_request.session = pretend.stub(
+ flash=pretend.call_recorder(lambda *a, **kw: None)
)
- db_request.session.flash = pretend.call_recorder(lambda *a, **kw: None)
+ db_request.matchdict["organization_id"] = organization.id
+ db_request.POST["total_size_limit"] = ""
- result = views.organization_application_decline(db_request)
+ result = views.set_total_size_limit(db_request)
- assert organization_service.decline_organization_application.calls == [
- pretend.call(organization_application.id, db_request),
- ]
assert db_request.session.flash.calls == [
- pretend.call(
- f'Request for "{organization_application.name}" organization declined',
- queue="success",
- ),
+ pretend.call("Total size limit set to (default)GiB", queue="success")
]
assert result.status_code == 303
- assert (
- result.location
- == f"/admin/organization_applications/{organization_application.id}/"
- )
+ assert result.location == "/admin/organizations/1/"
+ assert organization.total_size_limit is None
@pytest.mark.usefixtures("_enable_organizations")
- def test_decline_not_found(self):
- organization_service = pretend.stub(
- get_organization_application=lambda *a, **kw: None,
- )
- request = pretend.stub(
- flags=pretend.stub(enabled=lambda *a: False),
- find_service=lambda *a, **kw: organization_service,
- matchdict={"organization_application_id": pretend.stub()},
- )
+ def test_set_total_size_limit_with_non_integer(self, db_request):
+ organization = OrganizationFactory.create(name="foo")
+
+ db_request.matchdict["organization_id"] = organization.id
+ db_request.POST["total_size_limit"] = "meep"
+
+ with pytest.raises(HTTPBadRequest):
+ views.set_total_size_limit(db_request)
+
+ @pytest.mark.usefixtures("_enable_organizations")
+ def test_set_total_size_limit_with_less_than_minimum(self, db_request):
+ organization = OrganizationFactory.create(name="foo")
+
+ db_request.matchdict["organization_id"] = organization.id
+ # MAX_PROJECT_SIZE is 10 GiB, so 9 GiB < 10 GiB
+ db_request.POST["total_size_limit"] = "9"
+
+ with pytest.raises(HTTPBadRequest):
+ views.set_total_size_limit(db_request)
+
+ @pytest.mark.usefixtures("_enable_organizations")
+ def test_set_total_size_limit_not_found(self, db_request):
+ db_request.matchdict["organization_id"] = "00000000-0000-0000-0000-000000000000"
with pytest.raises(HTTPNotFound):
- views.organization_application_decline(request)
+ views.set_total_size_limit(db_request)
class TestAddOrganizationRole:
@@ -1449,7 +1051,7 @@ def test_add_role(self, db_request, monkeypatch):
db_request.session = pretend.stub(
flash=pretend.call_recorder(lambda *a, **kw: None)
)
- db_request.POST = {"username": user.username, "role_name": "Manager"}
+ db_request.POST = {"username": "testuser", "role_name": "Manager"}
result = views.add_organization_role(db_request)
@@ -1463,10 +1065,13 @@ def test_add_role(self, db_request, monkeypatch):
)
]
- role = db_request.db.query(OrganizationRole).one()
+ # Check role was created
+ role = (
+ db_request.db.query(OrganizationRole)
+ .filter_by(organization=organization, user=user)
+ .one()
+ )
assert role.role_name == OrganizationRoleType.Manager
- assert role.user == user
- assert role.organization == organization
# Check event was recorded
assert record_event.calls == [
@@ -1510,13 +1115,13 @@ def test_add_role_unknown_user(self, db_request):
db_request.session = pretend.stub(
flash=pretend.call_recorder(lambda *a, **kw: None)
)
- db_request.POST = {"username": "nonexistent", "role_name": "Manager"}
+ db_request.POST = {"username": "unknown", "role_name": "Manager"}
result = views.add_organization_role(db_request)
assert isinstance(result, HTTPSeeOther)
assert db_request.session.flash.calls == [
- pretend.call("Unknown username 'nonexistent'", queue="error")
+ pretend.call("Unknown username 'unknown'", queue="error")
]
def test_add_role_no_role_name(self, db_request):
@@ -1530,7 +1135,7 @@ def test_add_role_no_role_name(self, db_request):
db_request.session = pretend.stub(
flash=pretend.call_recorder(lambda *a, **kw: None)
)
- db_request.POST = {"username": user.username}
+ db_request.POST = {"username": "testuser"}
result = views.add_organization_role(db_request)
assert isinstance(result, HTTPSeeOther)
@@ -1542,7 +1147,7 @@ def test_add_role_no_role_name(self, db_request):
def test_add_role_user_already_has_role(self, db_request):
organization = OrganizationFactory.create(name="pypi")
user = UserFactory.create(username="testuser")
- OrganizationRoleFactory.create(
+ existing_role = OrganizationRoleFactory.create(
organization=organization, user=user, role_name=OrganizationRoleType.Member
)
@@ -1553,7 +1158,7 @@ def test_add_role_user_already_has_role(self, db_request):
db_request.session = pretend.stub(
flash=pretend.call_recorder(lambda *a, **kw: None)
)
- db_request.POST = {"username": user.username, "role_name": "Manager"}
+ db_request.POST = {"username": "testuser", "role_name": "Manager"}
result = views.add_organization_role(db_request)
assert isinstance(result, HTTPSeeOther)
@@ -1566,9 +1171,7 @@ def test_add_role_user_already_has_role(self, db_request):
]
def test_add_role_organization_not_found(self, db_request):
- db_request.matchdict = {
- "organization_id": "00000000-0000-0000-0000-000000000000"
- }
+ db_request.matchdict = {"organization_id": "00000000-0000-0000-0000-000000000000"}
with pytest.raises(HTTPNotFound):
views.add_organization_role(db_request)
@@ -1848,4 +1451,4 @@ def test_delete_role_organization_not_found(self, db_request):
}
with pytest.raises(HTTPNotFound):
- views.delete_organization_role(db_request)
+ views.delete_organization_role(db_request)
\ No newline at end of file
diff --git a/tests/unit/admin/views/test_projects.py b/tests/unit/admin/views/test_projects.py
index c942f8b04e7e..a9dcab9fdf11 100644
--- a/tests/unit/admin/views/test_projects.py
+++ b/tests/unit/admin/views/test_projects.py
@@ -112,6 +112,29 @@ def test_non_normalized_name(self, db_request):
with pytest.raises(HTTPMovedPermanently):
views.project_detail(project, db_request)
+ def test_with_organization(self, db_request):
+ from ....common.db.organizations import (
+ OrganizationFactory,
+ OrganizationProjectFactory,
+ )
+
+ organization = OrganizationFactory.create(
+ upload_limit=150 * views.ONE_MIB,
+ total_size_limit=100 * views.ONE_GIB,
+ )
+ org_project = OrganizationProjectFactory.create(organization=organization)
+ project = org_project.project
+ project.upload_limit = 50 * views.ONE_MIB
+ project.total_size_limit = 50 * views.ONE_GIB
+ db_request.matchdict["project_name"] = str(project.normalized_name)
+ result = views.project_detail(project, db_request)
+
+ assert result["project"] == project
+ assert project.organization == organization
+ # Verify that the organization limits are accessible through the project
+ assert project.organization.upload_limit == 150 * views.ONE_MIB
+ assert project.organization.total_size_limit == 100 * views.ONE_GIB
+
class TestReleaseDetail:
def test_gets_release(self, db_request):
diff --git a/tests/unit/forklift/test_legacy.py b/tests/unit/forklift/test_legacy.py
index 06d4e0176df2..d9afd3c0da40 100644
--- a/tests/unit/forklift/test_legacy.py
+++ b/tests/unit/forklift/test_legacy.py
@@ -5796,6 +5796,151 @@ def test_upload_for_company_organization_owned_project_fails_without_subscriptio
"Please contact support+orgs@pypi.org."
)
+ def test_upload_with_organization_file_size_limit_succeeds(
+ self, pyramid_config, db_request, monkeypatch
+ ):
+ organization = OrganizationFactory.create(
+ orgtype="Company", upload_limit=120 * (1024**2) # 120 MiB
+ )
+ user = UserFactory.create(with_verified_primary_email=True)
+ OrganizationRoleFactory.create(organization=organization, user=user)
+ OrganizationStripeSubscriptionFactory.create(organization=organization)
+ project = OrganizationProjectFactory.create(
+ organization=organization,
+ project__upload_limit=70 * (1024**2), # 70 MiB
+ ).project
+ version = "1.0.0"
+
+ filename = (
+ f"{project.normalized_name.replace('-', '_')}-{version}-py3-none-any.whl"
+ )
+ # Create a small file representing a 110 MiB file
+ filebody = _get_whl_testdata(
+ name=project.normalized_name.replace("-", "_"), version=version
+ )
+
+ @pretend.call_recorder
+ def storage_service_store(path, file_path, *, meta):
+ with open(file_path, "rb") as fp:
+ if file_path.endswith(".metadata"):
+ assert fp.read() == b"Fake metadata"
+ else:
+ assert fp.read() == filebody
+
+ storage_service = pretend.stub(store=storage_service_store)
+
+ db_request.find_service = pretend.call_recorder(
+ lambda svc, name=None, context=None: {
+ IFileStorage: storage_service,
+ }.get(svc)
+ )
+
+ monkeypatch.setattr(
+ legacy, "_is_valid_dist_file", lambda *a, **kw: (True, None)
+ )
+
+ # Mock the file size to be 110 MiB
+ class MockFieldStorage:
+ def __init__(self, data, filename):
+ self.file = io.BytesIO(data)
+ self.filename = filename
+ self.type = "application/x-wheel+zip"
+ self.length = 110 * (1024**2) # 110 MiB
+
+ pyramid_config.testing_securitypolicy(identity=user)
+ db_request.user = user
+ db_request.user_agent = "warehouse-tests/6.6.6"
+ db_request.POST = MultiDict(
+ {
+ "metadata_version": "1.2",
+ "name": project.name,
+ "version": "1.0.0",
+ "summary": "This is a summary",
+ "filetype": "bdist_wheel",
+ "pyversion": "py3",
+ "md5_digest": hashlib.md5(filebody).hexdigest(),
+ "content": MockFieldStorage(filebody, filename),
+ }
+ )
+
+ resp = legacy.file_upload(db_request)
+
+ assert resp.status_code == 200
+
+ def test_upload_with_organization_total_size_limit_succeeds(
+ self, pyramid_config, db_request, monkeypatch
+ ):
+ organization = OrganizationFactory.create(
+ orgtype="Company", total_size_limit=100 * (1024**3) # 100 GiB
+ )
+ user = UserFactory.create(with_verified_primary_email=True)
+ OrganizationRoleFactory.create(organization=organization, user=user)
+ OrganizationStripeSubscriptionFactory.create(organization=organization)
+
+ # Create project with 90 GiB already used, project limit of 50 GiB
+ project = OrganizationProjectFactory.create(
+ organization=organization,
+ project__total_size_limit=50 * (1024**3), # 50 GiB
+ project__total_size=90 * (1024**3), # 90 GiB already used
+ ).project
+ version = "1.0.0"
+
+ filename = (
+ f"{project.normalized_name.replace('-', '_')}-{version}-py3-none-any.whl"
+ )
+ # Create a small file representing a 5 GiB file
+ filebody = _get_whl_testdata(
+ name=project.normalized_name.replace("-", "_"), version=version
+ )
+
+ @pretend.call_recorder
+ def storage_service_store(path, file_path, *, meta):
+ with open(file_path, "rb") as fp:
+ if file_path.endswith(".metadata"):
+ assert fp.read() == b"Fake metadata"
+ else:
+ assert fp.read() == filebody
+
+ storage_service = pretend.stub(store=storage_service_store)
+
+ db_request.find_service = pretend.call_recorder(
+ lambda svc, name=None, context=None: {
+ IFileStorage: storage_service,
+ }.get(svc)
+ )
+
+ monkeypatch.setattr(
+ legacy, "_is_valid_dist_file", lambda *a, **kw: (True, None)
+ )
+
+ # Mock the file size to be 5 GiB
+ class MockFieldStorage:
+ def __init__(self, data, filename):
+ self.file = io.BytesIO(data)
+ self.filename = filename
+ self.type = "application/x-wheel+zip"
+ self.length = 5 * (1024**3) # 5 GiB
+
+ pyramid_config.testing_securitypolicy(identity=user)
+ db_request.user = user
+ db_request.user_agent = "warehouse-tests/6.6.6"
+ db_request.POST = MultiDict(
+ {
+ "metadata_version": "1.2",
+ "name": project.name,
+ "version": "1.0.0",
+ "summary": "This is a summary",
+ "filetype": "bdist_wheel",
+ "pyversion": "py3",
+ "md5_digest": hashlib.md5(filebody).hexdigest(),
+ "content": MockFieldStorage(filebody, filename),
+ }
+ )
+
+ resp = legacy.file_upload(db_request)
+
+ assert resp.status_code == 200
+
def test_upload_for_company_organization_owned_project_suceeds_with_subscription(
self, pyramid_config, db_request, monkeypatch
):
diff --git a/warehouse/admin/routes.py b/warehouse/admin/routes.py
index 3e250940f9e9..9813a8374ed7 100644
--- a/warehouse/admin/routes.py
+++ b/warehouse/admin/routes.py
@@ -24,6 +24,16 @@ def includeme(config):
"/admin/organizations/{organization_id}/rename/",
domain=warehouse,
)
+ config.add_route(
+ "admin.organization.set_upload_limit",
+ "/admin/organizations/{organization_id}/set_upload_limit/",
+ domain=warehouse,
+ )
+ config.add_route(
+ "admin.organization.set_total_size_limit",
+ "/admin/organizations/{organization_id}/set_total_size_limit/",
+ domain=warehouse,
+ )
config.add_route(
"admin.organization.add_role",
"/admin/organizations/{organization_id}/add_role/",
diff --git a/warehouse/admin/templates/admin/organizations/detail.html b/warehouse/admin/templates/admin/organizations/detail.html
index 8944e430ff0e..261383574dd9 100644
--- a/warehouse/admin/templates/admin/organizations/detail.html
+++ b/warehouse/admin/templates/admin/organizations/detail.html
@@ -166,6 +166,66 @@
Organization Details
+
+