@@ -17,12 +17,15 @@ export type OutboxDependencies<SupportedEvents extends CommonEventDefinition[]>
17
17
eventEmitter : DomainEventEmitter < SupportedEvents >
18
18
}
19
19
20
- export type OutboxConfiguration = {
20
+ export type OutboxProcessorConfiguration = {
21
21
maxRetryCount : number
22
22
emitBatchSize : number
23
- jobIntervalInMs : number
24
23
}
25
24
25
+ export type OutboxConfiguration = {
26
+ jobIntervalInMs : number
27
+ } & OutboxProcessorConfiguration
28
+
26
29
/**
27
30
* Main logic for handling outbox entries.
28
31
*
@@ -31,17 +34,16 @@ export type OutboxConfiguration = {
31
34
export class OutboxProcessor < SupportedEvents extends CommonEventDefinition [ ] > {
32
35
constructor (
33
36
private readonly outboxDependencies : OutboxDependencies < SupportedEvents > ,
34
- private readonly maxRetryCount : number ,
35
- private readonly emitBatchSize : number ,
37
+ private readonly outboxProcessorConfiguration : OutboxProcessorConfiguration ,
36
38
) { }
37
39
38
40
public async processOutboxEntries ( context : JobExecutionContext ) {
39
41
const { outboxStorage, eventEmitter, outboxAccumulator } = this . outboxDependencies
40
42
41
- const entries = await outboxStorage . getEntries ( this . maxRetryCount )
43
+ const entries = await outboxStorage . getEntries ( this . outboxProcessorConfiguration . maxRetryCount )
42
44
43
45
await PromisePool . for ( entries )
44
- . withConcurrency ( this . emitBatchSize )
46
+ . withConcurrency ( this . outboxProcessorConfiguration . emitBatchSize )
45
47
. process ( async ( entry ) => {
46
48
try {
47
49
await eventEmitter . emit ( entry . event , entry . data , entry . precedingMessageMetadata )
@@ -96,8 +98,7 @@ export class OutboxPeriodicJob<
96
98
97
99
this . outboxProcessor = new OutboxProcessor < SupportedEvents > (
98
100
outboxDependencies ,
99
- outboxConfiguration . maxRetryCount ,
100
- outboxConfiguration . emitBatchSize ,
101
+ outboxConfiguration ,
101
102
)
102
103
}
103
104
0 commit comments