@@ -111,6 +111,8 @@ class InMemoryOutboxStorage<SupportedEvents extends CommonEventDefinition[]>
111
111
}
112
112
}
113
113
114
+ const MAX_RETRY_COUNT = 2
115
+
114
116
describe ( 'outbox' , ( ) => {
115
117
let outboxProcessor : OutboxProcessor < TestEventsType >
116
118
let eventEmitter : DomainEventEmitter < TestEventsType >
@@ -129,7 +131,11 @@ describe('outbox', () => {
129
131
130
132
outboxStorage = new InMemoryOutboxStorage < TestEventsType > ( )
131
133
outboxEventEmitter = new OutboxEventEmitter < TestEventsType > ( outboxStorage )
132
- outboxProcessor = new OutboxProcessor < TestEventsType > ( outboxStorage , eventEmitter , 2 )
134
+ outboxProcessor = new OutboxProcessor < TestEventsType > (
135
+ outboxStorage ,
136
+ eventEmitter ,
137
+ MAX_RETRY_COUNT ,
138
+ )
133
139
} )
134
140
135
141
afterEach ( ( ) => {
@@ -141,7 +147,7 @@ describe('outbox', () => {
141
147
correlationId : randomUUID ( ) ,
142
148
} )
143
149
144
- const entries = await outboxStorage . getEntries ( 2 )
150
+ const entries = await outboxStorage . getEntries ( MAX_RETRY_COUNT )
145
151
146
152
expect ( entries ) . toHaveLength ( 1 )
147
153
} )
@@ -157,7 +163,7 @@ describe('outbox', () => {
157
163
executorId : randomUUID ( ) ,
158
164
} )
159
165
160
- const entries = await outboxStorage . getEntries ( 2 )
166
+ const entries = await outboxStorage . getEntries ( MAX_RETRY_COUNT )
161
167
162
168
expect ( entries ) . toHaveLength ( 0 )
163
169
expect ( outboxStorage . entries ) . toMatchObject ( [
@@ -168,10 +174,6 @@ describe('outbox', () => {
168
174
} )
169
175
170
176
it ( 'saves outbox entry and process it with error and retries' , async ( ) => {
171
- await outboxEventEmitter . emit ( TestEvents . created , createdEventPayload , {
172
- correlationId : randomUUID ( ) ,
173
- } )
174
-
175
177
const mockedEventEmitter = vi . spyOn ( eventEmitter , 'emit' )
176
178
mockedEventEmitter . mockImplementationOnce ( ( ) => {
177
179
throw new Error ( 'Could not emit event.' )
@@ -190,13 +192,17 @@ describe('outbox', () => {
190
192
} ) ,
191
193
)
192
194
195
+ await outboxEventEmitter . emit ( TestEvents . created , createdEventPayload , {
196
+ correlationId : randomUUID ( ) ,
197
+ } )
198
+
193
199
await outboxProcessor . processOutboxEntries ( {
194
200
logger : TestLogger ,
195
201
reqId : randomUUID ( ) ,
196
202
executorId : randomUUID ( ) ,
197
203
} )
198
204
199
- let entries = await outboxStorage . getEntries ( 2 )
205
+ let entries = await outboxStorage . getEntries ( MAX_RETRY_COUNT )
200
206
expect ( entries ) . toHaveLength ( 1 )
201
207
expect ( outboxStorage . entries ) . toMatchObject ( [
202
208
{
@@ -212,7 +218,7 @@ describe('outbox', () => {
212
218
executorId : randomUUID ( ) ,
213
219
} )
214
220
215
- entries = await outboxStorage . getEntries ( 2 )
221
+ entries = await outboxStorage . getEntries ( MAX_RETRY_COUNT )
216
222
expect ( entries ) . toHaveLength ( 0 ) //Nothing to process anymore
217
223
expect ( outboxStorage . entries ) . toMatchObject ( [
218
224
{
@@ -221,4 +227,54 @@ describe('outbox', () => {
221
227
} ,
222
228
] )
223
229
} )
230
+
231
+ it ( 'no longer processes the event if exceeded retry count' , async ( ) => {
232
+ //Let's always fail the event
233
+ const mockedEventEmitter = vi . spyOn ( eventEmitter , 'emit' )
234
+ mockedEventEmitter . mockImplementation ( ( ) => {
235
+ throw new Error ( 'Could not emit event.' )
236
+ } )
237
+
238
+ //Persist the event
239
+ await outboxEventEmitter . emit ( TestEvents . created , createdEventPayload , {
240
+ correlationId : randomUUID ( ) ,
241
+ } )
242
+
243
+ //Initially event is present in outbox storage.
244
+ expect ( await outboxStorage . getEntries ( MAX_RETRY_COUNT ) ) . toHaveLength ( 1 )
245
+
246
+ //Retry +1
247
+ await outboxProcessor . processOutboxEntries ( {
248
+ logger : TestLogger ,
249
+ reqId : randomUUID ( ) ,
250
+ executorId : randomUUID ( ) ,
251
+ } )
252
+ //Still present
253
+ expect ( await outboxStorage . getEntries ( MAX_RETRY_COUNT ) ) . toHaveLength ( 1 )
254
+
255
+ //Retry +2
256
+ await outboxProcessor . processOutboxEntries ( {
257
+ logger : TestLogger ,
258
+ reqId : randomUUID ( ) ,
259
+ executorId : randomUUID ( ) ,
260
+ } )
261
+ //Stil present
262
+ expect ( await outboxStorage . getEntries ( MAX_RETRY_COUNT ) ) . toHaveLength ( 1 )
263
+
264
+ //Retry +3
265
+ await outboxProcessor . processOutboxEntries ( {
266
+ logger : TestLogger ,
267
+ reqId : randomUUID ( ) ,
268
+ executorId : randomUUID ( ) ,
269
+ } )
270
+ //Now it's gone, we no longer try to process it
271
+ expect ( await outboxStorage . getEntries ( MAX_RETRY_COUNT ) ) . toHaveLength ( 0 )
272
+
273
+ expect ( outboxStorage . entries ) . toMatchObject ( [
274
+ {
275
+ status : 'FAILED' ,
276
+ retryCount : 3 ,
277
+ } ,
278
+ ] )
279
+ } )
224
280
} )
0 commit comments