1
1
import { Logger } from "@trigger.dev/core/logger" ;
2
2
import { nanoid } from "nanoid" ;
3
3
import pLimit from "p-limit" ;
4
+ import { signalsEmitter } from "~/services/signals.server" ;
4
5
5
6
export type DynamicFlushSchedulerConfig < T > = {
6
7
batchSize : number ;
@@ -22,6 +23,7 @@ export class DynamicFlushScheduler<T> {
22
23
private readonly BATCH_SIZE : number ;
23
24
private readonly FLUSH_INTERVAL : number ;
24
25
private flushTimer : NodeJS . Timeout | null ;
26
+ private metricsReporterTimer : NodeJS . Timeout | undefined ;
25
27
private readonly callback : ( flushId : string , batch : T [ ] ) => Promise < void > ;
26
28
27
29
// New properties for dynamic scaling
@@ -41,6 +43,7 @@ export class DynamicFlushScheduler<T> {
41
43
droppedEvents : 0 ,
42
44
droppedEventsByKind : new Map < string , number > ( ) ,
43
45
} ;
46
+ private isShuttingDown : boolean = false ;
44
47
45
48
// New properties for load shedding
46
49
private readonly loadSheddingThreshold : number ;
@@ -75,6 +78,7 @@ export class DynamicFlushScheduler<T> {
75
78
76
79
this . startFlushTimer ( ) ;
77
80
this . startMetricsReporter ( ) ;
81
+ this . setupShutdownHandlers ( ) ;
78
82
}
79
83
80
84
addToBatch ( items : T [ ] ) : void {
@@ -119,8 +123,8 @@ export class DynamicFlushScheduler<T> {
119
123
this . currentBatch . push ( ...itemsToAdd ) ;
120
124
this . totalQueuedItems += itemsToAdd . length ;
121
125
122
- // Check if we need to create a batch
123
- if ( this . currentBatch . length >= this . currentBatchSize ) {
126
+ // Check if we need to create a batch (if we are shutting down, create a batch immediately because the flush timer is stopped)
127
+ if ( this . currentBatch . length >= this . currentBatchSize || this . isShuttingDown ) {
124
128
this . createBatch ( ) ;
125
129
}
126
130
@@ -137,6 +141,23 @@ export class DynamicFlushScheduler<T> {
137
141
this . resetFlushTimer ( ) ;
138
142
}
139
143
144
+ private setupShutdownHandlers ( ) : void {
145
+ signalsEmitter . on ( "SIGTERM" , ( ) =>
146
+ this . shutdown ( ) . catch ( ( error ) => {
147
+ this . logger . error ( "Error shutting down dynamic flush scheduler" , {
148
+ error,
149
+ } ) ;
150
+ } )
151
+ ) ;
152
+ signalsEmitter . on ( "SIGINT" , ( ) =>
153
+ this . shutdown ( ) . catch ( ( error ) => {
154
+ this . logger . error ( "Error shutting down dynamic flush scheduler" , {
155
+ error,
156
+ } ) ;
157
+ } )
158
+ ) ;
159
+ }
160
+
140
161
private startFlushTimer ( ) : void {
141
162
this . flushTimer = setInterval ( ( ) => this . checkAndFlush ( ) , this . FLUSH_INTERVAL ) ;
142
163
}
@@ -145,6 +166,9 @@ export class DynamicFlushScheduler<T> {
145
166
if ( this . flushTimer ) {
146
167
clearInterval ( this . flushTimer ) ;
147
168
}
169
+
170
+ if ( this . isShuttingDown ) return ;
171
+
148
172
this . startFlushTimer ( ) ;
149
173
}
150
174
@@ -226,7 +250,7 @@ export class DynamicFlushScheduler<T> {
226
250
}
227
251
228
252
private lastConcurrencyAdjustment : number = Date . now ( ) ;
229
-
253
+
230
254
private adjustConcurrency ( backOff : boolean = false ) : void {
231
255
const currentConcurrency = this . limiter . concurrency ;
232
256
let newConcurrency = currentConcurrency ;
@@ -281,7 +305,7 @@ export class DynamicFlushScheduler<T> {
281
305
282
306
private startMetricsReporter ( ) : void {
283
307
// Report metrics every 30 seconds
284
- setInterval ( ( ) => {
308
+ this . metricsReporterTimer = setInterval ( ( ) => {
285
309
const droppedByKind : Record < string , number > = { } ;
286
310
this . metrics . droppedEventsByKind . forEach ( ( count , kind ) => {
287
311
droppedByKind [ kind ] = count ;
@@ -356,10 +380,18 @@ export class DynamicFlushScheduler<T> {
356
380
357
381
// Graceful shutdown
358
382
async shutdown ( ) : Promise < void > {
383
+ if ( this . isShuttingDown ) return ;
384
+
385
+ this . isShuttingDown = true ;
386
+
359
387
if ( this . flushTimer ) {
360
388
clearInterval ( this . flushTimer ) ;
361
389
}
362
390
391
+ if ( this . metricsReporterTimer ) {
392
+ clearInterval ( this . metricsReporterTimer ) ;
393
+ }
394
+
363
395
// Flush any remaining items
364
396
if ( this . currentBatch . length > 0 ) {
365
397
this . createBatch ( ) ;
0 commit comments