Skip to content

Commit 5ad155f

Browse files
committed
Failed entries handling.
1 parent 9c971fc commit 5ad155f

File tree

2 files changed

+116
-22
lines changed

2 files changed

+116
-22
lines changed

packages/outbox-prisma-adapter/lib/outbox-prisma-adapter.ts

Lines changed: 80 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -25,52 +25,110 @@ export class OutboxPrismaAdapter<SupportedEvents extends CommonEventDefinition[]
2525
updated: outboxEntry.updated,
2626
data: outboxEntry.data,
2727
status: outboxEntry.status,
28+
retryCount: outboxEntry.retryCount,
2829
},
2930
})
3031
}
3132

3233
async flush(outboxAccumulator: OutboxAccumulator<SupportedEvents>): Promise<void> {
3334
const entries = await outboxAccumulator.getEntries()
34-
35+
const failedEntries = await outboxAccumulator.getFailedEntries()
3536
const prismaModel: PrismaClient[typeof this.modelName] = this.prisma[this.modelName]
3637

3738
const existingEntries = await prismaModel.findMany({
3839
where: {
3940
id: {
40-
in: entries.map((entry) => entry.id),
41+
in: [...entries.map((entry) => entry.id), ...failedEntries.map((entry) => entry.id)],
4142
},
4243
},
4344
})
4445

46+
await this.handleSuccesses(prismaModel, entries, existingEntries)
47+
await this.handleFailures(prismaModel, failedEntries, existingEntries)
48+
}
49+
50+
private async handleSuccesses(
51+
prismaModel: PrismaClient[typeof this.modelName],
52+
entries: OutboxEntry<SupportedEvents[number]>[],
53+
existingEntries: OutboxEntry<SupportedEvents[number]>[],
54+
) {
4555
const toCreate = entries.filter(
4656
(entry) => !existingEntries.some((existingEntry) => existingEntry.id === entry.id),
4757
)
4858
const toUpdate = entries.filter((entry) =>
4959
existingEntries.some((existingEntry) => existingEntry.id === entry.id),
5060
)
5161

52-
await prismaModel.createMany({
53-
data: toCreate.map((entry) => ({
54-
id: entry.id,
55-
type: getMessageType(entry.event),
56-
created: entry.created,
57-
updated: new Date(),
58-
data: entry.data,
59-
status: 'SUCCESS',
60-
})),
61-
})
62+
if (toCreate.length > 0) {
63+
await prismaModel.createMany({
64+
data: toCreate.map((entry) => ({
65+
id: entry.id,
66+
type: getMessageType(entry.event),
67+
created: entry.created,
68+
updated: new Date(),
69+
data: entry.data,
70+
status: 'SUCCESS',
71+
})),
72+
})
73+
}
6274

63-
await prismaModel.updateMany({
64-
where: {
65-
id: {
66-
in: toUpdate.map((entry) => entry.id),
75+
if (toUpdate.length > 0) {
76+
await prismaModel.updateMany({
77+
where: {
78+
id: {
79+
in: toUpdate.map((entry) => entry.id),
80+
},
6781
},
68-
},
69-
data: {
70-
status: 'SUCCESS',
71-
updated: new Date(),
72-
},
73-
})
82+
data: {
83+
status: 'SUCCESS',
84+
updated: new Date(),
85+
},
86+
})
87+
}
88+
}
89+
90+
private async handleFailures(
91+
prismaModel: PrismaClient[typeof this.modelName],
92+
entries: OutboxEntry<SupportedEvents[number]>[],
93+
existingEntries: OutboxEntry<SupportedEvents[number]>[],
94+
) {
95+
const toCreate = entries.filter(
96+
(entry) => !existingEntries.some((existingEntry) => existingEntry.id === entry.id),
97+
)
98+
const toUpdate = entries.filter((entry) =>
99+
existingEntries.some((existingEntry) => existingEntry.id === entry.id),
100+
)
101+
102+
if (toCreate.length > 0) {
103+
await prismaModel.createMany({
104+
data: toCreate.map((entry) => ({
105+
id: entry.id,
106+
type: getMessageType(entry.event),
107+
created: entry.created,
108+
updated: new Date(),
109+
data: entry.data,
110+
status: 'FAILED',
111+
retryCount: 1,
112+
})),
113+
})
114+
}
115+
116+
if (toUpdate.length > 0) {
117+
await prismaModel.updateMany({
118+
where: {
119+
id: {
120+
in: toUpdate.map((entry) => entry.id),
121+
},
122+
},
123+
data: {
124+
status: 'FAILED',
125+
updated: new Date(),
126+
retryCount: {
127+
increment: 1,
128+
},
129+
},
130+
})
131+
}
74132
}
75133

76134
getEntries(maxRetryCount: number): Promise<OutboxEntry<SupportedEvents[number]>[]> {

packages/outbox-prisma-adapter/test/outbox-prisma-adapter.spec.ts

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,4 +206,40 @@ describe('outbox-prisma-adapter', () => {
206206
},
207207
])
208208
})
209+
210+
it("should change failed entries' status to 'FAILED' and increment retry count", async () => {
211+
const accumulator = new InMemoryOutboxAccumulator<SupportedEvents>()
212+
accumulator.addFailure(ENTRY_1)
213+
214+
await outboxPrismaAdapter.flush(accumulator)
215+
216+
const afterFirstFlush = await outboxPrismaAdapter.getEntries(10)
217+
expect(afterFirstFlush).toMatchObject([
218+
{
219+
id: ENTRY_1.id,
220+
status: 'FAILED',
221+
retryCount: 1,
222+
},
223+
])
224+
})
225+
226+
it('should change failed EXISTING entries status to FAILED and increment retry count', async () => {
227+
const accumulator = new InMemoryOutboxAccumulator<SupportedEvents>()
228+
const failedEntry = { ...ENTRY_1, retryCount: 3, status: 'FAILED' } satisfies OutboxEntry<
229+
SupportedEvents[number]
230+
>
231+
accumulator.addFailure(failedEntry)
232+
233+
await outboxPrismaAdapter.createEntry(failedEntry)
234+
await outboxPrismaAdapter.flush(accumulator)
235+
236+
const afterFirstFlush = await outboxPrismaAdapter.getEntries(10)
237+
expect(afterFirstFlush).toMatchObject([
238+
{
239+
id: failedEntry.id,
240+
status: 'FAILED',
241+
retryCount: 4,
242+
},
243+
])
244+
})
209245
})

0 commit comments

Comments
 (0)