|
23 | 23 | AccountManagerPermissionChecker, |
24 | 24 | ObjectPermissionChecker, |
25 | 25 | PermissionManagerPermissionChecker, |
| 26 | + RepositoryManagerPermissionChecker, |
26 | 27 | ) |
27 | 28 | from infrahub.graphql.initialization import prepare_graphql_params |
28 | 29 | from infrahub.permissions.local_backend import LocalPermissionBackend |
|
182 | 183 | } |
183 | 184 | """ |
184 | 185 |
|
| 186 | +MUTATION_REPOSITORY = """ |
| 187 | +mutation { |
| 188 | + CoreRepositoryCreate(data: { |
| 189 | + name: {value: "Test"} |
| 190 | + location: {value: "/var/random"} |
| 191 | + }) { |
| 192 | + ok |
| 193 | + } |
| 194 | +} |
| 195 | +""" |
| 196 | + |
| 197 | +MUTATION_READONLY_REPOSITORY = """ |
| 198 | +mutation { |
| 199 | + CoreReadOnlyRepositoryCreate(data: { |
| 200 | + name: {value: "Test"} |
| 201 | + location: {value: "/var/random"} |
| 202 | + }) { |
| 203 | + ok |
| 204 | + } |
| 205 | +} |
| 206 | +""" |
| 207 | + |
| 208 | +MUTATION_GENERIC_REPOSITORY = """ |
| 209 | +mutation { |
| 210 | + CoreGenericRepositoryUpdate(data: { |
| 211 | + name: {value: "Test"} |
| 212 | + location: {value: "/var/random"} |
| 213 | + }) { |
| 214 | + ok |
| 215 | + } |
| 216 | +} |
| 217 | +""" |
| 218 | + |
185 | 219 |
|
186 | 220 | class TestObjectPermissions: |
187 | 221 | async def test_setup( |
@@ -560,3 +594,117 @@ async def test_account_without_permission( |
560 | 594 | query_parameters=gql_params, |
561 | 595 | branch=permissions_helper.default_branch, |
562 | 596 | ) |
| 597 | + |
| 598 | + |
| 599 | +class TestRepositoryManagerPermissions: |
| 600 | + async def test_setup( |
| 601 | + self, |
| 602 | + db: InfrahubDatabase, |
| 603 | + register_core_models_schema: None, |
| 604 | + default_branch: Branch, |
| 605 | + permissions_helper: PermissionsHelper, |
| 606 | + first_account: CoreAccount, |
| 607 | + second_account: CoreAccount, |
| 608 | + ): |
| 609 | + registry.permission_backends = [LocalPermissionBackend()] |
| 610 | + permissions_helper._default_branch = default_branch |
| 611 | + |
| 612 | + permission = await Node.init(db=db, schema=InfrahubKind.GLOBALPERMISSION) |
| 613 | + await permission.new( |
| 614 | + db=db, name=GlobalPermissions.MANAGE_REPOSITORIES.value, action=GlobalPermissions.MANAGE_REPOSITORIES.value |
| 615 | + ) |
| 616 | + await permission.save(db=db) |
| 617 | + |
| 618 | + role = await Node.init(db=db, schema=InfrahubKind.ACCOUNTROLE) |
| 619 | + await role.new(db=db, name="admin", permissions=[permission]) |
| 620 | + await role.save(db=db) |
| 621 | + |
| 622 | + group = await Node.init(db=db, schema=InfrahubKind.ACCOUNTGROUP) |
| 623 | + await group.new(db=db, name="admin", roles=[role]) |
| 624 | + await group.save(db=db) |
| 625 | + |
| 626 | + await group.members.add(db=db, data={"id": first_account.id}) |
| 627 | + await group.members.save(db=db) |
| 628 | + |
| 629 | + permissions_helper._first = first_account |
| 630 | + permissions_helper._second = second_account |
| 631 | + |
| 632 | + @pytest.mark.parametrize( |
| 633 | + "user", |
| 634 | + [ |
| 635 | + AccountSession(account_id="abc", auth_type=AuthType.JWT, role=AccountRole.ADMIN), |
| 636 | + AccountSession(authenticated=False, account_id="anonymous", auth_type=AuthType.NONE), |
| 637 | + ], |
| 638 | + ) |
| 639 | + async def test_supports_manage_repositories_permission_accounts( |
| 640 | + self, user: AccountSession, db: InfrahubDatabase, permissions_helper: PermissionsHelper |
| 641 | + ): |
| 642 | + checker = AccountManagerPermissionChecker() |
| 643 | + is_supported = await checker.supports(db=db, account_session=user, branch=permissions_helper.default_branch) |
| 644 | + assert is_supported == user.authenticated |
| 645 | + |
| 646 | + @pytest.mark.parametrize( |
| 647 | + "operation", [MUTATION_REPOSITORY, MUTATION_READONLY_REPOSITORY, MUTATION_GENERIC_REPOSITORY] |
| 648 | + ) |
| 649 | + async def test_account_with_permission( |
| 650 | + self, db: InfrahubDatabase, permissions_helper: PermissionsHelper, operation: str |
| 651 | + ): |
| 652 | + checker = RepositoryManagerPermissionChecker() |
| 653 | + session = AccountSession( |
| 654 | + authenticated=True, account_id=permissions_helper.first.id, session_id=str(uuid4()), auth_type=AuthType.JWT |
| 655 | + ) |
| 656 | + |
| 657 | + gql_params = prepare_graphql_params(db=db, include_mutation=True, branch=permissions_helper.default_branch) |
| 658 | + analyzed_query = InfrahubGraphQLQueryAnalyzer( |
| 659 | + query=operation, schema=gql_params.schema, branch=permissions_helper.default_branch |
| 660 | + ) |
| 661 | + |
| 662 | + resolution = await checker.check( |
| 663 | + db=db, |
| 664 | + account_session=session, |
| 665 | + analyzed_query=analyzed_query, |
| 666 | + query_parameters=gql_params, |
| 667 | + branch=permissions_helper.default_branch, |
| 668 | + ) |
| 669 | + assert resolution == CheckerResolution.NEXT_CHECKER |
| 670 | + |
| 671 | + @pytest.mark.parametrize( |
| 672 | + "operation,must_raise", |
| 673 | + [ |
| 674 | + (MUTATION_REPOSITORY, True), |
| 675 | + (MUTATION_READONLY_REPOSITORY, True), |
| 676 | + (MUTATION_GENERIC_REPOSITORY, True), |
| 677 | + (QUERY_TAGS, False), |
| 678 | + ], |
| 679 | + ) |
| 680 | + async def test_account_without_permission( |
| 681 | + self, db: InfrahubDatabase, permissions_helper: PermissionsHelper, operation: str, must_raise: bool |
| 682 | + ): |
| 683 | + checker = RepositoryManagerPermissionChecker() |
| 684 | + session = AccountSession( |
| 685 | + authenticated=True, account_id=permissions_helper.second.id, session_id=str(uuid4()), auth_type=AuthType.JWT |
| 686 | + ) |
| 687 | + |
| 688 | + gql_params = prepare_graphql_params(db=db, include_mutation=True, branch=permissions_helper.default_branch) |
| 689 | + analyzed_query = InfrahubGraphQLQueryAnalyzer( |
| 690 | + query=operation, schema=gql_params.schema, branch=permissions_helper.default_branch |
| 691 | + ) |
| 692 | + |
| 693 | + if not must_raise: |
| 694 | + resolution = await checker.check( |
| 695 | + db=db, |
| 696 | + account_session=session, |
| 697 | + analyzed_query=analyzed_query, |
| 698 | + query_parameters=gql_params, |
| 699 | + branch=permissions_helper.default_branch, |
| 700 | + ) |
| 701 | + assert resolution == CheckerResolution.NEXT_CHECKER |
| 702 | + else: |
| 703 | + with pytest.raises(PermissionDeniedError, match=r"You do not have the permission to manage repositories"): |
| 704 | + await checker.check( |
| 705 | + db=db, |
| 706 | + account_session=session, |
| 707 | + analyzed_query=analyzed_query, |
| 708 | + query_parameters=gql_params, |
| 709 | + branch=permissions_helper.default_branch, |
| 710 | + ) |
0 commit comments