Skip to content

Commit 7444fab

Browse files
committed
failing test for updating.
1 parent 963014a commit 7444fab

File tree

3 files changed

+149
-3
lines changed

3 files changed

+149
-3
lines changed

packages/outbox-prisma-adapter/lib/outbox-prisma-adapter.ts

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,37 @@ export class OutboxPrismaAdapter<SupportedEvents extends CommonEventDefinition[]
2222
id: outboxEntry.id,
2323
type: messageType,
2424
created: outboxEntry.created,
25+
updated: outboxEntry.updated,
2526
data: outboxEntry.data,
2627
status: outboxEntry.status,
2728
},
2829
})
2930
}
3031

31-
flush(outboxAccumulator: OutboxAccumulator<SupportedEvents>): Promise<void> {
32-
return Promise.resolve(undefined)
32+
async flush(outboxAccumulator: OutboxAccumulator<SupportedEvents>): Promise<void> {
33+
const entries = await outboxAccumulator.getEntries()
34+
35+
const prismaModel: PrismaClient[typeof this.modelName] = this.prisma[this.modelName]
36+
37+
for (const entry of entries) {
38+
await prismaModel.upsert({
39+
where: {
40+
id: entry.id,
41+
},
42+
update: {
43+
status: 'SUCCESS',
44+
updated: new Date(),
45+
},
46+
create: {
47+
id: entry.id,
48+
type: getMessageType(entry.event),
49+
created: entry.created,
50+
updated: new Date(),
51+
data: entry.data,
52+
status: 'SUCCESS',
53+
},
54+
})
55+
}
3356
}
3457

3558
getEntries(maxRetryCount: number): Promise<OutboxEntry<SupportedEvents[number]>[]> {

packages/outbox-prisma-adapter/test/outbox-prisma-adapter.spec.ts

Lines changed: 123 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import type { OutboxEntry } from '@message-queue-toolkit/outbox-core'
1+
import { InMemoryOutboxAccumulator, type OutboxEntry } from '@message-queue-toolkit/outbox-core'
22
import {
33
type CommonEventDefinition,
44
enrichMessageSchemaWithBase,
@@ -37,6 +37,7 @@ describe('outbox-prisma-adapter', () => {
3737
id UUID PRIMARY KEY,
3838
type TEXT NOT NULL,
3939
created TIMESTAMP NOT NULL,
40+
updated TIMESTAMP,
4041
retry_count INT NOT NULL DEFAULT 0,
4142
data JSONB NOT NULL,
4243
status TEXT NOT NULL
@@ -74,6 +75,7 @@ describe('outbox-prisma-adapter', () => {
7475
id: expect.any(String),
7576
type: 'entity.created',
7677
created: expect.any(Date),
78+
updated: expect.any(Date),
7779
retryCount: 0,
7880
data: {
7981
id: expect.any(String),
@@ -87,4 +89,124 @@ describe('outbox-prisma-adapter', () => {
8789
},
8890
])
8991
})
92+
93+
it('should insert successful entries from accumulator', async () => {
94+
const accumulator = new InMemoryOutboxAccumulator<SupportedEvents>()
95+
96+
const entry1 = {
97+
id: uuidv7(),
98+
event: events.created,
99+
status: 'CREATED',
100+
data: {
101+
id: uuidv7(),
102+
payload: {
103+
message: 'TEST EVENT',
104+
},
105+
metadata: {},
106+
timestamp: new Date().toISOString(),
107+
},
108+
retryCount: 0,
109+
created: new Date(),
110+
} satisfies OutboxEntry<SupportedEvents[number]>
111+
accumulator.add(entry1)
112+
113+
const entry2 = {
114+
id: uuidv7(),
115+
event: events.created,
116+
status: 'CREATED',
117+
data: {
118+
id: uuidv7(),
119+
payload: {
120+
message: 'TEST EVENT 2',
121+
},
122+
metadata: {},
123+
timestamp: new Date().toISOString(),
124+
},
125+
retryCount: 0,
126+
created: new Date(),
127+
} satisfies OutboxEntry<SupportedEvents[number]>
128+
accumulator.add(entry2)
129+
130+
await outboxPrismaAdapter.flush(accumulator)
131+
132+
const entriesAfterFlush = await outboxPrismaAdapter.getEntries(10)
133+
134+
expect(entriesAfterFlush).toMatchObject([
135+
{
136+
id: entry1.id,
137+
status: 'SUCCESS',
138+
},
139+
{
140+
id: entry2.id,
141+
status: 'SUCCESS',
142+
},
143+
])
144+
})
145+
146+
it("should update successful entries' status to 'SUCCESS'", async () => {
147+
const accumulator = new InMemoryOutboxAccumulator<SupportedEvents>()
148+
149+
const entry1 = {
150+
id: uuidv7(),
151+
event: events.created,
152+
status: 'CREATED',
153+
data: {
154+
id: uuidv7(),
155+
payload: {
156+
message: 'TEST EVENT',
157+
},
158+
metadata: {},
159+
timestamp: new Date().toISOString(),
160+
},
161+
retryCount: 0,
162+
created: new Date(),
163+
} satisfies OutboxEntry<SupportedEvents[number]>
164+
accumulator.add(entry1)
165+
166+
const entry2 = {
167+
id: uuidv7(),
168+
event: events.created,
169+
status: 'CREATED',
170+
data: {
171+
id: uuidv7(),
172+
payload: {
173+
message: 'TEST EVENT 2',
174+
},
175+
metadata: {},
176+
timestamp: new Date().toISOString(),
177+
},
178+
retryCount: 0,
179+
created: new Date(),
180+
} satisfies OutboxEntry<SupportedEvents[number]>
181+
accumulator.add(entry2)
182+
183+
await outboxPrismaAdapter.createEntry(entry1)
184+
await outboxPrismaAdapter.createEntry(entry2)
185+
186+
const beforeFlush = await outboxPrismaAdapter.getEntries(10)
187+
expect(beforeFlush).toMatchObject([
188+
{
189+
id: entry1.id,
190+
status: 'CREATED',
191+
},
192+
{
193+
id: entry2.id,
194+
status: 'CREATED',
195+
},
196+
])
197+
198+
outboxPrismaAdapter.flush(accumulator)
199+
200+
const afterFlush = await outboxPrismaAdapter.getEntries(10)
201+
expect(afterFlush).toMatchObject([
202+
{
203+
id: entry1.id,
204+
status: 'SUCCESS',
205+
},
206+
{
207+
id: entry2.id,
208+
status: 'SUCCESS',
209+
},
210+
])
211+
})
90212
})

packages/outbox-prisma-adapter/test/schema.prisma

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ datasource db {
66
model OutboxEntry {
77
id String @id @default(uuid()) @db.Uuid
88
created DateTime @default(now())
9+
updated DateTime @default(now()) @updatedAt
910
type String
1011
retryCount Int @default(0) @map("retry_count")
1112
data Json

0 commit comments

Comments
 (0)