diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index b72857e04f..eba67df32d 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -252,6 +252,12 @@ const EnvironmentSchema = z.object({ EVENTS_BATCH_SIZE: z.coerce.number().int().default(100), EVENTS_BATCH_INTERVAL: z.coerce.number().int().default(1000), EVENTS_DEFAULT_LOG_RETENTION: z.coerce.number().int().default(7), + EVENTS_MIN_CONCURRENCY: z.coerce.number().int().default(1), + EVENTS_MAX_CONCURRENCY: z.coerce.number().int().default(10), + EVENTS_MAX_BATCH_SIZE: z.coerce.number().int().default(500), + EVENTS_MEMORY_PRESSURE_THRESHOLD: z.coerce.number().int().default(5000), + EVENTS_LOAD_SHEDDING_THRESHOLD: z.coerce.number().int().default(100000), + EVENTS_LOAD_SHEDDING_ENABLED: z.string().default("1"), SHARED_QUEUE_CONSUMER_POOL_SIZE: z.coerce.number().int().default(10), SHARED_QUEUE_CONSUMER_INTERVAL_MS: z.coerce.number().int().default(100), SHARED_QUEUE_CONSUMER_NEXT_TICK_INTERVAL_MS: z.coerce.number().int().default(100), diff --git a/apps/webapp/app/v3/dynamicFlushScheduler.server.ts b/apps/webapp/app/v3/dynamicFlushScheduler.server.ts index f87c2a143b..88e6a10248 100644 --- a/apps/webapp/app/v3/dynamicFlushScheduler.server.ts +++ b/apps/webapp/app/v3/dynamicFlushScheduler.server.ts @@ -1,38 +1,140 @@ +import { Logger } from "@trigger.dev/core/logger"; import { nanoid } from "nanoid"; +import pLimit from "p-limit"; export type DynamicFlushSchedulerConfig = { batchSize: number; flushInterval: number; callback: (flushId: string, batch: T[]) => Promise; + // New configuration options + minConcurrency?: number; + maxConcurrency?: number; + maxBatchSize?: number; + memoryPressureThreshold?: number; // Number of items that triggers increased concurrency + loadSheddingThreshold?: number; // Number of items that triggers load shedding + loadSheddingEnabled?: boolean; + isDroppableEvent?: (item: T) => boolean; // Function to determine if an event can be dropped }; export class DynamicFlushScheduler { - private batchQueue: T[][]; // Adjust the type according to your data structure - private currentBatch: T[]; // Adjust the type according to your data structure + private batchQueue: T[][]; + private currentBatch: T[]; private readonly BATCH_SIZE: number; private readonly FLUSH_INTERVAL: number; private flushTimer: NodeJS.Timeout | null; private readonly callback: (flushId: string, batch: T[]) => Promise; + // New properties for dynamic scaling + private readonly minConcurrency: number; + private readonly maxConcurrency: number; + private readonly maxBatchSize: number; + private readonly memoryPressureThreshold: number; + private limiter: ReturnType; + private currentBatchSize: number; + private totalQueuedItems: number = 0; + private consecutiveFlushFailures: number = 0; + private lastFlushTime: number = Date.now(); + private metrics = { + flushedBatches: 0, + failedBatches: 0, + totalItemsFlushed: 0, + droppedEvents: 0, + droppedEventsByKind: new Map(), + }; + + // New properties for load shedding + private readonly loadSheddingThreshold: number; + private readonly loadSheddingEnabled: boolean; + private readonly isDroppableEvent?: (item: T) => boolean; + private isLoadShedding: boolean = false; + + private readonly logger: Logger = new Logger("EventRepo.DynamicFlushScheduler", "debug"); + constructor(config: DynamicFlushSchedulerConfig) { this.batchQueue = []; this.currentBatch = []; this.BATCH_SIZE = config.batchSize; + this.currentBatchSize = config.batchSize; this.FLUSH_INTERVAL = config.flushInterval; this.callback = config.callback; this.flushTimer = null; + + // Initialize dynamic scaling parameters + this.minConcurrency = config.minConcurrency ?? 1; + this.maxConcurrency = config.maxConcurrency ?? 10; + this.maxBatchSize = config.maxBatchSize ?? config.batchSize * 5; + this.memoryPressureThreshold = config.memoryPressureThreshold ?? config.batchSize * 20; + + // Initialize load shedding parameters + this.loadSheddingThreshold = config.loadSheddingThreshold ?? config.batchSize * 50; + this.loadSheddingEnabled = config.loadSheddingEnabled ?? true; + this.isDroppableEvent = config.isDroppableEvent; + + // Start with minimum concurrency + this.limiter = pLimit(this.minConcurrency); + this.startFlushTimer(); + this.startMetricsReporter(); } addToBatch(items: T[]): void { - this.currentBatch.push(...items); + let itemsToAdd = items; + + // Apply load shedding if enabled and we're over the threshold + if (this.loadSheddingEnabled && this.totalQueuedItems >= this.loadSheddingThreshold) { + const { kept, dropped } = this.applyLoadShedding(items); + itemsToAdd = kept; + + if (dropped.length > 0) { + this.metrics.droppedEvents += dropped.length; - if (this.currentBatch.length >= this.BATCH_SIZE) { - this.batchQueue.push(this.currentBatch); - this.currentBatch = []; - this.flushNextBatch(); - this.resetFlushTimer(); + // Track dropped events by kind if possible + dropped.forEach((item) => { + const kind = this.getEventKind(item); + if (kind) { + const currentCount = this.metrics.droppedEventsByKind.get(kind) || 0; + this.metrics.droppedEventsByKind.set(kind, currentCount + 1); + } + }); + + if (!this.isLoadShedding) { + this.isLoadShedding = true; + } + + this.logger.warn("Load shedding", { + totalQueuedItems: this.totalQueuedItems, + threshold: this.loadSheddingThreshold, + droppedCount: dropped.length, + }); + } + } else if (this.isLoadShedding && this.totalQueuedItems < this.loadSheddingThreshold * 0.8) { + this.isLoadShedding = false; + this.logger.info("Load shedding deactivated", { + totalQueuedItems: this.totalQueuedItems, + threshold: this.loadSheddingThreshold, + totalDropped: this.metrics.droppedEvents, + }); } + + this.currentBatch.push(...itemsToAdd); + this.totalQueuedItems += itemsToAdd.length; + + // Check if we need to create a batch + if (this.currentBatch.length >= this.currentBatchSize) { + this.createBatch(); + } + + // Adjust concurrency based on queue pressure + this.adjustConcurrency(); + } + + private createBatch(): void { + if (this.currentBatch.length === 0) return; + + this.batchQueue.push(this.currentBatch); + this.currentBatch = []; + this.flushBatches(); + this.resetFlushTimer(); } private startFlushTimer(): void { @@ -48,23 +150,224 @@ export class DynamicFlushScheduler { private checkAndFlush(): void { if (this.currentBatch.length > 0) { - this.batchQueue.push(this.currentBatch); - this.currentBatch = []; + this.createBatch(); } - this.flushNextBatch(); + this.flushBatches(); } - private async flushNextBatch(): Promise { - if (this.batchQueue.length === 0) return; + private async flushBatches(): Promise { + const batchesToFlush: T[][] = []; + + // Dequeue all available batches up to current concurrency limit + while (this.batchQueue.length > 0 && batchesToFlush.length < this.limiter.concurrency) { + const batch = this.batchQueue.shift(); + if (batch) { + batchesToFlush.push(batch); + } + } + + if (batchesToFlush.length === 0) return; + + // Schedule all batches for concurrent processing + const flushPromises = batchesToFlush.map((batch) => + this.limiter(async () => { + const flushId = nanoid(); + const itemCount = batch.length; + + try { + const startTime = Date.now(); + await this.callback(flushId, batch); + + const duration = Date.now() - startTime; + this.totalQueuedItems -= itemCount; + this.consecutiveFlushFailures = 0; + this.lastFlushTime = Date.now(); + this.metrics.flushedBatches++; + this.metrics.totalItemsFlushed += itemCount; - const batchToFlush = this.batchQueue.shift(); - try { - await this.callback(nanoid(), batchToFlush!); + this.logger.debug("Batch flushed successfully", { + flushId, + itemCount, + duration, + remainingQueueDepth: this.totalQueuedItems, + activeConcurrency: this.limiter.activeCount, + pendingConcurrency: this.limiter.pendingCount, + }); + } catch (error) { + this.consecutiveFlushFailures++; + this.metrics.failedBatches++; + + this.logger.error("Error flushing batch", { + flushId, + itemCount, + error, + consecutiveFailures: this.consecutiveFlushFailures, + }); + + // Re-queue the batch at the front if it fails + this.batchQueue.unshift(batch); + this.totalQueuedItems += itemCount; + + // Back off on failures + if (this.consecutiveFlushFailures > 3) { + this.adjustConcurrency(true); + } + } + }) + ); + + // Don't await here - let them run concurrently + Promise.allSettled(flushPromises).then(() => { + // After flush completes, check if we need to flush more if (this.batchQueue.length > 0) { - this.flushNextBatch(); + this.flushBatches(); + } + }); + } + + private lastConcurrencyAdjustment: number = Date.now(); + + private adjustConcurrency(backOff: boolean = false): void { + const currentConcurrency = this.limiter.concurrency; + let newConcurrency = currentConcurrency; + + // Calculate pressure metrics - moved outside the if/else block + const queuePressure = this.totalQueuedItems / this.memoryPressureThreshold; + const timeSinceLastFlush = Date.now() - this.lastFlushTime; + const timeSinceLastAdjustment = Date.now() - this.lastConcurrencyAdjustment; + + // Don't adjust too frequently (except for backoff) + if (!backOff && timeSinceLastAdjustment < 1000) { + return; + } + + if (backOff) { + // Reduce concurrency on failures + newConcurrency = Math.max(this.minConcurrency, Math.floor(currentConcurrency * 0.75)); + } else { + if (queuePressure > 0.8 || timeSinceLastFlush > this.FLUSH_INTERVAL * 2) { + // High pressure - increase concurrency + newConcurrency = Math.min(this.maxConcurrency, currentConcurrency + 2); + } else if (queuePressure < 0.2 && currentConcurrency > this.minConcurrency) { + // Low pressure - decrease concurrency + newConcurrency = Math.max(this.minConcurrency, currentConcurrency - 1); + } + } + + // Adjust batch size based on pressure + if (this.totalQueuedItems > this.memoryPressureThreshold) { + this.currentBatchSize = Math.min( + this.maxBatchSize, + Math.floor(this.BATCH_SIZE * (1 + queuePressure)) + ); + } else { + this.currentBatchSize = this.BATCH_SIZE; + } + + // Update concurrency if changed + if (newConcurrency !== currentConcurrency) { + this.limiter = pLimit(newConcurrency); + + this.logger.info("Adjusted flush concurrency", { + previousConcurrency: currentConcurrency, + newConcurrency, + queuePressure, + totalQueuedItems: this.totalQueuedItems, + currentBatchSize: this.currentBatchSize, + memoryPressureThreshold: this.memoryPressureThreshold, + }); + } + } + + private startMetricsReporter(): void { + // Report metrics every 30 seconds + setInterval(() => { + const droppedByKind: Record = {}; + this.metrics.droppedEventsByKind.forEach((count, kind) => { + droppedByKind[kind] = count; + }); + + this.logger.info("DynamicFlushScheduler metrics", { + totalQueuedItems: this.totalQueuedItems, + batchQueueLength: this.batchQueue.length, + currentBatchLength: this.currentBatch.length, + currentConcurrency: this.limiter.concurrency, + activeConcurrent: this.limiter.activeCount, + pendingConcurrent: this.limiter.pendingCount, + currentBatchSize: this.currentBatchSize, + isLoadShedding: this.isLoadShedding, + metrics: { + ...this.metrics, + droppedByKind, + }, + }); + }, 30000); + } + + private applyLoadShedding(items: T[]): { kept: T[]; dropped: T[] } { + if (!this.isDroppableEvent) { + // If no function provided to determine droppable events, keep all + return { kept: items, dropped: [] }; + } + + const kept: T[] = []; + const dropped: T[] = []; + + for (const item of items) { + if (this.isDroppableEvent(item)) { + dropped.push(item); + } else { + kept.push(item); } - } catch (error) { - console.error("Error inserting batch:", error); + } + + return { kept, dropped }; + } + + private getEventKind(item: T): string | undefined { + // Try to extract the kind from the event if it has one + if (item && typeof item === "object" && "kind" in item) { + return String(item.kind); + } + return undefined; + } + + // Method to get current status + getStatus() { + const droppedByKind: Record = {}; + this.metrics.droppedEventsByKind.forEach((count, kind) => { + droppedByKind[kind] = count; + }); + + return { + queuedItems: this.totalQueuedItems, + batchQueueLength: this.batchQueue.length, + currentBatchSize: this.currentBatch.length, + concurrency: this.limiter.concurrency, + activeFlushes: this.limiter.activeCount, + pendingFlushes: this.limiter.pendingCount, + isLoadShedding: this.isLoadShedding, + metrics: { + ...this.metrics, + droppedEventsByKind: droppedByKind, + }, + }; + } + + // Graceful shutdown + async shutdown(): Promise { + if (this.flushTimer) { + clearInterval(this.flushTimer); + } + + // Flush any remaining items + if (this.currentBatch.length > 0) { + this.createBatch(); + } + + // Wait for all pending flushes to complete + while (this.batchQueue.length > 0 || this.limiter.activeCount > 0) { + await new Promise((resolve) => setTimeout(resolve, 100)); } } -} +} \ No newline at end of file diff --git a/apps/webapp/app/v3/eventRepository.server.ts b/apps/webapp/app/v3/eventRepository.server.ts index cfca700209..1fb88e969f 100644 --- a/apps/webapp/app/v3/eventRepository.server.ts +++ b/apps/webapp/app/v3/eventRepository.server.ts @@ -107,6 +107,12 @@ export type EventRepoConfig = { retentionInDays: number; partitioningEnabled: boolean; tracer?: Tracer; + minConcurrency?: number; + maxConcurrency?: number; + maxBatchSize?: number; + memoryPressureThreshold?: number; + loadSheddingThreshold?: number; + loadSheddingEnabled?: boolean; }; export type QueryOptions = Prisma.TaskEventWhereInput; @@ -199,6 +205,10 @@ export class EventRepository { return this._subscriberCount; } + get flushSchedulerStatus() { + return this._flushScheduler.getStatus(); + } + constructor( db: PrismaClient = prisma, readReplica: PrismaReplicaClient = $replica, @@ -208,6 +218,16 @@ export class EventRepository { batchSize: _config.batchSize, flushInterval: _config.batchInterval, callback: this.#flushBatch.bind(this), + minConcurrency: _config.minConcurrency, + maxConcurrency: _config.maxConcurrency, + maxBatchSize: _config.maxBatchSize, + memoryPressureThreshold: _config.memoryPressureThreshold, + loadSheddingThreshold: _config.loadSheddingThreshold, + loadSheddingEnabled: _config.loadSheddingEnabled, + isDroppableEvent: (event: CreatableEvent) => { + // Only drop LOG events during load shedding + return event.kind === TaskEventKind.LOG; + }, }); this._redisPublishClient = createRedisClient("trigger:eventRepoPublisher", this._config.redis); @@ -1324,6 +1344,12 @@ function initializeEventRepo() { batchInterval: env.EVENTS_BATCH_INTERVAL, retentionInDays: env.EVENTS_DEFAULT_LOG_RETENTION, partitioningEnabled: env.TASK_EVENT_PARTITIONING_ENABLED === "1", + minConcurrency: env.EVENTS_MIN_CONCURRENCY, + maxConcurrency: env.EVENTS_MAX_CONCURRENCY, + maxBatchSize: env.EVENTS_MAX_BATCH_SIZE, + memoryPressureThreshold: env.EVENTS_MEMORY_PRESSURE_THRESHOLD, + loadSheddingThreshold: env.EVENTS_LOAD_SHEDDING_THRESHOLD, + loadSheddingEnabled: env.EVENTS_LOAD_SHEDDING_ENABLED === "1", redis: { port: env.PUBSUB_REDIS_PORT, host: env.PUBSUB_REDIS_HOST, @@ -1343,6 +1369,67 @@ function initializeEventRepo() { registers: [metricsRegister], }); + // Add metrics for flush scheduler + new Gauge({ + name: "event_flush_scheduler_queued_items", + help: "Total number of items queued in the flush scheduler", + collect() { + const status = repo.flushSchedulerStatus; + this.set(status.queuedItems); + }, + registers: [metricsRegister], + }); + + new Gauge({ + name: "event_flush_scheduler_batch_queue_length", + help: "Number of batches waiting to be flushed", + collect() { + const status = repo.flushSchedulerStatus; + this.set(status.batchQueueLength); + }, + registers: [metricsRegister], + }); + + new Gauge({ + name: "event_flush_scheduler_concurrency", + help: "Current concurrency level of the flush scheduler", + collect() { + const status = repo.flushSchedulerStatus; + this.set(status.concurrency); + }, + registers: [metricsRegister], + }); + + new Gauge({ + name: "event_flush_scheduler_active_flushes", + help: "Number of active flush operations", + collect() { + const status = repo.flushSchedulerStatus; + this.set(status.activeFlushes); + }, + registers: [metricsRegister], + }); + + new Gauge({ + name: "event_flush_scheduler_dropped_events", + help: "Total number of events dropped due to load shedding", + collect() { + const status = repo.flushSchedulerStatus; + this.set(status.metrics.droppedEvents); + }, + registers: [metricsRegister], + }); + + new Gauge({ + name: "event_flush_scheduler_is_load_shedding", + help: "Whether load shedding is currently active (1 = active, 0 = inactive)", + collect() { + const status = repo.flushSchedulerStatus; + this.set(status.isLoadShedding ? 1 : 0); + }, + registers: [metricsRegister], + }); + return repo; } diff --git a/references/hello-world/src/trigger/example.ts b/references/hello-world/src/trigger/example.ts index 6d1622df28..8759953d28 100644 --- a/references/hello-world/src/trigger/example.ts +++ b/references/hello-world/src/trigger/example.ts @@ -373,3 +373,68 @@ export const largeAttributesTask = task({ }); }, }); + +export const lotsOfLogsParentTask = task({ + id: "lots-of-logs-parent", + run: async (payload: { count: number }, { ctx }) => { + logger.info("Hello, world from the lots of logs parent task", { count: payload.count }); + await lotsOfLogsTask.batchTriggerAndWait( + Array.from({ length: 20 }, (_, i) => ({ + payload: { count: payload.count }, + })) + ); + }, +}); + +export const lotsOfLogsTask = task({ + id: "lots-of-logs", + run: async (payload: { count: number }, { ctx }) => { + logger.info("Hello, world from the lots of logs task", { count: payload.count }); + + for (let i = 0; i < payload.count; i++) { + logger.info("Hello, world from the lots of logs task", { count: i }); + } + + await setTimeout(1000); + + for (let i = 0; i < payload.count; i++) { + logger.info("Hello, world from the lots of logs task", { count: i }); + } + + await setTimeout(1000); + + for (let i = 0; i < payload.count; i++) { + logger.info("Hello, world from the lots of logs task", { count: i }); + } + + await setTimeout(1000); + + for (let i = 0; i < payload.count; i++) { + logger.info("Hello, world from the lots of logs task", { count: i }); + } + + await setTimeout(1000); + + for (let i = 0; i < payload.count; i++) { + logger.info("Hello, world from the lots of logs task", { count: i }); + } + + await setTimeout(1000); + + for (let i = 0; i < payload.count; i++) { + logger.info("Hello, world from the lots of logs task", { count: i }); + } + + await setTimeout(1000); + + for (let i = 0; i < payload.count; i++) { + logger.info("Hello, world from the lots of logs task", { count: i }); + } + + await setTimeout(1000); + + for (let i = 0; i < payload.count; i++) { + logger.info("Hello, world from the lots of logs task", { count: i }); + } + }, +});