diff --git a/.changeset/alarm-scheduler-spinloop.md b/.changeset/alarm-scheduler-spinloop.md new file mode 100644 index 0000000..df2791a --- /dev/null +++ b/.changeset/alarm-scheduler-spinloop.md @@ -0,0 +1,9 @@ +--- +'@cloudflare/containers': patch +--- + +Fix a spinloop in the alarm scheduler that could saturate the Durable Object event loop and cause WebSocket upgrades to be canceled at 0ms wallclock while HTTP traffic to the same DO continued to succeed. The `alarm()` handler previously paired an in-memory `setTimeout` sleep with an unconditional `setAlarm(Date.now())` on exit. Any external call to `scheduleNextAlarm()` during the sleep resolved the internal Promise, and the handler's exit path would then overwrite the caller's future alarm with one scheduled for "now" — causing the runtime to refire the alarm immediately. Under load (for example, a `startAndWaitForPorts` retry loop or partysocket reconnect storm), this escalated into a ~300ms alarm cadence matching `INSTANCE_POLL_INTERVAL_MS`. + +The handler is now durable-by-default: it completes its work and re-arms the storage alarm to the earliest of the next scheduled task, `sleepAfter` expiration, or a 3-minute heartbeat, floored at 100ms. `scheduleNextAlarm()` is idempotent — concurrent callers converge on the earliest requested time instead of clobbering each other via the removed in-memory Promise/timeout coordination. + +No behavior change for activity renewal, connection handling, or `onStart`/`onStop` lifecycle hooks. diff --git a/jest.config.js b/jest.config.js index 50712e8..5de43da 100644 --- a/jest.config.js +++ b/jest.config.js @@ -1,13 +1,17 @@ module.exports = { preset: 'ts-jest', testEnvironment: 'node', - testMatch: ['**/tests/**/*.test.ts'], + testMatch: ['**/src/tests/**/*.test.ts'], moduleFileExtensions: ['ts', 'js', 'json'], + moduleNameMapper: { + '^cloudflare:workers$': '/src/tests/__mocks__/cloudflare-workers.ts' + }, transform: { '^.+\\.ts$': ['ts-jest', { tsconfig: 'tsconfig.json' }] }, + clearMocks: true, collectCoverage: true, coverageDirectory: 'coverage', coverageReporters: ['text', 'lcov'], - collectCoverageFrom: ['src/**/*.ts', '!src/**/*.d.ts'] -}; \ No newline at end of file + collectCoverageFrom: ['src/**/*.ts', '!src/**/*.d.ts', '!src/tests/**'] +}; diff --git a/src/lib/container.ts b/src/lib/container.ts index a29adb4..87fcae2 100644 --- a/src/lib/container.ts +++ b/src/lib/container.ts @@ -33,6 +33,9 @@ const OUTBOUND_CONFIGURATION_KEY = 'OUTBOUND_CONFIGURATION'; const MAX_ALARM_RETRIES = 3; const PING_TIMEOUT_MS = 5000; +const MIN_ALARM_REARM_MS = 100; // Floor for alarm re-arm times +const MAX_ALARM_REARM_MS = 3 * 60 * 1000; // Default heartbeat + const DEFAULT_SLEEP_AFTER = '10m'; // Default sleep after inactivity time const INSTANCE_POLL_INTERVAL_MS = 300; // Default interval for polling container state @@ -1842,10 +1845,6 @@ export class Container extends DurableObject { }) .finally(() => { this.monitorSetup = false; - if (this.timeout) { - if (this.resolve) this.resolve(); - clearTimeout(this.timeout); - } }); } @@ -1877,14 +1876,6 @@ export class Container extends DurableObject { return; } - // do not remove this, container DOs ALWAYS need an alarm right now. - // The only way for this DO to stop having alarms is: - // 1. The container is not running anymore. - // 2. Activity expired and it exits. - const prevAlarm = Date.now(); - await this.ctx.storage.setAlarm(prevAlarm); - await this.ctx.storage.sync(); - // Get all schedules that should be executed now const result = this.sql<{ id: string; @@ -1895,7 +1886,7 @@ export class Container extends DurableObject { }>` SELECT * FROM container_schedules; `; - let minTime = Date.now() + 3 * 60 * 1000; + let minTime = Date.now() + MAX_ALARM_REARM_MS; const now = Date.now() / 1000; // Process each due schedule @@ -1956,36 +1947,16 @@ export class Container extends DurableObject { await this.onActivityExpired(); // renewActivityTimeout makes sure we don't spam calls here this.renewActivityTimeout(); + await this.ctx.storage.setAlarm(Date.now() + MIN_ALARM_REARM_MS); return; } // Math.min(3m or maxTime, sleepTimeout) minTime = Math.min(minTimeFromSchedules, minTime, this.sleepAfterMs); - const timeout = Math.max(0, minTime - Date.now()); - - // await a sleep for maxTime to keep the DO alive for - // at least this long - await new Promise(resolve => { - this.resolve = resolve; - if (!this.container.running) { - resolve(); - return; - } - - this.timeout = setTimeout(() => { - resolve(); - }, timeout); - }); - - await this.ctx.storage.setAlarm(Date.now()); - - // we exit and we have another alarm, - // the next alarm is the one that decides if it should stop the loop. + const nextAlarm = Math.max(minTime, Date.now() + MIN_ALARM_REARM_MS); + await this.ctx.storage.setAlarm(nextAlarm); } - timeout?: ReturnType; - resolve?: () => void; - // synchronises container state with the container source of truth to process events private async syncPendingStoppedEvents() { const state = await this.state.getState(); @@ -2019,15 +1990,14 @@ export class Container extends DurableObject { } /** - * Schedule the next alarm based on upcoming tasks + * Schedule the next alarm based on upcoming tasks. Idempotent — no-ops + * if an alarm is already set to fire sooner than the requested time. */ public async scheduleNextAlarm(ms = 1000): Promise { - const nextTime = ms + Date.now(); - - // if not already set - if (this.timeout) { - if (this.resolve) this.resolve(); - clearTimeout(this.timeout); + const nextTime = Date.now() + Math.max(ms, MIN_ALARM_REARM_MS); + const existing = await this.ctx.storage.getAlarm(); + if (existing !== null && existing <= nextTime) { + return; } await this.ctx.storage.setAlarm(nextTime); diff --git a/src/tests/__mocks__/cloudflare-workers.ts b/src/tests/__mocks__/cloudflare-workers.ts new file mode 100644 index 0000000..8343e0b --- /dev/null +++ b/src/tests/__mocks__/cloudflare-workers.ts @@ -0,0 +1,24 @@ +/** + * Jest mock for the `cloudflare:workers` virtual module. Provides stub + * implementations of DurableObject and WorkerEntrypoint good enough to + * let Container / ContainerProxy class bodies compile and run under + * Node + ts-jest without a real workerd runtime. + */ + +export class DurableObject { + ctx: unknown; + env: Env; + constructor(ctx: unknown, env: Env) { + this.ctx = ctx; + this.env = env; + } +} + +export class WorkerEntrypoint { + ctx: unknown; + env: Env; + constructor(ctx: unknown, env: Env) { + this.ctx = ctx; + this.env = env; + } +} diff --git a/src/tests/alarm-scheduler.test.ts b/src/tests/alarm-scheduler.test.ts new file mode 100644 index 0000000..49f4be4 --- /dev/null +++ b/src/tests/alarm-scheduler.test.ts @@ -0,0 +1,310 @@ +import { Container } from '../lib/container'; + +jest.mock('node:async_hooks', () => ({ + AsyncLocalStorage: class MockAsyncLocalStorage { + getStore() { + return null; + } + run(_store: unknown, fn: Function) { + return fn(); + } + }, +})); + +function makeMockStorage() { + let alarmTs: number | null = null; + const kv = new Map(); + const rows: Array<{ id: string; callback: string; payload: string; type: string; time: number }> = []; + + const sqlExec = jest.fn((query: string) => { + if (/COUNT\(\*\)/i.test(query)) { + return [{ count: rows.length }]; + } + if (/SELECT \* FROM container_schedules/i.test(query)) { + return rows.slice(); + } + return []; + }); + + return { + alarmTs: () => alarmTs, + rowsRef: rows, + resetAlarmState: () => { + alarmTs = null; + }, + storage: { + setAlarm: jest.fn(async (ts: number) => { + alarmTs = ts; + }), + // Yield a microtask before returning so concurrent scheduleNextAlarm + // calls observe each other's setAlarm writes (approximates the DO + // input gate's per-invocation serialization). + getAlarm: jest.fn(async () => { + await Promise.resolve(); + return alarmTs; + }), + deleteAlarm: jest.fn(async () => { + alarmTs = null; + }), + sync: jest.fn(async () => undefined), + put: jest.fn(async (k: string, v: unknown) => { + kv.set(k, v); + }), + get: jest.fn(async (k: string) => kv.get(k)), + kv: { + get: jest.fn((k: string) => kv.get(k)), + put: jest.fn((k: string, v: unknown) => { + kv.set(k, v); + }), + delete: jest.fn((k: string) => kv.delete(k)), + }, + sql: { exec: sqlExec }, + }, + }; +} + +function makeMockCtx(storage: ReturnType['storage'], running: boolean) { + const stubFetcher = { fetch: jest.fn() } as unknown as Fetcher; + return { + storage, + blockConcurrencyWhile: jest.fn(async (fn: Function) => fn()), + container: { + running, + start: jest.fn(), + destroy: jest.fn(), + signal: jest.fn(), + monitor: jest.fn().mockReturnValue(Promise.resolve()), + getTcpPort: jest.fn().mockReturnValue({ + fetch: jest.fn().mockResolvedValue({ status: 200, body: 'ok' }), + }), + interceptOutboundHttp: jest.fn().mockResolvedValue(undefined), + interceptOutboundHttps: jest.fn().mockResolvedValue(undefined), + removeInterceptOutbound: jest.fn().mockResolvedValue(undefined), + }, + id: { toString: () => 'test-id' }, + // Required by applyOutboundInterception, which runs from the + // constructor's blockConcurrencyWhile when container.running is true. + exports: { + ContainerProxy: jest.fn(() => stubFetcher), + }, + }; +} + +/** Test subclass exposing the handful of private fields the tests need. */ +class TestContainer extends Container { + // Widen private `sleepAfterMs` for tests that need to force the + // sleepAfter window to a near-future time. + get sleepAfterMsField(): number { + return (this as unknown as { sleepAfterMs: number }).sleepAfterMs; + } + set sleepAfterMsField(value: number) { + (this as unknown as { sleepAfterMs: number }).sleepAfterMs = value; + } + // Widen private `inflightRequests` for tests simulating an open + // WebSocket / streaming response without the full containerFetch path. + get inflightRequestsField(): number { + return (this as unknown as { inflightRequests: number }).inflightRequests; + } + set inflightRequestsField(value: number) { + (this as unknown as { inflightRequests: number }).inflightRequests = value; + } +} + +async function makeContainer(opts: { running?: boolean } = {}) { + const mock = makeMockStorage(); + const ctx = makeMockCtx(mock.storage, opts.running ?? false); + const container = new TestContainer(ctx as unknown as DurableObjectState, {} as never); + container.defaultPort = 5555; + container.sleepAfter = '1h'; + container.renewActivityTimeout(); + // Drain the constructor's blockConcurrencyWhile callback before tests + // observe state, so initial scheduleNextAlarm + applyOutboundInterception + // have settled. + await container.applyOutboundInterceptionPromise; + mock.resetAlarmState(); + jest.clearAllMocks(); + return { container, ctx, mock }; +} + +describe('scheduleNextAlarm', () => { + test('sets the alarm when none exists', async () => { + const { container, mock } = await makeContainer(); + + await container.scheduleNextAlarm(1000); + + const stored = mock.alarmTs(); + expect(stored).not.toBeNull(); + expect(stored! - Date.now()).toBeGreaterThan(500); + expect(stored! - Date.now()).toBeLessThan(1500); + }); + + test('is idempotent — a later request does not push out a sooner alarm', async () => { + const { container, mock } = await makeContainer(); + + await container.scheduleNextAlarm(1000); + const first = mock.alarmTs()!; + + await container.scheduleNextAlarm(5000); + const second = mock.alarmTs()!; + + expect(second).toBe(first); + }); + + test('advances the alarm when an earlier time is requested', async () => { + const { container, mock } = await makeContainer(); + + await container.scheduleNextAlarm(5000); + const first = mock.alarmTs()!; + + await container.scheduleNextAlarm(500); + const second = mock.alarmTs()!; + + expect(second).toBeLessThan(first); + }); + + test('clamps sub-floor requests to MIN_ALARM_REARM_MS', async () => { + const { container, mock } = await makeContainer(); + + await container.scheduleNextAlarm(0); + const stored = mock.alarmTs()!; + expect(stored - Date.now()).toBeGreaterThanOrEqual(50); + }); + + test('concurrent callers settle on the earliest requested time', async () => { + const { container, mock } = await makeContainer(); + + await Promise.all([ + container.scheduleNextAlarm(5000), + container.scheduleNextAlarm(1000), + container.scheduleNextAlarm(3000), + ]); + + const stored = mock.alarmTs()!; + expect(stored - Date.now()).toBeLessThan(2000); + }); +}); + +describe('alarm() handler', () => { + test('re-arms for the default heartbeat when idle', async () => { + const { container, mock } = await makeContainer({ running: true }); + + await container.alarm(); + + const next = mock.alarmTs()!; + const delta = next - Date.now(); + expect(delta).toBeGreaterThan(1000); + expect(delta).toBeLessThanOrEqual(3 * 60 * 1000); + }); + + test('never re-arms for a time in the past', async () => { + const { container, mock } = await makeContainer({ running: true }); + + await container.alarm(); + + const next = mock.alarmTs()!; + expect(next).toBeGreaterThanOrEqual(Date.now()); + }); + + test('honors sleepAfterMs when sooner than the default heartbeat', async () => { + const { container, mock } = await makeContainer({ running: true }); + container.sleepAfterMsField = Date.now() + 30_000; + + await container.alarm(); + + const next = mock.alarmTs()!; + const delta = next - Date.now(); + expect(delta).toBeGreaterThan(25_000); + expect(delta).toBeLessThan(35_000); + }); + + test('deletes the alarm when the container is stopped and no schedules remain', async () => { + const { container, mock } = await makeContainer(); + + await container.alarm(); + + expect(mock.storage.deleteAlarm).toHaveBeenCalled(); + expect(mock.alarmTs()).toBeNull(); + }); + + test('re-arms to the next pending schedule when the container is stopped', async () => { + const { container, mock } = await makeContainer(); + mock.rowsRef.push({ + id: 'abc', + callback: 'noop', + payload: '{}', + type: 'scheduled', + time: Math.floor((Date.now() + 10_000) / 1000), + }); + + await container.alarm(); + + expect(mock.storage.deleteAlarm).not.toHaveBeenCalled(); + const next = mock.alarmTs()!; + expect(next).toBeGreaterThan(Date.now() + 5_000); + }); + + test('scheduleNextAlarm during an in-progress alarm does not cause immediate re-fire on exit', async () => { + const { container, mock } = await makeContainer({ running: true }); + + const alarmPromise = container.alarm(); + await container.scheduleNextAlarm(1000); + await alarmPromise; + + const next = mock.alarmTs()!; + const delta = next - Date.now(); + expect(delta).toBeGreaterThan(500); + }); + + test('retry exhaustion does not set an immediate alarm', async () => { + const { container, mock } = await makeContainer({ running: true }); + + await container.alarm({ isRetry: true, retryCount: 10 }); + + const next = mock.alarmTs()!; + expect(next).not.toBeNull(); + expect(next - Date.now()).toBeGreaterThan(50); + }); +}); + +describe('scheduleNextAlarm + alarm cadence invariant', () => { + test('repeated external callers cannot drive alarm to fire in the past', async () => { + const { container, mock } = await makeContainer({ running: true }); + + for (let i = 0; i < 20; i++) { + await container.scheduleNextAlarm(1000); + await container.alarm(); + const next = mock.alarmTs()!; + expect(next).toBeGreaterThanOrEqual(Date.now()); + } + }); +}); + +// Regression coverage for cloudflare/containers#147: an open WebSocket +// (tracked via inflightRequests) must keep the container alive even +// after the sleepAfter window has elapsed. +describe('inflight request activity tracking', () => { + test('inflightRequests > 0 prevents onActivityExpired and renews the sleepAfter window', async () => { + const { container } = await makeContainer({ running: true }); + const onExpired = jest.spyOn(container, 'onActivityExpired'); + container.inflightRequestsField = 1; + container.sleepAfterMsField = Date.now() - 1; + + await container.alarm(); + + expect(onExpired).not.toHaveBeenCalled(); + // A fresh sleepAfter window should be at least most of `sleepAfter` + // ahead (test fixture uses 1h). + expect(container.sleepAfterMsField).toBeGreaterThan(Date.now() + 55 * 60 * 1000); + }); + + test('onActivityExpired fires once the counter drops to zero and sleepAfter is past', async () => { + const { container } = await makeContainer({ running: true }); + const onExpired = jest.spyOn(container, 'onActivityExpired').mockResolvedValue(undefined); + container.inflightRequestsField = 0; + container.sleepAfterMsField = Date.now() - 1; + + await container.alarm(); + + expect(onExpired).toHaveBeenCalledTimes(1); + }); +});