Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 131 additions & 5 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,109 @@ Base config: `tsconfig.base.json`
- Weaviate adapter stack lives in `packages/platform/db-weaviate`
- Domain models are independent from table/row shapes
- Mapping from DB rows to domain objects belongs in platform adapters
- **Apps use pool-based connections**: Use `createPostgresPool()` in `apps/*/clients.ts` for direct pool access
- **Apps use SqlClient for all DB access**: Boundaries provide `SqlClientLive` layer with organization context for RLS enforcement

### SqlClient and Row-Level Security (RLS)

All Postgres access flows through `SqlClient`—a domain-level service that abstracts database operations and enforces organization scoping via RLS.

**Architecture:**
- **Domain Layer** (`@domain/shared`): `SqlClient` interface with `transaction()` and `query()` methods
- **Platform Layer** (`@platform/db-postgres`): `SqlClientLive` implementation with automatic RLS context setting
- **App Layer** (`apps/*`): Boundaries provide `SqlClientLive` with the request's organization context

**Key behaviors:**
- Every transaction automatically sets `app.current_organization_id` session variable
- RLS policies filter all queries by this organization ID at the database level
- Nested transactions share the same connection (pass-through proxy—no nested transaction overhead)
- Domain errors propagate through Effect error channel; database errors become `RepositoryError`

**Usage in boundaries (apps):**

```typescript
// apps/api/src/routes/projects.ts
import { SqlClientLive } from "@platform/db-postgres"
import { ProjectRepositoryLive } from "@platform/db-postgres"

app.openapi(createProjectRoute, async (c) => {
const project = await Effect.runPromise(
createProjectUseCase(input).pipe(
Effect.provide(ProjectRepositoryLive),
Effect.provide(SqlClientLive(c.var.postgresClient, c.var.organization.id)),
),
)
return c.json(toProjectResponse(project), 201)
})
```

```typescript
// apps/web/src/domains/projects/projects.functions.ts
import { getPostgresClient } from "../../server/clients.ts"

export const createProject = createServerFn({ method: "POST" })
.handler(async ({ data }) => {
const { organizationId } = await requireSession()
const client = getPostgresClient()

const project = await Effect.runPromise(
createProjectUseCase({...}).pipe(
Effect.provide(ProjectRepositoryLive),
Effect.provide(SqlClientLive(client, organizationId)),
)
)
return toRecord(project)
})
```

**Usage in use-cases (multi-operation transactions):**

```typescript
// packages/domain/auth/src/use-cases/complete-auth-intent.ts
export const completeAuthIntentUseCase = (input) =>
Effect.gen(function* () {
const sqlClient = yield* SqlClient

// Wraps multi-step operation in single transaction with RLS
yield* sqlClient.transaction(handleIntentByType(intent, input.session))
})

const handleSignup = (intent, session) =>
Effect.gen(function* () {
const users = yield* UserRepository
const memberships = yield* MembershipRepository

// All operations share the same transaction + RLS context
const organization = yield* createOrganizationUseCase({...})
yield* memberships.save(createMembership({...}))
yield* users.setNameIfMissing({...})
})
```

**Usage in repositories (single operations):**

```typescript
// packages/platform/db-postgres/src/repositories/project-repository.ts
export const ProjectRepositoryLive = Layer.effect(
ProjectRepository,
Effect.gen(function* () {
const sqlClient = (yield* SqlClient) as SqlClientShape<Operator>

return {
findById: (id) =>
sqlClient
.query((db) => db.select().from(projects).where(eq(projects.id, id)))
.pipe(Effect.flatMap(...)),

save: (project) =>
Effect.gen(function* () {
yield* sqlClient.query((db) =>
db.insert(projects).values(row).onConflictDoUpdate({...})
)
}),
}
})
)
```

### Postgres Management

Expand Down Expand Up @@ -610,17 +712,41 @@ The web app uses a **server-centric, query-driven** architecture built on the Ta
**Server Functions** — All data fetching and mutations use `createServerFn` from `@tanstack/react-start`:

```typescript
import { Effect } from "effect"
import { ProjectRepository, createProjectUseCase } from "@domain/projects"
import { ProjectRepositoryLive, SqlClientLive } from "@platform/db-postgres"
import { getPostgresClient } from "../../server/clients.ts"

// Query (GET)
export const listProjects = createServerFn({ method: "GET" }).handler(async () => {
const { organizationId } = await requireSession()
const { db } = getPostgresClient()
// compose domain use-cases...
const client = getPostgresClient()

return await Effect.runPromise(
Effect.gen(function* () {
const repo = yield* ProjectRepository
return yield* repo.findAll()
}).pipe(
Effect.provide(ProjectRepositoryLive),
Effect.provide(SqlClientLive(client, organizationId))
)
)
})

// Mutation (POST) with Zod validation
export const createProject = createServerFn({ method: "POST" })
.inputValidator(z.object({ name: z.string().min(1) }))
.handler(async ({ data }) => { ... })
.inputValidator(createProjectSchema)
.handler(async ({ data }) => {
const { userId, organizationId } = await requireSession()
const client = getPostgresClient()

return await Effect.runPromise(
createProjectUseCase({...}).pipe(
Effect.provide(ProjectRepositoryLive),
Effect.provide(SqlClientLive(client, organizationId))
)
)
})
```

Server functions live in `apps/web/src/domains/*/functions.ts`.
Expand Down
18 changes: 10 additions & 8 deletions apps/api/src/middleware/auth.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import { ApiKeyRepository } from "@domain/api-keys"
import { OrganizationId, UnauthorizedError, UserId } from "@domain/shared"
import { type PostgresDb, createApiKeyPostgresRepository } from "@platform/db-postgres"
import { ApiKeyRepositoryLive, SqlClientLive } from "@platform/db-postgres"
import type { PostgresClient } from "@platform/db-postgres"
import { hashToken } from "@repo/utils"
import { Effect, Option } from "effect"
import type { Context, MiddlewareHandler, Next } from "hono"
Expand Down Expand Up @@ -96,9 +98,8 @@ const validateApiKey = (
options?: AuthMiddlewareOptions,
): Effect.Effect<{ organizationId: string; keyId: string } | null, never> => {
const redis = c.get("redis")
const adminDb = options?.adminDb ?? getAdminPostgresClient().db
const unscopedApiKeyRepository = createApiKeyPostgresRepository(adminDb)
const touchBuffer = createTouchBuffer(adminDb)
const adminClient = options?.adminClient ?? getAdminPostgresClient()
const touchBuffer = createTouchBuffer(adminClient)

return Effect.gen(function* () {
const startTime = Date.now()
Expand All @@ -113,7 +114,8 @@ const validateApiKey = (
return cached
}

const apiKeyOption = yield* Effect.option(unscopedApiKeyRepository.findByTokenHash(tokenHash))
const apiKeyRepository = yield* ApiKeyRepository
const apiKeyOption = yield* Effect.option(apiKeyRepository.findByTokenHash(tokenHash))

if (Option.isNone(apiKeyOption)) {
yield* cacheApiKeyResult(redis, tokenHash, null, INVALID_KEY_TTL_SECONDS)
Expand All @@ -138,7 +140,7 @@ const validateApiKey = (
// Enforce minimum time before returning
yield* enforceMinimumTime(startTime, MIN_VALIDATION_TIME_MS)
return result
}).pipe(Effect.orDie)
}).pipe(Effect.provide(ApiKeyRepositoryLive), Effect.provide(SqlClientLive(adminClient)), Effect.orDie)
}

/**
Expand Down Expand Up @@ -191,7 +193,7 @@ const authenticateWithApiKey = (
}

/**
* Authenticate via API key from the Authorization: Bearer header.
* Main authentication effect that validates API key from Authorization header.
*/
const authenticate = (c: Context, options?: AuthMiddlewareOptions): Effect.Effect<AuthContext, UnauthorizedError> => {
return Effect.gen(function* () {
Expand Down Expand Up @@ -219,7 +221,7 @@ const authenticate = (c: Context, options?: AuthMiddlewareOptions): Effect.Effec
* Public routes should be excluded from this middleware.
*/
interface AuthMiddlewareOptions {
readonly adminDb?: PostgresDb
readonly adminClient?: PostgresClient
}

export const createAuthMiddleware = (options?: AuthMiddlewareOptions): MiddlewareHandler => {
Expand Down
15 changes: 12 additions & 3 deletions apps/api/src/middleware/organization-context.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { OrganizationRepository } from "@domain/organizations"
import { OrganizationId, PermissionError } from "@domain/shared"
import { createOrganizationPostgresRepository } from "@platform/db-postgres"
import { OrganizationRepositoryLive, SqlClientLive } from "@platform/db-postgres"
import { BadRequestError } from "@repo/utils"
import { Effect } from "effect"
import type { Context, MiddlewareHandler, Next } from "hono"
Expand All @@ -26,8 +27,16 @@ export const createOrganizationContextMiddleware = (): MiddlewareHandler => {
})
}

const organizationRepository = createOrganizationPostgresRepository(c.var.db)
const organization = await Effect.runPromise(organizationRepository.findById(OrganizationId(organizationIdParam)))
const organizationId = OrganizationId(organizationIdParam)
const organization = await Effect.runPromise(
Effect.gen(function* () {
const repo = yield* OrganizationRepository
return yield* repo.findById(organizationId)
}).pipe(
Effect.provide(OrganizationRepositoryLive),
Effect.provide(SqlClientLive(c.var.postgresClient, organizationId)),
),
)

c.set("organization", organization)
await next()
Expand Down
26 changes: 17 additions & 9 deletions apps/api/src/middleware/touch-buffer.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import { ApiKeyRepository } from "@domain/api-keys"
import { ApiKeyId } from "@domain/shared"
import { type PostgresDb, createApiKeyPostgresRepository } from "@platform/db-postgres"
import { ApiKeyRepositoryLive, SqlClientLive } from "@platform/db-postgres"
import type { PostgresClient } from "@platform/db-postgres"
import { createLogger } from "@repo/observability"
import { Effect } from "effect"

Expand Down Expand Up @@ -40,10 +42,10 @@ class TouchBuffer {
private flushInterval: NodeJS.Timeout | null = null
private readonly intervalMs: number
private readonly maxBufferSize: number
private readonly db: PostgresDb
private readonly client: PostgresClient

constructor(db: PostgresDb, config: TouchBufferConfig = {}) {
this.db = db
constructor(client: PostgresClient, config: TouchBufferConfig = {}) {
this.client = client
this.intervalMs = config.intervalMs ?? 30000
this.maxBufferSize = config.maxBufferSize ?? 10000

Expand Down Expand Up @@ -97,11 +99,17 @@ class TouchBuffer {

const startTime = Date.now()

// Use the admin db connection (bypasses RLS) for cross-org batch updates.
const repo = createApiKeyPostgresRepository(this.db)
// Use the live layer pattern for cross-org batch updates (bypasses RLS)
const sqlClientLayer = SqlClientLive(this.client)
const apiKeyRepoLayer = ApiKeyRepositoryLive

try {
await Effect.runPromise(repo.touchBatch(keyIds))
await Effect.runPromise(
Effect.gen(function* () {
const repo = yield* ApiKeyRepository
return yield* repo.touchBatch(keyIds)
}).pipe(Effect.provide(apiKeyRepoLayer), Effect.provide(sqlClientLayer)),
)

const duration = Date.now() - startTime
logger.info(`Flushed ${keyIds.length} touch updates in ${duration}ms`)
Expand Down Expand Up @@ -169,9 +177,9 @@ class TouchBuffer {
*/
let touchBufferInstance: TouchBuffer | null = null

export const createTouchBuffer = (db: PostgresDb, config?: TouchBufferConfig): TouchBuffer => {
export const createTouchBuffer = (client: PostgresClient, config?: TouchBufferConfig): TouchBuffer => {
if (!touchBufferInstance) {
touchBufferInstance = new TouchBuffer(db, config)
touchBufferInstance = new TouchBuffer(client, config)
}
return touchBufferInstance
}
Expand Down
28 changes: 14 additions & 14 deletions apps/api/src/routes/api-keys.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,16 @@ interface ApiKeysRoutesTestContext extends TestContext {
database: InMemoryPostgres
}

const createApiKeyRecord = async (db: InMemoryPostgres["db"], organizationId: string, name: string) => {
const createApiKeyRecord = async (database: InMemoryPostgres, organizationId: string, name: string) => {
const token = crypto.randomUUID()
const id = generateId()
const tokenHash = await Effect.runPromise(hashToken(token))
const encryptedToken = await Effect.runPromise(encrypt(token, TEST_ENCRYPTION_KEY))

await db.insert(postgresSchema.apiKeys).values({
await database.db.insert(postgresSchema.apiKeys).values({
id,
organizationId,
token: await Effect.runPromise(encrypt(token, TEST_ENCRYPTION_KEY)),
token: encryptedToken,
tokenHash,
name,
})
Expand Down Expand Up @@ -69,11 +70,11 @@ describe("API Keys Routes Integration", () => {
app,
database,
}) => {
const tenantA = await createTenantSetup(database.db)
const tenantB = await createTenantSetup(database.db)
const tenantA = await createTenantSetup(database)
const tenantB = await createTenantSetup(database)

const tenantAKey = await createApiKeyRecord(database.db, tenantA.organizationId, "tenant-a-key")
const tenantBKey = await createApiKeyRecord(database.db, tenantB.organizationId, "tenant-b-key")
const tenantAKey = await createApiKeyRecord(database, tenantA.organizationId, "tenant-a-key")
const tenantBKey = await createApiKeyRecord(database, tenantB.organizationId, "tenant-b-key")

const response = await app.fetch(
new Request(`http://localhost/v1/organizations/${tenantA.organizationId}/api-keys`, {
Expand All @@ -95,9 +96,9 @@ describe("API Keys Routes Integration", () => {
app,
database,
}) => {
const tenantA = await createTenantSetup(database.db)
const tenantB = await createTenantSetup(database.db)
const tenantBKey = await createApiKeyRecord(database.db, tenantB.organizationId, "tenant-b-key")
const tenantA = await createTenantSetup(database)
const tenantB = await createTenantSetup(database)
const tenantBKey = await createApiKeyRecord(database, tenantB.organizationId, "tenant-b-key")

const response = await app.fetch(
new Request(`http://localhost/v1/organizations/${tenantA.organizationId}/api-keys/${tenantBKey.id}`, {
Expand All @@ -108,13 +109,12 @@ describe("API Keys Routes Integration", () => {

expect(response.status).toBe(404)

const [stillActive] = await database.db
const rows = await database.db
.select({ deletedAt: postgresSchema.apiKeys.deletedAt })
.from(postgresSchema.apiKeys)
.where(eq(postgresSchema.apiKeys.id, tenantBKey.id))
.limit(1)

expect(stillActive).toBeDefined()
expect(stillActive?.deletedAt).toBeNull()
expect(rows.length).toBe(1)
expect(rows[0].deletedAt).toBeNull()
})
})
Loading
Loading