Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions apps/webapp/app/eventLoopMonitor.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { env } from "./env.server";
import { context, Context } from "@opentelemetry/api";
import { performance } from "node:perf_hooks";
import { logger } from "./services/logger.server";
import { signalsEmitter } from "./services/signals.server";

const THRESHOLD_NS = env.EVENT_LOOP_MONITOR_THRESHOLD_MS * 1e6;

Expand Down Expand Up @@ -110,6 +111,13 @@ function startEventLoopUtilizationMonitoring() {
lastEventLoopUtilization = currentEventLoopUtilization;
}, env.EVENT_LOOP_MONITOR_UTILIZATION_INTERVAL_MS);

signalsEmitter.on("SIGTERM", () => {
clearInterval(interval);
});
signalsEmitter.on("SIGINT", () => {
clearInterval(interval);
});

return () => {
clearInterval(interval);
};
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { AuthenticatedEnvironment } from "../apiAuth.server";
import { logger } from "../logger.server";
import { signalsEmitter } from "../signals.server";
import { StreamIngestor, StreamResponder } from "./types";
import { LineTransformStream } from "./utils.server";
import { v1RealtimeStreams } from "./v1StreamsGlobal.server";
Expand Down Expand Up @@ -243,12 +244,17 @@ export class RelayRealtimeStreams implements StreamIngestor, StreamResponder {
}

function initializeRelayRealtimeStreams() {
return new RelayRealtimeStreams({
const service = new RelayRealtimeStreams({
ttl: 1000 * 60 * 5, // 5 minutes
cleanupInterval: 1000 * 60, // 1 minute
fallbackIngestor: v1RealtimeStreams,
fallbackResponder: v1RealtimeStreams,
});

signalsEmitter.on("SIGTERM", service.close.bind(service));
signalsEmitter.on("SIGINT", service.close.bind(service));

return service;
}

export const relayRealtimeStreams = singleton(
Expand Down
6 changes: 3 additions & 3 deletions apps/webapp/app/services/runsReplicationInstance.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ import invariant from "tiny-invariant";
import { env } from "~/env.server";
import { singleton } from "~/utils/singleton";
import { provider } from "~/v3/tracer.server";
import { logger } from "./logger.server";
import { RunsReplicationService } from "./runsReplicationService.server";
import { signalsEmitter } from "./signals.server";

export const runsReplicationInstance = singleton(
"runsReplicationInstance",
Expand Down Expand Up @@ -80,8 +80,8 @@ function initializeRunsReplicationInstance() {
});
});

process.on("SIGTERM", service.shutdown.bind(service));
process.on("SIGINT", service.shutdown.bind(service));
signalsEmitter.on("SIGTERM", service.shutdown.bind(service));
signalsEmitter.on("SIGINT", service.shutdown.bind(service));
}

return service;
Expand Down
32 changes: 32 additions & 0 deletions apps/webapp/app/services/signals.server.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import { EventEmitter } from "events";
import { singleton } from "~/utils/singleton";

export type SignalsEvents = {
SIGTERM: [
{
time: Date;
signal: NodeJS.Signals;
}
];
SIGINT: [
{
time: Date;
signal: NodeJS.Signals;
}
];
};

export type SignalsEventArgs<T extends keyof SignalsEvents> = SignalsEvents[T];

export type SignalsEmitter = EventEmitter<SignalsEvents>;

function initializeSignalsEmitter() {
const emitter = new EventEmitter<SignalsEvents>();

process.on("SIGTERM", () => emitter.emit("SIGTERM", { time: new Date(), signal: "SIGTERM" }));
process.on("SIGINT", () => emitter.emit("SIGINT", { time: new Date(), signal: "SIGINT" }));

return emitter;
}

export const signalsEmitter = singleton("signalsEmitter", initializeSignalsEmitter);
28 changes: 24 additions & 4 deletions apps/webapp/app/v3/dynamicFlushScheduler.server.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { Logger } from "@trigger.dev/core/logger";
import { nanoid } from "nanoid";
import pLimit from "p-limit";
import { signalsEmitter } from "~/services/signals.server";

export type DynamicFlushSchedulerConfig<T> = {
batchSize: number;
Expand All @@ -22,6 +23,7 @@ export class DynamicFlushScheduler<T> {
private readonly BATCH_SIZE: number;
private readonly FLUSH_INTERVAL: number;
private flushTimer: NodeJS.Timeout | null;
private metricsReporterTimer: NodeJS.Timeout | undefined;
private readonly callback: (flushId: string, batch: T[]) => Promise<void>;

// New properties for dynamic scaling
Expand All @@ -41,6 +43,7 @@ export class DynamicFlushScheduler<T> {
droppedEvents: 0,
droppedEventsByKind: new Map<string, number>(),
};
private isShuttingDown: boolean = false;

// New properties for load shedding
private readonly loadSheddingThreshold: number;
Expand Down Expand Up @@ -75,6 +78,7 @@ export class DynamicFlushScheduler<T> {

this.startFlushTimer();
this.startMetricsReporter();
this.setupShutdownHandlers();
}

addToBatch(items: T[]): void {
Expand Down Expand Up @@ -119,8 +123,8 @@ export class DynamicFlushScheduler<T> {
this.currentBatch.push(...itemsToAdd);
this.totalQueuedItems += itemsToAdd.length;

// Check if we need to create a batch
if (this.currentBatch.length >= this.currentBatchSize) {
// Check if we need to create a batch (if we are shutting down, create a batch immediately because the flush timer is stopped)
if (this.currentBatch.length >= this.currentBatchSize || this.isShuttingDown) {
this.createBatch();
}

Expand All @@ -137,6 +141,11 @@ export class DynamicFlushScheduler<T> {
this.resetFlushTimer();
}

private setupShutdownHandlers(): void {
signalsEmitter.on("SIGTERM", () => this.shutdown());
signalsEmitter.on("SIGINT", () => this.shutdown());
}

private startFlushTimer(): void {
this.flushTimer = setInterval(() => this.checkAndFlush(), this.FLUSH_INTERVAL);
}
Expand All @@ -145,6 +154,9 @@ export class DynamicFlushScheduler<T> {
if (this.flushTimer) {
clearInterval(this.flushTimer);
}

if (this.isShuttingDown) return;

this.startFlushTimer();
}

Expand Down Expand Up @@ -226,7 +238,7 @@ export class DynamicFlushScheduler<T> {
}

private lastConcurrencyAdjustment: number = Date.now();

private adjustConcurrency(backOff: boolean = false): void {
const currentConcurrency = this.limiter.concurrency;
let newConcurrency = currentConcurrency;
Expand Down Expand Up @@ -281,7 +293,7 @@ export class DynamicFlushScheduler<T> {

private startMetricsReporter(): void {
// Report metrics every 30 seconds
setInterval(() => {
this.metricsReporterTimer = setInterval(() => {
const droppedByKind: Record<string, number> = {};
this.metrics.droppedEventsByKind.forEach((count, kind) => {
droppedByKind[kind] = count;
Expand Down Expand Up @@ -356,10 +368,18 @@ export class DynamicFlushScheduler<T> {

// Graceful shutdown
async shutdown(): Promise<void> {
if (this.isShuttingDown) return;

this.isShuttingDown = true;

if (this.flushTimer) {
clearInterval(this.flushTimer);
}

if (this.metricsReporterTimer) {
clearInterval(this.metricsReporterTimer);
}

// Flush any remaining items
if (this.currentBatch.length > 0) {
this.createBatch();
Expand Down
5 changes: 3 additions & 2 deletions apps/webapp/app/v3/marqs/index.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import z from "zod";
import { env } from "~/env.server";
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
import { logger } from "~/services/logger.server";
import { signalsEmitter } from "~/services/signals.server";
import { singleton } from "~/utils/singleton";
import { legacyRunEngineWorker } from "../legacyRunEngineWorker.server";
import { concurrencyTracker } from "../services/taskRunConcurrencyTracker.server";
Expand Down Expand Up @@ -151,8 +152,8 @@ export class MarQS {
}

#setupShutdownHandlers() {
process.on("SIGTERM", () => this.shutdown("SIGTERM"));
process.on("SIGINT", () => this.shutdown("SIGINT"));
signalsEmitter.on("SIGTERM", () => this.shutdown("SIGTERM"));
signalsEmitter.on("SIGINT", () => this.shutdown("SIGINT"));
}

async shutdown(signal: NodeJS.Signals) {
Expand Down
33 changes: 7 additions & 26 deletions apps/webapp/app/v3/tracing.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,33 +41,14 @@ export async function startSpanWithEnv<T>(
fn: (span: Span) => Promise<T>,
options?: SpanOptions
): Promise<T> {
return startSpan(
tracer,
name,
async (span) => {
try {
return await fn(span);
} catch (e) {
if (e instanceof Error) {
span.recordException(e);
} else {
span.recordException(new Error(String(e)));
}

throw e;
} finally {
span.end();
}
return startSpan(tracer, name, fn, {
attributes: {
...attributesFromAuthenticatedEnv(env),
...options?.attributes,
},
{
attributes: {
...attributesFromAuthenticatedEnv(env),
...options?.attributes,
},
kind: SpanKind.SERVER,
...options,
}
);
kind: SpanKind.SERVER,
...options,
});
}

export async function emitDebugLog(
Expand Down
Loading
Loading