Skip to content

Commit 7c09b6e

Browse files
committed
Implement load shedding for TaskEvent records
This change introduces load shedding mechanisms to manage TaskEvent records, particularly those of kind LOG, when the system experiences high volumes and is unable to flush to the database in a timely manner. The addition aims to prevent overwhelming the system and ensure critical tasks are prioritized. - Added configuration options for `loadSheddingThreshold` and `loadSheddingEnabled` in multiple modules to activate load shedding. - Introduced `isDroppableEvent` function to allow specific events to be dropped when load shedding is enabled. - Ensured metrics are updated to reflect dropped events and load shedding status, providing visibility into system performance during high load conditions. - Updated loggers to inform about load shedding state changes, ensuring timely awareness of load management activities.
1 parent a34f03f commit 7c09b6e

File tree

3 files changed

+136
-4
lines changed

3 files changed

+136
-4
lines changed

apps/webapp/app/env.server.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,8 @@ const EnvironmentSchema = z.object({
256256
EVENTS_MAX_CONCURRENCY: z.coerce.number().int().default(10),
257257
EVENTS_MAX_BATCH_SIZE: z.coerce.number().int().default(500),
258258
EVENTS_MEMORY_PRESSURE_THRESHOLD: z.coerce.number().int().default(2000),
259+
EVENTS_LOAD_SHEDDING_THRESHOLD: z.coerce.number().int().default(5000),
260+
EVENTS_LOAD_SHEDDING_ENABLED: z.coerce.boolean().default(true),
259261
SHARED_QUEUE_CONSUMER_POOL_SIZE: z.coerce.number().int().default(10),
260262
SHARED_QUEUE_CONSUMER_INTERVAL_MS: z.coerce.number().int().default(100),
261263
SHARED_QUEUE_CONSUMER_NEXT_TICK_INTERVAL_MS: z.coerce.number().int().default(100),

apps/webapp/app/v3/dynamicFlushScheduler.server.ts

Lines changed: 104 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { nanoid } from "nanoid";
22
import pLimit from "p-limit";
33
import { logger } from "~/services/logger.server";
4+
import { TaskEventKind } from "@trigger.dev/database";
45

56
export type DynamicFlushSchedulerConfig<T> = {
67
batchSize: number;
@@ -11,6 +12,9 @@ export type DynamicFlushSchedulerConfig<T> = {
1112
maxConcurrency?: number;
1213
maxBatchSize?: number;
1314
memoryPressureThreshold?: number; // Number of items that triggers increased concurrency
15+
loadSheddingThreshold?: number; // Number of items that triggers load shedding
16+
loadSheddingEnabled?: boolean;
17+
isDroppableEvent?: (item: T) => boolean; // Function to determine if an event can be dropped
1418
};
1519

1620
export class DynamicFlushScheduler<T> {
@@ -35,8 +39,16 @@ export class DynamicFlushScheduler<T> {
3539
flushedBatches: 0,
3640
failedBatches: 0,
3741
totalItemsFlushed: 0,
42+
droppedEvents: 0,
43+
droppedEventsByKind: new Map<string, number>(),
3844
};
3945

46+
// New properties for load shedding
47+
private readonly loadSheddingThreshold: number;
48+
private readonly loadSheddingEnabled: boolean;
49+
private readonly isDroppableEvent?: (item: T) => boolean;
50+
private isLoadShedding: boolean = false;
51+
4052
constructor(config: DynamicFlushSchedulerConfig<T>) {
4153
this.batchQueue = [];
4254
this.currentBatch = [];
@@ -52,6 +64,11 @@ export class DynamicFlushScheduler<T> {
5264
this.maxBatchSize = config.maxBatchSize ?? config.batchSize * 5;
5365
this.memoryPressureThreshold = config.memoryPressureThreshold ?? config.batchSize * 20;
5466

67+
// Initialize load shedding parameters
68+
this.loadSheddingThreshold = config.loadSheddingThreshold ?? config.batchSize * 50;
69+
this.loadSheddingEnabled = config.loadSheddingEnabled ?? true;
70+
this.isDroppableEvent = config.isDroppableEvent;
71+
5572
// Start with minimum concurrency
5673
this.limiter = pLimit(this.minConcurrency);
5774

@@ -60,8 +77,45 @@ export class DynamicFlushScheduler<T> {
6077
}
6178

6279
addToBatch(items: T[]): void {
63-
this.currentBatch.push(...items);
64-
this.totalQueuedItems += items.length;
80+
let itemsToAdd = items;
81+
82+
// Apply load shedding if enabled and we're over the threshold
83+
if (this.loadSheddingEnabled && this.totalQueuedItems >= this.loadSheddingThreshold) {
84+
const { kept, dropped } = this.applyLoadShedding(items);
85+
itemsToAdd = kept;
86+
87+
if (dropped.length > 0) {
88+
this.metrics.droppedEvents += dropped.length;
89+
90+
// Track dropped events by kind if possible
91+
dropped.forEach((item) => {
92+
const kind = this.getEventKind(item);
93+
if (kind) {
94+
const currentCount = this.metrics.droppedEventsByKind.get(kind) || 0;
95+
this.metrics.droppedEventsByKind.set(kind, currentCount + 1);
96+
}
97+
});
98+
99+
if (!this.isLoadShedding) {
100+
this.isLoadShedding = true;
101+
logger.warn("Load shedding activated", {
102+
totalQueuedItems: this.totalQueuedItems,
103+
threshold: this.loadSheddingThreshold,
104+
droppedCount: dropped.length,
105+
});
106+
}
107+
}
108+
} else if (this.isLoadShedding && this.totalQueuedItems < this.loadSheddingThreshold * 0.8) {
109+
this.isLoadShedding = false;
110+
logger.info("Load shedding deactivated", {
111+
totalQueuedItems: this.totalQueuedItems,
112+
threshold: this.loadSheddingThreshold,
113+
totalDropped: this.metrics.droppedEvents,
114+
});
115+
}
116+
117+
this.currentBatch.push(...itemsToAdd);
118+
this.totalQueuedItems += itemsToAdd.length;
65119

66120
// Check if we need to create a batch
67121
if (this.currentBatch.length >= this.currentBatchSize) {
@@ -217,6 +271,11 @@ export class DynamicFlushScheduler<T> {
217271
private startMetricsReporter(): void {
218272
// Report metrics every 30 seconds
219273
setInterval(() => {
274+
const droppedByKind: Record<string, number> = {};
275+
this.metrics.droppedEventsByKind.forEach((count, kind) => {
276+
droppedByKind[kind] = count;
277+
});
278+
220279
logger.info("DynamicFlushScheduler metrics", {
221280
totalQueuedItems: this.totalQueuedItems,
222281
batchQueueLength: this.batchQueue.length,
@@ -225,21 +284,62 @@ export class DynamicFlushScheduler<T> {
225284
activeConcurrent: this.limiter.activeCount,
226285
pendingConcurrent: this.limiter.pendingCount,
227286
currentBatchSize: this.currentBatchSize,
228-
metrics: this.metrics,
287+
isLoadShedding: this.isLoadShedding,
288+
metrics: {
289+
...this.metrics,
290+
droppedEventsByKind,
291+
},
229292
});
230293
}, 30000);
231294
}
232295

296+
private applyLoadShedding(items: T[]): { kept: T[]; dropped: T[] } {
297+
if (!this.isDroppableEvent) {
298+
// If no function provided to determine droppable events, keep all
299+
return { kept: items, dropped: [] };
300+
}
301+
302+
const kept: T[] = [];
303+
const dropped: T[] = [];
304+
305+
for (const item of items) {
306+
if (this.isDroppableEvent(item)) {
307+
dropped.push(item);
308+
} else {
309+
kept.push(item);
310+
}
311+
}
312+
313+
return { kept, dropped };
314+
}
315+
316+
private getEventKind(item: T): string | undefined {
317+
// Try to extract the kind from the event if it has one
318+
if (item && typeof item === 'object' && 'kind' in item) {
319+
return String(item.kind);
320+
}
321+
return undefined;
322+
}
323+
233324
// Method to get current status
234325
getStatus() {
326+
const droppedByKind: Record<string, number> = {};
327+
this.metrics.droppedEventsByKind.forEach((count, kind) => {
328+
droppedByKind[kind] = count;
329+
});
330+
235331
return {
236332
queuedItems: this.totalQueuedItems,
237333
batchQueueLength: this.batchQueue.length,
238334
currentBatchSize: this.currentBatch.length,
239335
concurrency: this.limiter.concurrency,
240336
activeFlushes: this.limiter.activeCount,
241337
pendingFlushes: this.limiter.pendingCount,
242-
metrics: { ...this.metrics },
338+
isLoadShedding: this.isLoadShedding,
339+
metrics: {
340+
...this.metrics,
341+
droppedEventsByKind: droppedByKind,
342+
},
243343
};
244344
}
245345

apps/webapp/app/v3/eventRepository.server.ts

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,8 @@ export type EventRepoConfig = {
111111
maxConcurrency?: number;
112112
maxBatchSize?: number;
113113
memoryPressureThreshold?: number;
114+
loadSheddingThreshold?: number;
115+
loadSheddingEnabled?: boolean;
114116
};
115117

116118
export type QueryOptions = Prisma.TaskEventWhereInput;
@@ -220,6 +222,12 @@ export class EventRepository {
220222
maxConcurrency: _config.maxConcurrency,
221223
maxBatchSize: _config.maxBatchSize,
222224
memoryPressureThreshold: _config.memoryPressureThreshold,
225+
loadSheddingThreshold: _config.loadSheddingThreshold,
226+
loadSheddingEnabled: _config.loadSheddingEnabled,
227+
isDroppableEvent: (event: CreatableEvent) => {
228+
// Only drop LOG events during load shedding
229+
return event.kind === TaskEventKind.LOG;
230+
},
223231
});
224232

225233
this._redisPublishClient = createRedisClient("trigger:eventRepoPublisher", this._config.redis);
@@ -1340,6 +1348,8 @@ function initializeEventRepo() {
13401348
maxConcurrency: env.EVENTS_MAX_CONCURRENCY,
13411349
maxBatchSize: env.EVENTS_MAX_BATCH_SIZE,
13421350
memoryPressureThreshold: env.EVENTS_MEMORY_PRESSURE_THRESHOLD,
1351+
loadSheddingThreshold: env.EVENTS_LOAD_SHEDDING_THRESHOLD,
1352+
loadSheddingEnabled: env.EVENTS_LOAD_SHEDDING_ENABLED,
13431353
redis: {
13441354
port: env.PUBSUB_REDIS_PORT,
13451355
host: env.PUBSUB_REDIS_HOST,
@@ -1400,6 +1410,26 @@ function initializeEventRepo() {
14001410
registers: [metricsRegister],
14011411
});
14021412

1413+
new Gauge({
1414+
name: "event_flush_scheduler_dropped_events",
1415+
help: "Total number of events dropped due to load shedding",
1416+
collect() {
1417+
const status = repo.flushSchedulerStatus;
1418+
this.set(status.metrics.droppedEvents);
1419+
},
1420+
registers: [metricsRegister],
1421+
});
1422+
1423+
new Gauge({
1424+
name: "event_flush_scheduler_is_load_shedding",
1425+
help: "Whether load shedding is currently active (1 = active, 0 = inactive)",
1426+
collect() {
1427+
const status = repo.flushSchedulerStatus;
1428+
this.set(status.isLoadShedding ? 1 : 0);
1429+
},
1430+
registers: [metricsRegister],
1431+
});
1432+
14031433
return repo;
14041434
}
14051435

0 commit comments

Comments
 (0)