11"""Tests for RBAC scope seeding."""
22
3+ from uuid import uuid4
4+
35import pytest
4- from sqlalchemy import select
6+ from sqlalchemy import func , select
57
68from tracecat .authz .enums import ScopeSource
79from tracecat .authz .seeding import (
10+ PRESET_ROLE_DEFINITIONS ,
811 SYSTEM_SCOPE_DEFINITIONS ,
9- seed_registry_scope ,
10- seed_registry_scopes_bulk ,
12+ seed_registry_scopes ,
13+ seed_system_roles_for_all_orgs ,
1114 seed_system_scopes ,
1215)
13- from tracecat .db .models import Scope
16+ from tracecat .db .models import Organization , Role , RoleScope , Scope
1417
1518
1619@pytest .mark .anyio
@@ -61,41 +64,7 @@ async def test_seed_system_scopes_idempotent(session):
6164
6265
6366@pytest .mark .anyio
64- async def test_seed_registry_scope (session ):
65- """Test seeding a single registry scope."""
66- action_key = "tools.test_integration.test_action"
67-
68- scope = await seed_registry_scope (session , action_key , "Test action scope" )
69- await session .commit ()
70-
71- assert scope is not None
72- assert scope .name == f"action:{ action_key } :execute"
73- assert scope .resource == "action"
74- assert scope .action == "execute"
75- assert scope .source == ScopeSource .PLATFORM
76- assert scope .source_ref == action_key
77- assert scope .organization_id is None
78-
79-
80- @pytest .mark .anyio
81- async def test_seed_registry_scope_idempotent (session ):
82- """Test that seeding the same registry scope twice returns existing scope."""
83- action_key = "tools.test_integration.test_action_idempotent"
84-
85- # Seed first time
86- scope1 = await seed_registry_scope (session , action_key )
87- await session .commit ()
88-
89- # Seed second time
90- scope2 = await seed_registry_scope (session , action_key )
91-
92- assert scope1 is not None
93- assert scope2 is not None
94- assert scope1 .id == scope2 .id
95-
96-
97- @pytest .mark .anyio
98- async def test_seed_registry_scopes_bulk (session ):
67+ async def test_seed_registry_scopes (session ):
9968 """Test bulk seeding of registry scopes."""
10069 action_keys = [
10170 "tools.okta.list_users" ,
@@ -104,7 +73,7 @@ async def test_seed_registry_scopes_bulk(session):
10473 "core.http_request" ,
10574 ]
10675
107- inserted_count = await seed_registry_scopes_bulk (session , action_keys )
76+ inserted_count = await seed_registry_scopes (session , action_keys )
10877 await session .commit ()
10978
11079 assert inserted_count == len (action_keys )
@@ -125,28 +94,91 @@ async def test_seed_registry_scopes_bulk(session):
12594
12695
12796@pytest .mark .anyio
128- async def test_seed_registry_scopes_bulk_idempotent (session ):
97+ async def test_seed_registry_scopes_idempotent (session ):
12998 """Test that bulk seeding is idempotent."""
13099 action_keys = ["tools.test.action1" , "tools.test.action2" ]
131100
132101 # First seed
133- first_count = await seed_registry_scopes_bulk (session , action_keys )
102+ first_count = await seed_registry_scopes (session , action_keys )
134103 await session .commit ()
135104 assert first_count == 2
136105
137106 # Second seed
138- second_count = await seed_registry_scopes_bulk (session , action_keys )
107+ second_count = await seed_registry_scopes (session , action_keys )
139108 await session .commit ()
140109 assert second_count == 0
141110
142111
143112@pytest .mark .anyio
144- async def test_seed_registry_scopes_bulk_empty (session ):
113+ async def test_seed_registry_scopes_empty (session ):
145114 """Test bulk seeding with empty list."""
146- inserted_count = await seed_registry_scopes_bulk (session , [])
115+ inserted_count = await seed_registry_scopes (session , [])
147116 assert inserted_count == 0
148117
149118
119+ @pytest .mark .anyio
120+ async def test_seed_system_roles_for_all_orgs_creates_roles_and_links (session ):
121+ """Seed preset roles for all orgs and link expected system scopes."""
122+ # Ensure scope IDs exist for role->scope links.
123+ await seed_system_scopes (session )
124+
125+ # Add an extra org so the function processes multiple orgs in one call.
126+ extra_org = Organization (
127+ id = uuid4 (),
128+ name = "Extra test org" ,
129+ slug = f"extra-test-org-{ uuid4 ().hex [:8 ]} " ,
130+ is_active = True ,
131+ )
132+ session .add (extra_org )
133+ await session .flush ()
134+
135+ # Capture target org IDs in this isolated session.
136+ org_result = await session .execute (select (Organization .id ))
137+ org_ids = {org_id for (org_id ,) in org_result .tuples ().all ()}
138+
139+ created_by_org = await seed_system_roles_for_all_orgs (session )
140+
141+ for org_id in org_ids :
142+ assert created_by_org [org_id ] == len (PRESET_ROLE_DEFINITIONS )
143+
144+ roles_result = await session .execute (
145+ select (Role .id , Role .organization_id , Role .slug ).where (
146+ Role .organization_id .in_ (org_ids ),
147+ Role .slug .in_ (PRESET_ROLE_DEFINITIONS ),
148+ )
149+ )
150+ roles = roles_result .tuples ().all ()
151+ assert len (roles ) == len (org_ids ) * len (PRESET_ROLE_DEFINITIONS )
152+
153+ role_scope_count_stmt = (
154+ select (Role .slug , func .count (RoleScope .scope_id ))
155+ .select_from (Role )
156+ .join (RoleScope , RoleScope .role_id == Role .id )
157+ .where (
158+ Role .organization_id .in_ (org_ids ),
159+ Role .slug .in_ (PRESET_ROLE_DEFINITIONS ),
160+ )
161+ .group_by (Role .slug )
162+ )
163+ role_scope_count_result = await session .execute (role_scope_count_stmt )
164+ role_scope_counts = dict (role_scope_count_result .tuples ().all ())
165+ expected_org_count = len (org_ids )
166+ for slug , role_def in PRESET_ROLE_DEFINITIONS .items ():
167+ assert role_scope_counts [slug ] == len (role_def .scopes ) * expected_org_count
168+
169+
170+ @pytest .mark .anyio
171+ async def test_seed_system_roles_for_all_orgs_idempotent (session ):
172+ """Running role seeding twice should not create duplicate roles."""
173+ await seed_system_scopes (session )
174+
175+ first = await seed_system_roles_for_all_orgs (session )
176+ second = await seed_system_roles_for_all_orgs (session )
177+
178+ assert sum (first .values ()) > 0
179+ assert all (created == 0 for created in second .values ())
180+
181+
150182@pytest .mark .anyio
151183async def test_system_scope_definitions_format (session ):
152184 """Test that all system scope definitions follow the expected format."""
0 commit comments