Skip to content

Commit 31522bb

Browse files
committed
Divided into files.
1 parent 3333be5 commit 31522bb

File tree

5 files changed

+108
-102
lines changed

5 files changed

+108
-102
lines changed
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
import type { CommonEventDefinition } from '@message-queue-toolkit/schemas'
2+
import type { OutboxEntry } from './objects.ts'
3+
4+
export interface OutboxAccumulator<SupportedEvents extends CommonEventDefinition[]> {
5+
add(outboxEntry: OutboxEntry<SupportedEvents[number]>): Promise<void>
6+
7+
addFailure(outboxEntry: OutboxEntry<SupportedEvents[number]>): Promise<void>
8+
9+
getEntries(): Promise<OutboxEntry<SupportedEvents[number]>[]>
10+
11+
getFailedEntries(): Promise<OutboxEntry<SupportedEvents[number]>[]>
12+
13+
clear(): Promise<void>
14+
}
15+
16+
export class InMemoryOutboxAccumulator<SupportedEvents extends CommonEventDefinition[]>
17+
implements OutboxAccumulator<SupportedEvents>
18+
{
19+
private entries: OutboxEntry<SupportedEvents[number]>[] = []
20+
private failedEntries: OutboxEntry<SupportedEvents[number]>[] = []
21+
22+
public add(outboxEntry: OutboxEntry<SupportedEvents[number]>) {
23+
this.entries = [...this.entries, outboxEntry]
24+
25+
return Promise.resolve()
26+
}
27+
28+
public addFailure(outboxEntry: OutboxEntry<SupportedEvents[number]>) {
29+
this.failedEntries = [...this.failedEntries, outboxEntry]
30+
31+
return Promise.resolve()
32+
}
33+
34+
getEntries(): Promise<OutboxEntry<SupportedEvents[number]>[]> {
35+
return Promise.resolve(this.entries)
36+
}
37+
38+
getFailedEntries(): Promise<OutboxEntry<SupportedEvents[number]>[]> {
39+
return Promise.resolve(this.failedEntries)
40+
}
41+
42+
public clear(): Promise<void> {
43+
this.entries = []
44+
this.failedEntries = []
45+
return Promise.resolve()
46+
}
47+
}

packages/outbox-core/lib/objects.ts

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
import type {
2+
CommonEventDefinition,
3+
CommonEventDefinitionPublisherSchemaType,
4+
ConsumerMessageMetadataType,
5+
} from '@message-queue-toolkit/schemas'
6+
7+
/**
8+
* Status of the outbox entry.
9+
* - CREATED - entry was created and is waiting to be processed to publish actual event
10+
* - ACKED - entry was picked up by outbox job and is being processed
11+
* - SUCCESS - entry was successfully processed, event was published
12+
* - FAILED - entry processing failed, it will be retried
13+
*/
14+
export type OutboxEntryStatus = 'CREATED' | 'ACKED' | 'SUCCESS' | 'FAILED'
15+
16+
export type OutboxEntry<SupportedEvent extends CommonEventDefinition> = {
17+
id: string
18+
event: SupportedEvent
19+
data: Omit<CommonEventDefinitionPublisherSchemaType<SupportedEvent>, 'type'>
20+
precedingMessageMetadata?: Partial<ConsumerMessageMetadataType>
21+
status: OutboxEntryStatus
22+
created: Date
23+
updated?: Date
24+
retryCount: number
25+
}

packages/outbox-core/lib/outbox.spec.ts

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,9 @@ import {
1212
import pino, { type Logger } from 'pino'
1313
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'
1414
import { z } from 'zod'
15-
import {
16-
InMemoryOutboxAccumulator,
17-
type OutboxAccumulator,
18-
type OutboxEntry,
19-
OutboxEventEmitter,
20-
OutboxProcessor,
21-
type OutboxStorage,
22-
} from './outbox'
15+
import { InMemoryOutboxAccumulator, type OutboxAccumulator } from './accumulators'
16+
import type { OutboxEntry } from './objects'
17+
import { OutboxEventEmitter, OutboxProcessor, type OutboxStorage } from './outbox'
2318

2419
const TestEvents = {
2520
created: {
@@ -125,8 +120,6 @@ class InMemoryOutboxStorage<SupportedEvents extends CommonEventDefinition[]>
125120
}
126121
return entry
127122
})
128-
129-
outboxAccumulator.clear()
130123
}
131124
}
132125

packages/outbox-core/lib/outbox.ts

Lines changed: 3 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -8,98 +8,8 @@ import type {
88
} from '@message-queue-toolkit/core'
99
import { PromisePool } from '@supercharge/promise-pool'
1010
import { uuidv7 } from 'uuidv7'
11-
12-
/**
13-
* Status of the outbox entry.
14-
* - CREATED - entry was created and is waiting to be processed to publish actual event
15-
* - ACKED - entry was picked up by outbox job and is being processed
16-
* - SUCCESS - entry was successfully processed, event was published
17-
* - FAILED - entry processing failed, it will be retried
18-
*/
19-
export type OutboxEntryStatus = 'CREATED' | 'ACKED' | 'SUCCESS' | 'FAILED'
20-
21-
export type OutboxEntry<SupportedEvent extends CommonEventDefinition> = {
22-
id: string
23-
event: SupportedEvent
24-
data: Omit<CommonEventDefinitionPublisherSchemaType<SupportedEvent>, 'type'>
25-
precedingMessageMetadata?: Partial<ConsumerMessageMetadataType>
26-
status: OutboxEntryStatus
27-
created: Date
28-
updated?: Date
29-
retryCount: number
30-
}
31-
32-
export interface OutboxAccumulator<SupportedEvents extends CommonEventDefinition[]> {
33-
add(outboxEntry: OutboxEntry<SupportedEvents[number]>): Promise<void>
34-
35-
addFailure(outboxEntry: OutboxEntry<SupportedEvents[number]>): Promise<void>
36-
37-
getEntries(): Promise<OutboxEntry<SupportedEvents[number]>[]>
38-
39-
getFailedEntries(): Promise<OutboxEntry<SupportedEvents[number]>[]>
40-
41-
clear(): Promise<void>
42-
}
43-
44-
export class InMemoryOutboxAccumulator<SupportedEvents extends CommonEventDefinition[]>
45-
implements OutboxAccumulator<SupportedEvents>
46-
{
47-
private entries: OutboxEntry<SupportedEvents[number]>[] = []
48-
private failedEntries: OutboxEntry<SupportedEvents[number]>[] = []
49-
50-
public add(outboxEntry: OutboxEntry<SupportedEvents[number]>) {
51-
this.entries = [...this.entries, outboxEntry]
52-
53-
return Promise.resolve()
54-
}
55-
56-
public addFailure(outboxEntry: OutboxEntry<SupportedEvents[number]>) {
57-
this.failedEntries = [...this.failedEntries, outboxEntry]
58-
59-
return Promise.resolve()
60-
}
61-
62-
getEntries(): Promise<OutboxEntry<SupportedEvents[number]>[]> {
63-
return Promise.resolve(this.entries)
64-
}
65-
66-
getFailedEntries(): Promise<OutboxEntry<SupportedEvents[number]>[]> {
67-
return Promise.resolve(this.failedEntries)
68-
}
69-
70-
public clear(): Promise<void> {
71-
this.entries = []
72-
this.failedEntries = []
73-
return Promise.resolve()
74-
}
75-
}
76-
77-
/**
78-
* Takes care of persisting and retrieving outbox entries.
79-
*
80-
* Implementation is required:
81-
* - in order to fulfill at least once delivery guarantee, persisting entries should be performed inside isolated transaction
82-
* - to return entries in the order they were created (UUID7 is used to create entries in OutboxEventEmitter)
83-
* - returned entries should not include the ones with 'SUCCESS' status
84-
*/
85-
export interface OutboxStorage<SupportedEvents extends CommonEventDefinition[]> {
86-
create(
87-
outboxEntry: OutboxEntry<SupportedEvents[number]>,
88-
): Promise<OutboxEntry<SupportedEvents[number]>>
89-
90-
flush(outboxAccumulator: OutboxAccumulator<SupportedEvents>): Promise<void>
91-
92-
update(
93-
outboxEntry: OutboxEntry<SupportedEvents[number]>,
94-
): Promise<OutboxEntry<SupportedEvents[number]>>
95-
96-
/**
97-
* Returns entries in the order they were created. It doesn't return entries with 'SUCCESS' status. It doesn't return entries that have been retried more than maxRetryCount times.
98-
*
99-
* For example if entry retryCount is 1 and maxRetryCount is 1, entry MUST be returned. If it fails again then retry count is 2, in that case entry MUST NOT be returned.
100-
*/
101-
getEntries(maxRetryCount: number): Promise<OutboxEntry<SupportedEvents[number]>[]>
102-
}
11+
import type { OutboxAccumulator } from './accumulators'
12+
import type { OutboxStorage } from './storage'
10313

10414
/**
10515
* Main logic for handling outbox entries.
@@ -132,6 +42,7 @@ export class OutboxProcessor<SupportedEvents extends CommonEventDefinition[]> {
13242
})
13343

13444
await this.outboxStorage.flush(this.outboxAccumulator)
45+
await this.outboxAccumulator.clear()
13546
}
13647
}
13748

packages/outbox-core/lib/storage.ts

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
import type { CommonEventDefinition } from '@message-queue-toolkit/schemas'
2+
import type { OutboxAccumulator } from './accumulators'
3+
import type { OutboxEntry } from './objects'
4+
5+
/**
6+
* Takes care of persisting and retrieving outbox entries.
7+
*
8+
* Implementation is required:
9+
* - in order to fulfill at least once delivery guarantee, persisting entries should be performed inside isolated transaction
10+
* - to return entries in the order they were created (UUID7 is used to create entries in OutboxEventEmitter)
11+
* - returned entries should not include the ones with 'SUCCESS' status
12+
*/
13+
export interface OutboxStorage<SupportedEvents extends CommonEventDefinition[]> {
14+
create(
15+
outboxEntry: OutboxEntry<SupportedEvents[number]>,
16+
): Promise<OutboxEntry<SupportedEvents[number]>>
17+
18+
flush(outboxAccumulator: OutboxAccumulator<SupportedEvents>): Promise<void>
19+
20+
update(
21+
outboxEntry: OutboxEntry<SupportedEvents[number]>,
22+
): Promise<OutboxEntry<SupportedEvents[number]>>
23+
24+
/**
25+
* Returns entries in the order they were created. It doesn't return entries with 'SUCCESS' status. It doesn't return entries that have been retried more than maxRetryCount times.
26+
*
27+
* For example if entry retryCount is 1 and maxRetryCount is 1, entry MUST be returned. If it fails again then retry count is 2, in that case entry MUST NOT be returned.
28+
*/
29+
getEntries(maxRetryCount: number): Promise<OutboxEntry<SupportedEvents[number]>[]>
30+
}

0 commit comments

Comments
 (0)