Skip to content

Commit 6792331

Browse files
committed
Dedicated directory for tests.
1 parent 23d380c commit 6792331

File tree

2 files changed

+77
-74
lines changed

2 files changed

+77
-74
lines changed
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
import type { CommonEventDefinition } from '@message-queue-toolkit/schemas'
2+
import type { OutboxAccumulator } from '../lib/accumulators'
3+
import type { OutboxEntry } from '../lib/objects'
4+
import type { OutboxStorage } from '../lib/storage'
5+
6+
export class InMemoryOutboxStorage<SupportedEvents extends CommonEventDefinition[]>
7+
implements OutboxStorage<SupportedEvents>
8+
{
9+
public entries: OutboxEntry<SupportedEvents[number]>[] = []
10+
11+
createEntry(
12+
outboxEntry: OutboxEntry<SupportedEvents[number]>,
13+
): Promise<OutboxEntry<SupportedEvents[number]>> {
14+
this.entries = [...this.entries, outboxEntry]
15+
16+
return Promise.resolve(outboxEntry)
17+
}
18+
19+
getEntries(maxRetryCount: number): Promise<OutboxEntry<SupportedEvents[number]>[]> {
20+
const entries = this.entries.filter((entry) => {
21+
return entry.status !== 'SUCCESS' && entry.retryCount <= maxRetryCount
22+
})
23+
24+
return Promise.resolve(entries)
25+
}
26+
27+
update(
28+
outboxEntry: OutboxEntry<SupportedEvents[number]>,
29+
): Promise<OutboxEntry<SupportedEvents[number]>> {
30+
this.entries = this.entries.map((entry) => {
31+
if (entry.id === outboxEntry.id) {
32+
return outboxEntry
33+
}
34+
return entry
35+
})
36+
37+
return Promise.resolve(outboxEntry)
38+
}
39+
40+
public async flush(outboxAccumulator: OutboxAccumulator<SupportedEvents>): Promise<void> {
41+
let successEntries = await outboxAccumulator.getEntries()
42+
successEntries = successEntries.map((entry) => {
43+
return {
44+
...entry,
45+
status: 'SUCCESS',
46+
updateAt: new Date(),
47+
}
48+
})
49+
this.entries = this.entries.map((entry) => {
50+
const foundEntry = successEntries.find((successEntry) => successEntry.id === entry.id)
51+
if (foundEntry) {
52+
return foundEntry
53+
}
54+
return entry
55+
})
56+
57+
let failedEntries = await outboxAccumulator.getFailedEntries()
58+
failedEntries = failedEntries.map((entry) => {
59+
return {
60+
...entry,
61+
status: 'FAILED',
62+
updateAt: new Date(),
63+
retryCount: entry.retryCount + 1,
64+
}
65+
})
66+
this.entries = this.entries.map((entry) => {
67+
const foundEntry = failedEntries.find((failedEntry) => failedEntry.id === entry.id)
68+
if (foundEntry) {
69+
return foundEntry
70+
}
71+
return entry
72+
})
73+
}
74+
}

packages/outbox-core/lib/outbox.spec.ts renamed to packages/outbox-core/test/outbox.spec.ts

Lines changed: 3 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,9 @@ import {
1212
import pino, { type Logger } from 'pino'
1313
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'
1414
import { z } from 'zod'
15-
import { InMemoryOutboxAccumulator, type OutboxAccumulator } from './accumulators'
16-
import type { OutboxEntry } from './objects'
17-
import { type OutboxDependencies, OutboxEventEmitter, OutboxProcessor } from './outbox'
18-
import type { OutboxStorage } from './storage'
15+
import { InMemoryOutboxAccumulator } from '../lib/accumulators'
16+
import { type OutboxDependencies, OutboxEventEmitter, OutboxProcessor } from '../lib/outbox'
17+
import { InMemoryOutboxStorage } from './InMemoryOutboxStorage'
1918

2019
const TestEvents = {
2120
created: {
@@ -54,76 +53,6 @@ const createdEventPayload: CommonEventDefinitionPublisherSchemaType<typeof TestE
5453

5554
const TestLogger: Logger = pino()
5655

57-
class InMemoryOutboxStorage<SupportedEvents extends CommonEventDefinition[]>
58-
implements OutboxStorage<SupportedEvents>
59-
{
60-
public entries: OutboxEntry<SupportedEvents[number]>[] = []
61-
62-
createEntry(
63-
outboxEntry: OutboxEntry<SupportedEvents[number]>,
64-
): Promise<OutboxEntry<SupportedEvents[number]>> {
65-
this.entries = [...this.entries, outboxEntry]
66-
67-
return Promise.resolve(outboxEntry)
68-
}
69-
70-
getEntries(maxRetryCount: number): Promise<OutboxEntry<SupportedEvents[number]>[]> {
71-
const entries = this.entries.filter((entry) => {
72-
return entry.status !== 'SUCCESS' && entry.retryCount <= maxRetryCount
73-
})
74-
75-
return Promise.resolve(entries)
76-
}
77-
78-
update(
79-
outboxEntry: OutboxEntry<SupportedEvents[number]>,
80-
): Promise<OutboxEntry<SupportedEvents[number]>> {
81-
this.entries = this.entries.map((entry) => {
82-
if (entry.id === outboxEntry.id) {
83-
return outboxEntry
84-
}
85-
return entry
86-
})
87-
88-
return Promise.resolve(outboxEntry)
89-
}
90-
91-
public async flush(outboxAccumulator: OutboxAccumulator<SupportedEvents>): Promise<void> {
92-
let successEntries = await outboxAccumulator.getEntries()
93-
successEntries = successEntries.map((entry) => {
94-
return {
95-
...entry,
96-
status: 'SUCCESS',
97-
updateAt: new Date(),
98-
}
99-
})
100-
this.entries = this.entries.map((entry) => {
101-
const foundEntry = successEntries.find((successEntry) => successEntry.id === entry.id)
102-
if (foundEntry) {
103-
return foundEntry
104-
}
105-
return entry
106-
})
107-
108-
let failedEntries = await outboxAccumulator.getFailedEntries()
109-
failedEntries = failedEntries.map((entry) => {
110-
return {
111-
...entry,
112-
status: 'FAILED',
113-
updateAt: new Date(),
114-
retryCount: entry.retryCount + 1,
115-
}
116-
})
117-
this.entries = this.entries.map((entry) => {
118-
const foundEntry = failedEntries.find((failedEntry) => failedEntry.id === entry.id)
119-
if (foundEntry) {
120-
return foundEntry
121-
}
122-
return entry
123-
})
124-
}
125-
}
126-
12756
const MAX_RETRY_COUNT = 2
12857

12958
describe('outbox', () => {

0 commit comments

Comments
 (0)