Skip to content

Commit 37f760c

Browse files
committed
feat(tables): enforce model-a RLS on dynamic workspace schemas
1 parent 5e27081 commit 37f760c

File tree

4 files changed

+369
-16
lines changed

4 files changed

+369
-16
lines changed
Lines changed: 215 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,215 @@
1+
"""apply model a rls to dynamic workspace schemas
2+
3+
Revision ID: b5fc4168fe22
4+
Revises: c76f9b01fad7
5+
Create Date: 2026-02-27 11:43:07.059399
6+
7+
"""
8+
9+
from __future__ import annotations
10+
11+
from collections.abc import Sequence
12+
from uuid import UUID
13+
14+
import sqlalchemy as sa
15+
16+
from alembic import op
17+
from tracecat.identifiers.workflow import WorkspaceUUID
18+
19+
# revision identifiers, used by Alembic.
20+
revision: str = "b5fc4168fe22"
21+
down_revision: str | None = "c76f9b01fad7"
22+
branch_labels: str | Sequence[str] | None = None
23+
depends_on: str | Sequence[str] | None = None
24+
25+
INTERNAL_TENANT_COLUMN = "__tc_workspace_id"
26+
DYNAMIC_WORKSPACE_RLS_POLICY = "rls_policy_dynamic_workspace"
27+
RLS_WORKSPACE_VAR = "app.current_workspace_id"
28+
RLS_BYPASS_VAR = "app.rls_bypass"
29+
RLS_BYPASS_ON = "on"
30+
DYNAMIC_WORKSPACE_SCHEMA_PREFIXES = ("tables_", "custom_fields_")
31+
32+
33+
def _workspace_id_from_schema(schema_name: str) -> UUID | None:
34+
"""Resolve workspace UUID from a dynamic workspace schema name."""
35+
for prefix in DYNAMIC_WORKSPACE_SCHEMA_PREFIXES:
36+
if not schema_name.startswith(prefix):
37+
continue
38+
short_workspace_id = schema_name.removeprefix(prefix)
39+
try:
40+
return WorkspaceUUID.new(short_workspace_id)
41+
except ValueError:
42+
return None
43+
return None
44+
45+
46+
def _qualified_table(
47+
preparer: sa.sql.compiler.IdentifierPreparer, schema_name: str, table_name: str
48+
) -> str:
49+
"""Return a properly quoted schema-qualified table reference."""
50+
return f"{preparer.quote_schema(schema_name)}.{preparer.quote(table_name)}"
51+
52+
53+
def _workspace_dynamic_tables() -> list[tuple[str, str, UUID]]:
54+
"""Collect all physical tables in workspace-scoped dynamic schemas."""
55+
bind = op.get_bind()
56+
inspector = sa.inspect(bind)
57+
tables: list[tuple[str, str, UUID]] = []
58+
for schema_name in sorted(inspector.get_schema_names()):
59+
workspace_id = _workspace_id_from_schema(schema_name)
60+
if workspace_id is None:
61+
continue
62+
for table_name in sorted(inspector.get_table_names(schema=schema_name)):
63+
tables.append((schema_name, table_name, workspace_id))
64+
return tables
65+
66+
67+
def _policy_expression() -> str:
68+
"""Build the RLS tenant policy expression for dynamic workspace tables."""
69+
return (
70+
f"current_setting('{RLS_BYPASS_VAR}', true) = '{RLS_BYPASS_ON}' "
71+
f'OR "{INTERNAL_TENANT_COLUMN}" = '
72+
f"NULLIF(current_setting('{RLS_WORKSPACE_VAR}', true), '')::uuid"
73+
)
74+
75+
76+
def _ensure_tenant_column(
77+
*,
78+
schema_name: str,
79+
table_name: str,
80+
workspace_id: UUID,
81+
preparer: sa.sql.compiler.IdentifierPreparer,
82+
) -> None:
83+
"""Add/backfill the internal tenant column on a physical dynamic table."""
84+
bind = op.get_bind()
85+
inspector = sa.inspect(bind)
86+
qualified_table = _qualified_table(preparer, schema_name, table_name)
87+
88+
has_tenant_column = any(
89+
column["name"] == INTERNAL_TENANT_COLUMN
90+
for column in inspector.get_columns(table_name, schema=schema_name)
91+
)
92+
if not has_tenant_column:
93+
op.execute(
94+
sa.text(
95+
f"""
96+
ALTER TABLE {qualified_table}
97+
ADD COLUMN "{INTERNAL_TENANT_COLUMN}" UUID
98+
"""
99+
)
100+
)
101+
102+
bind.execute(
103+
sa.text(
104+
f"""
105+
UPDATE {qualified_table}
106+
SET "{INTERNAL_TENANT_COLUMN}" = :workspace_id
107+
WHERE "{INTERNAL_TENANT_COLUMN}" IS NULL
108+
"""
109+
),
110+
{"workspace_id": str(workspace_id)},
111+
)
112+
113+
op.execute(
114+
sa.text(
115+
f"""
116+
ALTER TABLE {qualified_table}
117+
ALTER COLUMN "{INTERNAL_TENANT_COLUMN}"
118+
SET DEFAULT '{workspace_id}'::uuid
119+
"""
120+
)
121+
)
122+
op.execute(
123+
sa.text(
124+
f"""
125+
ALTER TABLE {qualified_table}
126+
ALTER COLUMN "{INTERNAL_TENANT_COLUMN}"
127+
SET NOT NULL
128+
"""
129+
)
130+
)
131+
132+
133+
def _enable_table_rls(
134+
*, schema_name: str, table_name: str, preparer: sa.sql.compiler.IdentifierPreparer
135+
) -> None:
136+
"""Enable RLS policy for a physical dynamic table."""
137+
qualified_table = _qualified_table(preparer, schema_name, table_name)
138+
policy_expr = _policy_expression()
139+
140+
op.execute(sa.text(f"ALTER TABLE {qualified_table} ENABLE ROW LEVEL SECURITY"))
141+
op.execute(
142+
sa.text(
143+
f"""
144+
DROP POLICY IF EXISTS {DYNAMIC_WORKSPACE_RLS_POLICY}
145+
ON {qualified_table}
146+
"""
147+
)
148+
)
149+
op.execute(
150+
sa.text(
151+
f"""
152+
CREATE POLICY {DYNAMIC_WORKSPACE_RLS_POLICY} ON {qualified_table}
153+
FOR ALL
154+
USING ({policy_expr})
155+
WITH CHECK ({policy_expr})
156+
"""
157+
)
158+
)
159+
160+
161+
def _disable_table_rls(
162+
*, schema_name: str, table_name: str, preparer: sa.sql.compiler.IdentifierPreparer
163+
) -> None:
164+
"""Disable RLS policy for a physical dynamic table."""
165+
qualified_table = _qualified_table(preparer, schema_name, table_name)
166+
op.execute(
167+
sa.text(
168+
f"""
169+
DROP POLICY IF EXISTS {DYNAMIC_WORKSPACE_RLS_POLICY}
170+
ON {qualified_table}
171+
"""
172+
)
173+
)
174+
op.execute(sa.text(f"ALTER TABLE {qualified_table} DISABLE ROW LEVEL SECURITY"))
175+
176+
177+
def upgrade() -> None:
178+
"""Apply workspace-tenant column + RLS to dynamic workspace schemas."""
179+
bind = op.get_bind()
180+
preparer = sa.sql.compiler.IdentifierPreparer(bind.dialect)
181+
182+
for schema_name, table_name, workspace_id in _workspace_dynamic_tables():
183+
_ensure_tenant_column(
184+
schema_name=schema_name,
185+
table_name=table_name,
186+
workspace_id=workspace_id,
187+
preparer=preparer,
188+
)
189+
_enable_table_rls(
190+
schema_name=schema_name,
191+
table_name=table_name,
192+
preparer=preparer,
193+
)
194+
195+
196+
def downgrade() -> None:
197+
"""Rollback workspace-tenant column + RLS on dynamic workspace schemas."""
198+
bind = op.get_bind()
199+
preparer = sa.sql.compiler.IdentifierPreparer(bind.dialect)
200+
201+
for schema_name, table_name, _ in _workspace_dynamic_tables():
202+
_disable_table_rls(
203+
schema_name=schema_name,
204+
table_name=table_name,
205+
preparer=preparer,
206+
)
207+
qualified_table = _qualified_table(preparer, schema_name, table_name)
208+
op.execute(
209+
sa.text(
210+
f"""
211+
ALTER TABLE {qualified_table}
212+
DROP COLUMN IF EXISTS "{INTERNAL_TENANT_COLUMN}"
213+
"""
214+
)
215+
)

tracecat/cases/service.py

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@
114114
from tracecat.service import BaseWorkspaceService, requires_entitlement
115115
from tracecat.tables.common import normalize_column_options
116116
from tracecat.tables.enums import SqlType
117-
from tracecat.tables.service import TablesService
117+
from tracecat.tables.service import DYNAMIC_WORKSPACE_TENANT_COLUMN, TablesService
118118
from tracecat.tiers.enums import Entitlement
119119

120120

@@ -1002,7 +1002,14 @@ class CaseFieldsService(CustomFieldsService):
10021002
# Hardcoded to preserve existing workspace-scoped table names
10031003
# (metadata table was renamed from case_fields to case_field)
10041004
_table = "case_fields"
1005-
_reserved_columns = {"id", "case_id", "created_at", "updated_at", "workspace_id"}
1005+
_reserved_columns = {
1006+
"id",
1007+
"case_id",
1008+
"created_at",
1009+
"updated_at",
1010+
"workspace_id",
1011+
DYNAMIC_WORKSPACE_TENANT_COLUMN,
1012+
}
10061013

10071014
def _table_definition(self) -> sa.Table:
10081015
"""Return the SQLAlchemy Table definition for the case_fields workspace table."""
@@ -1023,6 +1030,12 @@ def _table_definition(self) -> sa.Table:
10231030
nullable=False,
10241031
server_default=sa.func.now(),
10251032
),
1033+
sa.Column(
1034+
DYNAMIC_WORKSPACE_TENANT_COLUMN,
1035+
UUID(as_uuid=True),
1036+
nullable=False,
1037+
server_default=self._workspace_tenant_default_sql(),
1038+
),
10261039
# Use the actual Case table column to avoid metadata resolution issues
10271040
sa.ForeignKeyConstraint(
10281041
["case_id"],

tracecat/custom_fields/service.py

Lines changed: 61 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,16 @@
1313
from tracecat.db.locks import derive_lock_key_from_parts, pg_advisory_lock
1414
from tracecat.identifiers.workflow import WorkspaceUUID
1515
from tracecat.service import BaseWorkspaceService
16-
from tracecat.tables.service import TableEditorService, sanitize_identifier
16+
from tracecat.tables.service import (
17+
DYNAMIC_WORKSPACE_RLS_POLICY,
18+
DYNAMIC_WORKSPACE_TENANT_COLUMN,
19+
RLS_BYPASS_ON,
20+
RLS_BYPASS_VAR,
21+
RLS_WORKSPACE_VAR,
22+
TableEditorService,
23+
is_internal_column_name,
24+
sanitize_identifier,
25+
)
1726

1827

1928
class CustomFieldsService(BaseWorkspaceService, ABC):
@@ -50,6 +59,49 @@ def _get_schema_name(self) -> str:
5059

5160
return f"{self._schema_prefix}{self._workspace_uuid.short()}"
5261

62+
def _full_table_name(self) -> str:
63+
"""Get the fully-qualified workspace table name."""
64+
return f'"{self.schema_name}".{self.sanitized_table_name}'
65+
66+
def _workspace_tenant_default_sql(self) -> sa.TextClause:
67+
"""Build the server default expression for the tenant column."""
68+
return sa.text(f"'{self._workspace_uuid}'::uuid")
69+
70+
async def _enable_workspace_rls_for_base_table(self) -> None:
71+
"""Enable workspace RLS policy on the workspace base table."""
72+
conn = await self.session.connection()
73+
policy_expr = (
74+
f"current_setting('{RLS_BYPASS_VAR}', true) = '{RLS_BYPASS_ON}' "
75+
f'OR "{DYNAMIC_WORKSPACE_TENANT_COLUMN}" = '
76+
f"NULLIF(current_setting('{RLS_WORKSPACE_VAR}', true), '')::uuid"
77+
)
78+
full_table_name = self._full_table_name()
79+
await conn.execute(
80+
sa.DDL("ALTER TABLE %s ENABLE ROW LEVEL SECURITY", full_table_name)
81+
)
82+
await conn.execute(
83+
sa.DDL(
84+
f"DROP POLICY IF EXISTS {DYNAMIC_WORKSPACE_RLS_POLICY} ON %s",
85+
full_table_name,
86+
)
87+
)
88+
await conn.execute(
89+
sa.DDL(
90+
f"""
91+
CREATE POLICY {DYNAMIC_WORKSPACE_RLS_POLICY} ON %s
92+
FOR ALL
93+
USING ({policy_expr})
94+
WITH CHECK ({policy_expr})
95+
""",
96+
full_table_name,
97+
)
98+
)
99+
100+
def _assert_user_field_name_allowed(self, field_name: str) -> None:
101+
"""Reject operations on internal/system-managed field names."""
102+
if is_internal_column_name(field_name):
103+
raise ValueError(f"Field {field_name} is reserved for internal use")
104+
53105
@abstractmethod
54106
def _table_definition(self) -> sa.Table:
55107
"""Return the SQLAlchemy Table definition for the workspace table.
@@ -74,6 +126,7 @@ async def initialize_workspace_schema(self) -> None:
74126
await self.session.execute(
75127
CreateTable(self._table_definition(), if_not_exists=True)
76128
)
129+
await self._enable_workspace_rls_for_base_table()
77130
self._schema_initialized = True
78131

79132
async def _ensure_schema_ready(self) -> None:
@@ -108,12 +161,14 @@ async def list_fields(self) -> Sequence[sa.engine.interfaces.ReflectedColumn]:
108161
"""List all custom fields for the workspace."""
109162

110163
await self._ensure_schema_ready()
111-
return await self.editor.get_columns()
164+
columns = await self.editor.get_columns()
165+
return [col for col in columns if not is_internal_column_name(col["name"])]
112166

113167
async def create_field(self, params: CustomFieldCreate) -> None:
114168
"""Create a new custom field column."""
115169

116170
await self._ensure_schema_ready()
171+
self._assert_user_field_name_allowed(params.name)
117172
params.nullable = True # Custom fields remain nullable by default
118173
await self.editor.create_column(params)
119174
await self.session.commit()
@@ -122,6 +177,9 @@ async def update_field(self, field_id: str, params: CustomFieldUpdate) -> None:
122177
"""Update a custom field column."""
123178

124179
await self._ensure_schema_ready()
180+
self._assert_user_field_name_allowed(field_id)
181+
if params.name is not None:
182+
self._assert_user_field_name_allowed(params.name)
125183
await self.editor.update_column(field_id, params)
126184
await self.session.commit()
127185

@@ -136,6 +194,7 @@ async def delete_field(self, field_id: str) -> None:
136194
"""
137195

138196
await self._ensure_schema_ready()
197+
self._assert_user_field_name_allowed(field_id)
139198
if field_id in self._reserved_columns:
140199
raise ValueError(f"Field {field_id} is a reserved field")
141200
await self.editor.delete_column(field_id)

0 commit comments

Comments
 (0)