Skip to content

Commit 32704cb

Browse files
committed
wip
1 parent 5ad4255 commit 32704cb

File tree

1 file changed

+12
-14
lines changed

1 file changed

+12
-14
lines changed

packages/effect/src/unstable/eventlog/SqlEventLogServerUnencrypted.ts

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import * as Schema from "../../Schema.ts"
1111
import type * as Scope from "../../Scope.ts"
1212
import * as ServiceMap from "../../ServiceMap.ts"
1313
import * as SqlClient from "../sql/SqlClient.ts"
14-
import type * as SqlError from "../sql/SqlError.ts"
14+
import * as SqlError from "../sql/SqlError.js"
1515
import { Entry, EntryId, makeRemoteIdUnsafe, RemoteEntry, type RemoteId } from "./EventJournal.ts"
1616
import * as EventLogServerUnencrypted from "./EventLogServerUnencrypted.ts"
1717

@@ -197,17 +197,14 @@ export const makeStorage = (options?: {
197197
Effect.scoped
198198
)
199199

200-
const flushPendingPublications = Effect.fnUntraced(
201-
function*(pending: ReadonlyArray<PendingPublication>) {
202-
for (const publication of pending) {
203-
yield* publishCommitted(publication)
204-
}
205-
},
206-
Effect.scoped
207-
)
200+
const flushPendingPublications = Effect.fnUntraced(function*(pending: ReadonlyArray<PendingPublication>) {
201+
for (const publication of pending) {
202+
yield* publishCommitted(publication)
203+
}
204+
})
208205

209-
const withTransaction: EventLogServerUnencrypted.Storage["Service"]["withTransaction"] = ((effect) =>
210-
Effect.gen(function*() {
206+
const withTransaction: EventLogServerUnencrypted.Storage["Service"]["withTransaction"] = Effect.fnUntraced(
207+
function*(effect) {
211208
const pendingOption = yield* Effect.serviceOption(PendingPublications)
212209
if (Option.isSome(pendingOption)) {
213210
return yield* sql.withTransaction(effect)
@@ -219,7 +216,9 @@ export const makeStorage = (options?: {
219216
)
220217
yield* flushPendingPublications(pending)
221218
return result
222-
})) as EventLogServerUnencrypted.Storage["Service"]["withTransaction"]
219+
},
220+
Effect.catchIf(SqlError.isSqlError, Effect.die)
221+
)
223222

224223
const ensureStore = (storeId: string) =>
225224
sql.onDialectOrElse({
@@ -428,8 +427,7 @@ export const makeStorage = (options?: {
428427
Effect.orDie,
429428
Effect.scoped
430429
),
431-
entries: (storeId, startSequence) =>
432-
selectEntriesAfter(storeId, startSequence).pipe(Effect.orDie),
430+
entries: (storeId, startSequence) => selectEntriesAfter(storeId, startSequence).pipe(Effect.orDie),
433431
changes: Effect.fnUntraced(function*(storeId, startSequence) {
434432
const queue = yield* Queue.make<RemoteEntry>()
435433
const pubsub = yield* RcMap.get(pubsubs, storeId)

0 commit comments

Comments
 (0)