Skip to content
154 changes: 132 additions & 22 deletions apps/webapp/app/v3/dynamicFlushScheduler.server.ts
Original file line number Diff line number Diff line change
@@ -1,70 +1,180 @@
import { nanoid } from "nanoid";
import pLimit from "p-limit";
import { Gauge } from "prom-client";
import { metricsRegister } from "~/metrics.server";
import { logger } from "~/services/logger.server";

export type DynamicFlushSchedulerConfig<T> = {
batchSize: number;
flushInterval: number;
maxConcurrency?: number;
callback: (flushId: string, batch: T[]) => Promise<void>;
};

export class DynamicFlushScheduler<T> {
private batchQueue: T[][]; // Adjust the type according to your data structure
private currentBatch: T[]; // Adjust the type according to your data structure
private readonly BATCH_SIZE: number;
private readonly FLUSH_INTERVAL: number;
private readonly MAX_CONCURRENCY: number;
private readonly concurrencyLimiter: ReturnType<typeof pLimit>;
private flushTimer: NodeJS.Timeout | null;
private readonly callback: (flushId: string, batch: T[]) => Promise<void>;
private isShuttingDown;
private failedBatchCount;

constructor(config: DynamicFlushSchedulerConfig<T>) {
this.batchQueue = [];
this.currentBatch = [];
this.BATCH_SIZE = config.batchSize;
this.FLUSH_INTERVAL = config.flushInterval;
this.callback = config.callback;
this.MAX_CONCURRENCY = config.maxConcurrency || 1;
this.concurrencyLimiter = pLimit(this.MAX_CONCURRENCY);
this.flushTimer = null;
this.callback = config.callback;
this.isShuttingDown = false;
this.failedBatchCount = 0;

logger.info("Initializing DynamicFlushScheduler", {
batchSize: this.BATCH_SIZE,
flushInterval: this.FLUSH_INTERVAL,
maxConcurrency: this.MAX_CONCURRENCY,
});

this.startFlushTimer();
this.setupShutdownHandlers();

if (!process.env.VITEST) {
const scheduler = this;
new Gauge({
name: "dynamic_flush_scheduler_batch_size",
help: "Number of items in the current dynamic flush scheduler batch",
collect() {
this.set(scheduler.currentBatch.length);
},
registers: [metricsRegister],
});

new Gauge({
name: "dynamic_flush_scheduler_failed_batches",
help: "Number of failed batches",
collect() {
this.set(scheduler.failedBatchCount);
},
registers: [metricsRegister],
});
}
}

addToBatch(items: T[]): void {
/**
*
* If you want to fire and forget, don't await this method.
*/
async addToBatch(items: T[]): Promise<void> {
// TODO: consider using concat. spread is not performant
this.currentBatch.push(...items);
logger.debug("Adding items to batch", {
currentBatchSize: this.currentBatch.length,
itemsAdded: items.length,
});

if (this.currentBatch.length >= this.BATCH_SIZE) {
this.batchQueue.push(this.currentBatch);
this.currentBatch = [];
this.flushNextBatch();
logger.debug("Batch size threshold reached, initiating flush", {
batchSize: this.BATCH_SIZE,
currentSize: this.currentBatch.length,
});
await this.flushNextBatch();
this.resetFlushTimer();
}
}

private startFlushTimer(): void {
this.flushTimer = setInterval(() => this.checkAndFlush(), this.FLUSH_INTERVAL);
logger.debug("Started flush timer", { interval: this.FLUSH_INTERVAL });
}

private resetFlushTimer(): void {
private setupShutdownHandlers() {
process.on("SIGTERM", this.shutdown.bind(this));
process.on("SIGINT", this.shutdown.bind(this));
logger.debug("Shutdown handlers configured");
}

private async shutdown(): Promise<void> {
if (this.isShuttingDown) return;
this.isShuttingDown = true;
logger.info("Initiating shutdown of dynamic flush scheduler", {
remainingItems: this.currentBatch.length,
});

await this.checkAndFlush();
this.clearTimer();

logger.info("Dynamic flush scheduler shutdown complete", {
totalFailedBatches: this.failedBatchCount,
});
}

private clearTimer(): void {
if (this.flushTimer) {
clearInterval(this.flushTimer);
logger.debug("Flush timer cleared");
}
}

private resetFlushTimer(): void {
this.clearTimer();
this.startFlushTimer();
logger.debug("Flush timer reset");
}

private checkAndFlush(): void {
private async checkAndFlush(): Promise<void> {
if (this.currentBatch.length > 0) {
this.batchQueue.push(this.currentBatch);
this.currentBatch = [];
logger.debug("Periodic flush check triggered", {
currentBatchSize: this.currentBatch.length,
});
await this.flushNextBatch();
}
this.flushNextBatch();
}

private async flushNextBatch(): Promise<void> {
if (this.batchQueue.length === 0) return;

const batchToFlush = this.batchQueue.shift();
try {
await this.callback(nanoid(), batchToFlush!);
if (this.batchQueue.length > 0) {
this.flushNextBatch();
}
} catch (error) {
console.error("Error inserting batch:", error);
if (this.currentBatch.length === 0) return;

const batches: T[][] = [];
while (this.currentBatch.length > 0) {
batches.push(this.currentBatch.splice(0, this.BATCH_SIZE));
}

logger.info("Starting batch flush", {
numberOfBatches: batches.length,
totalItems: batches.reduce((sum, batch) => sum + batch.length, 0),
});

// TODO: report plimit.activeCount and pLimit.pendingCount and pLimit.concurrency to /metrics
const promises = batches.map((batch) =>
this.concurrencyLimiter(async () => {
const batchId = nanoid();
try {
await this.callback(batchId, batch!);
} catch (error) {
logger.error("Error processing batch", {
batchId,
error,
batchSize: batch.length,
errorMessage: error instanceof Error ? error.message : "Unknown error",
});
throw error;
}
})
);

const results = await Promise.allSettled(promises);

const failedBatches = results.filter((result) => result.status === "rejected").length;
this.failedBatchCount += failedBatches;

logger.info("Batch flush complete", {
totalBatches: batches.length,
successfulBatches: batches.length - failedBatches,
failedBatches,
totalFailedBatches: this.failedBatchCount,
});
}
}
9 changes: 5 additions & 4 deletions apps/webapp/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -45,18 +45,19 @@
"@codemirror/view": "^6.5.0",
"@conform-to/react": "^0.6.1",
"@conform-to/zod": "^0.6.1",
"@depot/sdk-node": "^1.0.0",
"@depot/cli": "0.0.1-cli.2.80.0",
"@depot/sdk-node": "^1.0.0",
"@electric-sql/react": "^0.3.5",
"@headlessui/react": "^1.7.8",
"@heroicons/react": "^2.0.12",
"@internal/run-engine": "workspace:*",
"@internal/zod-worker": "workspace:*",
"@internal/redis": "workspace:*",
"@internal/redis-worker": "workspace:*",
"@internal/run-engine": "workspace:*",
"@internal/zod-worker": "workspace:*",
"@internationalized/date": "^3.5.1",
"@lezer/highlight": "^1.1.6",
"@opentelemetry/api": "1.9.0",
"@opentelemetry/api-logs": "0.52.1",
"@opentelemetry/core": "1.25.1",
"@opentelemetry/exporter-logs-otlp-http": "0.52.1",
"@opentelemetry/exporter-trace-otlp-http": "0.52.1",
Expand All @@ -69,7 +70,6 @@
"@opentelemetry/sdk-trace-base": "1.25.1",
"@opentelemetry/sdk-trace-node": "1.25.1",
"@opentelemetry/semantic-conventions": "1.25.1",
"@opentelemetry/api-logs": "0.52.1",
"@popperjs/core": "^2.11.8",
"@prisma/instrumentation": "^5.11.0",
"@radix-ui/react-alert-dialog": "^1.0.4",
Expand Down Expand Up @@ -145,6 +145,7 @@
"non.geist": "^1.0.2",
"ohash": "^1.1.3",
"openai": "^4.33.1",
"p-limit": "^6.2.0",
"parse-duration": "^1.1.0",
"posthog-js": "^1.93.3",
"posthog-node": "^3.1.3",
Expand Down
90 changes: 90 additions & 0 deletions apps/webapp/test/dynamicFlushScheduler.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import { describe, it, expect } from "vitest";
import { DynamicFlushScheduler } from "../app/v3/dynamicFlushScheduler.server";

describe("DynamicFlushScheduler", () => {
beforeEach(() => {
vi.useFakeTimers();
vi.clearAllMocks();
vi.resetAllMocks();
});

it("doesn't call callback when there are no items", () => {
const callback = vi.fn();
const dynamicFlushScheduler = new DynamicFlushScheduler({
batchSize: 3,
flushInterval: 5000,
callback,
});
dynamicFlushScheduler.addToBatch([]);

expect(callback).toBeCalledTimes(0);
});

it("calls callback once with batchSize items", async () => {
const callback = vi.fn();
const dynamicFlushScheduler = new DynamicFlushScheduler({
batchSize: 3,
flushInterval: 5000,
callback,
});
const items = [1, 2, 3];
await dynamicFlushScheduler.addToBatch(items);

expect(callback).toBeCalledTimes(1);
expect(callback).toBeCalledWith(expect.any(String), [1, 2, 3]);
});

it("calls callback when flush interval is reached", async () => {
const callback = vi.fn();
const dynamicFlushScheduler = new DynamicFlushScheduler({
batchSize: 100,
flushInterval: 3000,
callback,
});
const items = [1, 2, 3, 4, 5];
dynamicFlushScheduler.addToBatch(items);

await vi.advanceTimersByTimeAsync(3000);

expect(callback).toBeCalledTimes(1);
expect(callback).toBeCalledWith(expect.any(String), [1, 2, 3, 4, 5]);
});

it("calls callback multiple times with the correct batch size", async () => {
const callback = vi.fn();
const dynamicFlushScheduler = new DynamicFlushScheduler({
batchSize: 3,
flushInterval: 10000,
callback,
});
const items = [1, 2, 3, 4, 5, 6];
await dynamicFlushScheduler.addToBatch(items);

expect(callback).toHaveBeenCalledTimes(2);
expect(callback).toHaveBeenNthCalledWith(1, expect.any(String), [1, 2, 3]);
expect(callback).toHaveBeenNthCalledWith(2, expect.any(String), [4, 5, 6]);
});

it("handles SIGTERM signal correctly", async () => {
const callback = vi.fn();

const processOnMock = vi.fn();
process.on = processOnMock;

const dynamicFlushScheduler = new DynamicFlushScheduler({
batchSize: 10,
flushInterval: 5000,
callback,
});

const items = [1, 2, 3, 4, 5, 6];
await dynamicFlushScheduler.addToBatch(items);

const sigtermHandler = processOnMock.mock.calls.find((call) => call[0] === "SIGTERM")[1];

await sigtermHandler();

expect(callback).toHaveBeenCalled();
expect(callback).toHaveBeenCalledWith(expect.any(String), items);
});
});
3 changes: 3 additions & 0 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.