1414from ansible_base .jwt_consumer .common .cert import JWTCert , JWTCertException
1515from ansible_base .jwt_consumer .common .exceptions import InvalidTokenException
1616from ansible_base .lib .utils .translations import translatableConditionally as _
17+ from ansible_base .rbac .claims import get_or_create_resource , save_user_claims
1718from ansible_base .rbac .models import RoleDefinition , RoleUserAssignment
1819from ansible_base .rbac .permission_registry import permission_registry
1920from ansible_base .resource_registry .models import Resource
2021from test_app .models import Organization , Team
2122
2223default_logger = 'ansible_base.jwt_consumer.common.auth.logger'
2324
25+ claims_logger = 'ansible_base.rbac.claims.logger'
26+
2427
2528@pytest .fixture
2629def organization_admin_role ():
@@ -300,8 +303,8 @@ def test_apply_rbac_permissions_system_roles(
300303 authentication = JWTCommonAuth ()
301304 authentication .user = admin_user
302305 if logs_error :
303- with expected_log (default_logger , 'error' , 'Unable to grant' ):
304- authentication ._apply_rbac_permissions ( {}, {}, global_roles )
306+ with expected_log (claims_logger , 'error' , 'Unable to grant' ):
307+ save_user_claims ( authentication .user , {}, {}, global_roles )
305308 elif logs_error is not None :
306309 # Make sure we have a System Auditor role
307310 RoleDefinition .objects .get_or_create (
@@ -311,17 +314,17 @@ def test_apply_rbac_permissions_system_roles(
311314 'managed' : True ,
312315 },
313316 )
314- with expected_log (default_logger , 'info' , 'Granted user' ):
315- authentication ._apply_rbac_permissions ( {}, {}, global_roles )
317+ with expected_log (claims_logger , 'info' , 'Granted user' ):
318+ save_user_claims ( authentication .user , {}, {}, global_roles )
316319 else :
317- authentication ._apply_rbac_permissions ( {}, {}, global_roles )
320+ save_user_claims ( authentication .user , {}, {}, global_roles )
318321
319322 def test_apply_rbac_permissions_object_roles_role_dne (self , expected_log , admin_user ):
320323 authentication = JWTCommonAuth ()
321324 authentication .user = admin_user
322325 object_roles = {'Junk' : ['a' ]}
323- with expected_log (default_logger , 'error' , 'Unable to grant' ):
324- authentication ._apply_rbac_permissions ( {}, object_roles , [])
326+ with expected_log (claims_logger , 'error' , 'Unable to grant' ):
327+ save_user_claims ( authentication .user , {}, object_roles , [])
325328
326329 @pytest .mark .parametrize (
327330 "object_roles,log_level,log_substring" ,
@@ -337,16 +340,16 @@ def test_apply_rbac_permissions_object_role_exists_object_exists(
337340 authentication .user = admin_user
338341 objects = {'organization' : [{'ansible_id' : organization .resource .ansible_id , 'name' : organization .name }]}
339342 if log_level :
340- with expected_log (default_logger , log_level , log_substring ):
341- authentication ._apply_rbac_permissions ( objects , object_roles , [])
343+ with expected_log (claims_logger , log_level , log_substring ):
344+ save_user_claims ( authentication .user , objects , object_roles , [])
342345
343346 def test_apply_rbac_permissions_org_duplicate_name_error (self , expected_log , admin_user , organization , organization_admin_role ):
344347 authentication = JWTCommonAuth ()
345348 authentication .user = admin_user
346349 objects = {'organization' : [{'ansible_id' : str (uuid4 ()), 'name' : organization .name }]}
347350 object_roles = {"Organization Admin" : {'content_type' : 'organization' , 'objects' : [0 ]}}
348- with expected_log (default_logger , "warning" , "Got integrity error" ):
349- authentication ._apply_rbac_permissions ( objects , object_roles , [])
351+ with expected_log (claims_logger , "warning" , "Got integrity error" ):
352+ save_user_claims ( authentication .user , objects , object_roles , [])
350353
351354 def test_apply_rbac_permissions_removed_when_removed_from_jwt (self , admin_user , organization , organization_admin_role ):
352355 # Make sure we have a System Auditor role
@@ -364,27 +367,25 @@ def test_apply_rbac_permissions_removed_when_removed_from_jwt(self, admin_user,
364367 object_roles = {organization_admin_role .name : {'content_type' : 'organization' , 'objects' : [0 ]}}
365368 global_roles = ["Platform Auditor" ]
366369
367- authentication ._apply_rbac_permissions ( objects , object_roles , global_roles )
370+ save_user_claims ( authentication .user , objects , object_roles , global_roles )
368371
369372 assert RoleUserAssignment .objects .filter (user = admin_user ).count () == 2
370373
371374 # Test removing all roles
372- authentication ._apply_rbac_permissions ( {}, {}, [])
375+ save_user_claims ( authentication .user , {}, {}, [])
373376
374377 assert RoleUserAssignment .objects .filter (user = admin_user ).count () == 0
375378
376379 @pytest .mark .django_db
377380 def test_get_or_create_resource_invalid_content_type (self ):
378- authentication = JWTCommonAuth ()
379- assert authentication .get_or_create_resource ('junk' , {'ansible_id' : uuid4 ()}) == (None , None )
381+ assert get_or_create_resource ({}, 'junk' , {'ansible_id' : uuid4 ()}) == (None , None )
380382
381383 @pytest .mark .django_db
382384 def test_get_or_create_resource_organization (self ):
383- authentication = JWTCommonAuth ()
384385 data = {'ansible_id' : uuid4 (), 'name' : 'Test Organization' }
385386 assert not Organization .objects .filter (name = data ['name' ]).exists ()
386387 assert not Resource .objects .filter (ansible_id = data ['ansible_id' ]).exists ()
387- resource , obj = authentication . get_or_create_resource ('organization' , data )
388+ resource , obj = get_or_create_resource (data , 'organization' , data )
388389 assert resource is not None and obj is not None
389390 assert Organization .objects .filter (name = data ['name' ]).exists ()
390391 assert Resource .objects .filter (ansible_id = data ['ansible_id' ]).exists ()
@@ -411,7 +412,7 @@ def test_get_or_create_resource_team(self):
411412 assert not Team .objects .filter (name = data ['name' ]).exists ()
412413 assert not Organization .objects .filter (name = org_name ).exists ()
413414 assert not Resource .objects .filter (ansible_id = data ['ansible_id' ]).exists ()
414- resource , obj = authentication . get_or_create_resource ('team' , data )
415+ resource , obj = get_or_create_resource (authentication . token [ 'objects' ], 'team' , data )
415416 assert resource is not None and obj is not None
416417 assert Organization .objects .filter (name = org_name ).exists ()
417418 assert Resource .objects .filter (ansible_id = data ['ansible_id' ]).exists ()
@@ -483,7 +484,7 @@ def test_process_rbac_permissions_cache_scenarios(
483484 mock .patch ('ansible_base.jwt_consumer.common.auth.get_user_claims_hashable_form' ) as mock_get_hashable ,
484485 mock .patch ('ansible_base.jwt_consumer.common.auth.get_claims_hash' ) as mock_get_hash ,
485486 mock .patch .object (authentication , '_fetch_jwt_claims_from_gateway' ) as mock_gateway ,
486- mock .patch . object ( authentication , '_apply_rbac_permissions ' ) as mock_apply ,
487+ mock .patch ( 'ansible_base.jwt_consumer.common.auth.save_user_claims ' ) as mock_apply ,
487488 ):
488489
489490 # Setup mocks
@@ -532,7 +533,7 @@ def test_process_rbac_permissions_cache_scenarios(
532533
533534 # Verify RBAC application behavior
534535 if expected_rbac_call :
535- mock_apply .assert_called_once ( )
536+ mock_apply .assert_called_once_with ( admin_user , gateway_response [ 'objects' ], gateway_response [ 'object_roles' ], gateway_response [ 'global_roles' ] )
536537 else :
537538 mock_apply .assert_not_called ()
538539
0 commit comments