Skip to content

Commit 9c971fc

Browse files
committed
Bulk update + insert.
1 parent 7444fab commit 9c971fc

File tree

2 files changed

+112
-97
lines changed

2 files changed

+112
-97
lines changed

packages/outbox-prisma-adapter/lib/outbox-prisma-adapter.ts

Lines changed: 35 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -34,25 +34,43 @@ export class OutboxPrismaAdapter<SupportedEvents extends CommonEventDefinition[]
3434

3535
const prismaModel: PrismaClient[typeof this.modelName] = this.prisma[this.modelName]
3636

37-
for (const entry of entries) {
38-
await prismaModel.upsert({
39-
where: {
40-
id: entry.id,
41-
},
42-
update: {
43-
status: 'SUCCESS',
44-
updated: new Date(),
37+
const existingEntries = await prismaModel.findMany({
38+
where: {
39+
id: {
40+
in: entries.map((entry) => entry.id),
4541
},
46-
create: {
47-
id: entry.id,
48-
type: getMessageType(entry.event),
49-
created: entry.created,
50-
updated: new Date(),
51-
data: entry.data,
52-
status: 'SUCCESS',
42+
},
43+
})
44+
45+
const toCreate = entries.filter(
46+
(entry) => !existingEntries.some((existingEntry) => existingEntry.id === entry.id),
47+
)
48+
const toUpdate = entries.filter((entry) =>
49+
existingEntries.some((existingEntry) => existingEntry.id === entry.id),
50+
)
51+
52+
await prismaModel.createMany({
53+
data: toCreate.map((entry) => ({
54+
id: entry.id,
55+
type: getMessageType(entry.event),
56+
created: entry.created,
57+
updated: new Date(),
58+
data: entry.data,
59+
status: 'SUCCESS',
60+
})),
61+
})
62+
63+
await prismaModel.updateMany({
64+
where: {
65+
id: {
66+
in: toUpdate.map((entry) => entry.id),
5367
},
54-
})
55-
}
68+
},
69+
data: {
70+
status: 'SUCCESS',
71+
updated: new Date(),
72+
},
73+
})
5674
}
5775

5876
getEntries(maxRetryCount: number): Promise<OutboxEntry<SupportedEvents[number]>[]> {

packages/outbox-prisma-adapter/test/outbox-prisma-adapter.spec.ts

Lines changed: 77 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import {
55
} from '@message-queue-toolkit/schemas'
66
import { PrismaClient } from '@prisma/client'
77
import { uuidv7 } from 'uuidv7'
8-
import { afterAll, beforeAll, describe, expect, it } from 'vitest'
8+
import { afterAll, beforeAll, beforeEach, describe, expect, it } from 'vitest'
99
import { z } from 'zod'
1010
import { OutboxPrismaAdapter } from '../lib/outbox-prisma-adapter'
1111

@@ -26,8 +26,42 @@ describe('outbox-prisma-adapter', () => {
2626
let prisma: PrismaClient
2727
let outboxPrismaAdapter: OutboxPrismaAdapter<SupportedEvents>
2828

29+
const ENTRY_1 = {
30+
id: uuidv7(),
31+
event: events.created,
32+
status: 'CREATED',
33+
data: {
34+
id: uuidv7(),
35+
payload: {
36+
message: 'TEST EVENT',
37+
},
38+
metadata: {},
39+
timestamp: new Date().toISOString(),
40+
},
41+
retryCount: 0,
42+
created: new Date(),
43+
} satisfies OutboxEntry<SupportedEvents[number]>
44+
45+
const ENTRY_2 = {
46+
id: uuidv7(),
47+
event: events.created,
48+
status: 'CREATED',
49+
data: {
50+
id: uuidv7(),
51+
payload: {
52+
message: 'TEST EVENT 2',
53+
},
54+
metadata: {},
55+
timestamp: new Date().toISOString(),
56+
},
57+
retryCount: 0,
58+
created: new Date(),
59+
} satisfies OutboxEntry<SupportedEvents[number]>
60+
2961
beforeAll(async () => {
30-
prisma = new PrismaClient()
62+
prisma = new PrismaClient({
63+
log: ['query'],
64+
})
3165

3266
outboxPrismaAdapter = new OutboxPrismaAdapter<SupportedEvents>(prisma, 'OutboxEntry')
3367

@@ -45,6 +79,10 @@ describe('outbox-prisma-adapter', () => {
4579
`
4680
})
4781

82+
beforeEach(async () => {
83+
await prisma.$queryRaw`DELETE FROM prisma.outbox_entry;`
84+
})
85+
4886
afterAll(async () => {
4987
await prisma.$queryRaw`DROP TABLE prisma.outbox_entry;`
5088
await prisma.$queryRaw`DROP SCHEMA prisma;`
@@ -92,119 +130,78 @@ describe('outbox-prisma-adapter', () => {
92130

93131
it('should insert successful entries from accumulator', async () => {
94132
const accumulator = new InMemoryOutboxAccumulator<SupportedEvents>()
95-
96-
const entry1 = {
97-
id: uuidv7(),
98-
event: events.created,
99-
status: 'CREATED',
100-
data: {
101-
id: uuidv7(),
102-
payload: {
103-
message: 'TEST EVENT',
104-
},
105-
metadata: {},
106-
timestamp: new Date().toISOString(),
107-
},
108-
retryCount: 0,
109-
created: new Date(),
110-
} satisfies OutboxEntry<SupportedEvents[number]>
111-
accumulator.add(entry1)
112-
113-
const entry2 = {
114-
id: uuidv7(),
115-
event: events.created,
116-
status: 'CREATED',
117-
data: {
118-
id: uuidv7(),
119-
payload: {
120-
message: 'TEST EVENT 2',
121-
},
122-
metadata: {},
123-
timestamp: new Date().toISOString(),
124-
},
125-
retryCount: 0,
126-
created: new Date(),
127-
} satisfies OutboxEntry<SupportedEvents[number]>
128-
accumulator.add(entry2)
133+
accumulator.add(ENTRY_1)
134+
accumulator.add(ENTRY_2)
129135

130136
await outboxPrismaAdapter.flush(accumulator)
131137

132138
const entriesAfterFlush = await outboxPrismaAdapter.getEntries(10)
133139

134140
expect(entriesAfterFlush).toMatchObject([
135141
{
136-
id: entry1.id,
142+
id: ENTRY_1.id,
137143
status: 'SUCCESS',
138144
},
139145
{
140-
id: entry2.id,
146+
id: ENTRY_2.id,
141147
status: 'SUCCESS',
142148
},
143149
])
144150
})
145151

146-
it("should update successful entries' status to 'SUCCESS'", async () => {
152+
it("should update existing entries' status to 'SUCCESS'", async () => {
147153
const accumulator = new InMemoryOutboxAccumulator<SupportedEvents>()
154+
accumulator.add(ENTRY_1)
155+
accumulator.add(ENTRY_2)
148156

149-
const entry1 = {
150-
id: uuidv7(),
151-
event: events.created,
152-
status: 'CREATED',
153-
data: {
154-
id: uuidv7(),
155-
payload: {
156-
message: 'TEST EVENT',
157-
},
158-
metadata: {},
159-
timestamp: new Date().toISOString(),
160-
},
161-
retryCount: 0,
162-
created: new Date(),
163-
} satisfies OutboxEntry<SupportedEvents[number]>
164-
accumulator.add(entry1)
165-
166-
const entry2 = {
167-
id: uuidv7(),
168-
event: events.created,
169-
status: 'CREATED',
170-
data: {
171-
id: uuidv7(),
172-
payload: {
173-
message: 'TEST EVENT 2',
174-
},
175-
metadata: {},
176-
timestamp: new Date().toISOString(),
177-
},
178-
retryCount: 0,
179-
created: new Date(),
180-
} satisfies OutboxEntry<SupportedEvents[number]>
181-
accumulator.add(entry2)
182-
183-
await outboxPrismaAdapter.createEntry(entry1)
184-
await outboxPrismaAdapter.createEntry(entry2)
157+
await outboxPrismaAdapter.createEntry(ENTRY_1)
158+
await outboxPrismaAdapter.createEntry(ENTRY_2)
185159

186160
const beforeFlush = await outboxPrismaAdapter.getEntries(10)
187161
expect(beforeFlush).toMatchObject([
188162
{
189-
id: entry1.id,
163+
id: ENTRY_1.id,
190164
status: 'CREATED',
191165
},
192166
{
193-
id: entry2.id,
167+
id: ENTRY_2.id,
194168
status: 'CREATED',
195169
},
196170
])
197171

198-
outboxPrismaAdapter.flush(accumulator)
172+
await outboxPrismaAdapter.flush(accumulator)
199173

200174
const afterFlush = await outboxPrismaAdapter.getEntries(10)
201175
expect(afterFlush).toMatchObject([
202176
{
203-
id: entry1.id,
177+
id: ENTRY_1.id,
178+
status: 'SUCCESS',
179+
},
180+
{
181+
id: ENTRY_2.id,
182+
status: 'SUCCESS',
183+
},
184+
])
185+
})
186+
187+
it('should handle mix of entries, non existing and existing, and change their status to SUCCESS', async () => {
188+
const accumulator = new InMemoryOutboxAccumulator<SupportedEvents>()
189+
accumulator.add(ENTRY_1)
190+
accumulator.add(ENTRY_2)
191+
192+
//Only one exists in DB.
193+
await outboxPrismaAdapter.createEntry(ENTRY_2)
194+
195+
await outboxPrismaAdapter.flush(accumulator)
196+
197+
const afterFirstFlush = await outboxPrismaAdapter.getEntries(10)
198+
expect(afterFirstFlush).toMatchObject([
199+
{
200+
id: ENTRY_1.id,
204201
status: 'SUCCESS',
205202
},
206203
{
207-
id: entry2.id,
204+
id: ENTRY_2.id,
208205
status: 'SUCCESS',
209206
},
210207
])

0 commit comments

Comments
 (0)