|
22 | 22 | from infrahub.graphql.auth.query_permission_checker.object_permission_checker import ( |
23 | 23 | AccountManagerPermissionChecker, |
24 | 24 | ObjectPermissionChecker, |
| 25 | + PermissionManagerPermissionChecker, |
25 | 26 | ) |
26 | 27 | from infrahub.graphql.initialization import prepare_graphql_params |
27 | 28 | from infrahub.permissions.local_backend import LocalPermissionBackend |
|
127 | 128 | } |
128 | 129 | """ |
129 | 130 |
|
| 131 | +MUTATION_GLOBAL_PERMISSION = """ |
| 132 | +mutation { |
| 133 | + CoreGlobalPermissionCreate(data: { |
| 134 | + name: {value: "Merge branch"} |
| 135 | + action: {value: "merge_branch"} |
| 136 | + }) { |
| 137 | + ok |
| 138 | + object { |
| 139 | + identifier { |
| 140 | + value |
| 141 | + } |
| 142 | + } |
| 143 | + } |
| 144 | +} |
| 145 | +""" |
| 146 | + |
| 147 | +MUTATION_OBJECT_PERMISSION = """ |
| 148 | +mutation { |
| 149 | + CoreObjectPermissionCreate(data: { |
| 150 | + branch: {value: "*"} |
| 151 | + namespace: {value: "*"} |
| 152 | + name: {value: "*"} |
| 153 | + }) { |
| 154 | + ok |
| 155 | + object { |
| 156 | + identifier { |
| 157 | + value |
| 158 | + } |
| 159 | + } |
| 160 | + } |
| 161 | +} |
| 162 | +""" |
| 163 | + |
| 164 | +QUERY_ACCOUNT_PERMISSIONS = """ |
| 165 | +query { |
| 166 | + InfrahubPermissions { |
| 167 | + global_permissions { |
| 168 | + edges { |
| 169 | + node { |
| 170 | + identifier |
| 171 | + } |
| 172 | + } |
| 173 | + } |
| 174 | + object_permissions { |
| 175 | + edges { |
| 176 | + node { |
| 177 | + identifier |
| 178 | + } |
| 179 | + } |
| 180 | + } |
| 181 | + } |
| 182 | +} |
| 183 | +""" |
| 184 | + |
130 | 185 |
|
131 | 186 | class TestObjectPermissions: |
132 | 187 | async def test_setup( |
@@ -293,7 +348,7 @@ async def test_setup( |
293 | 348 |
|
294 | 349 | permission = await Node.init(db=db, schema=InfrahubKind.GLOBALPERMISSION) |
295 | 350 | await permission.new( |
296 | | - db=db, name=GlobalPermissions.EDIT_DEFAULT_BRANCH.value, action=GlobalPermissions.MANAGE_ACCOUNTS.value |
| 351 | + db=db, name=GlobalPermissions.MANAGE_ACCOUNTS.value, action=GlobalPermissions.MANAGE_ACCOUNTS.value |
297 | 352 | ) |
298 | 353 | await permission.save(db=db) |
299 | 354 |
|
@@ -391,3 +446,117 @@ async def test_account_without_permission( |
391 | 446 | query_parameters=gql_params, |
392 | 447 | branch=permissions_helper.default_branch, |
393 | 448 | ) |
| 449 | + |
| 450 | + |
| 451 | +class TestPermissionManagerPermissions: |
| 452 | + async def test_setup( |
| 453 | + self, |
| 454 | + db: InfrahubDatabase, |
| 455 | + register_core_models_schema: None, |
| 456 | + default_branch: Branch, |
| 457 | + permissions_helper: PermissionsHelper, |
| 458 | + first_account: CoreAccount, |
| 459 | + second_account: CoreAccount, |
| 460 | + ): |
| 461 | + registry.permission_backends = [LocalPermissionBackend()] |
| 462 | + permissions_helper._default_branch = default_branch |
| 463 | + |
| 464 | + permission = await Node.init(db=db, schema=InfrahubKind.GLOBALPERMISSION) |
| 465 | + await permission.new( |
| 466 | + db=db, name=GlobalPermissions.MANAGE_PERMISSIONS.value, action=GlobalPermissions.MANAGE_PERMISSIONS.value |
| 467 | + ) |
| 468 | + await permission.save(db=db) |
| 469 | + |
| 470 | + role = await Node.init(db=db, schema=InfrahubKind.ACCOUNTROLE) |
| 471 | + await role.new(db=db, name="admin", permissions=[permission]) |
| 472 | + await role.save(db=db) |
| 473 | + |
| 474 | + group = await Node.init(db=db, schema=InfrahubKind.ACCOUNTGROUP) |
| 475 | + await group.new(db=db, name="admin", roles=[role]) |
| 476 | + await group.save(db=db) |
| 477 | + |
| 478 | + await group.members.add(db=db, data={"id": first_account.id}) |
| 479 | + await group.members.save(db=db) |
| 480 | + |
| 481 | + permissions_helper._first = first_account |
| 482 | + permissions_helper._second = second_account |
| 483 | + |
| 484 | + @pytest.mark.parametrize( |
| 485 | + "user", |
| 486 | + [ |
| 487 | + AccountSession(account_id="abc", auth_type=AuthType.JWT, role=AccountRole.ADMIN), |
| 488 | + AccountSession(authenticated=False, account_id="anonymous", auth_type=AuthType.NONE), |
| 489 | + ], |
| 490 | + ) |
| 491 | + async def test_supports_manage_accounts_permission_accounts( |
| 492 | + self, user: AccountSession, db: InfrahubDatabase, permissions_helper: PermissionsHelper |
| 493 | + ): |
| 494 | + checker = PermissionManagerPermissionChecker() |
| 495 | + is_supported = await checker.supports(db=db, account_session=user, branch=permissions_helper.default_branch) |
| 496 | + assert is_supported == user.authenticated |
| 497 | + |
| 498 | + @pytest.mark.parametrize( |
| 499 | + "operation", [MUTATION_GLOBAL_PERMISSION, MUTATION_OBJECT_PERMISSION, QUERY_ACCOUNT_PERMISSIONS] |
| 500 | + ) |
| 501 | + async def test_account_with_permission( |
| 502 | + self, db: InfrahubDatabase, permissions_helper: PermissionsHelper, operation: str |
| 503 | + ): |
| 504 | + checker = PermissionManagerPermissionChecker() |
| 505 | + session = AccountSession( |
| 506 | + authenticated=True, account_id=permissions_helper.first.id, session_id=str(uuid4()), auth_type=AuthType.JWT |
| 507 | + ) |
| 508 | + |
| 509 | + gql_params = prepare_graphql_params(db=db, include_mutation=True, branch=permissions_helper.default_branch) |
| 510 | + analyzed_query = InfrahubGraphQLQueryAnalyzer( |
| 511 | + query=operation, schema=gql_params.schema, branch=permissions_helper.default_branch |
| 512 | + ) |
| 513 | + |
| 514 | + resolution = await checker.check( |
| 515 | + db=db, |
| 516 | + account_session=session, |
| 517 | + analyzed_query=analyzed_query, |
| 518 | + query_parameters=gql_params, |
| 519 | + branch=permissions_helper.default_branch, |
| 520 | + ) |
| 521 | + assert resolution == CheckerResolution.NEXT_CHECKER |
| 522 | + |
| 523 | + @pytest.mark.parametrize( |
| 524 | + "operation,must_raise", |
| 525 | + [ |
| 526 | + (MUTATION_GLOBAL_PERMISSION, True), |
| 527 | + (MUTATION_OBJECT_PERMISSION, True), |
| 528 | + (QUERY_TAGS, False), |
| 529 | + (QUERY_ACCOUNT_PERMISSIONS, False), |
| 530 | + ], |
| 531 | + ) |
| 532 | + async def test_account_without_permission( |
| 533 | + self, db: InfrahubDatabase, permissions_helper: PermissionsHelper, operation: str, must_raise: bool |
| 534 | + ): |
| 535 | + checker = PermissionManagerPermissionChecker() |
| 536 | + session = AccountSession( |
| 537 | + authenticated=True, account_id=permissions_helper.second.id, session_id=str(uuid4()), auth_type=AuthType.JWT |
| 538 | + ) |
| 539 | + |
| 540 | + gql_params = prepare_graphql_params(db=db, include_mutation=True, branch=permissions_helper.default_branch) |
| 541 | + analyzed_query = InfrahubGraphQLQueryAnalyzer( |
| 542 | + query=operation, schema=gql_params.schema, branch=permissions_helper.default_branch |
| 543 | + ) |
| 544 | + |
| 545 | + if not must_raise: |
| 546 | + resolution = await checker.check( |
| 547 | + db=db, |
| 548 | + account_session=session, |
| 549 | + analyzed_query=analyzed_query, |
| 550 | + query_parameters=gql_params, |
| 551 | + branch=permissions_helper.default_branch, |
| 552 | + ) |
| 553 | + assert resolution == CheckerResolution.NEXT_CHECKER |
| 554 | + else: |
| 555 | + with pytest.raises(PermissionDeniedError, match=r"You do not have the permission to manage permissions"): |
| 556 | + await checker.check( |
| 557 | + db=db, |
| 558 | + account_session=session, |
| 559 | + analyzed_query=analyzed_query, |
| 560 | + query_parameters=gql_params, |
| 561 | + branch=permissions_helper.default_branch, |
| 562 | + ) |
0 commit comments