|
1 | | -"""Tests for RBAC scope and role seeding.""" |
2 | | - |
3 | | -from uuid import uuid4 |
| 1 | +"""Tests for RBAC scope seeding.""" |
4 | 2 |
|
5 | 3 | import pytest |
6 | 4 | from sqlalchemy import select |
7 | 5 |
|
8 | | -from tracecat.authz.enums import ScopeSource, WorkspaceRole |
9 | | -from tracecat.authz.scopes import SYSTEM_ROLE_SCOPES |
| 6 | +from tracecat.authz.enums import ScopeSource |
10 | 7 | from tracecat.authz.seeding import ( |
11 | | - SYSTEM_ROLE_DEFINITIONS, |
12 | 8 | SYSTEM_SCOPE_DEFINITIONS, |
13 | | - get_system_scope_ids, |
14 | 9 | seed_registry_scope, |
15 | 10 | seed_registry_scopes_bulk, |
16 | | - seed_system_roles_for_org, |
17 | 11 | seed_system_scopes, |
18 | 12 | ) |
19 | | -from tracecat.db.models import Organization, Role, RoleScope, Scope |
| 13 | +from tracecat.db.models import Scope |
20 | 14 |
|
21 | 15 |
|
22 | 16 | @pytest.mark.anyio |
@@ -153,148 +147,6 @@ async def test_seed_registry_scopes_bulk_empty(session): |
153 | 147 | assert inserted_count == 0 |
154 | 148 |
|
155 | 149 |
|
156 | | -@pytest.mark.anyio |
157 | | -async def test_get_system_scope_ids(session): |
158 | | - """Test retrieving system scope IDs by name.""" |
159 | | - # First seed the system scopes |
160 | | - await seed_system_scopes(session) |
161 | | - |
162 | | - # Get scope IDs for a subset of scopes |
163 | | - scope_names = frozenset({"workflow:read", "workflow:create", "case:read"}) |
164 | | - scope_ids = await get_system_scope_ids(session, scope_names) |
165 | | - |
166 | | - assert len(scope_ids) == 3 |
167 | | - assert "workflow:read" in scope_ids |
168 | | - assert "workflow:create" in scope_ids |
169 | | - assert "case:read" in scope_ids |
170 | | - |
171 | | - |
172 | | -@pytest.mark.anyio |
173 | | -async def test_get_system_scope_ids_with_wildcards(session): |
174 | | - """Test retrieving scope IDs including wildcard scopes.""" |
175 | | - # First seed the system scopes |
176 | | - await seed_system_scopes(session) |
177 | | - |
178 | | - # ADMIN_SCOPES includes wildcard scopes like "action:*:execute" |
179 | | - scope_names = frozenset({"workflow:read", "action:*:execute"}) |
180 | | - scope_ids = await get_system_scope_ids(session, scope_names) |
181 | | - |
182 | | - # Should find both exact and wildcard scopes |
183 | | - assert "workflow:read" in scope_ids |
184 | | - assert "action:*:execute" in scope_ids |
185 | | - |
186 | | - |
187 | | -@pytest.mark.anyio |
188 | | -async def test_seed_system_roles_for_org(session): |
189 | | - """Test seeding system roles for an organization.""" |
190 | | - # Create a test organization first |
191 | | - org = Organization( |
192 | | - id=uuid4(), |
193 | | - name="Test Org for Roles", |
194 | | - slug=f"test-org-roles-{uuid4().hex[:8]}", |
195 | | - ) |
196 | | - session.add(org) |
197 | | - await session.flush() |
198 | | - |
199 | | - # Seed system scopes first (roles depend on these) |
200 | | - await seed_system_scopes(session) |
201 | | - |
202 | | - # Seed system roles for the organization |
203 | | - created_count = await seed_system_roles_for_org(session, org.id) |
204 | | - |
205 | | - assert created_count == len(SYSTEM_ROLE_DEFINITIONS) |
206 | | - |
207 | | - # Verify roles exist |
208 | | - result = await session.execute( |
209 | | - select(Role).where( |
210 | | - Role.organization_id == org.id, |
211 | | - Role.is_system.is_(True), |
212 | | - ) |
213 | | - ) |
214 | | - roles = result.scalars().all() |
215 | | - assert len(roles) == len(SYSTEM_ROLE_DEFINITIONS) |
216 | | - |
217 | | - role_names = {r.name for r in roles} |
218 | | - expected_names = {name for name, _ in SYSTEM_ROLE_DEFINITIONS.values()} |
219 | | - assert role_names == expected_names |
220 | | - |
221 | | - |
222 | | -@pytest.mark.anyio |
223 | | -async def test_seed_system_roles_for_org_idempotent(session): |
224 | | - """Test that role seeding is idempotent.""" |
225 | | - # Create a test organization |
226 | | - org = Organization( |
227 | | - id=uuid4(), |
228 | | - name="Test Org for Idempotent Roles", |
229 | | - slug=f"test-org-idem-{uuid4().hex[:8]}", |
230 | | - ) |
231 | | - session.add(org) |
232 | | - await session.flush() |
233 | | - |
234 | | - # Seed system scopes first |
235 | | - await seed_system_scopes(session) |
236 | | - |
237 | | - # Seed roles twice |
238 | | - first_count = await seed_system_roles_for_org(session, org.id) |
239 | | - second_count = await seed_system_roles_for_org(session, org.id) |
240 | | - |
241 | | - assert first_count == len(SYSTEM_ROLE_DEFINITIONS) |
242 | | - assert second_count == 0 # No new roles on second run |
243 | | - |
244 | | - |
245 | | -@pytest.mark.anyio |
246 | | -async def test_seed_system_roles_have_scopes(session): |
247 | | - """Test that seeded system roles have the correct scopes assigned.""" |
248 | | - # Create a test organization |
249 | | - org = Organization( |
250 | | - id=uuid4(), |
251 | | - name="Test Org for Role Scopes", |
252 | | - slug=f"test-org-scopes-{uuid4().hex[:8]}", |
253 | | - ) |
254 | | - session.add(org) |
255 | | - await session.flush() |
256 | | - |
257 | | - # Seed system scopes first |
258 | | - await seed_system_scopes(session) |
259 | | - |
260 | | - # Seed system roles |
261 | | - await seed_system_roles_for_org(session, org.id) |
262 | | - |
263 | | - # Verify Viewer role has correct scopes |
264 | | - result = await session.execute( |
265 | | - select(Role).where( |
266 | | - Role.organization_id == org.id, |
267 | | - Role.name == "Viewer", |
268 | | - ) |
269 | | - ) |
270 | | - viewer_role = result.scalar_one() |
271 | | - |
272 | | - # Get the role-scope assignments |
273 | | - result = await session.execute( |
274 | | - select(RoleScope).where(RoleScope.role_id == viewer_role.id) |
275 | | - ) |
276 | | - role_scopes = result.scalars().all() |
277 | | - |
278 | | - # Viewer role should have scopes assigned |
279 | | - # The exact count depends on which scopes exist in the database |
280 | | - # (wildcards may not all have individual scope entries) |
281 | | - viewer_scope_names = SYSTEM_ROLE_SCOPES[WorkspaceRole.VIEWER] |
282 | | - assert len(role_scopes) > 0 |
283 | | - |
284 | | - # Verify at least some expected scopes are present |
285 | | - result = await session.execute( |
286 | | - select(Scope.name).where(Scope.id.in_([rs.scope_id for rs in role_scopes])) |
287 | | - ) |
288 | | - assigned_scope_names = set(result.scalars().all()) |
289 | | - |
290 | | - # Check that non-wildcard scopes are assigned |
291 | | - for scope_name in viewer_scope_names: |
292 | | - if "*" not in scope_name: |
293 | | - assert scope_name in assigned_scope_names, ( |
294 | | - f"Expected {scope_name} to be assigned to Viewer role" |
295 | | - ) |
296 | | - |
297 | | - |
298 | 150 | @pytest.mark.anyio |
299 | 151 | async def test_system_scope_definitions_format(session): |
300 | 152 | """Test that all system scope definitions follow the expected format.""" |
|
0 commit comments