Skip to content

Commit bad4f64

Browse files
geclosclaude
andcommitted
feat(testkit): add PGlite in-memory Postgres adapter and concurrent tx detection
- Add `createInMemoryPostgres` / `closeInMemoryPostgres` to testkit using PGlite so tests can run without a real Postgres instance - Export from testkit index; add `@electric-sql/pglite` and `drizzle-orm` as deps to testkit - Add `vitest.config.ts` to `apps/web` wiring up the vitest preset - Harden `SqlClientLive`: track `txOpening` flag so concurrent `transaction()` calls on the same instance fail fast via `Effect.die` instead of silently corrupting connections; add tests for both the error and the sequential-OK path Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent d75ab8a commit bad4f64

File tree

8 files changed

+170
-3
lines changed

8 files changed

+170
-3
lines changed

apps/web/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
"zod": "catalog:"
4848
},
4949
"devDependencies": {
50+
"@repo/vitest-config": "workspace:*",
5051
"@tailwindcss/postcss": "^4.0.0",
5152
"@tailwindcss/vite": "^4.0.0",
5253
"@types/papaparse": "^5.5.2",

apps/web/vitest.config.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
import sharedConfig from "@repo/vitest-config"
2+
import { defineConfig, mergeConfig } from "vitest/config"
3+
4+
export default mergeConfig(
5+
sharedConfig,
6+
defineConfig({
7+
test: {
8+
globals: true,
9+
environment: "node",
10+
},
11+
}),
12+
)

packages/platform/db-postgres/src/sql-client.test.ts

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,46 @@ describe("SqlClientLive", () => {
252252
})
253253
})
254254

255+
describe("concurrent transaction detection", () => {
256+
it("dies when two transaction() calls are started concurrently on the same SqlClient", async () => {
257+
const client = createMockPostgresClient(state)
258+
const orgId = OrganizationId("org-concurrent")
259+
260+
// Run two transaction() calls with concurrency: 2 on the same SqlClient.
261+
// The second fiber sees txOpening === true and should die.
262+
const layer = SqlClientLive(client, orgId)
263+
const effect = Effect.gen(function* () {
264+
const sqlClient = yield* SqlClient
265+
const shape = sqlClient as import("@domain/shared").SqlClientShape<Operator>
266+
return yield* Effect.all([shape.transaction(Effect.succeed("a")), shape.transaction(Effect.succeed("b"))], {
267+
concurrency: 2,
268+
})
269+
}).pipe(Effect.provide(layer))
270+
271+
await expect(Effect.runPromise(effect)).rejects.toThrow("concurrent transaction() calls detected")
272+
})
273+
274+
it("allows sequential transaction() calls on the same SqlClient after one completes", async () => {
275+
const client = createMockPostgresClient(state)
276+
const orgId = OrganizationId("org-sequential")
277+
278+
// Run two transactions sequentially — this is fine; txOpening is cleared
279+
// after each transaction completes.
280+
const layer = SqlClientLive(client, orgId)
281+
const effect = Effect.gen(function* () {
282+
const sqlClient = yield* SqlClient
283+
const shape = sqlClient as import("@domain/shared").SqlClientShape<Operator>
284+
const a = yield* shape.transaction(Effect.succeed(1))
285+
const b = yield* shape.transaction(Effect.succeed(2))
286+
return a + b
287+
}).pipe(Effect.provide(layer))
288+
289+
const result = await Effect.runPromise(effect)
290+
expect(result).toBe(3)
291+
expect(state.transactionCallCount).toBe(2)
292+
})
293+
})
294+
255295
describe("failure and rollback", () => {
256296
it("propagates failure from inner effect and does not commit", async () => {
257297
const client = createMockPostgresClient(state)

packages/platform/db-postgres/src/sql-client.ts

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import type { Operator, PostgresClient } from "./client.ts"
2121
* concurrency > 1). `activeTx` has no fiber identity, so concurrent
2222
* transactions will overwrite each other's operator and corrupt both
2323
* connections. Use separate `SqlClientLive` layer instances instead.
24+
* A concurrent call is detected at runtime and killed with Effect.die.
2425
*/
2526
const setRlsContext = (tx: Operator, organizationId: OrganizationId) => {
2627
if (organizationId === "system") return Promise.resolve()
@@ -32,15 +33,24 @@ export const SqlClientLive = (client: PostgresClient, organizationId: Organizati
3233
SqlClient,
3334
Effect.gen(function* () {
3435
let activeTx: Operator | null = null
36+
let txOpening = false
3537

3638
return {
3739
organizationId,
3840

3941
transaction: <A, E, R>(effect: Effect.Effect<A, E, R>) => {
40-
if (activeTx) {
41-
return effect
42+
if (activeTx) return effect
43+
if (txOpening) {
44+
return Effect.die(
45+
new Error(
46+
"SqlClient: concurrent transaction() calls detected on the same instance. " +
47+
"Use separate SqlClientLive layer instances for parallel transactions.",
48+
),
49+
)
4250
}
4351

52+
txOpening = true
53+
4454
return Effect.gen(function* () {
4555
let resolveTxReady!: (tx: Operator) => void
4656
let resolveEffectDone!: (result: { ok: true; value: A } | { ok: false; error: unknown }) => void
@@ -62,10 +72,17 @@ export const SqlClientLive = (client: PostgresClient, organizationId: Organizati
6272

6373
const tx = yield* Effect.tryPromise({
6474
try: () => txReady,
65-
catch: (e) => toRepositoryError(e, "transaction"),
75+
catch: (e) => {
76+
txOpening = false
77+
return toRepositoryError(e, "transaction")
78+
},
6679
})
6780

81+
// activeTx is set — nested transaction() calls now use pass-through.
82+
// Release txOpening so that sequential (non-concurrent) calls after
83+
// this transaction completes can open their own transactions.
6884
activeTx = tx
85+
txOpening = false
6986
const exit = yield* Effect.exit(effect)
7087
activeTx = null
7188

packages/platform/testkit/package.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,11 @@
1515
},
1616
"dependencies": {
1717
"@domain/shared": "workspace:*",
18+
"@electric-sql/pglite": "0.3.15",
1819
"@platform/db-postgres": "workspace:*",
1920
"@platform/cache-redis": "workspace:*",
2021
"@repo/utils": "workspace:*",
22+
"drizzle-orm": "catalog:",
2123
"effect": "catalog:",
2224
"hono": "catalog:",
2325
"ioredis": "catalog:"
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
import { fileURLToPath } from "node:url"
2+
import { PGlite } from "@electric-sql/pglite"
3+
import { type PostgresClient, type PostgresDb, postgresSchema } from "@platform/db-postgres"
4+
import { sql } from "drizzle-orm"
5+
import { drizzle } from "drizzle-orm/pglite"
6+
import { migrate } from "drizzle-orm/pglite/migrator"
7+
8+
const MIGRATIONS_FOLDER = fileURLToPath(new URL("../../../db-postgres/drizzle", import.meta.url))
9+
10+
const unsafeCast = <T>(value: unknown): T => value as T
11+
12+
export interface InMemoryPostgres {
13+
readonly client: PGlite
14+
readonly db: ReturnType<typeof drizzle>
15+
readonly postgresDb: PostgresDb
16+
/** Admin client — runs as the table owner (superuser). RLS is bypassed. */
17+
readonly adminPostgresClient: PostgresClient
18+
/**
19+
* App-role client whose transactions run under the `latitude_app` role so
20+
* that Postgres RLS policies are enforced. The role switch is injected via
21+
* drizzle's `tx.execute()` (transaction-scoped) before each query batch.
22+
*/
23+
readonly appPostgresClient: PostgresClient
24+
}
25+
26+
const createPostgresClientFromDb = (postgresDb: PostgresDb): PostgresClient => {
27+
const transaction = <T>(fn: (txDb: PostgresDb) => Promise<T>): Promise<T> =>
28+
(postgresDb as unknown as { transaction: (fn: (tx: unknown) => Promise<T>) => Promise<T> }).transaction(
29+
async (tx) => fn(tx as PostgresDb),
30+
)
31+
return unsafeCast<PostgresClient>({ db: postgresDb, transaction })
32+
}
33+
34+
/**
35+
* Create an app-role client that switches to `latitude_app` for each
36+
* transaction so that RLS policies are enforced.
37+
*
38+
* The role switch uses `SET LOCAL ROLE` executed through drizzle's transaction
39+
* `tx` handle (not via a raw `pglite.exec` call) to avoid interleaving raw
40+
* PGlite I/O with drizzle's serialized connection queue, which would deadlock
41+
* on the single-connection PGlite instance.
42+
*/
43+
const createAppRoleClient = (postgresDb: PostgresDb): PostgresClient => {
44+
const transaction = <T>(fn: (txDb: PostgresDb) => Promise<T>): Promise<T> =>
45+
(postgresDb as unknown as { transaction: (fn: (tx: unknown) => Promise<T>) => Promise<T> }).transaction(
46+
async (tx) => {
47+
// Switch to the runtime role so RLS policies apply.
48+
// SET LOCAL is transaction-scoped and reverts automatically on
49+
// commit/rollback — no cleanup needed.
50+
await (tx as PostgresDb).execute(sql`SET LOCAL ROLE latitude_app`)
51+
return fn(tx as PostgresDb)
52+
},
53+
)
54+
return unsafeCast<PostgresClient>({ db: postgresDb, transaction })
55+
}
56+
57+
export const createInMemoryPostgres = async (): Promise<InMemoryPostgres> => {
58+
const client = new PGlite()
59+
60+
// Create the runtime role before migrations so the grant migration finds it.
61+
await client.exec("CREATE ROLE latitude_app NOLOGIN")
62+
63+
const db = drizzle({ client, schema: postgresSchema })
64+
await migrate(db, { migrationsFolder: MIGRATIONS_FOLDER })
65+
66+
const postgresDb = unsafeCast<PostgresDb>(db)
67+
68+
return {
69+
client,
70+
db,
71+
postgresDb,
72+
adminPostgresClient: createPostgresClientFromDb(postgresDb),
73+
appPostgresClient: createAppRoleClient(postgresDb),
74+
}
75+
}
76+
77+
export const closeInMemoryPostgres = async (database: InMemoryPostgres): Promise<void> => {
78+
await database.client.close()
79+
}

packages/platform/testkit/src/index.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,10 @@
1+
// In-memory PGlite exports
2+
export {
3+
closeInMemoryPostgres,
4+
createInMemoryPostgres,
5+
type InMemoryPostgres,
6+
} from "./database/pglite.ts"
7+
18
// Database exports
29
export {
310
closeTestDatabase,

pnpm-lock.yaml

Lines changed: 9 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)