@@ -11,38 +11,50 @@ import { uuidv7 } from 'uuidv7'
11
11
import type { OutboxAccumulator } from './accumulators'
12
12
import type { OutboxStorage } from './storage'
13
13
14
+ export type OutboxDependencies < SupportedEvents extends CommonEventDefinition [ ] > = {
15
+ outboxStorage : OutboxStorage < SupportedEvents >
16
+ outboxAccumulator : OutboxAccumulator < SupportedEvents >
17
+ eventEmitter : DomainEventEmitter < SupportedEvents >
18
+ }
19
+
20
+ export type OutboxConfiguration = {
21
+ maxRetryCount : number
22
+ emitBatchSize : number
23
+ jobIntervalInMs : number
24
+ }
25
+
14
26
/**
15
27
* Main logic for handling outbox entries.
16
28
*
17
29
* If entry is rejected, it is NOT going to be handled during the same execution. Next execution will pick it up.
18
30
*/
19
31
export class OutboxProcessor < SupportedEvents extends CommonEventDefinition [ ] > {
20
32
constructor (
21
- private readonly outboxStorage : OutboxStorage < SupportedEvents > ,
22
- private readonly outboxAccumulator : OutboxAccumulator < SupportedEvents > ,
23
- private readonly eventEmitter : DomainEventEmitter < SupportedEvents > ,
33
+ private readonly outboxDependencies : OutboxDependencies < SupportedEvents > ,
24
34
private readonly maxRetryCount : number ,
25
35
private readonly emitBatchSize : number ,
26
36
) { }
27
37
28
38
public async processOutboxEntries ( context : JobExecutionContext ) {
29
- const entries = await this . outboxStorage . getEntries ( this . maxRetryCount )
39
+ const { outboxStorage, eventEmitter, outboxAccumulator } = this . outboxDependencies
40
+
41
+ const entries = await outboxStorage . getEntries ( this . maxRetryCount )
30
42
31
43
await PromisePool . for ( entries )
32
44
. withConcurrency ( this . emitBatchSize )
33
45
. process ( async ( entry ) => {
34
46
try {
35
- await this . eventEmitter . emit ( entry . event , entry . data , entry . precedingMessageMetadata )
36
- await this . outboxAccumulator . add ( entry )
47
+ await eventEmitter . emit ( entry . event , entry . data , entry . precedingMessageMetadata )
48
+ await outboxAccumulator . add ( entry )
37
49
} catch ( e ) {
38
50
context . logger . error ( { error : e } , 'Failed to process outbox entry.' )
39
51
40
- await this . outboxAccumulator . addFailure ( entry )
52
+ await outboxAccumulator . addFailure ( entry )
41
53
}
42
54
} )
43
55
44
- await this . outboxStorage . flush ( this . outboxAccumulator )
45
- await this . outboxAccumulator . clear ( )
56
+ await outboxStorage . flush ( outboxAccumulator )
57
+ await outboxAccumulator . clear ( )
46
58
}
47
59
}
48
60
@@ -59,19 +71,15 @@ export class OutboxPeriodicJob<
59
71
private readonly outboxProcessor : OutboxProcessor < SupportedEvents >
60
72
61
73
constructor (
62
- outboxStorage : OutboxStorage < SupportedEvents > ,
63
- outboxAccumulator : OutboxAccumulator < SupportedEvents > ,
64
- eventEmitter : DomainEventEmitter < SupportedEvents > ,
74
+ outboxDependencies : OutboxDependencies < SupportedEvents > ,
75
+ outboxConfiguration : OutboxConfiguration ,
65
76
dependencies : PeriodicJobDependencies ,
66
- maxRetryCount : number ,
67
- emitBatchSize : number ,
68
- intervalInMs : number ,
69
77
) {
70
78
super (
71
79
{
72
80
jobId : 'OutboxJob' ,
73
81
schedule : {
74
- intervalInMs : intervalInMs ,
82
+ intervalInMs : outboxConfiguration . jobIntervalInMs ,
75
83
} ,
76
84
singleConsumerMode : {
77
85
enabled : true ,
@@ -87,11 +95,9 @@ export class OutboxPeriodicJob<
87
95
)
88
96
89
97
this . outboxProcessor = new OutboxProcessor < SupportedEvents > (
90
- outboxStorage ,
91
- outboxAccumulator ,
92
- eventEmitter ,
93
- maxRetryCount ,
94
- emitBatchSize ,
98
+ outboxDependencies ,
99
+ outboxConfiguration . maxRetryCount ,
100
+ outboxConfiguration . emitBatchSize ,
95
101
)
96
102
}
97
103
0 commit comments