Skip to content

Commit fee22ce

Browse files
committed
feat: transactions and rls rules
Implements Row-Level Security (RLS) policies and SQL transaction management. Row-Level Security (RLS): - Database function get_current_organization_id() reads app.current_organization_id from session - RLS policies on tables with organization_id filter rows automatically - Schema uses organizationRLSPolicy() helper to enable RLS per table SQL Transactions: - Domain Layer (@domain/shared): SqlClient interface for database operations - Platform Layer (@platform/db-postgres): SqlClientLive with automatic RLS context - App Layer (apps/*): Boundaries provide SqlClientLive with organization context Usage patterns: - Repositories use sqlClient.query() for single operations - Use cases use sqlClient.transaction() for multi-step operations - Routes provide SqlClientLive(client, organizationId) for RLS enforcement Key behaviors: - Every transaction sets app.current_organization_id session variable - Nested transactions share connection (pass-through proxy) - Domain errors propagate; failures trigger automatic rollback
1 parent 2098c83 commit fee22ce

File tree

86 files changed

+2517
-2573
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

86 files changed

+2517
-2573
lines changed

AGENTS.md

Lines changed: 131 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -240,7 +240,109 @@ Base config: `tsconfig.base.json`
240240
- Weaviate adapter stack lives in `packages/platform/db-weaviate`
241241
- Domain models are independent from table/row shapes
242242
- Mapping from DB rows to domain objects belongs in platform adapters
243-
- **Apps use pool-based connections**: Use `createPostgresPool()` in `apps/*/clients.ts` for direct pool access
243+
- **Apps use SqlClient for all DB access**: Boundaries provide `SqlClientLive` layer with organization context for RLS enforcement
244+
245+
### SqlClient and Row-Level Security (RLS)
246+
247+
All Postgres access flows through `SqlClient`—a domain-level service that abstracts database operations and enforces organization scoping via RLS.
248+
249+
**Architecture:**
250+
- **Domain Layer** (`@domain/shared`): `SqlClient` interface with `transaction()` and `query()` methods
251+
- **Platform Layer** (`@platform/db-postgres`): `SqlClientLive` implementation with automatic RLS context setting
252+
- **App Layer** (`apps/*`): Boundaries provide `SqlClientLive` with the request's organization context
253+
254+
**Key behaviors:**
255+
- Every transaction automatically sets `app.current_organization_id` session variable
256+
- RLS policies filter all queries by this organization ID at the database level
257+
- Nested transactions share the same connection (pass-through proxy—no nested transaction overhead)
258+
- Domain errors propagate through Effect error channel; database errors become `RepositoryError`
259+
260+
**Usage in boundaries (apps):**
261+
262+
```typescript
263+
// apps/api/src/routes/projects.ts
264+
import { SqlClientLive } from "@platform/db-postgres"
265+
import { ProjectRepositoryLive } from "@platform/db-postgres"
266+
267+
app.openapi(createProjectRoute, async (c) => {
268+
const project = await Effect.runPromise(
269+
createProjectUseCase(input).pipe(
270+
Effect.provide(ProjectRepositoryLive),
271+
Effect.provide(SqlClientLive(c.var.postgresClient, c.var.organization.id)),
272+
),
273+
)
274+
return c.json(toProjectResponse(project), 201)
275+
})
276+
```
277+
278+
```typescript
279+
// apps/web/src/domains/projects/projects.functions.ts
280+
import { getPostgresClient } from "../../server/clients.ts"
281+
282+
export const createProject = createServerFn({ method: "POST" })
283+
.handler(async ({ data }) => {
284+
const { organizationId } = await requireSession()
285+
const client = getPostgresClient()
286+
287+
const project = await Effect.runPromise(
288+
createProjectUseCase({...}).pipe(
289+
Effect.provide(ProjectRepositoryLive),
290+
Effect.provide(SqlClientLive(client, organizationId)),
291+
)
292+
)
293+
return toRecord(project)
294+
})
295+
```
296+
297+
**Usage in use-cases (multi-operation transactions):**
298+
299+
```typescript
300+
// packages/domain/auth/src/use-cases/complete-auth-intent.ts
301+
export const completeAuthIntentUseCase = (input) =>
302+
Effect.gen(function* () {
303+
const sqlClient = yield* SqlClient
304+
305+
// Wraps multi-step operation in single transaction with RLS
306+
yield* sqlClient.transaction(handleIntentByType(intent, input.session))
307+
})
308+
309+
const handleSignup = (intent, session) =>
310+
Effect.gen(function* () {
311+
const users = yield* UserRepository
312+
const memberships = yield* MembershipRepository
313+
314+
// All operations share the same transaction + RLS context
315+
const organization = yield* createOrganizationUseCase({...})
316+
yield* memberships.save(createMembership({...}))
317+
yield* users.setNameIfMissing({...})
318+
})
319+
```
320+
321+
**Usage in repositories (single operations):**
322+
323+
```typescript
324+
// packages/platform/db-postgres/src/repositories/project-repository.ts
325+
export const ProjectRepositoryLive = Layer.effect(
326+
ProjectRepository,
327+
Effect.gen(function* () {
328+
const sqlClient = (yield* SqlClient) as SqlClientShape<Operator>
329+
330+
return {
331+
findById: (id) =>
332+
sqlClient
333+
.query((db) => db.select().from(projects).where(eq(projects.id, id)))
334+
.pipe(Effect.flatMap(...)),
335+
336+
save: (project) =>
337+
Effect.gen(function* () {
338+
yield* sqlClient.query((db) =>
339+
db.insert(projects).values(row).onConflictDoUpdate({...})
340+
)
341+
}),
342+
}
343+
})
344+
)
345+
```
244346

245347
### Postgres Management
246348

@@ -610,17 +712,41 @@ The web app uses a **server-centric, query-driven** architecture built on the Ta
610712
**Server Functions** — All data fetching and mutations use `createServerFn` from `@tanstack/react-start`:
611713

612714
```typescript
715+
import { Effect } from "effect"
716+
import { ProjectRepository, createProjectUseCase } from "@domain/projects"
717+
import { ProjectRepositoryLive, SqlClientLive } from "@platform/db-postgres"
718+
import { getPostgresClient } from "../../server/clients.ts"
719+
613720
// Query (GET)
614721
export const listProjects = createServerFn({ method: "GET" }).handler(async () => {
615722
const { organizationId } = await requireSession()
616-
const { db } = getPostgresClient()
617-
// compose domain use-cases...
723+
const client = getPostgresClient()
724+
725+
return await Effect.runPromise(
726+
Effect.gen(function* () {
727+
const repo = yield* ProjectRepository
728+
return yield* repo.findAll()
729+
}).pipe(
730+
Effect.provide(ProjectRepositoryLive),
731+
Effect.provide(SqlClientLive(client, organizationId))
732+
)
733+
)
618734
})
619735

620736
// Mutation (POST) with Zod validation
621737
export const createProject = createServerFn({ method: "POST" })
622-
.inputValidator(z.object({ name: z.string().min(1) }))
623-
.handler(async ({ data }) => { ... })
738+
.inputValidator(createProjectSchema)
739+
.handler(async ({ data }) => {
740+
const { userId, organizationId } = await requireSession()
741+
const client = getPostgresClient()
742+
743+
return await Effect.runPromise(
744+
createProjectUseCase({...}).pipe(
745+
Effect.provide(ProjectRepositoryLive),
746+
Effect.provide(SqlClientLive(client, organizationId))
747+
)
748+
)
749+
})
624750
```
625751

626752
Server functions live in `apps/web/src/domains/*/functions.ts`.

apps/api/src/middleware/auth.ts

Lines changed: 42 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
1+
import { ApiKeyRepository } from "@domain/api-keys"
12
import { OrganizationId, UnauthorizedError, UserId } from "@domain/shared"
2-
import { type PostgresDb, createApiKeyPostgresRepository } from "@platform/db-postgres"
3+
import { ApiKeyRepositoryLive, SqlClientLive } from "@platform/db-postgres"
4+
import type { PostgresClient } from "@platform/db-postgres"
35
import { hashToken } from "@repo/utils"
46
import { Effect, Option } from "effect"
57
import type { Context, MiddlewareHandler, Next } from "hono"
@@ -94,51 +96,53 @@ const validateApiKey = (
9496
c: Context,
9597
token: string,
9698
options?: AuthMiddlewareOptions,
97-
): Effect.Effect<{ organizationId: string; keyId: string } | null, never> => {
99+
): Promise<{ organizationId: string; keyId: string } | null> => {
98100
const redis = c.get("redis")
99-
const adminDb = options?.adminDb ?? getAdminPostgresClient().db
100-
const unscopedApiKeyRepository = createApiKeyPostgresRepository(adminDb)
101-
const touchBuffer = createTouchBuffer(adminDb)
101+
const adminClient = options?.adminClient ?? getAdminPostgresClient()
102+
const touchBuffer = createTouchBuffer(adminClient)
102103

103-
return Effect.gen(function* () {
104-
const startTime = Date.now()
105-
const tokenHash = yield* hashToken(token)
104+
return Effect.runPromise(
105+
Effect.gen(function* () {
106+
const startTime = Date.now()
107+
const tokenHash = yield* hashToken(token)
106108

107-
// Try cache first for consistent lookup time (keyed by hash)
108-
const cached = yield* getCachedApiKey(redis, tokenHash)
109+
// Try cache first for consistent lookup time (keyed by hash)
110+
const cached = yield* getCachedApiKey(redis, tokenHash)
109111

110-
if (cached !== undefined) {
111-
// Cache hit - enforce minimum time and return
112-
yield* enforceMinimumTime(startTime, MIN_VALIDATION_TIME_MS)
113-
return cached
114-
}
112+
if (cached !== undefined) {
113+
// Cache hit - enforce minimum time and return
114+
yield* enforceMinimumTime(startTime, MIN_VALIDATION_TIME_MS)
115+
return cached
116+
}
115117

116-
const apiKeyOption = yield* Effect.option(unscopedApiKeyRepository.findByTokenHash(tokenHash))
118+
const apiKeyRepository = yield* ApiKeyRepository
119+
const apiKeyOption = yield* Effect.option(apiKeyRepository.findByTokenHash(tokenHash))
117120

118-
if (Option.isNone(apiKeyOption)) {
119-
yield* cacheApiKeyResult(redis, tokenHash, null, INVALID_KEY_TTL_SECONDS)
120-
yield* enforceMinimumTime(startTime, MIN_VALIDATION_TIME_MS)
121-
return null
122-
}
121+
if (Option.isNone(apiKeyOption)) {
122+
yield* cacheApiKeyResult(redis, tokenHash, null, INVALID_KEY_TTL_SECONDS)
123+
yield* enforceMinimumTime(startTime, MIN_VALIDATION_TIME_MS)
124+
return null
125+
}
123126

124-
const apiKey = apiKeyOption.value
127+
const apiKey = apiKeyOption.value
125128

126-
const result = {
127-
organizationId: apiKey.organizationId,
128-
keyId: apiKey.id,
129-
}
129+
const result = {
130+
organizationId: apiKey.organizationId,
131+
keyId: apiKey.id,
132+
}
130133

131-
// Cache successful validation for 5 minutes (keyed by hash)
132-
yield* cacheApiKeyResult(redis, tokenHash, result, VALID_KEY_TTL_SECONDS)
134+
// Cache successful validation for 5 minutes (keyed by hash)
135+
yield* cacheApiKeyResult(redis, tokenHash, result, VALID_KEY_TTL_SECONDS)
133136

134-
// Use TouchBuffer for batched updates instead of fire-and-forget
135-
// This reduces database writes by 90%+ by batching updates
136-
touchBuffer.touch(apiKey.id)
137+
// Use TouchBuffer for batched updates instead of fire-and-forget
138+
// This reduces database writes by 90%+ by batching updates
139+
touchBuffer.touch(apiKey.id)
137140

138-
// Enforce minimum time before returning
139-
yield* enforceMinimumTime(startTime, MIN_VALIDATION_TIME_MS)
140-
return result
141-
}).pipe(Effect.orDie)
141+
// Enforce minimum time before returning
142+
yield* enforceMinimumTime(startTime, MIN_VALIDATION_TIME_MS)
143+
return result
144+
}).pipe(Effect.provide(ApiKeyRepositoryLive), Effect.provide(SqlClientLive(adminClient)), Effect.orDie),
145+
)
142146
}
143147

144148
/**
@@ -174,7 +178,7 @@ const authenticateWithApiKey = (
174178
options?: AuthMiddlewareOptions,
175179
): Effect.Effect<AuthContext | null, never> => {
176180
return Effect.gen(function* () {
177-
const result = yield* validateApiKey(c, token, options)
181+
const result = yield* Effect.promise(() => validateApiKey(c, token, options))
178182

179183
if (result) {
180184
const authContext: AuthContext = {
@@ -191,7 +195,7 @@ const authenticateWithApiKey = (
191195
}
192196

193197
/**
194-
* Authenticate via API key from the Authorization: Bearer header.
198+
* Main authentication effect that validates API key from Authorization header.
195199
*/
196200
const authenticate = (c: Context, options?: AuthMiddlewareOptions): Effect.Effect<AuthContext, UnauthorizedError> => {
197201
return Effect.gen(function* () {
@@ -219,7 +223,7 @@ const authenticate = (c: Context, options?: AuthMiddlewareOptions): Effect.Effec
219223
* Public routes should be excluded from this middleware.
220224
*/
221225
interface AuthMiddlewareOptions {
222-
readonly adminDb?: PostgresDb
226+
readonly adminClient?: PostgresClient
223227
}
224228

225229
export const createAuthMiddleware = (options?: AuthMiddlewareOptions): MiddlewareHandler => {

apps/api/src/middleware/organization-context.ts

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
1+
import { OrganizationRepository } from "@domain/organizations"
12
import { OrganizationId, PermissionError } from "@domain/shared"
2-
import { createOrganizationPostgresRepository } from "@platform/db-postgres"
3+
import { OrganizationRepositoryLive, SqlClientLive } from "@platform/db-postgres"
34
import { BadRequestError } from "@repo/utils"
45
import { Effect } from "effect"
56
import type { Context, MiddlewareHandler, Next } from "hono"
@@ -26,8 +27,16 @@ export const createOrganizationContextMiddleware = (): MiddlewareHandler => {
2627
})
2728
}
2829

29-
const organizationRepository = createOrganizationPostgresRepository(c.var.db)
30-
const organization = await Effect.runPromise(organizationRepository.findById(OrganizationId(organizationIdParam)))
30+
const organizationId = OrganizationId(organizationIdParam)
31+
const organization = await Effect.runPromise(
32+
Effect.gen(function* () {
33+
const repo = yield* OrganizationRepository
34+
return yield* repo.findById(organizationId)
35+
}).pipe(
36+
Effect.provide(OrganizationRepositoryLive),
37+
Effect.provide(SqlClientLive(c.var.postgresClient, organizationId)),
38+
),
39+
)
3140

3241
c.set("organization", organization)
3342
await next()

apps/api/src/middleware/touch-buffer.ts

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
1+
import { ApiKeyRepository } from "@domain/api-keys"
12
import { ApiKeyId } from "@domain/shared"
2-
import { type PostgresDb, createApiKeyPostgresRepository } from "@platform/db-postgres"
3+
import { ApiKeyRepositoryLive, SqlClientLive } from "@platform/db-postgres"
4+
import type { PostgresClient } from "@platform/db-postgres"
35
import { createLogger } from "@repo/observability"
46
import { Effect } from "effect"
57

@@ -40,10 +42,10 @@ class TouchBuffer {
4042
private flushInterval: NodeJS.Timeout | null = null
4143
private readonly intervalMs: number
4244
private readonly maxBufferSize: number
43-
private readonly db: PostgresDb
45+
private readonly client: PostgresClient
4446

45-
constructor(db: PostgresDb, config: TouchBufferConfig = {}) {
46-
this.db = db
47+
constructor(client: PostgresClient, config: TouchBufferConfig = {}) {
48+
this.client = client
4749
this.intervalMs = config.intervalMs ?? 30000
4850
this.maxBufferSize = config.maxBufferSize ?? 10000
4951

@@ -97,11 +99,17 @@ class TouchBuffer {
9799

98100
const startTime = Date.now()
99101

100-
// Use the admin db connection (bypasses RLS) for cross-org batch updates.
101-
const repo = createApiKeyPostgresRepository(this.db)
102+
// Use the live layer pattern for cross-org batch updates (bypasses RLS)
103+
const sqlClientLayer = SqlClientLive(this.client)
104+
const apiKeyRepoLayer = ApiKeyRepositoryLive
102105

103106
try {
104-
await Effect.runPromise(repo.touchBatch(keyIds))
107+
await Effect.runPromise(
108+
Effect.gen(function* () {
109+
const repo = yield* ApiKeyRepository
110+
return yield* repo.touchBatch(keyIds)
111+
}).pipe(Effect.provide(apiKeyRepoLayer), Effect.provide(sqlClientLayer)),
112+
)
105113

106114
const duration = Date.now() - startTime
107115
logger.info(`Flushed ${keyIds.length} touch updates in ${duration}ms`)
@@ -169,9 +177,9 @@ class TouchBuffer {
169177
*/
170178
let touchBufferInstance: TouchBuffer | null = null
171179

172-
export const createTouchBuffer = (db: PostgresDb, config?: TouchBufferConfig): TouchBuffer => {
180+
export const createTouchBuffer = (client: PostgresClient, config?: TouchBufferConfig): TouchBuffer => {
173181
if (!touchBufferInstance) {
174-
touchBufferInstance = new TouchBuffer(db, config)
182+
touchBufferInstance = new TouchBuffer(client, config)
175183
}
176184
return touchBufferInstance
177185
}

0 commit comments

Comments
 (0)