Skip to content

Commit cc7ba2b

Browse files
committed
Add shared SQL eventlog storage integration tests
1 parent 325dffd commit cc7ba2b

File tree

6 files changed

+345
-134
lines changed

6 files changed

+345
-134
lines changed

.specs/sql-eventlog-server-unencrypted.md

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -319,7 +319,25 @@ Scope:
319319

320320
Why this is one task: these behaviors depend on one another and should land together so the service semantics remain internally consistent and type-safe.
321321

322-
### [ ] Task 3 — Add reusable integration coverage and driver runners
322+
### [x] Task 3 — Add reusable integration coverage and driver runners
323+
324+
Status:
325+
326+
- completed in this change
327+
- added `packages/effect/test/unstable/eventlog/SqlEventLogServerUnencryptedStorageTest.ts` as the shared SQL integration suite and switched sqlite-node to the same runner style used by the persistence suites
328+
- important discovery: to verify remote-id persistence across multiple storage instances against the same database, the shared suite needs a shared `SqlClient` layer rather than a prebuilt `Storage` layer, so each test can instantiate multiple storages over the same backing database handle
329+
- each test now uses unique SQL table names so the suite can safely share one driver client layer across all test cases without cross-test data leakage
330+
- added postgres and mysql2 runner files alongside the sqlite-node runner, all pointing at the shared suite
331+
- the suite covers remote-id reuse, per-store sequencing, in-call and cross-call idempotency, strict `entries(..., startSequence)` semantics, backlog + live `changes`, a startup-race stress case, transactional commit / rollback behavior with sequence reuse, store isolation, and allowing the same `EntryId` in different stores
332+
333+
Validation result after this task:
334+
335+
- `pnpm lint-fix`
336+
- `pnpm test packages/sql/sqlite-node/test/SqlEventLogServerUnencrypted.test.ts`
337+
- `pnpm test packages/sql/pg/test/SqlEventLogServerUnencrypted.test.ts`
338+
- `pnpm test packages/sql/mysql2/test/SqlEventLogServerUnencrypted.test.ts`
339+
- `pnpm check:tsgo`
340+
- `pnpm docgen`
323341

324342
Validation target after this task:
325343

@@ -338,7 +356,23 @@ Scope:
338356

339357
Why this is one task: the reusable suite and the driver runners need to land together so validations exercise real SQL backends immediately. Splitting them would either leave dead test code with no runner or runner files with missing shared coverage.
340358

341-
### [ ] Task 4 — Add release metadata and perform the final validation pass
359+
### [x] Task 4 — Add release metadata and perform the final validation pass
360+
361+
Status:
362+
363+
- completed in this change
364+
- reused the existing scaffold changeset at `.changeset/sql-eventlog-server-unencrypted-scaffold.md`; no additional changeset file was required for this follow-up task
365+
- important discovery from cross-driver validation: PostgreSQL and MySQL return `BIGINT` result columns as strings in this setup, so SQL eventlog storage decoding now accepts both `number` and `NumberFromString` for persisted `sequence` / `next_sequence` values
366+
- final validation pass completed cleanly across lint, the three SQL integration runners, typechecking, and doc generation
367+
368+
Validation result after this task:
369+
370+
- `pnpm lint-fix`
371+
- `pnpm test packages/sql/sqlite-node/test/SqlEventLogServerUnencrypted.test.ts`
372+
- `pnpm test packages/sql/pg/test/SqlEventLogServerUnencrypted.test.ts`
373+
- `pnpm test packages/sql/mysql2/test/SqlEventLogServerUnencrypted.test.ts`
374+
- `pnpm check:tsgo`
375+
- `pnpm docgen`
342376

343377
Validation target after this task:
344378

packages/effect/src/unstable/eventlog/SqlEventLogServerUnencrypted.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -510,15 +510,17 @@ const EntrySql = Schema.Struct({
510510

511511
type EntrySql = Schema.Schema.Type<typeof EntrySql>
512512

513+
const SqlNumber = Schema.Union([Schema.Number, Schema.NumberFromString])
514+
513515
const RemoteEntrySql = Schema.Struct({
514516
...EntrySql.fields,
515-
sequence: Schema.Number
517+
sequence: SqlNumber
516518
})
517519

518520
type RemoteEntrySql = Schema.Schema.Type<typeof RemoteEntrySql>
519521

520522
const StoreSequenceSql = Schema.Struct({
521-
next_sequence: Schema.Number
523+
next_sequence: SqlNumber
522524
})
523525

524526
const decodeRemoteEntryRows = Schema.decodeUnknownEffect(Schema.Array(RemoteEntrySql))
Lines changed: 283 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,283 @@
1+
import { assert, it } from "@effect/vitest"
2+
import { Effect, Exit, Fiber, Layer, Option, Queue } from "effect"
3+
import * as EventJournal from "effect/unstable/eventlog/EventJournal"
4+
import type * as EventLogServerUnencrypted from "effect/unstable/eventlog/EventLogServerUnencrypted"
5+
import * as SqlEventLogServerUnencrypted from "effect/unstable/eventlog/SqlEventLogServerUnencrypted"
6+
import { Reactivity } from "effect/unstable/reactivity"
7+
import type * as SqlClient from "effect/unstable/sql/SqlClient"
8+
9+
let nextNamespace = 0
10+
11+
const uniqueNamespace = (prefix: string) => `${prefix}_${++nextNamespace}`
12+
13+
const makeOptions = (prefix: string) => {
14+
const namespace = uniqueNamespace(prefix)
15+
return {
16+
entryTablePrefix: `effect_events_${namespace}`,
17+
remoteIdTable: `effect_remote_id_${namespace}`,
18+
insertBatchSize: 2
19+
}
20+
}
21+
22+
const makeStoreId = (prefix: string) => `${uniqueNamespace(prefix)}_store` as EventLogServerUnencrypted.StoreId
23+
24+
const makeEntry = (
25+
name: string,
26+
options: {
27+
readonly id?: EventJournal.EntryId | undefined
28+
readonly primaryKey?: string | undefined
29+
} = {}
30+
) =>
31+
new EventJournal.Entry({
32+
id: options.id ?? EventJournal.makeEntryIdUnsafe(),
33+
event: "UserNameSet",
34+
primaryKey: options.primaryKey ?? "user-1",
35+
payload: new TextEncoder().encode(name)
36+
}, { disableChecks: true })
37+
38+
const makeStorage = (options: {
39+
readonly entryTablePrefix?: string
40+
readonly remoteIdTable?: string
41+
readonly insertBatchSize?: number
42+
}) =>
43+
SqlEventLogServerUnencrypted.makeStorage(options).pipe(
44+
Effect.orDie
45+
)
46+
47+
export const suite = (name: string, layer: Layer.Layer<SqlClient.SqlClient, unknown>) =>
48+
it.layer(
49+
Layer.mergeAll(Reactivity.layer, layer),
50+
{ timeout: "30 seconds" }
51+
)(`SqlEventLogServerUnencrypted (${name})`, (it) => {
52+
it.effect("persists remote id across storage instances", () =>
53+
Effect.gen(function*() {
54+
const options = makeOptions("remote_id")
55+
const storageA = yield* makeStorage(options)
56+
const storageB = yield* makeStorage(options)
57+
58+
const idA = yield* storageA.getId
59+
const idB = yield* storageB.getId
60+
61+
assert.deepStrictEqual(idA, idB)
62+
}))
63+
64+
it.effect("keeps per-store sequence counters independent", () =>
65+
Effect.gen(function*() {
66+
const storage = yield* makeStorage(makeOptions("per_store_sequences"))
67+
const storeA = makeStoreId("sequence_a")
68+
const storeB = makeStoreId("sequence_b")
69+
const entryA1 = makeEntry("Ada")
70+
const entryA2 = makeEntry("Grace")
71+
const entryB1 = makeEntry("Margaret")
72+
73+
const firstA = yield* storage.write(storeA, [entryA1])
74+
const firstB = yield* storage.write(storeB, [entryB1])
75+
const secondA = yield* storage.write(storeA, [entryA2])
76+
77+
assert.deepStrictEqual(firstA.sequenceNumbers, [1])
78+
assert.deepStrictEqual(firstB.sequenceNumbers, [1])
79+
assert.deepStrictEqual(secondA.sequenceNumbers, [2])
80+
assert.deepStrictEqual((yield* storage.entries(storeA, 0)).map((entry) => entry.remoteSequence), [1, 2])
81+
assert.deepStrictEqual((yield* storage.entries(storeB, 0)).map((entry) => entry.remoteSequence), [1])
82+
}))
83+
84+
it.effect("deduplicates writes within one call and across repeated calls", () =>
85+
Effect.gen(function*() {
86+
const storage = yield* makeStorage(makeOptions("idempotent_writes"))
87+
const storeId = makeStoreId("idempotent")
88+
const sharedId = EventJournal.makeEntryIdUnsafe()
89+
const entryA = makeEntry("Ada", { id: sharedId })
90+
const entryB = makeEntry("Grace")
91+
92+
const first = yield* storage.write(storeId, [entryA, entryA, entryB, entryA])
93+
const second = yield* storage.write(storeId, [entryB, entryA, entryB])
94+
95+
assert.deepStrictEqual(first.sequenceNumbers, [1, 1, 2, 1])
96+
assert.deepStrictEqual(first.committed.map((entry) => entry.remoteSequence), [1, 2])
97+
assert.deepStrictEqual(second.sequenceNumbers, [2, 1, 2])
98+
assert.deepStrictEqual(second.committed, [])
99+
assert.deepStrictEqual((yield* storage.entries(storeId, 0)).map((entry) => entry.remoteSequence), [1, 2])
100+
}))
101+
102+
it.effect("uses strict > semantics for entries", () =>
103+
Effect.gen(function*() {
104+
const storage = yield* makeStorage(makeOptions("entries_strict_gt"))
105+
const storeId = makeStoreId("entries")
106+
const missingStoreId = makeStoreId("entries_missing")
107+
const entryA = makeEntry("Ada")
108+
const entryB = makeEntry("Grace")
109+
const entryC = makeEntry("Margaret")
110+
111+
yield* storage.write(storeId, [entryA, entryB, entryC])
112+
113+
assert.deepStrictEqual((yield* storage.entries(storeId, 0)).map((entry) => entry.remoteSequence), [1, 2, 3])
114+
assert.deepStrictEqual((yield* storage.entries(storeId, 1)).map((entry) => entry.remoteSequence), [2, 3])
115+
assert.deepStrictEqual((yield* storage.entries(storeId, 3)).map((entry) => entry.remoteSequence), [])
116+
assert.deepStrictEqual(yield* storage.entries(missingStoreId, 0), [])
117+
}))
118+
119+
it.effect("replays backlog and then streams live changes without startup duplication", () =>
120+
Effect.gen(function*() {
121+
const storage = yield* makeStorage(makeOptions("changes_backlog_then_live"))
122+
const storeId = makeStoreId("changes")
123+
const entryA = makeEntry("Ada")
124+
const entryB = makeEntry("Grace")
125+
const entryC = makeEntry("Margaret")
126+
127+
yield* storage.write(storeId, [entryA, entryB])
128+
129+
const changes = yield* storage.changes(storeId, 0)
130+
const replayed = yield* Queue.takeAll(changes)
131+
132+
assert.deepStrictEqual(replayed.map((entry) => entry.remoteSequence), [1, 2])
133+
assert.deepStrictEqual(replayed.map((entry) => entry.entry.idString), [entryA.idString, entryB.idString])
134+
135+
yield* storage.write(storeId, [entryC])
136+
137+
const next = yield* Queue.take(changes)
138+
assert.strictEqual(next.remoteSequence, 3)
139+
assert.strictEqual(next.entry.idString, entryC.idString)
140+
141+
yield* Effect.yieldNow
142+
assert.strictEqual(Option.isNone(yield* Queue.poll(changes)), true)
143+
}))
144+
145+
it.effect("handles the changes startup race without losing or duplicating rows", () =>
146+
Effect.gen(function*() {
147+
const storage = yield* makeStorage(makeOptions("changes_startup_race"))
148+
149+
for (let iteration = 0; iteration < 5; iteration++) {
150+
const storeId = makeStoreId(`startup_race_${iteration}`)
151+
const backlogEntry = makeEntry(`Ada_${iteration}`)
152+
const racedEntry = makeEntry(`Grace_${iteration}`)
153+
154+
yield* storage.write(storeId, [backlogEntry])
155+
156+
const changesFiber = yield* storage.changes(storeId, 0).pipe(Effect.forkScoped)
157+
yield* storage.write(storeId, [racedEntry])
158+
const changes = yield* Fiber.join(changesFiber)
159+
160+
const first = yield* Queue.take(changes)
161+
const second = yield* Queue.take(changes)
162+
163+
assert.deepStrictEqual(
164+
[first.remoteSequence, second.remoteSequence],
165+
[1, 2],
166+
`iteration ${iteration} should deliver exactly the backlog row and the raced row`
167+
)
168+
assert.deepStrictEqual(
169+
[first.entry.idString, second.entry.idString],
170+
[backlogEntry.idString, racedEntry.idString]
171+
)
172+
173+
yield* Effect.yieldNow
174+
assert.strictEqual(Option.isNone(yield* Queue.poll(changes)), true)
175+
}
176+
}))
177+
178+
it.effect("commits and rolls back transactions, reusing sequences after rollback", () =>
179+
Effect.gen(function*() {
180+
const storage = yield* makeStorage(makeOptions("transactions"))
181+
const storeId = makeStoreId("transactions")
182+
const changes = yield* storage.changes(storeId, 0)
183+
const committedEntry = makeEntry("Committed")
184+
185+
const committed = yield* storage.withTransaction(
186+
Effect.gen(function*() {
187+
const written = yield* storage.write(storeId, [committedEntry])
188+
189+
assert.deepStrictEqual(written.sequenceNumbers, [1])
190+
assert.deepStrictEqual(written.committed.map((entry) => entry.remoteSequence), [1])
191+
assert.strictEqual(Option.isNone(yield* Queue.poll(changes)), true)
192+
193+
return written
194+
})
195+
)
196+
197+
assert.deepStrictEqual(committed.sequenceNumbers, [1])
198+
const firstDelivered = yield* Queue.take(changes)
199+
assert.strictEqual(firstDelivered.remoteSequence, 1)
200+
assert.strictEqual(firstDelivered.entry.idString, committedEntry.idString)
201+
202+
const rolledBackEntry = makeEntry("RolledBack")
203+
const rolledBack = yield* storage.withTransaction(
204+
Effect.gen(function*() {
205+
const written = yield* storage.write(storeId, [rolledBackEntry])
206+
207+
assert.deepStrictEqual(written.sequenceNumbers, [2])
208+
assert.strictEqual(Option.isNone(yield* Queue.poll(changes)), true)
209+
210+
return yield* Effect.fail("boom")
211+
})
212+
).pipe(Effect.exit)
213+
214+
assert.strictEqual(Exit.isFailure(rolledBack), true)
215+
assert.deepStrictEqual((yield* storage.entries(storeId, 1)).map((entry) => entry.remoteSequence), [])
216+
assert.strictEqual(Option.isNone(yield* Queue.poll(changes)), true)
217+
218+
const afterRollbackEntry = makeEntry("AfterRollback")
219+
const afterRollback = yield* storage.write(storeId, [afterRollbackEntry])
220+
221+
assert.deepStrictEqual(afterRollback.sequenceNumbers, [2])
222+
assert.deepStrictEqual(afterRollback.committed.map((entry) => entry.remoteSequence), [2])
223+
224+
const secondDelivered = yield* Queue.take(changes)
225+
assert.strictEqual(secondDelivered.remoteSequence, 2)
226+
assert.strictEqual(secondDelivered.entry.idString, afterRollbackEntry.idString)
227+
}))
228+
229+
it.effect("isolates reads and streams between stores", () =>
230+
Effect.gen(function*() {
231+
const storage = yield* makeStorage(makeOptions("store_isolation"))
232+
const storeA = makeStoreId("isolation_a")
233+
const storeB = makeStoreId("isolation_b")
234+
const entryA1 = makeEntry("Ada")
235+
const entryB1 = makeEntry("Grace")
236+
const entryA2 = makeEntry("Margaret")
237+
const entryB2 = makeEntry("Linus")
238+
239+
yield* storage.write(storeA, [entryA1])
240+
yield* storage.write(storeB, [entryB1])
241+
242+
assert.deepStrictEqual((yield* storage.entries(storeA, 0)).map((entry) => entry.entry.idString), [
243+
entryA1.idString
244+
])
245+
assert.deepStrictEqual((yield* storage.entries(storeB, 0)).map((entry) => entry.entry.idString), [
246+
entryB1.idString
247+
])
248+
249+
const changesA = yield* storage.changes(storeA, 0)
250+
const backlogA = yield* Queue.takeAll(changesA)
251+
assert.deepStrictEqual(backlogA.map((entry) => entry.entry.idString), [entryA1.idString])
252+
253+
yield* storage.write(storeB, [entryB2])
254+
yield* Effect.yieldNow
255+
assert.strictEqual(Option.isNone(yield* Queue.poll(changesA)), true)
256+
257+
yield* storage.write(storeA, [entryA2])
258+
const nextA = yield* Queue.take(changesA)
259+
260+
assert.strictEqual(nextA.remoteSequence, 2)
261+
assert.strictEqual(nextA.entry.idString, entryA2.idString)
262+
}))
263+
264+
it.effect("allows the same EntryId to be committed in different stores", () =>
265+
Effect.gen(function*() {
266+
const storage = yield* makeStorage(makeOptions("shared_entry_id"))
267+
const storeA = makeStoreId("shared_id_a")
268+
const storeB = makeStoreId("shared_id_b")
269+
const sharedId = EventJournal.makeEntryIdUnsafe()
270+
const entryA = makeEntry("Ada", { id: sharedId, primaryKey: "user-a" })
271+
const entryB = makeEntry("Grace", { id: sharedId, primaryKey: "user-b" })
272+
273+
const writtenA = yield* storage.write(storeA, [entryA])
274+
const writtenB = yield* storage.write(storeB, [entryB])
275+
276+
assert.deepStrictEqual(writtenA.sequenceNumbers, [1])
277+
assert.deepStrictEqual(writtenB.sequenceNumbers, [1])
278+
assert.deepStrictEqual(writtenA.committed.map((entry) => entry.remoteSequence), [1])
279+
assert.deepStrictEqual(writtenB.committed.map((entry) => entry.remoteSequence), [1])
280+
assert.strictEqual((yield* storage.entries(storeA, 0))[0]?.entry.idString, entryA.idString)
281+
assert.strictEqual((yield* storage.entries(storeB, 0))[0]?.entry.idString, entryB.idString)
282+
}))
283+
})
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
import * as SqlEventLogServerUnencryptedStorageTest from "effect-test/unstable/eventlog/SqlEventLogServerUnencryptedStorageTest"
2+
import { MysqlContainer } from "./utils.ts"
3+
4+
SqlEventLogServerUnencryptedStorageTest.suite(
5+
"sql-mysql2",
6+
MysqlContainer.layerClient
7+
)
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
import * as SqlEventLogServerUnencryptedStorageTest from "effect-test/unstable/eventlog/SqlEventLogServerUnencryptedStorageTest"
2+
import { PgContainer } from "./utils.ts"
3+
4+
SqlEventLogServerUnencryptedStorageTest.suite(
5+
"sql-pg",
6+
PgContainer.layerClient
7+
)

0 commit comments

Comments
 (0)