@@ -14,6 +14,7 @@ export interface BullQueueConfig {
14
14
port : number ;
15
15
username ?: string ;
16
16
password ?: string ;
17
+ db ?: number ;
17
18
} ;
18
19
retryAttempts ?: number ;
19
20
}
@@ -25,17 +26,41 @@ export class BullQueue
25
26
extends SequencerModule < BullQueueConfig >
26
27
implements TaskQueue
27
28
{
29
+ private activePromise ?: Promise < void > ;
30
+
28
31
public createWorker (
29
32
name : string ,
30
33
executor : ( data : TaskPayload ) => Promise < TaskPayload > ,
31
34
options ?: { concurrency ?: number }
32
35
) : Closeable {
33
36
const worker = new Worker < TaskPayload , TaskPayload > (
34
37
name ,
35
- async ( job ) => await executor ( job . data ) ,
38
+ async ( job ) => {
39
+ // This weird promise logic is needed to make sure the worker is not proving in parallel
40
+ // This is by far not optimal - since it still picks up 1 task per queue but waits until
41
+ // computing them, so that leads to bad performance over multiple workers.
42
+ // For that we need to restructure tasks to be flowing through a single queue however
43
+ while ( this . activePromise !== undefined ) {
44
+ // eslint-disable-next-line no-await-in-loop
45
+ await this . activePromise ;
46
+ }
47
+ let resOutside : ( ) => void = ( ) => { } ;
48
+ const promise = new Promise < void > ( ( res ) => {
49
+ resOutside = res ;
50
+ } ) ;
51
+ this . activePromise = promise ;
52
+
53
+ const result = await executor ( job . data ) ;
54
+ this . activePromise = undefined ;
55
+ void resOutside ( ) ;
56
+
57
+ return result ;
58
+ } ,
36
59
{
37
60
concurrency : options ?. concurrency ?? 1 ,
38
61
connection : this . config . redis ,
62
+ stalledInterval : 60000 , // 1 minute
63
+ lockDuration : 60000 , // 1 minute
39
64
40
65
metrics : { maxDataPoints : MetricsTime . ONE_HOUR * 24 } ,
41
66
}
@@ -68,6 +93,7 @@ export class BullQueue
68
93
name : queueName ,
69
94
70
95
async addTask ( payload : TaskPayload ) : Promise < { taskId : string } > {
96
+ log . debug ( "Adding task: " , payload ) ;
71
97
const job = await queue . add ( queueName , payload , {
72
98
attempts : retryAttempts ?? 2 ,
73
99
} ) ;
@@ -76,14 +102,25 @@ export class BullQueue
76
102
77
103
async onCompleted ( listener : ( payload : TaskPayload ) => Promise < void > ) {
78
104
events . on ( "completed" , async ( result ) => {
79
- // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
80
- await listener ( JSON . parse ( result . returnvalue ) as TaskPayload ) ;
105
+ log . debug ( "Completed task: " , result ) ;
106
+ try {
107
+ // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
108
+ await listener ( result . returnvalue as unknown as TaskPayload ) ;
109
+ } catch ( e ) {
110
+ // Catch error explicitly since this promise is dangling,
111
+ // therefore any error will be voided as well
112
+ log . error ( e ) ;
113
+ }
114
+ } ) ;
115
+ events . on ( "error" , async ( error ) => {
116
+ log . error ( "Error in worker" , error ) ;
81
117
} ) ;
82
118
await events . waitUntilReady ( ) ;
83
119
} ,
84
120
85
121
async close ( ) : Promise < void > {
86
122
await events . close ( ) ;
123
+ await queue . drain ( ) ;
87
124
await queue . close ( ) ;
88
125
} ,
89
126
} ;
0 commit comments