@@ -11,7 +11,7 @@ import * as Schema from "../../Schema.ts"
1111import type * as Scope from "../../Scope.ts"
1212import * as ServiceMap from "../../ServiceMap.ts"
1313import * as SqlClient from "../sql/SqlClient.ts"
14- import type * as SqlError from "../sql/SqlError.ts"
14+ import * as SqlError from "../sql/SqlError.ts"
1515import { Entry , EntryId , makeRemoteIdUnsafe , RemoteEntry , type RemoteId } from "./EventJournal.ts"
1616import * as EventLogServerUnencrypted from "./EventLogServerUnencrypted.ts"
1717
@@ -197,17 +197,14 @@ export const makeStorage = (options?: {
197197 Effect . scoped
198198 )
199199
200- const flushPendingPublications = Effect . fnUntraced (
201- function * ( pending : ReadonlyArray < PendingPublication > ) {
202- for ( const publication of pending ) {
203- yield * publishCommitted ( publication )
204- }
205- } ,
206- Effect . scoped
207- )
200+ const flushPendingPublications = Effect . fnUntraced ( function * ( pending : ReadonlyArray < PendingPublication > ) {
201+ for ( const publication of pending ) {
202+ yield * publishCommitted ( publication )
203+ }
204+ } )
208205
209- const withTransaction : EventLogServerUnencrypted . Storage [ "Service" ] [ "withTransaction" ] = ( ( effect ) =>
210- Effect . gen ( function * ( ) {
206+ const withTransaction : EventLogServerUnencrypted . Storage [ "Service" ] [ "withTransaction" ] = Effect . fnUntraced (
207+ function * ( effect ) {
211208 const pendingOption = yield * Effect . serviceOption ( PendingPublications )
212209 if ( Option . isSome ( pendingOption ) ) {
213210 return yield * sql . withTransaction ( effect )
@@ -219,7 +216,9 @@ export const makeStorage = (options?: {
219216 )
220217 yield * flushPendingPublications ( pending )
221218 return result
222- } ) ) as EventLogServerUnencrypted . Storage [ "Service" ] [ "withTransaction" ]
219+ } ,
220+ Effect . catchIf ( SqlError . isSqlError , Effect . die )
221+ )
223222
224223 const ensureStore = ( storeId : string ) =>
225224 sql . onDialectOrElse ( {
@@ -428,8 +427,7 @@ export const makeStorage = (options?: {
428427 Effect . orDie ,
429428 Effect . scoped
430429 ) ,
431- entries : ( storeId , startSequence ) =>
432- selectEntriesAfter ( storeId , startSequence ) . pipe ( Effect . orDie ) ,
430+ entries : ( storeId , startSequence ) => selectEntriesAfter ( storeId , startSequence ) . pipe ( Effect . orDie ) ,
433431 changes : Effect . fnUntraced ( function * ( storeId , startSequence ) {
434432 const queue = yield * Queue . make < RemoteEntry > ( )
435433 const pubsub = yield * RcMap . get ( pubsubs , storeId )
0 commit comments