@@ -3,18 +3,27 @@ import type { OutboxStorage } from '@message-queue-toolkit/outbox-core/dist/lib/
3
3
import { type CommonEventDefinition , getMessageType } from '@message-queue-toolkit/schemas'
4
4
import type { PrismaClient } from '@prisma/client'
5
5
6
- export class OutboxPrismaAdapter < SupportedEvents extends CommonEventDefinition [ ] >
7
- implements OutboxStorage < SupportedEvents >
6
+ type ModelDelegate = {
7
+ create : ( args : any ) => Promise < any >
8
+ findMany : ( args : any ) => Promise < any >
9
+ createMany : ( args : any ) => Promise < any >
10
+ updateMany : ( args : any ) => Promise < any >
11
+ }
12
+
13
+ export class OutboxPrismaAdapter <
14
+ SupportedEvents extends CommonEventDefinition [ ] ,
15
+ ModelName extends keyof PrismaClient & string ,
16
+ > implements OutboxStorage < SupportedEvents >
8
17
{
9
18
constructor (
10
19
private readonly prisma : PrismaClient ,
11
- private readonly modelName : string ,
20
+ private readonly modelName : ModelName ,
12
21
) { }
13
22
14
23
createEntry (
15
24
outboxEntry : OutboxEntry < SupportedEvents [ number ] > ,
16
25
) : Promise < OutboxEntry < SupportedEvents [ number ] > > {
17
- const prismaModel : PrismaClient [ typeof this . modelName ] = this . prisma [ this . modelName ]
26
+ const prismaModel = this . prisma [ this . modelName ] as unknown as ModelDelegate
18
27
19
28
const messageType = getMessageType ( outboxEntry . event )
20
29
return prismaModel . create ( {
@@ -33,7 +42,7 @@ export class OutboxPrismaAdapter<SupportedEvents extends CommonEventDefinition[]
33
42
async flush ( outboxAccumulator : OutboxAccumulator < SupportedEvents > ) : Promise < void > {
34
43
const entries = await outboxAccumulator . getEntries ( )
35
44
const failedEntries = await outboxAccumulator . getFailedEntries ( )
36
- const prismaModel : PrismaClient [ typeof this . modelName ] = this . prisma [ this . modelName ]
45
+ const prismaModel = this . prisma [ this . modelName ] as unknown as ModelDelegate
37
46
38
47
const existingEntries = await prismaModel . findMany ( {
39
48
where : {
@@ -43,12 +52,15 @@ export class OutboxPrismaAdapter<SupportedEvents extends CommonEventDefinition[]
43
52
} ,
44
53
} )
45
54
46
- await this . handleSuccesses ( prismaModel , entries , existingEntries )
47
- await this . handleFailures ( prismaModel , failedEntries , existingEntries )
55
+ await this . prisma . $transaction ( async ( prisma ) => {
56
+ const prismaModel = prisma [ this . modelName ] as ModelDelegate
57
+ await this . handleSuccesses ( prismaModel , entries , existingEntries )
58
+ await this . handleFailures ( prismaModel , failedEntries , existingEntries )
59
+ } )
48
60
}
49
61
50
62
private async handleSuccesses (
51
- prismaModel : PrismaClient [ typeof this . modelName ] ,
63
+ prismaModel : ModelDelegate ,
52
64
entries : OutboxEntry < SupportedEvents [ number ] > [ ] ,
53
65
existingEntries : OutboxEntry < SupportedEvents [ number ] > [ ] ,
54
66
) {
@@ -88,7 +100,7 @@ export class OutboxPrismaAdapter<SupportedEvents extends CommonEventDefinition[]
88
100
}
89
101
90
102
private async handleFailures (
91
- prismaModel : PrismaClient [ typeof this . modelName ] ,
103
+ prismaModel : ModelDelegate ,
92
104
entries : OutboxEntry < SupportedEvents [ number ] > [ ] ,
93
105
existingEntries : OutboxEntry < SupportedEvents [ number ] > [ ] ,
94
106
) {
@@ -132,7 +144,9 @@ export class OutboxPrismaAdapter<SupportedEvents extends CommonEventDefinition[]
132
144
}
133
145
134
146
getEntries ( maxRetryCount : number ) : Promise < OutboxEntry < SupportedEvents [ number ] > [ ] > {
135
- return this . prisma [ this . modelName ] . findMany ( {
147
+ const prismaModel = this . prisma [ this . modelName ] as unknown as ModelDelegate
148
+
149
+ return prismaModel . findMany ( {
136
150
where : {
137
151
retryCount : {
138
152
lte : maxRetryCount ,
0 commit comments