From c352440684d07d2daef2c30475bca78438c6b127 Mon Sep 17 00:00:00 2001 From: katereznykova Date: Thu, 23 Oct 2025 12:03:41 +0100 Subject: [PATCH 01/23] add auto timeout renewal and some health checks --- packages/sandbox/src/sandbox.ts | 125 ++++++++++++++++++++++++++++++-- 1 file changed, 120 insertions(+), 5 deletions(-) diff --git a/packages/sandbox/src/sandbox.ts b/packages/sandbox/src/sandbox.ts index f2ec253..c6e0293 100644 --- a/packages/sandbox/src/sandbox.ts +++ b/packages/sandbox/src/sandbox.ts @@ -63,6 +63,10 @@ export class Sandbox extends Container implements ISandbox { private defaultSession: string | null = null; envVars: Record = {}; private logger: ReturnType; + private lastActivityRenewal: number = 0; + private readonly ACTIVITY_RENEWAL_THROTTLE_MS = 5000; // Throttle renewals to once per 5 seconds + private readonly STREAM_HEALTH_CHECK_INTERVAL_MS = 5000; // Check sandbox health every 5 seconds during streaming + private readonly STREAM_READ_TIMEOUT_MS = 300000; // 5 minutes timeout for stream reads (detects hung streams), to reduce the overhead constructor(ctx: DurableObject['ctx'], env: Env) { super(ctx, env); @@ -171,6 +175,21 @@ export class Sandbox extends Container implements ISandbox { this.logger.error('Sandbox error', error instanceof Error ? error : new Error(String(error))); } + /** + * Throttled activity timeout renewal, to prevent renewal on every chunk + * Only renews if sufficient time has passed since last renewal + */ + private renewActivityTimeoutThrottled(): void { + const now = Date.now(); + const timeSinceLastRenewal = now - this.lastActivityRenewal; + + // Only renew if throttle period has elapsed + if (timeSinceLastRenewal >= this.ACTIVITY_RENEWAL_THROTTLE_MS) { + this.renewActivityTimeout(); + this.lastActivityRenewal = now; + } + } + // Override fetch to route internal container requests to appropriate ports override async fetch(request: Request): Promise { // Extract or generate trace ID from request @@ -331,6 +350,10 @@ export class Sandbox extends Container implements ISandbox { case 'stdout': case 'stderr': if (event.data) { + // Renew activity timeout on each output event (throttled to reduce overhead) + // This keeps the container alive during active command execution + this.renewActivityTimeoutThrottled(); + // Update accumulated output if (event.type === 'stdout') stdout += event.data; if (event.type === 'stderr') stderr += event.data; @@ -542,8 +565,9 @@ export class Sandbox extends Container implements ISandbox { } const session = await this.ensureDefaultSession(); - // Get the stream from CommandClient - return this.client.commands.executeStream(command, session); + // Get the stream from CommandClient and wrap it with activity renewal + const stream = await this.client.commands.executeStream(command, session); + return this.wrapStreamWithActivityRenewal(stream); } /** @@ -555,7 +579,8 @@ export class Sandbox extends Container implements ISandbox { throw new Error('Operation was aborted'); } - return this.client.commands.executeStream(command, sessionId); + const stream = await this.client.commands.executeStream(command, sessionId); + return this.wrapStreamWithActivityRenewal(stream); } async streamProcessLogs(processId: string, options?: { signal?: AbortSignal }): Promise> { @@ -564,7 +589,96 @@ export class Sandbox extends Container implements ISandbox { throw new Error('Operation was aborted'); } - return this.client.processes.streamProcessLogs(processId); + const stream = await this.client.processes.streamProcessLogs(processId); + return this.wrapStreamWithActivityRenewal(stream); + } + + /** + * Wraps a ReadableStream to provide three critical protections: + * 1. Activity renewal - prevents sleepAfter timeout during active streaming + * 2. Health monitoring - detects container crashes mid-stream + * 3. Read timeout - detects hung streams (e.g., Bun connection idle timeout) + * + * This solves both issues from GitHub #101: + * - Container staying alive during long operations (activity renewal) + * - Detecting silent stream failures (health checks + timeouts) + */ + private wrapStreamWithActivityRenewal(stream: ReadableStream): ReadableStream { + const self = this; + + return new ReadableStream({ + async start(controller) { + const reader = stream.getReader(); + + // Set up periodic health monitoring to detect container crashes + let healthCheckInterval: NodeJS.Timeout | undefined; + let isHealthy = true; + + healthCheckInterval = setInterval(async () => { + try { + const state = await self.getState(); + if (state.status !== 'running') { + isHealthy = false; + const error = new Error(`Container became unhealthy during streaming: ${state.status}`); + controller.error(error); + clearInterval(healthCheckInterval); + await reader.cancel(error.message); + } + } catch (error) { + // If getState() fails, container is likely dead + isHealthy = false; + const stateError = new Error(`Failed to check container health: ${error instanceof Error ? error.message : String(error)}`); + controller.error(stateError); + clearInterval(healthCheckInterval); + await reader.cancel(stateError.message); + } + }, self.STREAM_HEALTH_CHECK_INTERVAL_MS); + + try { + while (true) { + // Race read against timeout to detect hung streams + // (e.g., Bun closes connection silently after idle timeout) + const readPromise = reader.read(); + const timeoutPromise = new Promise((_, reject) => { + setTimeout(() => { + reject(new Error(`Stream read timeout after ${self.STREAM_READ_TIMEOUT_MS}ms - connection may be hung`)); + }, self.STREAM_READ_TIMEOUT_MS); + }); + + const { done, value } = await Promise.race([readPromise, timeoutPromise]); + + // Check health status before processing chunk + if (!isHealthy) { + throw new Error('Container became unhealthy during streaming'); + } + + if (done) { + controller.close(); + break; + } + + // This keeps container alive during active streaming + self.renewActivityTimeoutThrottled(); + + controller.enqueue(value); + } + } catch (error) { + controller.error(error); + throw error; + } finally { + // Always cleanup + if (healthCheckInterval) { + clearInterval(healthCheckInterval); + } + await reader.releaseLock(); + } + }, + + cancel(reason) { + // Allow cancellation to propagate to the underlying stream + return stream.cancel(reason); + } + }); } async gitCheckout( @@ -935,7 +1049,8 @@ export class Sandbox extends Container implements ISandbox { } async runCodeStream(code: string, options?: RunCodeOptions): Promise { - return this.codeInterpreter.runCodeStream(code, options); + const stream = await this.codeInterpreter.runCodeStream(code, options); + return this.wrapStreamWithActivityRenewal(stream as ReadableStream); } async listCodeContexts(): Promise { From 6faa69c96c00046cec16a60974edbea1d0f4ee9f Mon Sep 17 00:00:00 2001 From: katereznykova Date: Thu, 23 Oct 2025 12:13:08 +0100 Subject: [PATCH 02/23] cleanup the issues --- packages/sandbox/src/sandbox.ts | 51 ++++++++++++++++++++++++--------- 1 file changed, 38 insertions(+), 13 deletions(-) diff --git a/packages/sandbox/src/sandbox.ts b/packages/sandbox/src/sandbox.ts index c6e0293..422c2ea 100644 --- a/packages/sandbox/src/sandbox.ts +++ b/packages/sandbox/src/sandbox.ts @@ -65,8 +65,8 @@ export class Sandbox extends Container implements ISandbox { private logger: ReturnType; private lastActivityRenewal: number = 0; private readonly ACTIVITY_RENEWAL_THROTTLE_MS = 5000; // Throttle renewals to once per 5 seconds - private readonly STREAM_HEALTH_CHECK_INTERVAL_MS = 5000; // Check sandbox health every 5 seconds during streaming - private readonly STREAM_READ_TIMEOUT_MS = 300000; // 5 minutes timeout for stream reads (detects hung streams), to reduce the overhead + private readonly STREAM_HEALTH_CHECK_INTERVAL_MS = 30000; // Check sandbox health every 30 seconds during streaming + private readonly STREAM_READ_TIMEOUT_MS = 300000; // 5 minutes timeout for stream reads (detects hung streams) constructor(ctx: DurableObject['ctx'], env: Env) { super(ctx, env); @@ -605,23 +605,32 @@ export class Sandbox extends Container implements ISandbox { */ private wrapStreamWithActivityRenewal(stream: ReadableStream): ReadableStream { const self = this; + let healthCheckInterval: NodeJS.Timeout | undefined; + let streamActive = true; return new ReadableStream({ async start(controller) { const reader = stream.getReader(); - - // Set up periodic health monitoring to detect container crashes - let healthCheckInterval: NodeJS.Timeout | undefined; let isHealthy = true; + // Set up periodic health monitoring to detect container crashes healthCheckInterval = setInterval(async () => { + if (!streamActive) { + if (healthCheckInterval) { + clearInterval(healthCheckInterval); + } + return; + } + try { const state = await self.getState(); if (state.status !== 'running') { isHealthy = false; const error = new Error(`Container became unhealthy during streaming: ${state.status}`); controller.error(error); - clearInterval(healthCheckInterval); + if (healthCheckInterval) { + clearInterval(healthCheckInterval); + } await reader.cancel(error.message); } } catch (error) { @@ -629,24 +638,27 @@ export class Sandbox extends Container implements ISandbox { isHealthy = false; const stateError = new Error(`Failed to check container health: ${error instanceof Error ? error.message : String(error)}`); controller.error(stateError); - clearInterval(healthCheckInterval); + if (healthCheckInterval) { + clearInterval(healthCheckInterval); + } await reader.cancel(stateError.message); } }, self.STREAM_HEALTH_CHECK_INTERVAL_MS); try { while (true) { - // Race read against timeout to detect hung streams - // (e.g., Bun closes connection silently after idle timeout) + let timeoutHandle: NodeJS.Timeout; const readPromise = reader.read(); const timeoutPromise = new Promise((_, reject) => { - setTimeout(() => { + timeoutHandle = setTimeout(() => { reject(new Error(`Stream read timeout after ${self.STREAM_READ_TIMEOUT_MS}ms - connection may be hung`)); }, self.STREAM_READ_TIMEOUT_MS); }); const { done, value } = await Promise.race([readPromise, timeoutPromise]); + clearTimeout(timeoutHandle!); + // Check health status before processing chunk if (!isHealthy) { throw new Error('Container became unhealthy during streaming'); @@ -657,6 +669,7 @@ export class Sandbox extends Container implements ISandbox { break; } + // Renew activity timeout on each chunk (throttled to reduce overhead) // This keeps container alive during active streaming self.renewActivityTimeoutThrottled(); @@ -664,17 +677,29 @@ export class Sandbox extends Container implements ISandbox { } } catch (error) { controller.error(error); - throw error; } finally { - // Always cleanup + // Mark stream as inactive to stop health checks + streamActive = false; + + // Always cleanup interval if (healthCheckInterval) { clearInterval(healthCheckInterval); + healthCheckInterval = undefined; + } + + try { + reader.releaseLock(); + } catch (e) { } - await reader.releaseLock(); } }, cancel(reason) { + streamActive = false; + if (healthCheckInterval) { + clearInterval(healthCheckInterval); + healthCheckInterval = undefined; + } // Allow cancellation to propagate to the underlying stream return stream.cancel(reason); } From 3c45f4910e3c12c1f714515c60cc6e9f660d6c0a Mon Sep 17 00:00:00 2001 From: katereznykova Date: Thu, 23 Oct 2025 12:21:09 +0100 Subject: [PATCH 03/23] fix worker compat for timeout --- packages/sandbox/src/interpreter.ts | 2 +- packages/sandbox/src/sandbox.ts | 11 +++++------ packages/shared/src/types.ts | 4 ++-- 3 files changed, 8 insertions(+), 9 deletions(-) diff --git a/packages/sandbox/src/interpreter.ts b/packages/sandbox/src/interpreter.ts index ba3ee61..9630b71 100644 --- a/packages/sandbox/src/interpreter.ts +++ b/packages/sandbox/src/interpreter.ts @@ -82,7 +82,7 @@ export class CodeInterpreter { async runCodeStream( code: string, options: RunCodeOptions = {} - ): Promise { + ): Promise> { // Get or create context let context = options.context; if (!context) { diff --git a/packages/sandbox/src/sandbox.ts b/packages/sandbox/src/sandbox.ts index 422c2ea..06deee7 100644 --- a/packages/sandbox/src/sandbox.ts +++ b/packages/sandbox/src/sandbox.ts @@ -287,8 +287,7 @@ export class Sandbox extends Container implements ISandbox { const startTime = Date.now(); const timestamp = new Date().toISOString(); - // Handle timeout - let timeoutId: NodeJS.Timeout | undefined; + let timeoutId: ReturnType | undefined; try { // Handle cancellation @@ -605,7 +604,7 @@ export class Sandbox extends Container implements ISandbox { */ private wrapStreamWithActivityRenewal(stream: ReadableStream): ReadableStream { const self = this; - let healthCheckInterval: NodeJS.Timeout | undefined; + let healthCheckInterval: ReturnType | undefined; let streamActive = true; return new ReadableStream({ @@ -647,7 +646,7 @@ export class Sandbox extends Container implements ISandbox { try { while (true) { - let timeoutHandle: NodeJS.Timeout; + let timeoutHandle: ReturnType; const readPromise = reader.read(); const timeoutPromise = new Promise((_, reject) => { timeoutHandle = setTimeout(() => { @@ -1073,9 +1072,9 @@ export class Sandbox extends Container implements ISandbox { return execution.toJSON(); } - async runCodeStream(code: string, options?: RunCodeOptions): Promise { + async runCodeStream(code: string, options?: RunCodeOptions): Promise> { const stream = await this.codeInterpreter.runCodeStream(code, options); - return this.wrapStreamWithActivityRenewal(stream as ReadableStream); + return this.wrapStreamWithActivityRenewal(stream); } async listCodeContexts(): Promise { diff --git a/packages/shared/src/types.ts b/packages/shared/src/types.ts index eb7e2bb..8e0e7df 100644 --- a/packages/shared/src/types.ts +++ b/packages/shared/src/types.ts @@ -613,7 +613,7 @@ export interface ExecutionSession { // Code interpreter methods createCodeContext(options?: CreateContextOptions): Promise; runCode(code: string, options?: RunCodeOptions): Promise; - runCodeStream(code: string, options?: RunCodeOptions): Promise; + runCodeStream(code: string, options?: RunCodeOptions): Promise>; listCodeContexts(): Promise; deleteCodeContext(contextId: string): Promise; } @@ -657,7 +657,7 @@ export interface ISandbox { // Code interpreter methods createCodeContext(options?: CreateContextOptions): Promise; runCode(code: string, options?: RunCodeOptions): Promise; - runCodeStream(code: string, options?: RunCodeOptions): Promise; + runCodeStream(code: string, options?: RunCodeOptions): Promise>; listCodeContexts(): Promise; deleteCodeContext(contextId: string): Promise; } From 63fb1afe179302f5d0e4a309c406a8e15bebebc3 Mon Sep 17 00:00:00 2001 From: katereznykova Date: Thu, 23 Oct 2025 14:39:48 +0100 Subject: [PATCH 04/23] add more tests --- packages/sandbox/tests/sandbox.test.ts | 233 +++++++++++++++++- .../e2e/streaming-operations-workflow.test.ts | 136 ++++++++++ 2 files changed, 368 insertions(+), 1 deletion(-) diff --git a/packages/sandbox/tests/sandbox.test.ts b/packages/sandbox/tests/sandbox.test.ts index 479a40e..a8e5412 100644 --- a/packages/sandbox/tests/sandbox.test.ts +++ b/packages/sandbox/tests/sandbox.test.ts @@ -35,7 +35,7 @@ describe('Sandbox - Automatic Session Management', () => { delete: vi.fn().mockResolvedValue(undefined), list: vi.fn().mockResolvedValue(new Map()), } as any, - blockConcurrencyWhile: vi.fn((fn: () => Promise) => fn()), + blockConcurrencyWhile: vi.fn().mockImplementation((callback: () => Promise): Promise => callback()), id: { toString: () => 'test-sandbox-id', equals: vi.fn(), @@ -462,4 +462,235 @@ describe('Sandbox - Automatic Session Management', () => { expect(sandbox.client.ports.exposePort).toHaveBeenCalled(); }); }); + + describe('Activity renewal and health checks', () => { + let renewActivityTimeoutThrottledSpy: any; + + beforeEach(() => { + // Mock Container base class methods + (sandbox as any).renewActivityTimeout = vi.fn(); + (sandbox as any).getState = vi.fn().mockResolvedValue({ + status: 'running', + timestamp: new Date().toISOString(), + }); + + // Spy on private method via prototype + renewActivityTimeoutThrottledSpy = vi.spyOn(sandbox as any, 'renewActivityTimeoutThrottled'); + }); + + it('should throttle activity renewal during streaming', async () => { + // Create a mock stream that emits multiple chunks quickly + const encoder = new TextEncoder(); + const chunks = Array.from({ length: 10 }, (_, i) => encoder.encode(`chunk ${i}\n`)); + + const mockStream = new ReadableStream({ + async start(controller) { + for (const chunk of chunks) { + controller.enqueue(chunk); + } + controller.close(); + } + }); + + vi.spyOn(sandbox.client.commands, 'executeStream').mockResolvedValue(mockStream); + + const stream = await sandbox.execStream('echo test'); + const reader = stream.getReader(); + + // Read all chunks + let chunkCount = 0; + while (true) { + const { done } = await reader.read(); + if (done) break; + chunkCount++; + } + + expect(chunkCount).toBe(10); + + // Activity renewal should be called, but throttled (not once per chunk) + // With 5 second throttle and instant reads, should only be called a few times + expect(renewActivityTimeoutThrottledSpy).toHaveBeenCalled(); + expect(renewActivityTimeoutThrottledSpy.mock.calls.length).toBeLessThanOrEqual(chunkCount); + }); + + it('should detect container crashes during streaming', async () => { + vi.useFakeTimers(); + + // Create a stream that will trigger health check + const encoder = new TextEncoder(); + const mockStream = new ReadableStream({ + async start(controller) { + // Enqueue initial chunk + controller.enqueue(encoder.encode('starting\n')); + // Keep stream open + } + }); + + vi.spyOn(sandbox.client.commands, 'executeStream').mockResolvedValue(mockStream); + + // Make getState return unhealthy status + (sandbox as any).getState = vi.fn().mockResolvedValue({ + status: 'stopped', + timestamp: new Date().toISOString(), + }); + + const stream = await sandbox.execStream('echo test'); + const reader = stream.getReader(); + + // Read first chunk + await reader.read(); + + // Advance time to trigger health check (30 seconds) + await vi.advanceTimersByTimeAsync(31000); + + // Verify health check was called + expect((sandbox as any).getState).toHaveBeenCalled(); + + vi.useRealTimers(); + }); + + it('should timeout on hung streams', async () => { + vi.useFakeTimers(); + + // Create a stream that never resolves + const mockStream = new ReadableStream({ + async start(controller) { + // Never enqueue anything - stream hangs + await new Promise(() => {}); // Never resolves + } + }); + + vi.spyOn(sandbox.client.commands, 'executeStream').mockResolvedValue(mockStream); + + const streamPromise = sandbox.execStream('sleep infinity'); + const stream = await streamPromise; + const reader = stream.getReader(); + + // Start reading + const readPromise = reader.read(); + + // Fast-forward past the timeout (5 minutes = 300000ms) + vi.advanceTimersByTime(300001); + + // Should reject with timeout error + await expect(readPromise).rejects.toThrow(/Stream read timeout/); + + vi.useRealTimers(); + }); + + it('should cleanup intervals on stream completion', async () => { + const clearIntervalSpy = vi.spyOn(global, 'clearInterval'); + + const encoder = new TextEncoder(); + const mockStream = new ReadableStream({ + start(controller) { + controller.enqueue(encoder.encode('test\n')); + controller.close(); + } + }); + + vi.spyOn(sandbox.client.commands, 'executeStream').mockResolvedValue(mockStream); + + const stream = await sandbox.execStream('echo test'); + const reader = stream.getReader(); + + // Read until done + while (true) { + const { done } = await reader.read(); + if (done) break; + } + + // Verify interval was cleared + expect(clearIntervalSpy).toHaveBeenCalled(); + }); + + it('should cleanup intervals on stream cancellation', async () => { + const clearIntervalSpy = vi.spyOn(global, 'clearInterval'); + + const encoder = new TextEncoder(); + let underlyingReader: any; + const mockStream = new ReadableStream({ + async start(controller) { + underlyingReader = { + cancel: async () => { + // Mock cancel + } + }; + // Emit chunks indefinitely + for (let i = 0; i < 100; i++) { + controller.enqueue(encoder.encode(`chunk ${i}\n`)); + await new Promise(resolve => setTimeout(resolve, 10)); + } + controller.close(); + }, + cancel() { + // Mock cancel + } + }); + + vi.spyOn(sandbox.client.commands, 'executeStream').mockResolvedValue(mockStream); + + const stream = await sandbox.execStream('echo test'); + const reader = stream.getReader(); + + // Read one chunk + await reader.read(); + + // Cancel the stream + try { + await reader.cancel('user cancellation'); + } catch (e) { + // Ignore cancellation errors in test + } + + // Verify interval was cleared on cancellation + expect(clearIntervalSpy).toHaveBeenCalled(); + }); + + it('should handle health check failures gracefully', async () => { + const encoder = new TextEncoder(); + const mockStream = new ReadableStream({ + start(controller) { + controller.enqueue(encoder.encode('test\n')); + controller.close(); + } + }); + + vi.spyOn(sandbox.client.commands, 'executeStream').mockResolvedValue(mockStream); + + // Make getState throw an error (container is dead) + (sandbox as any).getState = vi.fn().mockRejectedValue(new Error('Container not found')); + + const stream = await sandbox.execStream('echo test'); + const reader = stream.getReader(); + + // Read should still work since we're testing the health check error handling + // The health check runs in background and shouldn't block reads initially + const result = await reader.read(); + expect(result.done).toBe(false); + }); + + it('should work with all streaming APIs', async () => { + const encoder = new TextEncoder(); + const mockStream = new ReadableStream({ + start(controller) { + controller.enqueue(encoder.encode('test\n')); + controller.close(); + } + }); + + // Test execStream + vi.spyOn(sandbox.client.commands, 'executeStream').mockResolvedValue(mockStream); + const execStream = await sandbox.execStream('echo test'); + expect(execStream).toBeInstanceOf(ReadableStream); + + // Test streamProcessLogs + vi.spyOn(sandbox.client.processes, 'streamProcessLogs').mockResolvedValue(mockStream); + const logsStream = await sandbox.streamProcessLogs('proc-1'); + expect(logsStream).toBeInstanceOf(ReadableStream); + + // Verify both streams have activity renewal applied + expect(renewActivityTimeoutThrottledSpy).toHaveBeenCalled(); + }); + }); }); diff --git a/tests/e2e/streaming-operations-workflow.test.ts b/tests/e2e/streaming-operations-workflow.test.ts index df1b96d..920e2a2 100644 --- a/tests/e2e/streaming-operations-workflow.test.ts +++ b/tests/e2e/streaming-operations-workflow.test.ts @@ -444,5 +444,141 @@ describe('Streaming Operations Workflow', () => { const completeEvent = events.find((e) => e.type === 'complete'); expect(completeEvent?.exitCode).toBe(0); }, 90000); + + /** + * Tests for GitHub Issue #101: Container Hibernation During Long Streaming + * + * These tests verify that containers stay alive during long-running streaming + * operations via automatic activity renewal, and that the implementation actually + * works with real containers (not just mocked tests). + */ + + test('should keep container alive during 15+ second streaming command (GitHub #101)', async () => { + currentSandboxId = createSandboxId(); + const headers = createTestHeaders(currentSandboxId); + + console.log('[Test] Starting 15+ second streaming command to verify activity renewal...'); + + // Stream a command that runs for 15+ seconds with output every 2 seconds + // Without activity renewal, this would timeout (default container timeout is ~10s) + const streamResponse = await vi.waitFor( + async () => fetchWithStartup(`${workerUrl}/api/execStream`, { + method: 'POST', + headers, + body: JSON.stringify({ + command: "bash -c 'for i in {1..8}; do echo \"Tick $i at $(date +%s)\"; sleep 2; done; echo \"SUCCESS\"'", + }), + }), + { timeout: 90000, interval: 2000 } + ); + + expect(streamResponse.status).toBe(200); + + const startTime = Date.now(); + const events = await collectSSEEvents(streamResponse, 50); + const duration = Date.now() - startTime; + + console.log(`[Test] Stream completed in ${duration}ms`); + + // Verify command ran for approximately 16 seconds (8 ticks * 2 seconds) + expect(duration).toBeGreaterThan(14000); // At least 14 seconds + expect(duration).toBeLessThan(25000); // But completed (not timed out) + + // Should have received all ticks + const stdoutEvents = events.filter((e) => e.type === 'stdout'); + const output = stdoutEvents.map((e) => e.data).join(''); + + for (let i = 1; i <= 8; i++) { + expect(output).toContain(`Tick ${i}`); + } + expect(output).toContain('SUCCESS'); + + // Most importantly: should complete with exit code 0 (not timeout) + const completeEvent = events.find((e) => e.type === 'complete'); + expect(completeEvent).toBeDefined(); + expect(completeEvent?.exitCode).toBe(0); + + console.log('[Test] ✅ Container stayed alive for 16+ seconds - activity renewal working!'); + }, 90000); + + test('should handle high-volume streaming over extended period', async () => { + currentSandboxId = createSandboxId(); + const headers = createTestHeaders(currentSandboxId); + + console.log('[Test] Starting high-volume streaming test...'); + + // Stream command that generates many lines over 10+ seconds + // Tests throttling: renewActivityTimeout shouldn't be called for every chunk + const streamResponse = await vi.waitFor( + async () => fetchWithStartup(`${workerUrl}/api/execStream`, { + method: 'POST', + headers, + body: JSON.stringify({ + command: "bash -c 'for i in {1..100}; do echo \"Line $i: $(date +%s.%N)\"; sleep 0.1; done'", + }), + }), + { timeout: 90000, interval: 2000 } + ); + + expect(streamResponse.status).toBe(200); + + const events = await collectSSEEvents(streamResponse, 150); + + // Should have many stdout events + const stdoutEvents = events.filter((e) => e.type === 'stdout'); + expect(stdoutEvents.length).toBeGreaterThanOrEqual(50); + + // Verify we got output from beginning and end + const output = stdoutEvents.map((e) => e.data).join(''); + expect(output).toContain('Line 1'); + expect(output).toContain('Line 100'); + + // Should complete successfully + const completeEvent = events.find((e) => e.type === 'complete'); + expect(completeEvent).toBeDefined(); + expect(completeEvent?.exitCode).toBe(0); + + console.log('[Test] ✅ High-volume streaming completed successfully'); + }, 90000); + + test('should handle streaming with intermittent output gaps', async () => { + currentSandboxId = createSandboxId(); + const headers = createTestHeaders(currentSandboxId); + + console.log('[Test] Starting intermittent output test...'); + + // Command with gaps between output bursts + // Tests that activity renewal works even when output is periodic + const streamResponse = await vi.waitFor( + async () => fetchWithStartup(`${workerUrl}/api/execStream`, { + method: 'POST', + headers, + body: JSON.stringify({ + command: "bash -c 'echo \"Burst 1\"; sleep 3; echo \"Burst 2\"; sleep 3; echo \"Burst 3\"; sleep 3; echo \"Complete\"'", + }), + }), + { timeout: 90000, interval: 2000 } + ); + + expect(streamResponse.status).toBe(200); + + const events = await collectSSEEvents(streamResponse, 30); + + const stdoutEvents = events.filter((e) => e.type === 'stdout'); + const output = stdoutEvents.map((e) => e.data).join(''); + + // All bursts should be received despite gaps + expect(output).toContain('Burst 1'); + expect(output).toContain('Burst 2'); + expect(output).toContain('Burst 3'); + expect(output).toContain('Complete'); + + // Should complete successfully + const completeEvent = events.find((e) => e.type === 'complete'); + expect(completeEvent).toBeDefined(); + expect(completeEvent?.exitCode).toBe(0); + + console.log('[Test] ✅ Intermittent output handled correctly'); + }, 90000); }); }); From 32dbaf5a3bb757d0543f49785ef8a59eb51f999e Mon Sep 17 00:00:00 2001 From: katereznykova Date: Thu, 23 Oct 2025 14:49:54 +0100 Subject: [PATCH 05/23] fix minor nits --- packages/sandbox/src/sandbox.ts | 39 ++++++++++++++++++++++----------- 1 file changed, 26 insertions(+), 13 deletions(-) diff --git a/packages/sandbox/src/sandbox.ts b/packages/sandbox/src/sandbox.ts index 06deee7..9f6c8e3 100644 --- a/packages/sandbox/src/sandbox.ts +++ b/packages/sandbox/src/sandbox.ts @@ -606,6 +606,7 @@ export class Sandbox extends Container implements ISandbox { const self = this; let healthCheckInterval: ReturnType | undefined; let streamActive = true; + let errorReported = false; return new ReadableStream({ async start(controller) { @@ -625,28 +626,34 @@ export class Sandbox extends Container implements ISandbox { const state = await self.getState(); if (state.status !== 'running') { isHealthy = false; - const error = new Error(`Container became unhealthy during streaming: ${state.status}`); - controller.error(error); - if (healthCheckInterval) { - clearInterval(healthCheckInterval); + if (!errorReported) { + errorReported = true; + const error = new Error(`Container became unhealthy during streaming: ${state.status}`); + controller.error(error); + if (healthCheckInterval) { + clearInterval(healthCheckInterval); + } + await reader.cancel(error.message); } - await reader.cancel(error.message); } } catch (error) { // If getState() fails, container is likely dead isHealthy = false; - const stateError = new Error(`Failed to check container health: ${error instanceof Error ? error.message : String(error)}`); - controller.error(stateError); - if (healthCheckInterval) { - clearInterval(healthCheckInterval); + if (!errorReported) { + errorReported = true; + const stateError = new Error(`Failed to check container health: ${error instanceof Error ? error.message : String(error)}`); + controller.error(stateError); + if (healthCheckInterval) { + clearInterval(healthCheckInterval); + } + await reader.cancel(stateError.message); } - await reader.cancel(stateError.message); } }, self.STREAM_HEALTH_CHECK_INTERVAL_MS); try { while (true) { - let timeoutHandle: ReturnType; + let timeoutHandle: ReturnType | undefined; const readPromise = reader.read(); const timeoutPromise = new Promise((_, reject) => { timeoutHandle = setTimeout(() => { @@ -656,7 +663,9 @@ export class Sandbox extends Container implements ISandbox { const { done, value } = await Promise.race([readPromise, timeoutPromise]); - clearTimeout(timeoutHandle!); + if (timeoutHandle !== undefined) { + clearTimeout(timeoutHandle); + } // Check health status before processing chunk if (!isHealthy) { @@ -675,7 +684,10 @@ export class Sandbox extends Container implements ISandbox { controller.enqueue(value); } } catch (error) { - controller.error(error); + if (!errorReported) { + errorReported = true; + controller.error(error); + } } finally { // Mark stream as inactive to stop health checks streamActive = false; @@ -689,6 +701,7 @@ export class Sandbox extends Container implements ISandbox { try { reader.releaseLock(); } catch (e) { + // Safe to ignore: lock is already released or stream is already closed } } }, From 584f23bf2d89cd93b016d77247337fd9ddd311a0 Mon Sep 17 00:00:00 2001 From: katereznykova Date: Thu, 23 Oct 2025 15:08:53 +0100 Subject: [PATCH 06/23] more fixes --- packages/sandbox/src/sandbox.ts | 37 +++++++++++++++----------- packages/sandbox/tests/sandbox.test.ts | 17 +++++++----- 2 files changed, 32 insertions(+), 22 deletions(-) diff --git a/packages/sandbox/src/sandbox.ts b/packages/sandbox/src/sandbox.ts index 9f6c8e3..08576a2 100644 --- a/packages/sandbox/src/sandbox.ts +++ b/packages/sandbox/src/sandbox.ts @@ -596,22 +596,19 @@ export class Sandbox extends Container implements ISandbox { * Wraps a ReadableStream to provide three critical protections: * 1. Activity renewal - prevents sleepAfter timeout during active streaming * 2. Health monitoring - detects container crashes mid-stream - * 3. Read timeout - detects hung streams (e.g., Bun connection idle timeout) - * - * This solves both issues from GitHub #101: - * - Container staying alive during long operations (activity renewal) - * - Detecting silent stream failures (health checks + timeouts) - */ + * 3. Read timeout - detects hung streams (e.g., Bun connection idle timeout)*/ private wrapStreamWithActivityRenewal(stream: ReadableStream): ReadableStream { const self = this; let healthCheckInterval: ReturnType | undefined; let streamActive = true; - let errorReported = false; + let errorReported = false; + let lastActivityRenewal = 0; return new ReadableStream({ async start(controller) { const reader = stream.getReader(); let isHealthy = true; + let currentTimeoutHandle: ReturnType | undefined; // Set up periodic health monitoring to detect container crashes healthCheckInterval = setInterval(async () => { @@ -653,18 +650,19 @@ export class Sandbox extends Container implements ISandbox { try { while (true) { - let timeoutHandle: ReturnType | undefined; const readPromise = reader.read(); const timeoutPromise = new Promise((_, reject) => { - timeoutHandle = setTimeout(() => { + currentTimeoutHandle = setTimeout(() => { reject(new Error(`Stream read timeout after ${self.STREAM_READ_TIMEOUT_MS}ms - connection may be hung`)); }, self.STREAM_READ_TIMEOUT_MS); }); const { done, value } = await Promise.race([readPromise, timeoutPromise]); - if (timeoutHandle !== undefined) { - clearTimeout(timeoutHandle); + // Clear timeout immediately after successful read + if (currentTimeoutHandle !== undefined) { + clearTimeout(currentTimeoutHandle); + currentTimeoutHandle = undefined; } // Check health status before processing chunk @@ -676,14 +674,20 @@ export class Sandbox extends Container implements ISandbox { controller.close(); break; } - - // Renew activity timeout on each chunk (throttled to reduce overhead) - // This keeps container alive during active streaming - self.renewActivityTimeoutThrottled(); + const now = Date.now(); + if (now - lastActivityRenewal >= self.ACTIVITY_RENEWAL_THROTTLE_MS) { + self.renewActivityTimeout(); + lastActivityRenewal = now; + } controller.enqueue(value); } } catch (error) { + if (currentTimeoutHandle !== undefined) { + clearTimeout(currentTimeoutHandle); + currentTimeoutHandle = undefined; + } + if (!errorReported) { errorReported = true; controller.error(error); @@ -702,6 +706,9 @@ export class Sandbox extends Container implements ISandbox { reader.releaseLock(); } catch (e) { // Safe to ignore: lock is already released or stream is already closed + // This can happen if the reader was cancelled or errored before we reached finally + const errorMsg = e instanceof Error ? e.message : String(e); + self.logger.debug(`Reader lock release failed (expected if stream already closed): ${errorMsg}`); } } }, diff --git a/packages/sandbox/tests/sandbox.test.ts b/packages/sandbox/tests/sandbox.test.ts index a8e5412..e206875 100644 --- a/packages/sandbox/tests/sandbox.test.ts +++ b/packages/sandbox/tests/sandbox.test.ts @@ -464,7 +464,7 @@ describe('Sandbox - Automatic Session Management', () => { }); describe('Activity renewal and health checks', () => { - let renewActivityTimeoutThrottledSpy: any; + let renewActivityTimeoutSpy: any; beforeEach(() => { // Mock Container base class methods @@ -474,8 +474,8 @@ describe('Sandbox - Automatic Session Management', () => { timestamp: new Date().toISOString(), }); - // Spy on private method via prototype - renewActivityTimeoutThrottledSpy = vi.spyOn(sandbox as any, 'renewActivityTimeoutThrottled'); + // Spy on the base renewActivityTimeout method (called by inline throttling) + renewActivityTimeoutSpy = vi.spyOn(sandbox as any, 'renewActivityTimeout'); }); it('should throttle activity renewal during streaming', async () => { @@ -508,9 +508,12 @@ describe('Sandbox - Automatic Session Management', () => { expect(chunkCount).toBe(10); // Activity renewal should be called, but throttled (not once per chunk) - // With 5 second throttle and instant reads, should only be called a few times - expect(renewActivityTimeoutThrottledSpy).toHaveBeenCalled(); - expect(renewActivityTimeoutThrottledSpy.mock.calls.length).toBeLessThanOrEqual(chunkCount); + // With 5 second throttle and instant reads, first chunk triggers renewal, + // subsequent chunks are throttled since less than 5s has passed + expect(renewActivityTimeoutSpy).toHaveBeenCalled(); + expect(renewActivityTimeoutSpy.mock.calls.length).toBeLessThanOrEqual(chunkCount); + // Should be called at least once (first chunk) but not for every chunk + expect(renewActivityTimeoutSpy.mock.calls.length).toBeGreaterThanOrEqual(1); }); it('should detect container crashes during streaming', async () => { @@ -690,7 +693,7 @@ describe('Sandbox - Automatic Session Management', () => { expect(logsStream).toBeInstanceOf(ReadableStream); // Verify both streams have activity renewal applied - expect(renewActivityTimeoutThrottledSpy).toHaveBeenCalled(); + expect(renewActivityTimeoutSpy).toHaveBeenCalled(); }); }); }); From 0b0d65a8e2dcdc893ba72f3eec6037ac3450aa52 Mon Sep 17 00:00:00 2001 From: katereznykova Date: Thu, 23 Oct 2025 15:12:59 +0100 Subject: [PATCH 07/23] update pkg --- .github/workflows/pkg-pr-new.yml | 1 - 1 file changed, 1 deletion(-) diff --git a/.github/workflows/pkg-pr-new.yml b/.github/workflows/pkg-pr-new.yml index 96747b8..a6a600a 100644 --- a/.github/workflows/pkg-pr-new.yml +++ b/.github/workflows/pkg-pr-new.yml @@ -8,7 +8,6 @@ on: pull_request: types: [opened, synchronize, reopened] paths: - - '!**/*.md' - '!.changeset/**' From b3afdc7553dab7b9793f4b56b6f7aef515a24b77 Mon Sep 17 00:00:00 2001 From: katereznykova Date: Thu, 23 Oct 2025 15:18:06 +0100 Subject: [PATCH 08/23] fix the workflow --- .github/workflows/pkg-pr-new.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/pkg-pr-new.yml b/.github/workflows/pkg-pr-new.yml index a6a600a..63f05c5 100644 --- a/.github/workflows/pkg-pr-new.yml +++ b/.github/workflows/pkg-pr-new.yml @@ -8,6 +8,7 @@ on: pull_request: types: [opened, synchronize, reopened] paths: + - '**' - '!.changeset/**' From ec5be808910c5e705a930b2d2ead392a284813c6 Mon Sep 17 00:00:00 2001 From: katereznykova Date: Thu, 23 Oct 2025 15:23:34 +0100 Subject: [PATCH 09/23] disable claude reviews (annoying) --- .github/workflows/claude-code-review.yml | 58 ++++++++++++------------ 1 file changed, 29 insertions(+), 29 deletions(-) diff --git a/.github/workflows/claude-code-review.yml b/.github/workflows/claude-code-review.yml index ab07e49..7892615 100644 --- a/.github/workflows/claude-code-review.yml +++ b/.github/workflows/claude-code-review.yml @@ -25,33 +25,33 @@ jobs: issues: read id-token: write - steps: - - name: Checkout repository - uses: actions/checkout@v4 - with: - fetch-depth: 1 - - - name: Run Claude Code Review - id: claude-review - uses: anthropics/claude-code-action@v1 - with: - anthropic_api_key: ${{ secrets.ANTHROPIC_API_KEY }} - prompt: | - REPO: ${{ github.repository }} - PR NUMBER: ${{ github.event.pull_request.number }} - - Please review this pull request and provide feedback on: - - Code quality and best practices - - Potential bugs or issues - - Performance considerations - - Security concerns - - Test coverage - - Use the repository's CLAUDE.md for guidance on style and conventions. Be constructive and helpful in your feedback. - - Use `gh pr comment` with your Bash tool to leave your review as a comment on the PR. - - # See https://github.com/anthropics/claude-code-action/blob/main/docs/usage.md - # or https://docs.claude.com/en/docs/claude-code/cli-reference for available options - claude_args: '--allowed-tools "Bash(gh issue view:*),Bash(gh search:*),Bash(gh issue list:*),Bash(gh pr comment:*),Bash(gh pr diff:*),Bash(gh pr view:*),Bash(gh pr list:*)"' + # steps: + # - name: Checkout repository + # uses: actions/checkout@v4 + # with: + # fetch-depth: 1 + + # - name: Run Claude Code Review + # id: claude-review + # uses: anthropics/claude-code-action@v1 + # with: + # anthropic_api_key: ${{ secrets.ANTHROPIC_API_KEY }} + # prompt: | + # REPO: ${{ github.repository }} + # PR NUMBER: ${{ github.event.pull_request.number }} + + # Please review this pull request and provide feedback on: + # - Code quality and best practices + # - Potential bugs or issues + # - Performance considerations + # - Security concerns + # - Test coverage + + # Use the repository's CLAUDE.md for guidance on style and conventions. Be constructive and helpful in your feedback. + + # Use `gh pr comment` with your Bash tool to leave your review as a comment on the PR. + + # # See https://github.com/anthropics/claude-code-action/blob/main/docs/usage.md + # # or https://docs.claude.com/en/docs/claude-code/cli-reference for available options + # claude_args: '--allowed-tools "Bash(gh issue view:*),Bash(gh search:*),Bash(gh issue list:*),Bash(gh pr comment:*),Bash(gh pr diff:*),Bash(gh pr view:*),Bash(gh pr list:*)"' From 0affd4adb02a4cb69689bb891ce0b4ad215755f3 Mon Sep 17 00:00:00 2001 From: whoiskatrin Date: Thu, 23 Oct 2025 15:24:16 +0100 Subject: [PATCH 10/23] Create wet-falcons-hang.md --- .changeset/wet-falcons-hang.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/wet-falcons-hang.md diff --git a/.changeset/wet-falcons-hang.md b/.changeset/wet-falcons-hang.md new file mode 100644 index 0000000..7e8afa8 --- /dev/null +++ b/.changeset/wet-falcons-hang.md @@ -0,0 +1,5 @@ +--- +"@cloudflare/sandbox": patch +--- + +add auto timeout renewal and some health checks From 24dbbbeda2dede91effbe52d7588ca8a712502a0 Mon Sep 17 00:00:00 2001 From: katereznykova Date: Fri, 24 Oct 2025 10:39:58 +0100 Subject: [PATCH 11/23] check for any running processes --- packages/sandbox/src/sandbox.ts | 81 +++++++++++++++++++++++++++++++++ 1 file changed, 81 insertions(+) diff --git a/packages/sandbox/src/sandbox.ts b/packages/sandbox/src/sandbox.ts index 08576a2..c7f346e 100644 --- a/packages/sandbox/src/sandbox.ts +++ b/packages/sandbox/src/sandbox.ts @@ -67,6 +67,9 @@ export class Sandbox extends Container implements ISandbox { private readonly ACTIVITY_RENEWAL_THROTTLE_MS = 5000; // Throttle renewals to once per 5 seconds private readonly STREAM_HEALTH_CHECK_INTERVAL_MS = 30000; // Check sandbox health every 30 seconds during streaming private readonly STREAM_READ_TIMEOUT_MS = 300000; // 5 minutes timeout for stream reads (detects hung streams) + private readonly PROCESS_MONITOR_INTERVAL_MS = 5000; // Check for running processes every 5 seconds + private processMonitorInterval: ReturnType | null = null; + private lastProcessMonitorRenewal: number = 0; constructor(ctx: DurableObject['ctx'], env: Env) { super(ctx, env); @@ -190,6 +193,57 @@ export class Sandbox extends Container implements ISandbox { } } + /** + * Start the global process monitor that keeps the container alive + * while any processes are running, even without active streams. + */ + private startProcessMonitor(): void { + // Don't start if already running + if (this.processMonitorInterval) { + return; + } + + this.logger.debug('Starting global process monitor'); + + this.processMonitorInterval = setInterval(async () => { + try { + const processList = await this.client.processes.listProcesses(); + const runningProcesses = processList.processes.filter( + p => p.status === 'running' || p.status === 'starting' + ); + + if (runningProcesses.length > 0) { + const now = Date.now(); + if (now - this.lastProcessMonitorRenewal >= this.ACTIVITY_RENEWAL_THROTTLE_MS) { + this.renewActivityTimeout(); + this.lastProcessMonitorRenewal = now; + this.logger.debug( + `Global process monitor renewed activity timeout due to ${runningProcesses.length} running process(es): ${runningProcesses.map(p => p.id).join(', ')}` + ); + } + } else { + // No running processes, stop the monitor + this.logger.debug('No running processes found, stopping global process monitor'); + this.stopProcessMonitor(); + } + } catch (error) { + // Non-fatal: if we can't check processes, just log and continue + this.logger.debug(`Global process monitor failed to check running processes: ${error instanceof Error ? error.message : String(error)}`); + } + }, this.PROCESS_MONITOR_INTERVAL_MS); + } + + /** + * Stop the global process monitor + */ + private stopProcessMonitor(): void { + if (this.processMonitorInterval) { + clearInterval(this.processMonitorInterval); + this.processMonitorInterval = null; + this.logger.debug('Stopped global process monitor'); + } + } + // Override fetch to route internal container requests to appropriate ports override async fetch(request: Request): Promise { // Extract or generate trace ID from request @@ -475,6 +529,10 @@ export class Sandbox extends Container implements ISandbox { exitCode: undefined }, session); + // Start the global process monitor to keep container alive + // even if no one is streaming + this.startProcessMonitor(); + // Call onStart callback if provided if (options?.onStart) { options.onStart(processObj); @@ -611,6 +669,7 @@ export class Sandbox extends Container implements ISandbox { let currentTimeoutHandle: ReturnType | undefined; // Set up periodic health monitoring to detect container crashes + // AND to keep container alive while processes are running even without output healthCheckInterval = setInterval(async () => { if (!streamActive) { if (healthCheckInterval) { @@ -633,6 +692,28 @@ export class Sandbox extends Container implements ISandbox { await reader.cancel(error.message); } } + + // Check for running processes and renew activity if any exist + try { + const processList = await self.client.processes.listProcesses(); + const runningProcesses = processList.processes.filter( + p => p.status === 'running' || p.status === 'starting' + ); + + if (runningProcesses.length > 0) { + const now = Date.now(); + if (now - lastActivityRenewal >= self.ACTIVITY_RENEWAL_THROTTLE_MS) { + self.renewActivityTimeout(); + lastActivityRenewal = now; + self.logger.debug( + `Renewed activity timeout due to ${runningProcesses.length} running process(es): ${runningProcesses.map(p => p.id).join(', ')}` + ); + } + } + } catch (error) { + // Non-fatal: if we can't check processes, just log and continue + self.logger.debug(`Failed to check running processes: ${error instanceof Error ? error.message : String(error)}`); + } } catch (error) { // If getState() fails, container is likely dead isHealthy = false; From b9f917cd89cbb53db30fb307980b22224b8375c8 Mon Sep 17 00:00:00 2001 From: katereznykova Date: Sun, 26 Oct 2025 10:54:15 +0000 Subject: [PATCH 12/23] test long running process --- cf-sandbox-bug | 1 + packages/sandbox/src/sandbox.ts | 125 ++++++++- packages/sandbox/tests/sandbox.test.ts | 265 ++++++++++++++++++ packages/shared/src/types.ts | 12 + .../e2e/streaming-operations-workflow.test.ts | 94 +++++++ 5 files changed, 496 insertions(+), 1 deletion(-) create mode 160000 cf-sandbox-bug diff --git a/cf-sandbox-bug b/cf-sandbox-bug new file mode 160000 index 0000000..704b93c --- /dev/null +++ b/cf-sandbox-bug @@ -0,0 +1 @@ +Subproject commit 704b93c6976ebe52c1fbf4d4fec52bb545f4e8c2 diff --git a/packages/sandbox/src/sandbox.ts b/packages/sandbox/src/sandbox.ts index c7f346e..18243b8 100644 --- a/packages/sandbox/src/sandbox.ts +++ b/packages/sandbox/src/sandbox.ts @@ -614,7 +614,24 @@ export class Sandbox extends Container implements ISandbox { } - // Streaming methods - return ReadableStream for RPC compatibility + /** + * Execute a command and return a ReadableStream of SSE events. + * + * ⚠️ **Important**: If calling this method from outside the Sandbox Durable Object (via RPC), + * streams may disconnect after ~40 seconds for long-running commands. For RPC calls, + * use `execStreamWithCallback()` instead which processes the stream internally. + * + * **Use this method when:** + * - You're inside a Worker endpoint and need to proxy/pipe the stream to an HTTP response + * - You're calling this directly from within the Sandbox DO + * + * **Use `execStreamWithCallback()` when:** + * - Calling from another Durable Object via RPC + * - Commands may run longer than 40 seconds + * - You need reliable event delivery + * + * @see execStreamWithCallback for RPC-safe streaming + */ async execStream(command: string, options?: StreamOptions): Promise> { // Check for cancellation if (options?.signal?.aborted) { @@ -640,6 +657,24 @@ export class Sandbox extends Container implements ISandbox { return this.wrapStreamWithActivityRenewal(stream); } + /** + * Stream logs from a background process as a ReadableStream. + * + * ⚠️ **Important**: If calling this method from outside the Sandbox Durable Object (via RPC), + * streams may disconnect after ~40 seconds for long-running processes. For RPC calls, + * use `streamProcessLogsWithCallback()` instead which processes the stream internally. + * + * **Use this method when:** + * - You're inside a Worker endpoint and need to proxy/pipe the stream to an HTTP response + * - You're calling this directly from within the Sandbox DO + * + * **Use `streamProcessLogsWithCallback()` when:** + * - Calling from another Durable Object via RPC + * - Process may run longer than 40 seconds + * - You need reliable event delivery + * + * @see streamProcessLogsWithCallback for RPC-safe streaming + */ async streamProcessLogs(processId: string, options?: { signal?: AbortSignal }): Promise> { // Check for cancellation if (options?.signal?.aborted) { @@ -806,6 +841,94 @@ export class Sandbox extends Container implements ISandbox { }); } + /** + * Execute a command with streaming output handled internally via callback. + * + * @param command - The command to execute + * @param onEvent - Callback function that receives each ExecEvent as it arrives + * @param options - Optional execution options including sessionId and signal + * @returns Promise that resolves when the command completes + */ + async execStreamWithCallback( + command: string, + onEvent: (event: ExecEvent) => void | Promise, + options?: { sessionId?: string; signal?: AbortSignal } + ): Promise { + // Check for cancellation + if (options?.signal?.aborted) { + throw new Error('Operation was aborted'); + } + + const session = options?.sessionId ?? await this.ensureDefaultSession(); + + // Get the stream - this happens INSIDE the Sandbox DO, so it's a direct HTTP fetch + const stream = await this.client.commands.executeStream(command, session); + + // Parse and process the stream internally + try { + for await (const event of parseSSEStream(stream)) { + // Check for cancellation during streaming + if (options?.signal?.aborted) { + throw new Error('Operation was aborted'); + } + + // Renew activity timeout periodically (if available) + if (this.renewActivityTimeout) { + this.renewActivityTimeout(); + } + + // Call the event callback + await onEvent(event); + } + } catch (error) { + this.logger.error('Error in execStreamWithCallback', error instanceof Error ? error : new Error(String(error))); + throw error; + } + } + + /** + * Stream process logs with output handled internally via callback. + * + * @param processId - The ID of the process to stream logs from + * @param onEvent - Callback function that receives each ExecEvent as it arrives + * @param options - Optional signal for cancellation + * @returns Promise that resolves when the stream completes + */ + async streamProcessLogsWithCallback( + processId: string, + onEvent: (event: ExecEvent) => void | Promise, + options?: { signal?: AbortSignal } + ): Promise { + // Check for cancellation + if (options?.signal?.aborted) { + throw new Error('Operation was aborted'); + } + + // Get the stream - this happens INSIDE the Sandbox DO, so it's a direct HTTP fetch + const stream = await this.client.processes.streamProcessLogs(processId); + + // Parse and process the stream internally + try { + for await (const event of parseSSEStream(stream)) { + // Check for cancellation during streaming + if (options?.signal?.aborted) { + throw new Error('Operation was aborted'); + } + + // Renew activity timeout periodically (if available) + if (this.renewActivityTimeout) { + this.renewActivityTimeout(); + } + + // Call the event callback + await onEvent(event); + } + } catch (error) { + this.logger.error('Error in streamProcessLogsWithCallback', error instanceof Error ? error : new Error(String(error))); + throw error; + } + } + async gitCheckout( repoUrl: string, options: { branch?: string; targetDir?: string; sessionId?: string } diff --git a/packages/sandbox/tests/sandbox.test.ts b/packages/sandbox/tests/sandbox.test.ts index e206875..21d79b1 100644 --- a/packages/sandbox/tests/sandbox.test.ts +++ b/packages/sandbox/tests/sandbox.test.ts @@ -696,4 +696,269 @@ describe('Sandbox - Automatic Session Management', () => { expect(renewActivityTimeoutSpy).toHaveBeenCalled(); }); }); + + describe('callback-based streaming (avoids RPC stream serialization)', () => { + it('should execute command with callback and process all events', async () => { + const mockEvents = [ + { type: 'start', timestamp: new Date().toISOString(), command: 'echo test' }, + { type: 'stdout', timestamp: new Date().toISOString(), data: 'test\n' }, + { type: 'complete', timestamp: new Date().toISOString(), exitCode: 0 }, + ]; + + // Create a mock readable stream that yields SSE events + const mockStream = new ReadableStream({ + start(controller) { + const encoder = new TextEncoder(); + for (const event of mockEvents) { + const sseData = `data: ${JSON.stringify(event)}\n\n`; + controller.enqueue(encoder.encode(sseData)); + } + controller.close(); + } + }); + + vi.spyOn(sandbox.client.commands, 'executeStream').mockResolvedValue(mockStream); + + const receivedEvents: any[] = []; + await sandbox.execStreamWithCallback( + 'echo test', + (event) => { + receivedEvents.push(event); + } + ); + + expect(receivedEvents).toHaveLength(3); + expect(receivedEvents[0].type).toBe('start'); + expect(receivedEvents[1].type).toBe('stdout'); + expect(receivedEvents[1].data).toBe('test\n'); + expect(receivedEvents[2].type).toBe('complete'); + expect(receivedEvents[2].exitCode).toBe(0); + }); + + it('should support async callbacks', async () => { + const mockEvents = [ + { type: 'stdout', timestamp: new Date().toISOString(), data: 'line1\n' }, + { type: 'stdout', timestamp: new Date().toISOString(), data: 'line2\n' }, + { type: 'complete', timestamp: new Date().toISOString(), exitCode: 0 }, + ]; + + const mockStream = new ReadableStream({ + start(controller) { + const encoder = new TextEncoder(); + for (const event of mockEvents) { + const sseData = `data: ${JSON.stringify(event)}\n\n`; + controller.enqueue(encoder.encode(sseData)); + } + controller.close(); + } + }); + + vi.spyOn(sandbox.client.commands, 'executeStream').mockResolvedValue(mockStream); + + const receivedEvents: any[] = []; + const asyncCallback = async (event: any) => { + // Simulate async work + await new Promise(resolve => setTimeout(resolve, 10)); + receivedEvents.push(event); + }; + + await sandbox.execStreamWithCallback('echo test', asyncCallback); + + expect(receivedEvents).toHaveLength(3); + }); + + it('should stream process logs with callback', async () => { + const mockEvents = [ + { type: 'stdout', timestamp: new Date().toISOString(), data: 'process output\n' }, + { type: 'complete', timestamp: new Date().toISOString(), exitCode: 0 }, + ]; + + const mockStream = new ReadableStream({ + start(controller) { + const encoder = new TextEncoder(); + for (const event of mockEvents) { + const sseData = `data: ${JSON.stringify(event)}\n\n`; + controller.enqueue(encoder.encode(sseData)); + } + controller.close(); + } + }); + + vi.spyOn(sandbox.client.processes, 'streamProcessLogs').mockResolvedValue(mockStream); + + const receivedEvents: any[] = []; + await sandbox.streamProcessLogsWithCallback( + 'proc-123', + (event) => { + receivedEvents.push(event); + } + ); + + expect(receivedEvents).toHaveLength(2); + expect(receivedEvents[0].type).toBe('stdout'); + expect(receivedEvents[1].type).toBe('complete'); + }); + + it('should renew activity timeout during streaming', async () => { + const mockEvents = [ + { type: 'stdout', timestamp: new Date().toISOString(), data: 'chunk1\n' }, + { type: 'stdout', timestamp: new Date().toISOString(), data: 'chunk2\n' }, + { type: 'complete', timestamp: new Date().toISOString(), exitCode: 0 }, + ]; + + const mockStream = new ReadableStream({ + start(controller) { + const encoder = new TextEncoder(); + for (const event of mockEvents) { + const sseData = `data: ${JSON.stringify(event)}\n\n`; + controller.enqueue(encoder.encode(sseData)); + } + controller.close(); + } + }); + + vi.spyOn(sandbox.client.commands, 'executeStream').mockResolvedValue(mockStream); + + // Mock renewActivityTimeout before creating the spy + (sandbox as any).renewActivityTimeout = vi.fn(); + const renewSpy = vi.spyOn(sandbox as any, 'renewActivityTimeout'); + + await sandbox.execStreamWithCallback('echo test', () => {}); + + expect(renewSpy).toHaveBeenCalled(); + }); + + it('should handle errors in streaming', async () => { + const mockStream = new ReadableStream({ + start(controller) { + controller.error(new Error('Stream error')); + } + }); + + vi.spyOn(sandbox.client.commands, 'executeStream').mockResolvedValue(mockStream); + + await expect( + sandbox.execStreamWithCallback('echo test', () => {}) + ).rejects.toThrow('Stream error'); + }); + + it('should respect abort signal during streaming', async () => { + const mockEvents = [ + { type: 'stdout', timestamp: new Date().toISOString(), data: 'chunk1\n' }, + { type: 'stdout', timestamp: new Date().toISOString(), data: 'chunk2\n' }, + { type: 'complete', timestamp: new Date().toISOString(), exitCode: 0 }, + ]; + + const mockStream = new ReadableStream({ + async start(controller) { + const encoder = new TextEncoder(); + for (const event of mockEvents) { + await new Promise(resolve => setTimeout(resolve, 100)); + const sseData = `data: ${JSON.stringify(event)}\n\n`; + controller.enqueue(encoder.encode(sseData)); + } + controller.close(); + } + }); + + vi.spyOn(sandbox.client.commands, 'executeStream').mockResolvedValue(mockStream); + + const abortController = new AbortController(); + const receivedEvents: any[] = []; + + // Abort after receiving first event + const callbackWithAbort = (event: any) => { + receivedEvents.push(event); + if (receivedEvents.length === 1) { + abortController.abort(); + } + }; + + await expect( + sandbox.execStreamWithCallback('echo test', callbackWithAbort, { + signal: abortController.signal + }) + ).rejects.toThrow('Operation was aborted'); + }); + + it('should use specified sessionId if provided', async () => { + const mockStream = new ReadableStream({ + start(controller) { + const encoder = new TextEncoder(); + const event = { type: 'complete', timestamp: new Date().toISOString(), exitCode: 0 }; + controller.enqueue(encoder.encode(`data: ${JSON.stringify(event)}\n\n`)); + controller.close(); + } + }); + + const executeStreamSpy = vi.spyOn(sandbox.client.commands, 'executeStream').mockResolvedValue(mockStream); + + await sandbox.execStreamWithCallback('echo test', () => {}, { sessionId: 'custom-session' }); + + expect(executeStreamSpy).toHaveBeenCalledWith('echo test', 'custom-session'); + }); + + it('should handle stderr events separately from stdout', async () => { + const mockEvents = [ + { type: 'stdout', timestamp: new Date().toISOString(), data: 'stdout message\n' }, + { type: 'stderr', timestamp: new Date().toISOString(), data: 'stderr message\n' }, + { type: 'complete', timestamp: new Date().toISOString(), exitCode: 0 }, + ]; + + const mockStream = new ReadableStream({ + start(controller) { + const encoder = new TextEncoder(); + for (const event of mockEvents) { + const sseData = `data: ${JSON.stringify(event)}\n\n`; + controller.enqueue(encoder.encode(sseData)); + } + controller.close(); + } + }); + + vi.spyOn(sandbox.client.commands, 'executeStream').mockResolvedValue(mockStream); + + const stdoutData: string[] = []; + const stderrData: string[] = []; + + await sandbox.execStreamWithCallback('test command', (event) => { + if (event.type === 'stdout') { + stdoutData.push(event.data || ''); + } else if (event.type === 'stderr') { + stderrData.push(event.data || ''); + } + }); + + expect(stdoutData).toEqual(['stdout message\n']); + expect(stderrData).toEqual(['stderr message\n']); + }); + + it('should handle error events from command execution', async () => { + const mockEvents = [ + { type: 'error', timestamp: new Date().toISOString(), error: 'Command not found' }, + ]; + + const mockStream = new ReadableStream({ + start(controller) { + const encoder = new TextEncoder(); + for (const event of mockEvents) { + const sseData = `data: ${JSON.stringify(event)}\n\n`; + controller.enqueue(encoder.encode(sseData)); + } + controller.close(); + } + }); + + vi.spyOn(sandbox.client.commands, 'executeStream').mockResolvedValue(mockStream); + + const receivedEvents: any[] = []; + await sandbox.execStreamWithCallback('nonexistent', (event) => { + receivedEvents.push(event); + }); + + expect(receivedEvents).toHaveLength(1); + expect(receivedEvents[0].type).toBe('error'); + expect(receivedEvents[0].error).toBe('Command not found'); + }); + }); }); diff --git a/packages/shared/src/types.ts b/packages/shared/src/types.ts index 8e0e7df..102c58b 100644 --- a/packages/shared/src/types.ts +++ b/packages/shared/src/types.ts @@ -634,6 +634,18 @@ export interface ISandbox { execStream(command: string, options?: StreamOptions): Promise>; streamProcessLogs(processId: string, options?: { signal?: AbortSignal }): Promise>; + // Callback-based streaming + execStreamWithCallback( + command: string, + onEvent: (event: ExecEvent) => void | Promise, + options?: { sessionId?: string; signal?: AbortSignal } + ): Promise; + streamProcessLogsWithCallback( + processId: string, + onEvent: (event: ExecEvent) => void | Promise, + options?: { signal?: AbortSignal } + ): Promise; + // Utility methods cleanupCompletedProcesses(): Promise; getProcessLogs(id: string): Promise<{ stdout: string; stderr: string; processId: string }>; diff --git a/tests/e2e/streaming-operations-workflow.test.ts b/tests/e2e/streaming-operations-workflow.test.ts index 920e2a2..cf0b848 100644 --- a/tests/e2e/streaming-operations-workflow.test.ts +++ b/tests/e2e/streaming-operations-workflow.test.ts @@ -580,5 +580,99 @@ describe('Streaming Operations Workflow', () => { console.log('[Test] ✅ Intermittent output handled correctly'); }, 90000); + + /** + * Test for callback-based streaming + * This validates that long-running commands work via callback pattern + */ + test('should handle very long-running commands (60+ seconds) via callback-based streaming', async () => { + currentSandboxId = createSandboxId(); + const headers = createTestHeaders(currentSandboxId); + + console.log('[Test] Starting 60+ second command via callback-based streaming...'); + + // With callback-based streaming, it should complete successfully + const streamResponse = await vi.waitFor( + async () => fetchWithStartup(`${workerUrl}/api/execStream`, { + method: 'POST', + headers, + body: JSON.stringify({ + // Command that runs for 60+ seconds with periodic output + command: "bash -c 'for i in {1..12}; do echo \"Minute mark $i\"; sleep 5; done; echo \"COMPLETED\"'", + }), + }), + { timeout: 90000, interval: 2000 } + ); + + expect(streamResponse.status).toBe(200); + + const startTime = Date.now(); + const events = await collectSSEEvents(streamResponse, 100); + const duration = Date.now() - startTime; + + console.log(`[Test] Very long stream completed in ${duration}ms`); + + // Verify command ran for approximately 60 seconds (12 ticks * 5 seconds) + expect(duration).toBeGreaterThan(55000); // At least 55 seconds + expect(duration).toBeLessThan(75000); // But not timed out (under 75s) + + // Should have received all minute marks + const stdoutEvents = events.filter((e) => e.type === 'stdout'); + const output = stdoutEvents.map((e) => e.data).join(''); + + for (let i = 1; i <= 12; i++) { + expect(output).toContain(`Minute mark ${i}`); + } + expect(output).toContain('COMPLETED'); + + // Most importantly: should complete with exit code 0 (not timeout/disconnect) + const completeEvent = events.find((e) => e.type === 'complete'); + expect(completeEvent).toBeDefined(); + expect(completeEvent?.exitCode).toBe(0); + + console.log('[Test] ✅ Very long-running command completed!'); + }, 90000); + + test('should handle command that sleeps for extended period', async () => { + currentSandboxId = createSandboxId(); + const headers = createTestHeaders(currentSandboxId); + + console.log('[Test] Testing sleep 45 && echo "done" pattern...'); + + // This is the exact pattern that was failing before + const streamResponse = await vi.waitFor( + async () => fetchWithStartup(`${workerUrl}/api/execStream`, { + method: 'POST', + headers, + body: JSON.stringify({ + command: 'sleep 45 && echo "done"', + }), + }), + { timeout: 90000, interval: 2000 } + ); + + expect(streamResponse.status).toBe(200); + + const startTime = Date.now(); + const events = await collectSSEEvents(streamResponse, 20); + const duration = Date.now() - startTime; + + console.log(`[Test] Sleep command completed in ${duration}ms`); + + // Should have taken at least 45 seconds + expect(duration).toBeGreaterThan(44000); + + // Should have the output + const stdoutEvents = events.filter((e) => e.type === 'stdout'); + const output = stdoutEvents.map((e) => e.data).join(''); + expect(output).toContain('done'); + + // Should complete successfully + const completeEvent = events.find((e) => e.type === 'complete'); + expect(completeEvent).toBeDefined(); + expect(completeEvent?.exitCode).toBe(0); + + console.log('[Test] ✅ Long sleep command completed without disconnect!'); + }, 90000); }); }); From 42b76c77f2189188f2f87f4c05cf7e9407ab2a9c Mon Sep 17 00:00:00 2001 From: katereznykova Date: Sun, 26 Oct 2025 11:48:22 +0000 Subject: [PATCH 13/23] fix for streaming --- cf-sandbox-bug | 1 - packages/sandbox/src/sandbox.ts | 4 ++ packages/shared/src/types.ts | 14 +++++- .../e2e/streaming-operations-workflow.test.ts | 4 +- tests/e2e/test-worker/index.ts | 45 +++++++++++++++++++ 5 files changed, 64 insertions(+), 4 deletions(-) delete mode 160000 cf-sandbox-bug diff --git a/cf-sandbox-bug b/cf-sandbox-bug deleted file mode 160000 index 704b93c..0000000 --- a/cf-sandbox-bug +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 704b93c6976ebe52c1fbf4d4fec52bb545f4e8c2 diff --git a/packages/sandbox/src/sandbox.ts b/packages/sandbox/src/sandbox.ts index 18243b8..c61a389 100644 --- a/packages/sandbox/src/sandbox.ts +++ b/packages/sandbox/src/sandbox.ts @@ -1227,6 +1227,8 @@ export class Sandbox extends Container implements ISandbox { // Command execution - delegate to internal session-aware methods exec: (command, options) => this.execWithSession(command, sessionId, options), execStream: (command, options) => this.execStreamWithSession(command, sessionId, options), + execStreamWithCallback: (command, onEvent, options) => + this.execStreamWithCallback(command, onEvent, { ...options, sessionId }), // Process management startProcess: (command, options) => this.startProcess(command, options, sessionId), @@ -1237,6 +1239,8 @@ export class Sandbox extends Container implements ISandbox { cleanupCompletedProcesses: () => this.cleanupCompletedProcesses(), getProcessLogs: (id) => this.getProcessLogs(id), streamProcessLogs: (processId, options) => this.streamProcessLogs(processId, options), + streamProcessLogsWithCallback: (processId, onEvent, options) => + this.streamProcessLogsWithCallback(processId, onEvent, options), // File operations - pass sessionId via options or parameter writeFile: (path, content, options) => this.writeFile(path, content, { ...options, sessionId }), diff --git a/packages/shared/src/types.ts b/packages/shared/src/types.ts index 102c58b..3a0566c 100644 --- a/packages/shared/src/types.ts +++ b/packages/shared/src/types.ts @@ -583,7 +583,14 @@ export interface ExecutionSession { // Command execution exec(command: string, options?: ExecOptions): Promise; execStream(command: string, options?: StreamOptions): Promise>; - + + // Callback-based streaming + execStreamWithCallback( + command: string, + onEvent: (event: ExecEvent) => void | Promise, + options?: { signal?: AbortSignal } + ): Promise; + // Background process management startProcess(command: string, options?: ProcessOptions): Promise; listProcesses(): Promise; @@ -593,6 +600,11 @@ export interface ExecutionSession { cleanupCompletedProcesses(): Promise; getProcessLogs(id: string): Promise<{ stdout: string; stderr: string; processId: string }>; streamProcessLogs(processId: string, options?: { signal?: AbortSignal }): Promise>; + streamProcessLogsWithCallback( + processId: string, + onEvent: (event: ExecEvent) => void | Promise, + options?: { signal?: AbortSignal } + ): Promise; // File operations writeFile(path: string, content: string, options?: { encoding?: string }): Promise; diff --git a/tests/e2e/streaming-operations-workflow.test.ts b/tests/e2e/streaming-operations-workflow.test.ts index cf0b848..3c5bede 100644 --- a/tests/e2e/streaming-operations-workflow.test.ts +++ b/tests/e2e/streaming-operations-workflow.test.ts @@ -593,7 +593,7 @@ describe('Streaming Operations Workflow', () => { // With callback-based streaming, it should complete successfully const streamResponse = await vi.waitFor( - async () => fetchWithStartup(`${workerUrl}/api/execStream`, { + async () => fetchWithStartup(`${workerUrl}/api/execStreamCallback`, { method: 'POST', headers, body: JSON.stringify({ @@ -641,7 +641,7 @@ describe('Streaming Operations Workflow', () => { // This is the exact pattern that was failing before const streamResponse = await vi.waitFor( - async () => fetchWithStartup(`${workerUrl}/api/execStream`, { + async () => fetchWithStartup(`${workerUrl}/api/execStreamCallback`, { method: 'POST', headers, body: JSON.stringify({ diff --git a/tests/e2e/test-worker/index.ts b/tests/e2e/test-worker/index.ts index 0c3aa26..7341713 100644 --- a/tests/e2e/test-worker/index.ts +++ b/tests/e2e/test-worker/index.ts @@ -86,6 +86,51 @@ export default { }); } + // Command execution with callback-based streaming (for long-running commands) + if (url.pathname === '/api/execStreamCallback' && request.method === 'POST') { + console.log('[TestWorker] execStreamCallback called for command:', body.command); + const startTime = Date.now(); + + // Create a TransformStream to convert callbacks to SSE + const { readable, writable } = new TransformStream(); + const writer = writable.getWriter(); + const encoder = new TextEncoder(); + + // Start streaming in the background using callback-based method + (async () => { + try { + await executor.execStreamWithCallback( + body.command, + async (event) => { + // Convert event to SSE format + const sseData = `data: ${JSON.stringify(event)}\n\n`; + await writer.write(encoder.encode(sseData)); + } + ); + } catch (error: any) { + // Send error event + const errorEvent = { + type: 'error', + timestamp: new Date().toISOString(), + error: error.message, + }; + await writer.write(encoder.encode(`data: ${JSON.stringify(errorEvent)}\n\n`)); + } finally { + await writer.close(); + console.log('[TestWorker] Stream completed in', Date.now() - startTime, 'ms'); + } + })(); + + // Return SSE stream + return new Response(readable, { + headers: { + 'Content-Type': 'text/event-stream', + 'Cache-Control': 'no-cache', + 'Connection': 'keep-alive', + }, + }); + } + // Git clone if (url.pathname === '/api/git/clone' && request.method === 'POST') { await executor.gitCheckout(body.repoUrl, { From 7eedde9c62751a77189526e20773f10674d7eaea Mon Sep 17 00:00:00 2001 From: katereznykova Date: Sun, 26 Oct 2025 11:51:49 +0000 Subject: [PATCH 14/23] nits --- packages/sandbox/src/interpreter.ts | 2 +- tests/e2e/streaming-operations-workflow.test.ts | 7 ------- 2 files changed, 1 insertion(+), 8 deletions(-) diff --git a/packages/sandbox/src/interpreter.ts b/packages/sandbox/src/interpreter.ts index 9630b71..ba3ee61 100644 --- a/packages/sandbox/src/interpreter.ts +++ b/packages/sandbox/src/interpreter.ts @@ -82,7 +82,7 @@ export class CodeInterpreter { async runCodeStream( code: string, options: RunCodeOptions = {} - ): Promise> { + ): Promise { // Get or create context let context = options.context; if (!context) { diff --git a/tests/e2e/streaming-operations-workflow.test.ts b/tests/e2e/streaming-operations-workflow.test.ts index 3c5bede..26885eb 100644 --- a/tests/e2e/streaming-operations-workflow.test.ts +++ b/tests/e2e/streaming-operations-workflow.test.ts @@ -445,13 +445,6 @@ describe('Streaming Operations Workflow', () => { expect(completeEvent?.exitCode).toBe(0); }, 90000); - /** - * Tests for GitHub Issue #101: Container Hibernation During Long Streaming - * - * These tests verify that containers stay alive during long-running streaming - * operations via automatic activity renewal, and that the implementation actually - * works with real containers (not just mocked tests). - */ test('should keep container alive during 15+ second streaming command (GitHub #101)', async () => { currentSandboxId = createSandboxId(); From 1aa47cde7e5d26917351c9783e3a9df56a7190bf Mon Sep 17 00:00:00 2001 From: katereznykova Date: Mon, 27 Oct 2025 11:16:03 +0000 Subject: [PATCH 15/23] fix and neats --- packages/sandbox/src/sandbox.ts | 65 ++++++++++++++------------------- packages/shared/src/types.ts | 2 +- 2 files changed, 29 insertions(+), 38 deletions(-) diff --git a/packages/sandbox/src/sandbox.ts b/packages/sandbox/src/sandbox.ts index 750887d..e594a23 100644 --- a/packages/sandbox/src/sandbox.ts +++ b/packages/sandbox/src/sandbox.ts @@ -58,10 +58,10 @@ export class Sandbox extends Container implements ISandbox { client: SandboxClient; private codeInterpreter: CodeInterpreter; - private sandboxName: string | null = null; - private baseUrl: string | null = null; + private sandboxName: string | undefined = undefined; + private baseUrl: string | undefined = undefined; private portTokens: Map = new Map(); - private defaultSession: string | null = null; + private defaultSession: string | undefined = undefined; envVars: Record = {}; private logger: ReturnType; private lastActivityRenewal: number = 0; @@ -69,7 +69,7 @@ export class Sandbox extends Container implements ISandbox { private readonly STREAM_HEALTH_CHECK_INTERVAL_MS = 30000; // Check sandbox health every 30 seconds during streaming private readonly STREAM_READ_TIMEOUT_MS = 300000; // 5 minutes timeout for stream reads (detects hung streams) private readonly PROCESS_MONITOR_INTERVAL_MS = 5000; // Check for running processes every 5 seconds - private processMonitorInterval: ReturnType | null = null; + private processMonitorInterval: ReturnType | undefined = undefined; private lastProcessMonitorRenewal: number = 0; constructor(ctx: DurableObject['ctx'], env: Env) { @@ -101,8 +101,8 @@ export class Sandbox extends Container implements ISandbox { // Load the sandbox name, port tokens, and default session from storage on initialization this.ctx.blockConcurrencyWhile(async () => { - this.sandboxName = await this.ctx.storage.get('sandboxName') || null; - this.defaultSession = await this.ctx.storage.get('defaultSession') || null; + this.sandboxName = await this.ctx.storage.get('sandboxName') || undefined; + this.defaultSession = await this.ctx.storage.get('defaultSession') || undefined; const storedTokens = await this.ctx.storage.get>('portTokens') || {}; // Convert stored tokens back to Map @@ -221,10 +221,12 @@ export class Sandbox extends Container implements ISandbox { override onStop() { this.logger.debug('Sandbox stopped'); + this.stopProcessMonitor(); } override onError(error: unknown) { this.logger.error('Sandbox error', error instanceof Error ? error : new Error(String(error))); + this.stopProcessMonitor(); } /** @@ -253,10 +255,13 @@ export class Sandbox extends Container implements ISandbox { } this.logger.debug('Starting global process monitor'); + let consecutiveFailures = 0; + const MAX_CONSECUTIVE_FAILURES = 3; // Stop after 3 consecutive failures (15 seconds) this.processMonitorInterval = setInterval(async () => { try { const processList = await this.client.processes.listProcesses(); + consecutiveFailures = 0; // Reset on success const runningProcesses = processList.processes.filter( p => p.status === 'running' || p.status === 'starting' ); @@ -276,8 +281,19 @@ export class Sandbox extends Container implements ISandbox { this.stopProcessMonitor(); } } catch (error) { - // Non-fatal: if we can't check processes, just log and continue - this.logger.debug(`Global process monitor failed to check running processes: ${error instanceof Error ? error.message : String(error)}`); + consecutiveFailures++; + const errorMsg = error instanceof Error ? error.message : String(error); + + if (consecutiveFailures >= MAX_CONSECUTIVE_FAILURES) { + this.logger.error( + `Global process monitor failed ${consecutiveFailures} times, stopping monitor. Last error: ${errorMsg}` + ); + this.stopProcessMonitor(); + } else { + this.logger.debug( + `Global process monitor failed to check running processes (${consecutiveFailures}/${MAX_CONSECUTIVE_FAILURES}): ${errorMsg}` + ); + } } }, this.PROCESS_MONITOR_INTERVAL_MS); } @@ -288,7 +304,7 @@ export class Sandbox extends Container implements ISandbox { private stopProcessMonitor(): void { if (this.processMonitorInterval) { clearInterval(this.processMonitorInterval); - this.processMonitorInterval = null; + this.processMonitorInterval = undefined; this.logger.debug('Stopped global process monitor'); } } @@ -735,10 +751,8 @@ export class Sandbox extends Container implements ISandbox { } /** - * Wraps a ReadableStream to provide three critical protections: - * 1. Activity renewal - prevents sleepAfter timeout during active streaming - * 2. Health monitoring - detects container crashes mid-stream - * 3. Read timeout - detects hung streams (e.g., Bun connection idle timeout)*/ + * Wraps a ReadableStream + */ private wrapStreamWithActivityRenewal(stream: ReadableStream): ReadableStream { const self = this; let healthCheckInterval: ReturnType | undefined; @@ -753,7 +767,6 @@ export class Sandbox extends Container implements ISandbox { let currentTimeoutHandle: ReturnType | undefined; // Set up periodic health monitoring to detect container crashes - // AND to keep container alive while processes are running even without output healthCheckInterval = setInterval(async () => { if (!streamActive) { if (healthCheckInterval) { @@ -776,30 +789,8 @@ export class Sandbox extends Container implements ISandbox { await reader.cancel(error.message); } } - - // Check for running processes and renew activity if any exist - try { - const processList = await self.client.processes.listProcesses(); - const runningProcesses = processList.processes.filter( - p => p.status === 'running' || p.status === 'starting' - ); - - if (runningProcesses.length > 0) { - const now = Date.now(); - if (now - lastActivityRenewal >= self.ACTIVITY_RENEWAL_THROTTLE_MS) { - self.renewActivityTimeout(); - lastActivityRenewal = now; - self.logger.debug( - `Renewed activity timeout due to ${runningProcesses.length} running process(es): ${runningProcesses.map(p => p.id).join(', ')}` - ); - } - } - } catch (error) { - // Non-fatal: if we can't check processes, just log and continue - self.logger.debug(`Failed to check running processes: ${error instanceof Error ? error.message : String(error)}`); - } } catch (error) { - // If getState() fails, container is likely dead + // If getState() fails, container is likely dead - this is a critical failure isHealthy = false; if (!errorReported) { errorReported = true; diff --git a/packages/shared/src/types.ts b/packages/shared/src/types.ts index c67a71f..ffa77ce 100644 --- a/packages/shared/src/types.ts +++ b/packages/shared/src/types.ts @@ -690,7 +690,7 @@ export interface ISandbox { // Code interpreter methods createCodeContext(options?: CreateContextOptions): Promise; runCode(code: string, options?: RunCodeOptions): Promise; - runCodeStream(code: string, options?: RunCodeOptions): Promise>; + runCodeStream(code: string, options?: RunCodeOptions): Promise; listCodeContexts(): Promise; deleteCodeContext(contextId: string): Promise; } From 7d916c78110cbf7a82e9108b1f98f77acfcb9a98 Mon Sep 17 00:00:00 2001 From: katereznykova Date: Mon, 27 Oct 2025 18:24:38 +0000 Subject: [PATCH 16/23] use keepAlive flag --- packages/sandbox/src/sandbox.ts | 132 ++------- packages/sandbox/tests/get-sandbox.test.ts | 39 +++ packages/sandbox/tests/sandbox.test.ts | 78 +---- packages/shared/src/index.ts | 2 +- packages/shared/src/types.ts | 23 ++ tests/e2e/keepalive-workflow.test.ts | 277 ++++++++++++++++++ .../e2e/streaming-operations-workflow.test.ts | 7 +- tests/e2e/test-worker/index.ts | 9 +- 8 files changed, 379 insertions(+), 188 deletions(-) create mode 100644 tests/e2e/keepalive-workflow.test.ts diff --git a/packages/sandbox/src/sandbox.ts b/packages/sandbox/src/sandbox.ts index e594a23..1164f8e 100644 --- a/packages/sandbox/src/sandbox.ts +++ b/packages/sandbox/src/sandbox.ts @@ -49,6 +49,10 @@ export function getSandbox( stub.setSleepAfter(options.sleepAfter); } + if (options?.keepAlive !== undefined) { + stub.setKeepAlive(options.keepAlive); + } + return stub; } @@ -64,13 +68,9 @@ export class Sandbox extends Container implements ISandbox { private defaultSession: string | undefined = undefined; envVars: Record = {}; private logger: ReturnType; - private lastActivityRenewal: number = 0; - private readonly ACTIVITY_RENEWAL_THROTTLE_MS = 5000; // Throttle renewals to once per 5 seconds private readonly STREAM_HEALTH_CHECK_INTERVAL_MS = 30000; // Check sandbox health every 30 seconds during streaming private readonly STREAM_READ_TIMEOUT_MS = 300000; // 5 minutes timeout for stream reads (detects hung streams) - private readonly PROCESS_MONITOR_INTERVAL_MS = 5000; // Check for running processes every 5 seconds - private processMonitorInterval: ReturnType | undefined = undefined; - private lastProcessMonitorRenewal: number = 0; + private keepAliveEnabled: boolean = false; constructor(ctx: DurableObject['ctx'], env: Env) { super(ctx, env); @@ -138,6 +138,16 @@ export class Sandbox extends Container implements ISandbox { this.sleepAfter = sleepAfter; } + // RPC method to enable keepAlive mode + async setKeepAlive(keepAlive: boolean): Promise { + this.keepAliveEnabled = keepAlive; + if (keepAlive) { + this.logger.info('KeepAlive mode enabled - container will stay alive until explicitly destroyed'); + } else { + this.logger.info('KeepAlive mode disabled - container will timeout normally'); + } + } + // RPC method to set environment variables async setEnvVars(envVars: Record): Promise { // Update local state for new sessions @@ -221,93 +231,27 @@ export class Sandbox extends Container implements ISandbox { override onStop() { this.logger.debug('Sandbox stopped'); - this.stopProcessMonitor(); } override onError(error: unknown) { this.logger.error('Sandbox error', error instanceof Error ? error : new Error(String(error))); - this.stopProcessMonitor(); } /** - * Throttled activity timeout renewal, to prevent renewal on every chunk - * Only renews if sufficient time has passed since last renewal + * Override onActivityExpired to prevent automatic shutdown when keepAlive is enabled + * When keepAlive is disabled, calls parent implementation which stops the container */ - private renewActivityTimeoutThrottled(): void { - const now = Date.now(); - const timeSinceLastRenewal = now - this.lastActivityRenewal; - - // Only renew if throttle period has elapsed - if (timeSinceLastRenewal >= this.ACTIVITY_RENEWAL_THROTTLE_MS) { - this.renewActivityTimeout(); - this.lastActivityRenewal = now; + override async onActivityExpired(): Promise { + if (this.keepAliveEnabled) { + this.logger.debug('Activity expired but keepAlive is enabled - container will stay alive'); + // Do nothing - don't call stop(), container stays alive + } else { + // Default behavior: stop the container + this.logger.debug('Activity expired - stopping container'); + await super.onActivityExpired(); } } - /** - * Start the global process monitor that keeps the container alive - * while any processes are running, even without active streams. - */ - private startProcessMonitor(): void { - // Don't start if already running - if (this.processMonitorInterval) { - return; - } - - this.logger.debug('Starting global process monitor'); - let consecutiveFailures = 0; - const MAX_CONSECUTIVE_FAILURES = 3; // Stop after 3 consecutive failures (15 seconds) - - this.processMonitorInterval = setInterval(async () => { - try { - const processList = await this.client.processes.listProcesses(); - consecutiveFailures = 0; // Reset on success - const runningProcesses = processList.processes.filter( - p => p.status === 'running' || p.status === 'starting' - ); - - if (runningProcesses.length > 0) { - const now = Date.now(); - if (now - this.lastProcessMonitorRenewal >= this.ACTIVITY_RENEWAL_THROTTLE_MS) { - this.renewActivityTimeout(); - this.lastProcessMonitorRenewal = now; - this.logger.debug( - `Global process monitor renewed activity timeout due to ${runningProcesses.length} running process(es): ${runningProcesses.map(p => p.id).join(', ')}` - ); - } - } else { - // No running processes, stop the monitor - this.logger.debug('No running processes found, stopping global process monitor'); - this.stopProcessMonitor(); - } - } catch (error) { - consecutiveFailures++; - const errorMsg = error instanceof Error ? error.message : String(error); - - if (consecutiveFailures >= MAX_CONSECUTIVE_FAILURES) { - this.logger.error( - `Global process monitor failed ${consecutiveFailures} times, stopping monitor. Last error: ${errorMsg}` - ); - this.stopProcessMonitor(); - } else { - this.logger.debug( - `Global process monitor failed to check running processes (${consecutiveFailures}/${MAX_CONSECUTIVE_FAILURES}): ${errorMsg}` - ); - } - } - }, this.PROCESS_MONITOR_INTERVAL_MS); - } - - /** - * Stop the global process monitor - */ - private stopProcessMonitor(): void { - if (this.processMonitorInterval) { - clearInterval(this.processMonitorInterval); - this.processMonitorInterval = undefined; - this.logger.debug('Stopped global process monitor'); - } - } // Override fetch to route internal container requests to appropriate ports override async fetch(request: Request): Promise { @@ -468,10 +412,6 @@ export class Sandbox extends Container implements ISandbox { case 'stdout': case 'stderr': if (event.data) { - // Renew activity timeout on each output event (throttled to reduce overhead) - // This keeps the container alive during active command execution - this.renewActivityTimeoutThrottled(); - // Update accumulated output if (event.type === 'stdout') stdout += event.data; if (event.type === 'stderr') stderr += event.data; @@ -594,10 +534,6 @@ export class Sandbox extends Container implements ISandbox { exitCode: undefined }, session); - // Start the global process monitor to keep container alive - // even if no one is streaming - this.startProcessMonitor(); - // Call onStart callback if provided if (options?.onStart) { options.onStart(processObj); @@ -751,14 +687,13 @@ export class Sandbox extends Container implements ISandbox { } /** - * Wraps a ReadableStream + * Wraps a ReadableStream with health monitoring */ private wrapStreamWithActivityRenewal(stream: ReadableStream): ReadableStream { const self = this; let healthCheckInterval: ReturnType | undefined; let streamActive = true; let errorReported = false; - let lastActivityRenewal = 0; return new ReadableStream({ async start(controller) { @@ -830,11 +765,6 @@ export class Sandbox extends Container implements ISandbox { controller.close(); break; } - const now = Date.now(); - if (now - lastActivityRenewal >= self.ACTIVITY_RENEWAL_THROTTLE_MS) { - self.renewActivityTimeout(); - lastActivityRenewal = now; - } controller.enqueue(value); } @@ -912,11 +842,6 @@ export class Sandbox extends Container implements ISandbox { throw new Error('Operation was aborted'); } - // Renew activity timeout periodically (if available) - if (this.renewActivityTimeout) { - this.renewActivityTimeout(); - } - // Call the event callback await onEvent(event); } @@ -955,11 +880,6 @@ export class Sandbox extends Container implements ISandbox { throw new Error('Operation was aborted'); } - // Renew activity timeout periodically (if available) - if (this.renewActivityTimeout) { - this.renewActivityTimeout(); - } - // Call the event callback await onEvent(event); } diff --git a/packages/sandbox/tests/get-sandbox.test.ts b/packages/sandbox/tests/get-sandbox.test.ts index 6f77f17..15da1fa 100644 --- a/packages/sandbox/tests/get-sandbox.test.ts +++ b/packages/sandbox/tests/get-sandbox.test.ts @@ -30,6 +30,7 @@ describe('getSandbox', () => { setSleepAfter: vi.fn((value: string | number) => { mockStub.sleepAfter = value; }), + setKeepAlive: vi.fn(), }; // Mock getContainer to return our stub @@ -107,4 +108,42 @@ describe('getSandbox', () => { expect(sandbox.sleepAfter).toBe(timeString); } }); + + it('should apply keepAlive option when provided as true', () => { + const mockNamespace = {} as any; + const sandbox = getSandbox(mockNamespace, 'test-sandbox', { + keepAlive: true, + }); + + expect(sandbox.setKeepAlive).toHaveBeenCalledWith(true); + }); + + it('should apply keepAlive option when provided as false', () => { + const mockNamespace = {} as any; + const sandbox = getSandbox(mockNamespace, 'test-sandbox', { + keepAlive: false, + }); + + expect(sandbox.setKeepAlive).toHaveBeenCalledWith(false); + }); + + it('should not call setKeepAlive when keepAlive option not provided', () => { + const mockNamespace = {} as any; + getSandbox(mockNamespace, 'test-sandbox'); + + expect(mockStub.setKeepAlive).not.toHaveBeenCalled(); + }); + + it('should apply keepAlive alongside other options', () => { + const mockNamespace = {} as any; + const sandbox = getSandbox(mockNamespace, 'test-sandbox', { + sleepAfter: '5m', + baseUrl: 'https://example.com', + keepAlive: true, + }); + + expect(sandbox.sleepAfter).toBe('5m'); + expect(sandbox.setBaseUrl).toHaveBeenCalledWith('https://example.com'); + expect(sandbox.setKeepAlive).toHaveBeenCalledWith(true); + }); }); diff --git a/packages/sandbox/tests/sandbox.test.ts b/packages/sandbox/tests/sandbox.test.ts index 21d79b1..d7ff678 100644 --- a/packages/sandbox/tests/sandbox.test.ts +++ b/packages/sandbox/tests/sandbox.test.ts @@ -463,58 +463,15 @@ describe('Sandbox - Automatic Session Management', () => { }); }); - describe('Activity renewal and health checks', () => { - let renewActivityTimeoutSpy: any; - + describe('Health checks', () => { beforeEach(() => { - // Mock Container base class methods - (sandbox as any).renewActivityTimeout = vi.fn(); + // Mock Container base class getState method for health checks (sandbox as any).getState = vi.fn().mockResolvedValue({ status: 'running', timestamp: new Date().toISOString(), }); - - // Spy on the base renewActivityTimeout method (called by inline throttling) - renewActivityTimeoutSpy = vi.spyOn(sandbox as any, 'renewActivityTimeout'); }); - it('should throttle activity renewal during streaming', async () => { - // Create a mock stream that emits multiple chunks quickly - const encoder = new TextEncoder(); - const chunks = Array.from({ length: 10 }, (_, i) => encoder.encode(`chunk ${i}\n`)); - - const mockStream = new ReadableStream({ - async start(controller) { - for (const chunk of chunks) { - controller.enqueue(chunk); - } - controller.close(); - } - }); - - vi.spyOn(sandbox.client.commands, 'executeStream').mockResolvedValue(mockStream); - - const stream = await sandbox.execStream('echo test'); - const reader = stream.getReader(); - - // Read all chunks - let chunkCount = 0; - while (true) { - const { done } = await reader.read(); - if (done) break; - chunkCount++; - } - - expect(chunkCount).toBe(10); - - // Activity renewal should be called, but throttled (not once per chunk) - // With 5 second throttle and instant reads, first chunk triggers renewal, - // subsequent chunks are throttled since less than 5s has passed - expect(renewActivityTimeoutSpy).toHaveBeenCalled(); - expect(renewActivityTimeoutSpy.mock.calls.length).toBeLessThanOrEqual(chunkCount); - // Should be called at least once (first chunk) but not for every chunk - expect(renewActivityTimeoutSpy.mock.calls.length).toBeGreaterThanOrEqual(1); - }); it('should detect container crashes during streaming', async () => { vi.useFakeTimers(); @@ -691,9 +648,6 @@ describe('Sandbox - Automatic Session Management', () => { vi.spyOn(sandbox.client.processes, 'streamProcessLogs').mockResolvedValue(mockStream); const logsStream = await sandbox.streamProcessLogs('proc-1'); expect(logsStream).toBeInstanceOf(ReadableStream); - - // Verify both streams have activity renewal applied - expect(renewActivityTimeoutSpy).toHaveBeenCalled(); }); }); @@ -799,34 +753,6 @@ describe('Sandbox - Automatic Session Management', () => { expect(receivedEvents[1].type).toBe('complete'); }); - it('should renew activity timeout during streaming', async () => { - const mockEvents = [ - { type: 'stdout', timestamp: new Date().toISOString(), data: 'chunk1\n' }, - { type: 'stdout', timestamp: new Date().toISOString(), data: 'chunk2\n' }, - { type: 'complete', timestamp: new Date().toISOString(), exitCode: 0 }, - ]; - - const mockStream = new ReadableStream({ - start(controller) { - const encoder = new TextEncoder(); - for (const event of mockEvents) { - const sseData = `data: ${JSON.stringify(event)}\n\n`; - controller.enqueue(encoder.encode(sseData)); - } - controller.close(); - } - }); - - vi.spyOn(sandbox.client.commands, 'executeStream').mockResolvedValue(mockStream); - - // Mock renewActivityTimeout before creating the spy - (sandbox as any).renewActivityTimeout = vi.fn(); - const renewSpy = vi.spyOn(sandbox as any, 'renewActivityTimeout'); - - await sandbox.execStreamWithCallback('echo test', () => {}); - - expect(renewSpy).toHaveBeenCalled(); - }); it('should handle errors in streaming', async () => { const mockStream = new ReadableStream({ diff --git a/packages/shared/src/index.ts b/packages/shared/src/index.ts index e691362..693e7ef 100644 --- a/packages/shared/src/index.ts +++ b/packages/shared/src/index.ts @@ -54,9 +54,9 @@ export type { ExecOptions, ExecResult, ExecutionSession, - FileExistsResult, // File streaming types FileChunk, + FileExistsResult, FileInfo, FileMetadata, FileStreamEvent, diff --git a/packages/shared/src/types.ts b/packages/shared/src/types.ts index ffa77ce..8610b71 100644 --- a/packages/shared/src/types.ts +++ b/packages/shared/src/types.ts @@ -262,6 +262,8 @@ export interface SandboxOptions { * - A string like "30s", "3m", "5m", "1h" (seconds, minutes, or hours) * - A number representing seconds (e.g., 180 for 3 minutes) * Default: "10m" (10 minutes) + * + * Note: Ignored when keepAlive is true */ sleepAfter?: string | number; @@ -269,6 +271,27 @@ export interface SandboxOptions { * Base URL for the sandbox API */ baseUrl?: string; + + /** + * Keep the container alive indefinitely by preventing automatic shutdown + * When true, the container will never auto-timeout and must be explicitly destroyed + * + * How it works: + * - Overrides onActivityExpired() to prevent shutdown when activity timeout expires + * - No active renewal or polling needed - simply prevents the default stop behavior + * - Container stays alive until you explicitly call destroy() + * + * Use cases: + * - Long-lived IDE/notebook sessions + * - WebSocket servers with sporadic message activity + * - Long-running background processes (builds, watchers, etc.) + * - Any scenario where activity can't be automatically detected + * + * Important: You MUST call sandbox.destroy() when done to avoid resource leaks + * + * Default: false + */ + keepAlive?: boolean; } /** diff --git a/tests/e2e/keepalive-workflow.test.ts b/tests/e2e/keepalive-workflow.test.ts new file mode 100644 index 0000000..195e7f5 --- /dev/null +++ b/tests/e2e/keepalive-workflow.test.ts @@ -0,0 +1,277 @@ +import { describe, test, expect, beforeAll, afterAll, afterEach, vi } from 'vitest'; +import { getTestWorkerUrl, WranglerDevRunner } from './helpers/wrangler-runner'; +import { createSandboxId, createTestHeaders, fetchWithStartup, cleanupSandbox } from './helpers/test-fixtures'; + +/** + * KeepAlive Workflow Integration Tests + * + * Tests the keepAlive feature that keeps containers alive indefinitely: + * - Container stays alive with keepAlive: true + * - Container respects normal timeout without keepAlive + * - Long-running processes work with keepAlive + * - Explicit destroy stops keepAlive container + * + * This validates that: + * - The keepAlive interval properly renews activity timeout + * - Containers don't auto-timeout when keepAlive is enabled + * - Manual cleanup via destroy() works correctly + */ +describe('KeepAlive Workflow', () => { + describe('local', () => { + let runner: WranglerDevRunner | null = null; + let workerUrl: string; + let currentSandboxId: string | null = null; + + beforeAll(async () => { + const result = await getTestWorkerUrl(); + workerUrl = result.url; + runner = result.runner; + }); + + afterEach(async () => { + // Cleanup sandbox container after each test + if (currentSandboxId) { + await cleanupSandbox(workerUrl, currentSandboxId); + currentSandboxId = null; + } + }); + + afterAll(async () => { + // Only stop runner if we spawned one locally (CI uses deployed worker) + if (runner) { + await runner.stop(); + } + }); + + test('should keep container alive with keepAlive enabled', async () => { + currentSandboxId = createSandboxId(); + const headers = createTestHeaders(currentSandboxId); + + // Add keepAlive header to enable keepAlive mode + const keepAliveHeaders = { + ...headers, + 'X-Sandbox-KeepAlive': 'true', + }; + + // Step 1: Initialize sandbox with keepAlive + const initResponse = await vi.waitFor( + async () => fetchWithStartup(`${workerUrl}/api/execute`, { + method: 'POST', + headers: keepAliveHeaders, + body: JSON.stringify({ + command: 'echo "Container initialized with keepAlive"', + }), + }), + { timeout: 90000, interval: 2000 } + ); + + expect(initResponse.status).toBe(200); + const initData = await initResponse.json(); + expect(initData.stdout).toContain('Container initialized with keepAlive'); + + // Step 2: Wait longer than normal activity timeout would allow (15 seconds) + // With keepAlive, container should stay alive + await new Promise((resolve) => setTimeout(resolve, 15000)); + + // Step 3: Execute another command to verify container is still alive + const verifyResponse = await fetch(`${workerUrl}/api/execute`, { + method: 'POST', + headers: keepAliveHeaders, + body: JSON.stringify({ + command: 'echo "Still alive after timeout period"', + }), + }); + + expect(verifyResponse.status).toBe(200); + const verifyData = await verifyResponse.json(); + expect(verifyData.stdout).toContain('Still alive after timeout period'); + }, 120000); + + test('should support long-running processes with keepAlive', async () => { + currentSandboxId = createSandboxId(); + const headers = createTestHeaders(currentSandboxId); + + const keepAliveHeaders = { + ...headers, + 'X-Sandbox-KeepAlive': 'true', + }; + + // Start a long sleep process (30 seconds) + const startResponse = await vi.waitFor( + async () => fetchWithStartup(`${workerUrl}/api/process/start`, { + method: 'POST', + headers: keepAliveHeaders, + body: JSON.stringify({ + command: 'sleep 30', + }), + }), + { timeout: 90000, interval: 2000 } + ); + + expect(startResponse.status).toBe(200); + const startData = await startResponse.json(); + expect(startData.status).toBe('running'); + const processId = startData.id; + + // Wait 20 seconds (longer than normal activity timeout) + await new Promise((resolve) => setTimeout(resolve, 20000)); + + // Verify process is still running + const statusResponse = await fetch(`${workerUrl}/api/process/${processId}`, { + method: 'GET', + headers: keepAliveHeaders, + }); + + expect(statusResponse.status).toBe(200); + const statusData = await statusResponse.json(); + expect(statusData.status).toBe('running'); + + // Cleanup - kill the process + await fetch(`${workerUrl}/api/process/${processId}`, { + method: 'DELETE', + headers: keepAliveHeaders, + }); + }, 120000); + + test('should destroy container when explicitly requested', async () => { + currentSandboxId = createSandboxId(); + const headers = createTestHeaders(currentSandboxId); + + const keepAliveHeaders = { + ...headers, + 'X-Sandbox-KeepAlive': 'true', + }; + + // Step 1: Initialize sandbox with keepAlive + await vi.waitFor( + async () => fetchWithStartup(`${workerUrl}/api/execute`, { + method: 'POST', + headers: keepAliveHeaders, + body: JSON.stringify({ + command: 'echo "Testing destroy"', + }), + }), + { timeout: 90000, interval: 2000 } + ); + + // Step 2: Explicitly destroy the container + const destroyResponse = await fetch(`${workerUrl}/api/destroy`, { + method: 'POST', + headers: keepAliveHeaders, + body: JSON.stringify({}), + }); + + expect(destroyResponse.status).toBe(200); + + // Step 3: Verify container was destroyed by trying to execute a command + // This should fail or require re-initialization + await new Promise((resolve) => setTimeout(resolve, 2000)); + + const verifyResponse = await fetch(`${workerUrl}/api/execute`, { + method: 'POST', + headers: keepAliveHeaders, + body: JSON.stringify({ + command: 'echo "After destroy"', + }), + }); + + // Container should be restarted (new container), not the same one + // We can verify by checking that the response is successful but it's a fresh container + expect(verifyResponse.status).toBe(200); + + // Mark as null so afterEach doesn't try to clean it up again + currentSandboxId = null; + }, 120000); + + test('should handle multiple commands with keepAlive over time', async () => { + currentSandboxId = createSandboxId(); + const headers = createTestHeaders(currentSandboxId); + + const keepAliveHeaders = { + ...headers, + 'X-Sandbox-KeepAlive': 'true', + }; + + // Initialize + await vi.waitFor( + async () => fetchWithStartup(`${workerUrl}/api/execute`, { + method: 'POST', + headers: keepAliveHeaders, + body: JSON.stringify({ + command: 'echo "Command 1"', + }), + }), + { timeout: 90000, interval: 2000 } + ); + + // Execute multiple commands with delays between them + for (let i = 2; i <= 4; i++) { + // Wait 8 seconds between commands (would timeout without keepAlive) + await new Promise((resolve) => setTimeout(resolve, 8000)); + + const response = await fetch(`${workerUrl}/api/execute`, { + method: 'POST', + headers: keepAliveHeaders, + body: JSON.stringify({ + command: `echo "Command ${i}"`, + }), + }); + + expect(response.status).toBe(200); + const data = await response.json(); + expect(data.stdout).toContain(`Command ${i}`); + } + }, 120000); + + test('should work with file operations while keepAlive is enabled', async () => { + currentSandboxId = createSandboxId(); + const headers = createTestHeaders(currentSandboxId); + + const keepAliveHeaders = { + ...headers, + 'X-Sandbox-KeepAlive': 'true', + }; + + // Initialize + await vi.waitFor( + async () => fetchWithStartup(`${workerUrl}/api/file/write`, { + method: 'POST', + headers: keepAliveHeaders, + body: JSON.stringify({ + path: '/workspace/test.txt', + content: 'Initial content', + }), + }), + { timeout: 90000, interval: 2000 } + ); + + // Wait longer than normal timeout + await new Promise((resolve) => setTimeout(resolve, 15000)); + + // Perform file operations - should still work + const writeResponse = await fetch(`${workerUrl}/api/file/write`, { + method: 'POST', + headers: keepAliveHeaders, + body: JSON.stringify({ + path: '/workspace/test.txt', + content: 'Updated content after keepAlive', + }), + }); + + expect(writeResponse.status).toBe(200); + + // Read file to verify + const readResponse = await fetch(`${workerUrl}/api/file/read`, { + method: 'POST', + headers: keepAliveHeaders, + body: JSON.stringify({ + path: '/workspace/test.txt', + }), + }); + + expect(readResponse.status).toBe(200); + const readData = await readResponse.json(); + expect(readData.content).toContain('Updated content after keepAlive'); + }, 120000); + }); +}); diff --git a/tests/e2e/streaming-operations-workflow.test.ts b/tests/e2e/streaming-operations-workflow.test.ts index 26885eb..52740b8 100644 --- a/tests/e2e/streaming-operations-workflow.test.ts +++ b/tests/e2e/streaming-operations-workflow.test.ts @@ -446,14 +446,13 @@ describe('Streaming Operations Workflow', () => { }, 90000); - test('should keep container alive during 15+ second streaming command (GitHub #101)', async () => { + test('should handle 15+ second streaming command', async () => { currentSandboxId = createSandboxId(); const headers = createTestHeaders(currentSandboxId); - console.log('[Test] Starting 15+ second streaming command to verify activity renewal...'); + console.log('[Test] Starting 15+ second streaming command...'); // Stream a command that runs for 15+ seconds with output every 2 seconds - // Without activity renewal, this would timeout (default container timeout is ~10s) const streamResponse = await vi.waitFor( async () => fetchWithStartup(`${workerUrl}/api/execStream`, { method: 'POST', @@ -491,7 +490,7 @@ describe('Streaming Operations Workflow', () => { expect(completeEvent).toBeDefined(); expect(completeEvent?.exitCode).toBe(0); - console.log('[Test] ✅ Container stayed alive for 16+ seconds - activity renewal working!'); + console.log('[Test] ✅ Streaming command completed successfully after 16+ seconds!'); }, 90000); test('should handle high-volume streaming over extended period', async () => { diff --git a/tests/e2e/test-worker/index.ts b/tests/e2e/test-worker/index.ts index 70309bb..baffd3b 100644 --- a/tests/e2e/test-worker/index.ts +++ b/tests/e2e/test-worker/index.ts @@ -31,7 +31,14 @@ export default { // Get sandbox ID from header // Sandbox ID determines which container instance (Durable Object) const sandboxId = request.headers.get('X-Sandbox-Id') || 'default-test-sandbox'; - const sandbox = getSandbox(env.Sandbox, sandboxId) as Sandbox; + + // Check if keepAlive is requested + const keepAliveHeader = request.headers.get('X-Sandbox-KeepAlive'); + const keepAlive = keepAliveHeader === 'true'; + + const sandbox = getSandbox(env.Sandbox, sandboxId, { + keepAlive, + }) as Sandbox; // Get session ID from header (optional) // If provided, retrieve the session fresh from the Sandbox DO on each request From e6906280401b823b1490b479ac1c040d034844fb Mon Sep 17 00:00:00 2001 From: katereznykova Date: Mon, 27 Oct 2025 18:35:21 +0000 Subject: [PATCH 17/23] cleanup --- packages/sandbox/src/sandbox.ts | 105 +---------- packages/sandbox/tests/sandbox.test.ts | 237 ------------------------- packages/shared/src/types.ts | 25 +-- 3 files changed, 4 insertions(+), 363 deletions(-) diff --git a/packages/sandbox/src/sandbox.ts b/packages/sandbox/src/sandbox.ts index 1164f8e..6bcdf16 100644 --- a/packages/sandbox/src/sandbox.ts +++ b/packages/sandbox/src/sandbox.ts @@ -618,20 +618,10 @@ export class Sandbox extends Container implements ISandbox { /** * Execute a command and return a ReadableStream of SSE events. * - * ⚠️ **Important**: If calling this method from outside the Sandbox Durable Object (via RPC), - * streams may disconnect after ~40 seconds for long-running commands. For RPC calls, - * use `execStreamWithCallback()` instead which processes the stream internally. - * + * **For long-running commands** * **Use this method when:** * - You're inside a Worker endpoint and need to proxy/pipe the stream to an HTTP response * - You're calling this directly from within the Sandbox DO - * - * **Use `execStreamWithCallback()` when:** - * - Calling from another Durable Object via RPC - * - Commands may run longer than 40 seconds - * - You need reliable event delivery - * - * @see execStreamWithCallback for RPC-safe streaming */ async execStream(command: string, options?: StreamOptions): Promise> { // Check for cancellation @@ -660,21 +650,12 @@ export class Sandbox extends Container implements ISandbox { /** * Stream logs from a background process as a ReadableStream. - * - * ⚠️ **Important**: If calling this method from outside the Sandbox Durable Object (via RPC), - * streams may disconnect after ~40 seconds for long-running processes. For RPC calls, - * use `streamProcessLogsWithCallback()` instead which processes the stream internally. + + * **For long-running processes** * * **Use this method when:** * - You're inside a Worker endpoint and need to proxy/pipe the stream to an HTTP response * - You're calling this directly from within the Sandbox DO - * - * **Use `streamProcessLogsWithCallback()` when:** - * - Calling from another Durable Object via RPC - * - Process may run longer than 40 seconds - * - You need reliable event delivery - * - * @see streamProcessLogsWithCallback for RPC-safe streaming */ async streamProcessLogs(processId: string, options?: { signal?: AbortSignal }): Promise> { // Check for cancellation @@ -811,83 +792,7 @@ export class Sandbox extends Container implements ISandbox { }); } - /** - * Execute a command with streaming output handled internally via callback. - * - * @param command - The command to execute - * @param onEvent - Callback function that receives each ExecEvent as it arrives - * @param options - Optional execution options including sessionId and signal - * @returns Promise that resolves when the command completes - */ - async execStreamWithCallback( - command: string, - onEvent: (event: ExecEvent) => void | Promise, - options?: { sessionId?: string; signal?: AbortSignal } - ): Promise { - // Check for cancellation - if (options?.signal?.aborted) { - throw new Error('Operation was aborted'); - } - - const session = options?.sessionId ?? await this.ensureDefaultSession(); - - // Get the stream - this happens INSIDE the Sandbox DO, so it's a direct HTTP fetch - const stream = await this.client.commands.executeStream(command, session); - - // Parse and process the stream internally - try { - for await (const event of parseSSEStream(stream)) { - // Check for cancellation during streaming - if (options?.signal?.aborted) { - throw new Error('Operation was aborted'); - } - // Call the event callback - await onEvent(event); - } - } catch (error) { - this.logger.error('Error in execStreamWithCallback', error instanceof Error ? error : new Error(String(error))); - throw error; - } - } - - /** - * Stream process logs with output handled internally via callback. - * - * @param processId - The ID of the process to stream logs from - * @param onEvent - Callback function that receives each ExecEvent as it arrives - * @param options - Optional signal for cancellation - * @returns Promise that resolves when the stream completes - */ - async streamProcessLogsWithCallback( - processId: string, - onEvent: (event: ExecEvent) => void | Promise, - options?: { signal?: AbortSignal } - ): Promise { - // Check for cancellation - if (options?.signal?.aborted) { - throw new Error('Operation was aborted'); - } - - // Get the stream - this happens INSIDE the Sandbox DO, so it's a direct HTTP fetch - const stream = await this.client.processes.streamProcessLogs(processId); - - // Parse and process the stream internally - try { - for await (const event of parseSSEStream(stream)) { - // Check for cancellation during streaming - if (options?.signal?.aborted) { - throw new Error('Operation was aborted'); - } - - // Call the event callback - await onEvent(event); - } - } catch (error) { - this.logger.error('Error in streamProcessLogsWithCallback', error instanceof Error ? error : new Error(String(error))); - throw error; - } - } async gitCheckout( repoUrl: string, @@ -1192,8 +1097,6 @@ export class Sandbox extends Container implements ISandbox { // Command execution - delegate to internal session-aware methods exec: (command, options) => this.execWithSession(command, sessionId, options), execStream: (command, options) => this.execStreamWithSession(command, sessionId, options), - execStreamWithCallback: (command, onEvent, options) => - this.execStreamWithCallback(command, onEvent, { ...options, sessionId }), // Process management startProcess: (command, options) => this.startProcess(command, options, sessionId), @@ -1204,8 +1107,6 @@ export class Sandbox extends Container implements ISandbox { cleanupCompletedProcesses: () => this.cleanupCompletedProcesses(), getProcessLogs: (id) => this.getProcessLogs(id), streamProcessLogs: (processId, options) => this.streamProcessLogs(processId, options), - streamProcessLogsWithCallback: (processId, onEvent, options) => - this.streamProcessLogsWithCallback(processId, onEvent, options), // File operations - pass sessionId via options or parameter writeFile: (path, content, options) => this.writeFile(path, content, { ...options, sessionId }), diff --git a/packages/sandbox/tests/sandbox.test.ts b/packages/sandbox/tests/sandbox.test.ts index d7ff678..a5898dd 100644 --- a/packages/sandbox/tests/sandbox.test.ts +++ b/packages/sandbox/tests/sandbox.test.ts @@ -650,241 +650,4 @@ describe('Sandbox - Automatic Session Management', () => { expect(logsStream).toBeInstanceOf(ReadableStream); }); }); - - describe('callback-based streaming (avoids RPC stream serialization)', () => { - it('should execute command with callback and process all events', async () => { - const mockEvents = [ - { type: 'start', timestamp: new Date().toISOString(), command: 'echo test' }, - { type: 'stdout', timestamp: new Date().toISOString(), data: 'test\n' }, - { type: 'complete', timestamp: new Date().toISOString(), exitCode: 0 }, - ]; - - // Create a mock readable stream that yields SSE events - const mockStream = new ReadableStream({ - start(controller) { - const encoder = new TextEncoder(); - for (const event of mockEvents) { - const sseData = `data: ${JSON.stringify(event)}\n\n`; - controller.enqueue(encoder.encode(sseData)); - } - controller.close(); - } - }); - - vi.spyOn(sandbox.client.commands, 'executeStream').mockResolvedValue(mockStream); - - const receivedEvents: any[] = []; - await sandbox.execStreamWithCallback( - 'echo test', - (event) => { - receivedEvents.push(event); - } - ); - - expect(receivedEvents).toHaveLength(3); - expect(receivedEvents[0].type).toBe('start'); - expect(receivedEvents[1].type).toBe('stdout'); - expect(receivedEvents[1].data).toBe('test\n'); - expect(receivedEvents[2].type).toBe('complete'); - expect(receivedEvents[2].exitCode).toBe(0); - }); - - it('should support async callbacks', async () => { - const mockEvents = [ - { type: 'stdout', timestamp: new Date().toISOString(), data: 'line1\n' }, - { type: 'stdout', timestamp: new Date().toISOString(), data: 'line2\n' }, - { type: 'complete', timestamp: new Date().toISOString(), exitCode: 0 }, - ]; - - const mockStream = new ReadableStream({ - start(controller) { - const encoder = new TextEncoder(); - for (const event of mockEvents) { - const sseData = `data: ${JSON.stringify(event)}\n\n`; - controller.enqueue(encoder.encode(sseData)); - } - controller.close(); - } - }); - - vi.spyOn(sandbox.client.commands, 'executeStream').mockResolvedValue(mockStream); - - const receivedEvents: any[] = []; - const asyncCallback = async (event: any) => { - // Simulate async work - await new Promise(resolve => setTimeout(resolve, 10)); - receivedEvents.push(event); - }; - - await sandbox.execStreamWithCallback('echo test', asyncCallback); - - expect(receivedEvents).toHaveLength(3); - }); - - it('should stream process logs with callback', async () => { - const mockEvents = [ - { type: 'stdout', timestamp: new Date().toISOString(), data: 'process output\n' }, - { type: 'complete', timestamp: new Date().toISOString(), exitCode: 0 }, - ]; - - const mockStream = new ReadableStream({ - start(controller) { - const encoder = new TextEncoder(); - for (const event of mockEvents) { - const sseData = `data: ${JSON.stringify(event)}\n\n`; - controller.enqueue(encoder.encode(sseData)); - } - controller.close(); - } - }); - - vi.spyOn(sandbox.client.processes, 'streamProcessLogs').mockResolvedValue(mockStream); - - const receivedEvents: any[] = []; - await sandbox.streamProcessLogsWithCallback( - 'proc-123', - (event) => { - receivedEvents.push(event); - } - ); - - expect(receivedEvents).toHaveLength(2); - expect(receivedEvents[0].type).toBe('stdout'); - expect(receivedEvents[1].type).toBe('complete'); - }); - - - it('should handle errors in streaming', async () => { - const mockStream = new ReadableStream({ - start(controller) { - controller.error(new Error('Stream error')); - } - }); - - vi.spyOn(sandbox.client.commands, 'executeStream').mockResolvedValue(mockStream); - - await expect( - sandbox.execStreamWithCallback('echo test', () => {}) - ).rejects.toThrow('Stream error'); - }); - - it('should respect abort signal during streaming', async () => { - const mockEvents = [ - { type: 'stdout', timestamp: new Date().toISOString(), data: 'chunk1\n' }, - { type: 'stdout', timestamp: new Date().toISOString(), data: 'chunk2\n' }, - { type: 'complete', timestamp: new Date().toISOString(), exitCode: 0 }, - ]; - - const mockStream = new ReadableStream({ - async start(controller) { - const encoder = new TextEncoder(); - for (const event of mockEvents) { - await new Promise(resolve => setTimeout(resolve, 100)); - const sseData = `data: ${JSON.stringify(event)}\n\n`; - controller.enqueue(encoder.encode(sseData)); - } - controller.close(); - } - }); - - vi.spyOn(sandbox.client.commands, 'executeStream').mockResolvedValue(mockStream); - - const abortController = new AbortController(); - const receivedEvents: any[] = []; - - // Abort after receiving first event - const callbackWithAbort = (event: any) => { - receivedEvents.push(event); - if (receivedEvents.length === 1) { - abortController.abort(); - } - }; - - await expect( - sandbox.execStreamWithCallback('echo test', callbackWithAbort, { - signal: abortController.signal - }) - ).rejects.toThrow('Operation was aborted'); - }); - - it('should use specified sessionId if provided', async () => { - const mockStream = new ReadableStream({ - start(controller) { - const encoder = new TextEncoder(); - const event = { type: 'complete', timestamp: new Date().toISOString(), exitCode: 0 }; - controller.enqueue(encoder.encode(`data: ${JSON.stringify(event)}\n\n`)); - controller.close(); - } - }); - - const executeStreamSpy = vi.spyOn(sandbox.client.commands, 'executeStream').mockResolvedValue(mockStream); - - await sandbox.execStreamWithCallback('echo test', () => {}, { sessionId: 'custom-session' }); - - expect(executeStreamSpy).toHaveBeenCalledWith('echo test', 'custom-session'); - }); - - it('should handle stderr events separately from stdout', async () => { - const mockEvents = [ - { type: 'stdout', timestamp: new Date().toISOString(), data: 'stdout message\n' }, - { type: 'stderr', timestamp: new Date().toISOString(), data: 'stderr message\n' }, - { type: 'complete', timestamp: new Date().toISOString(), exitCode: 0 }, - ]; - - const mockStream = new ReadableStream({ - start(controller) { - const encoder = new TextEncoder(); - for (const event of mockEvents) { - const sseData = `data: ${JSON.stringify(event)}\n\n`; - controller.enqueue(encoder.encode(sseData)); - } - controller.close(); - } - }); - - vi.spyOn(sandbox.client.commands, 'executeStream').mockResolvedValue(mockStream); - - const stdoutData: string[] = []; - const stderrData: string[] = []; - - await sandbox.execStreamWithCallback('test command', (event) => { - if (event.type === 'stdout') { - stdoutData.push(event.data || ''); - } else if (event.type === 'stderr') { - stderrData.push(event.data || ''); - } - }); - - expect(stdoutData).toEqual(['stdout message\n']); - expect(stderrData).toEqual(['stderr message\n']); - }); - - it('should handle error events from command execution', async () => { - const mockEvents = [ - { type: 'error', timestamp: new Date().toISOString(), error: 'Command not found' }, - ]; - - const mockStream = new ReadableStream({ - start(controller) { - const encoder = new TextEncoder(); - for (const event of mockEvents) { - const sseData = `data: ${JSON.stringify(event)}\n\n`; - controller.enqueue(encoder.encode(sseData)); - } - controller.close(); - } - }); - - vi.spyOn(sandbox.client.commands, 'executeStream').mockResolvedValue(mockStream); - - const receivedEvents: any[] = []; - await sandbox.execStreamWithCallback('nonexistent', (event) => { - receivedEvents.push(event); - }); - - expect(receivedEvents).toHaveLength(1); - expect(receivedEvents[0].type).toBe('error'); - expect(receivedEvents[0].error).toBe('Command not found'); - }); - }); }); diff --git a/packages/shared/src/types.ts b/packages/shared/src/types.ts index 8610b71..a8b182e 100644 --- a/packages/shared/src/types.ts +++ b/packages/shared/src/types.ts @@ -614,13 +614,6 @@ export interface ExecutionSession { exec(command: string, options?: ExecOptions): Promise; execStream(command: string, options?: StreamOptions): Promise>; - // Callback-based streaming - execStreamWithCallback( - command: string, - onEvent: (event: ExecEvent) => void | Promise, - options?: { signal?: AbortSignal } - ): Promise; - // Background process management startProcess(command: string, options?: ProcessOptions): Promise; listProcesses(): Promise; @@ -630,11 +623,7 @@ export interface ExecutionSession { cleanupCompletedProcesses(): Promise; getProcessLogs(id: string): Promise<{ stdout: string; stderr: string; processId: string }>; streamProcessLogs(processId: string, options?: { signal?: AbortSignal }): Promise>; - streamProcessLogsWithCallback( - processId: string, - onEvent: (event: ExecEvent) => void | Promise, - options?: { signal?: AbortSignal } - ): Promise; + // File operations writeFile(path: string, content: string, options?: { encoding?: string }): Promise; @@ -677,18 +666,6 @@ export interface ISandbox { execStream(command: string, options?: StreamOptions): Promise>; streamProcessLogs(processId: string, options?: { signal?: AbortSignal }): Promise>; - // Callback-based streaming - execStreamWithCallback( - command: string, - onEvent: (event: ExecEvent) => void | Promise, - options?: { sessionId?: string; signal?: AbortSignal } - ): Promise; - streamProcessLogsWithCallback( - processId: string, - onEvent: (event: ExecEvent) => void | Promise, - options?: { signal?: AbortSignal } - ): Promise; - // Utility methods cleanupCompletedProcesses(): Promise; getProcessLogs(id: string): Promise<{ stdout: string; stderr: string; processId: string }>; From 0d8daeb52997135743a7a0c1d181bbeffafb7da9 Mon Sep 17 00:00:00 2001 From: katereznykova Date: Mon, 27 Oct 2025 18:49:23 +0000 Subject: [PATCH 18/23] more nits --- packages/sandbox/src/sandbox.ts | 159 ++----------------------- packages/sandbox/tests/sandbox.test.ts | 134 --------------------- packages/shared/src/types.ts | 11 -- 3 files changed, 7 insertions(+), 297 deletions(-) diff --git a/packages/sandbox/src/sandbox.ts b/packages/sandbox/src/sandbox.ts index 6bcdf16..90add89 100644 --- a/packages/sandbox/src/sandbox.ts +++ b/packages/sandbox/src/sandbox.ts @@ -62,14 +62,12 @@ export class Sandbox extends Container implements ISandbox { client: SandboxClient; private codeInterpreter: CodeInterpreter; - private sandboxName: string | undefined = undefined; - private baseUrl: string | undefined = undefined; + private sandboxName: string | null = null; + private baseUrl: string | null = null; private portTokens: Map = new Map(); - private defaultSession: string | undefined = undefined; + private defaultSession: string | null = null; envVars: Record = {}; private logger: ReturnType; - private readonly STREAM_HEALTH_CHECK_INTERVAL_MS = 30000; // Check sandbox health every 30 seconds during streaming - private readonly STREAM_READ_TIMEOUT_MS = 300000; // 5 minutes timeout for stream reads (detects hung streams) private keepAliveEnabled: boolean = false; constructor(ctx: DurableObject['ctx'], env: Env) { @@ -617,11 +615,6 @@ export class Sandbox extends Container implements ISandbox { /** * Execute a command and return a ReadableStream of SSE events. - * - * **For long-running commands** - * **Use this method when:** - * - You're inside a Worker endpoint and need to proxy/pipe the stream to an HTTP response - * - You're calling this directly from within the Sandbox DO */ async execStream(command: string, options?: StreamOptions): Promise> { // Check for cancellation @@ -630,9 +623,7 @@ export class Sandbox extends Container implements ISandbox { } const session = await this.ensureDefaultSession(); - // Get the stream from CommandClient and wrap it with activity renewal - const stream = await this.client.commands.executeStream(command, session); - return this.wrapStreamWithActivityRenewal(stream); + return this.client.commands.executeStream(command, session); } /** @@ -644,18 +635,11 @@ export class Sandbox extends Container implements ISandbox { throw new Error('Operation was aborted'); } - const stream = await this.client.commands.executeStream(command, sessionId); - return this.wrapStreamWithActivityRenewal(stream); + return this.client.commands.executeStream(command, sessionId); } /** * Stream logs from a background process as a ReadableStream. - - * **For long-running processes** - * - * **Use this method when:** - * - You're inside a Worker endpoint and need to proxy/pipe the stream to an HTTP response - * - You're calling this directly from within the Sandbox DO */ async streamProcessLogs(processId: string, options?: { signal?: AbortSignal }): Promise> { // Check for cancellation @@ -663,137 +647,9 @@ export class Sandbox extends Container implements ISandbox { throw new Error('Operation was aborted'); } - const stream = await this.client.processes.streamProcessLogs(processId); - return this.wrapStreamWithActivityRenewal(stream); + return this.client.processes.streamProcessLogs(processId); } - /** - * Wraps a ReadableStream with health monitoring - */ - private wrapStreamWithActivityRenewal(stream: ReadableStream): ReadableStream { - const self = this; - let healthCheckInterval: ReturnType | undefined; - let streamActive = true; - let errorReported = false; - - return new ReadableStream({ - async start(controller) { - const reader = stream.getReader(); - let isHealthy = true; - let currentTimeoutHandle: ReturnType | undefined; - - // Set up periodic health monitoring to detect container crashes - healthCheckInterval = setInterval(async () => { - if (!streamActive) { - if (healthCheckInterval) { - clearInterval(healthCheckInterval); - } - return; - } - - try { - const state = await self.getState(); - if (state.status !== 'running') { - isHealthy = false; - if (!errorReported) { - errorReported = true; - const error = new Error(`Container became unhealthy during streaming: ${state.status}`); - controller.error(error); - if (healthCheckInterval) { - clearInterval(healthCheckInterval); - } - await reader.cancel(error.message); - } - } - } catch (error) { - // If getState() fails, container is likely dead - this is a critical failure - isHealthy = false; - if (!errorReported) { - errorReported = true; - const stateError = new Error(`Failed to check container health: ${error instanceof Error ? error.message : String(error)}`); - controller.error(stateError); - if (healthCheckInterval) { - clearInterval(healthCheckInterval); - } - await reader.cancel(stateError.message); - } - } - }, self.STREAM_HEALTH_CHECK_INTERVAL_MS); - - try { - while (true) { - const readPromise = reader.read(); - const timeoutPromise = new Promise((_, reject) => { - currentTimeoutHandle = setTimeout(() => { - reject(new Error(`Stream read timeout after ${self.STREAM_READ_TIMEOUT_MS}ms - connection may be hung`)); - }, self.STREAM_READ_TIMEOUT_MS); - }); - - const { done, value } = await Promise.race([readPromise, timeoutPromise]); - - // Clear timeout immediately after successful read - if (currentTimeoutHandle !== undefined) { - clearTimeout(currentTimeoutHandle); - currentTimeoutHandle = undefined; - } - - // Check health status before processing chunk - if (!isHealthy) { - throw new Error('Container became unhealthy during streaming'); - } - - if (done) { - controller.close(); - break; - } - - controller.enqueue(value); - } - } catch (error) { - if (currentTimeoutHandle !== undefined) { - clearTimeout(currentTimeoutHandle); - currentTimeoutHandle = undefined; - } - - if (!errorReported) { - errorReported = true; - controller.error(error); - } - } finally { - // Mark stream as inactive to stop health checks - streamActive = false; - - // Always cleanup interval - if (healthCheckInterval) { - clearInterval(healthCheckInterval); - healthCheckInterval = undefined; - } - - try { - reader.releaseLock(); - } catch (e) { - // Safe to ignore: lock is already released or stream is already closed - // This can happen if the reader was cancelled or errored before we reached finally - const errorMsg = e instanceof Error ? e.message : String(e); - self.logger.debug(`Reader lock release failed (expected if stream already closed): ${errorMsg}`); - } - } - }, - - cancel(reason) { - streamActive = false; - if (healthCheckInterval) { - clearInterval(healthCheckInterval); - healthCheckInterval = undefined; - } - // Allow cancellation to propagate to the underlying stream - return stream.cancel(reason); - } - }); - } - - - async gitCheckout( repoUrl: string, options: { branch?: string; targetDir?: string; sessionId?: string } @@ -1168,8 +1024,7 @@ export class Sandbox extends Container implements ISandbox { } async runCodeStream(code: string, options?: RunCodeOptions): Promise> { - const stream = await this.codeInterpreter.runCodeStream(code, options); - return this.wrapStreamWithActivityRenewal(stream); + return this.codeInterpreter.runCodeStream(code, options); } async listCodeContexts(): Promise { diff --git a/packages/sandbox/tests/sandbox.test.ts b/packages/sandbox/tests/sandbox.test.ts index a5898dd..2f74017 100644 --- a/packages/sandbox/tests/sandbox.test.ts +++ b/packages/sandbox/tests/sandbox.test.ts @@ -473,140 +473,6 @@ describe('Sandbox - Automatic Session Management', () => { }); - it('should detect container crashes during streaming', async () => { - vi.useFakeTimers(); - - // Create a stream that will trigger health check - const encoder = new TextEncoder(); - const mockStream = new ReadableStream({ - async start(controller) { - // Enqueue initial chunk - controller.enqueue(encoder.encode('starting\n')); - // Keep stream open - } - }); - - vi.spyOn(sandbox.client.commands, 'executeStream').mockResolvedValue(mockStream); - - // Make getState return unhealthy status - (sandbox as any).getState = vi.fn().mockResolvedValue({ - status: 'stopped', - timestamp: new Date().toISOString(), - }); - - const stream = await sandbox.execStream('echo test'); - const reader = stream.getReader(); - - // Read first chunk - await reader.read(); - - // Advance time to trigger health check (30 seconds) - await vi.advanceTimersByTimeAsync(31000); - - // Verify health check was called - expect((sandbox as any).getState).toHaveBeenCalled(); - - vi.useRealTimers(); - }); - - it('should timeout on hung streams', async () => { - vi.useFakeTimers(); - - // Create a stream that never resolves - const mockStream = new ReadableStream({ - async start(controller) { - // Never enqueue anything - stream hangs - await new Promise(() => {}); // Never resolves - } - }); - - vi.spyOn(sandbox.client.commands, 'executeStream').mockResolvedValue(mockStream); - - const streamPromise = sandbox.execStream('sleep infinity'); - const stream = await streamPromise; - const reader = stream.getReader(); - - // Start reading - const readPromise = reader.read(); - - // Fast-forward past the timeout (5 minutes = 300000ms) - vi.advanceTimersByTime(300001); - - // Should reject with timeout error - await expect(readPromise).rejects.toThrow(/Stream read timeout/); - - vi.useRealTimers(); - }); - - it('should cleanup intervals on stream completion', async () => { - const clearIntervalSpy = vi.spyOn(global, 'clearInterval'); - - const encoder = new TextEncoder(); - const mockStream = new ReadableStream({ - start(controller) { - controller.enqueue(encoder.encode('test\n')); - controller.close(); - } - }); - - vi.spyOn(sandbox.client.commands, 'executeStream').mockResolvedValue(mockStream); - - const stream = await sandbox.execStream('echo test'); - const reader = stream.getReader(); - - // Read until done - while (true) { - const { done } = await reader.read(); - if (done) break; - } - - // Verify interval was cleared - expect(clearIntervalSpy).toHaveBeenCalled(); - }); - - it('should cleanup intervals on stream cancellation', async () => { - const clearIntervalSpy = vi.spyOn(global, 'clearInterval'); - - const encoder = new TextEncoder(); - let underlyingReader: any; - const mockStream = new ReadableStream({ - async start(controller) { - underlyingReader = { - cancel: async () => { - // Mock cancel - } - }; - // Emit chunks indefinitely - for (let i = 0; i < 100; i++) { - controller.enqueue(encoder.encode(`chunk ${i}\n`)); - await new Promise(resolve => setTimeout(resolve, 10)); - } - controller.close(); - }, - cancel() { - // Mock cancel - } - }); - - vi.spyOn(sandbox.client.commands, 'executeStream').mockResolvedValue(mockStream); - - const stream = await sandbox.execStream('echo test'); - const reader = stream.getReader(); - - // Read one chunk - await reader.read(); - - // Cancel the stream - try { - await reader.cancel('user cancellation'); - } catch (e) { - // Ignore cancellation errors in test - } - - // Verify interval was cleared on cancellation - expect(clearIntervalSpy).toHaveBeenCalled(); - }); - it('should handle health check failures gracefully', async () => { const encoder = new TextEncoder(); const mockStream = new ReadableStream({ diff --git a/packages/shared/src/types.ts b/packages/shared/src/types.ts index a8b182e..d885425 100644 --- a/packages/shared/src/types.ts +++ b/packages/shared/src/types.ts @@ -275,16 +275,6 @@ export interface SandboxOptions { /** * Keep the container alive indefinitely by preventing automatic shutdown * When true, the container will never auto-timeout and must be explicitly destroyed - * - * How it works: - * - Overrides onActivityExpired() to prevent shutdown when activity timeout expires - * - No active renewal or polling needed - simply prevents the default stop behavior - * - Container stays alive until you explicitly call destroy() - * - * Use cases: - * - Long-lived IDE/notebook sessions - * - WebSocket servers with sporadic message activity - * - Long-running background processes (builds, watchers, etc.) * - Any scenario where activity can't be automatically detected * * Important: You MUST call sandbox.destroy() when done to avoid resource leaks @@ -623,7 +613,6 @@ export interface ExecutionSession { cleanupCompletedProcesses(): Promise; getProcessLogs(id: string): Promise<{ stdout: string; stderr: string; processId: string }>; streamProcessLogs(processId: string, options?: { signal?: AbortSignal }): Promise>; - // File operations writeFile(path: string, content: string, options?: { encoding?: string }): Promise; From 6f262f8a602342cfd7c410a51831022c102fc3ea Mon Sep 17 00:00:00 2001 From: katereznykova Date: Mon, 27 Oct 2025 18:52:29 +0000 Subject: [PATCH 19/23] cleanup more --- packages/sandbox/src/sandbox.ts | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/packages/sandbox/src/sandbox.ts b/packages/sandbox/src/sandbox.ts index 90add89..122c904 100644 --- a/packages/sandbox/src/sandbox.ts +++ b/packages/sandbox/src/sandbox.ts @@ -99,8 +99,8 @@ export class Sandbox extends Container implements ISandbox { // Load the sandbox name, port tokens, and default session from storage on initialization this.ctx.blockConcurrencyWhile(async () => { - this.sandboxName = await this.ctx.storage.get('sandboxName') || undefined; - this.defaultSession = await this.ctx.storage.get('defaultSession') || undefined; + this.sandboxName = await this.ctx.storage.get('sandboxName') || null; + this.defaultSession = await this.ctx.storage.get('defaultSession') || null; const storedTokens = await this.ctx.storage.get>('portTokens') || {}; // Convert stored tokens back to Map @@ -348,7 +348,7 @@ export class Sandbox extends Container implements ISandbox { const startTime = Date.now(); const timestamp = new Date().toISOString(); - let timeoutId: ReturnType | undefined; + let timeoutId: NodeJS.Timeout | undefined; try { // Handle cancellation @@ -612,10 +612,7 @@ export class Sandbox extends Container implements ISandbox { }; } - - /** - * Execute a command and return a ReadableStream of SSE events. - */ +// Streaming methods - return ReadableStream for RPC compatibility async execStream(command: string, options?: StreamOptions): Promise> { // Check for cancellation if (options?.signal?.aborted) { @@ -623,6 +620,7 @@ export class Sandbox extends Container implements ISandbox { } const session = await this.ensureDefaultSession(); + // Get the stream from CommandClient return this.client.commands.executeStream(command, session); } @@ -1023,7 +1021,7 @@ export class Sandbox extends Container implements ISandbox { return execution.toJSON(); } - async runCodeStream(code: string, options?: RunCodeOptions): Promise> { + async runCodeStream(code: string, options?: RunCodeOptions): Promise { return this.codeInterpreter.runCodeStream(code, options); } From 1ea070f38ba36db7e8514a25b60645701ed6c6f6 Mon Sep 17 00:00:00 2001 From: katereznykova Date: Tue, 28 Oct 2025 14:49:12 +0000 Subject: [PATCH 20/23] nits and bits --- .github/workflows/claude-code-review.yml | 71 +++++++++++-------- tests/e2e/keepalive-workflow.test.ts | 3 +- .../e2e/streaming-operations-workflow.test.ts | 30 +++++--- tests/e2e/test-worker/index.ts | 45 ------------ 4 files changed, 64 insertions(+), 85 deletions(-) diff --git a/.github/workflows/claude-code-review.yml b/.github/workflows/claude-code-review.yml index 6fe865a..deeefd5 100644 --- a/.github/workflows/claude-code-review.yml +++ b/.github/workflows/claude-code-review.yml @@ -22,33 +22,46 @@ jobs: issues: read id-token: write - # steps: - # - name: Checkout repository - # uses: actions/checkout@v4 - # with: - # fetch-depth: 1 - - # - name: Run Claude Code Review - # id: claude-review - # uses: anthropics/claude-code-action@v1 - # with: - # anthropic_api_key: ${{ secrets.ANTHROPIC_API_KEY }} - # prompt: | - # REPO: ${{ github.repository }} - # PR NUMBER: ${{ github.event.pull_request.number }} - - # Please review this pull request and provide feedback on: - # - Code quality and best practices - # - Potential bugs or issues - # - Performance considerations - # - Security concerns - # - Test coverage - - # Use the repository's CLAUDE.md for guidance on style and conventions. Be constructive and helpful in your feedback. - - # Use `gh pr comment` with your Bash tool to leave your review as a comment on the PR. - - # # See https://github.com/anthropics/claude-code-action/blob/main/docs/usage.md - # # or https://docs.claude.com/en/docs/claude-code/cli-reference for available options - # claude_args: '--allowed-tools "Bash(gh issue view:*),Bash(gh search:*),Bash(gh issue list:*),Bash(gh pr comment:*),Bash(gh pr diff:*),Bash(gh pr view:*),Bash(gh pr list:*)"' + steps: + - name: Checkout repository + uses: actions/checkout@v4 + with: + fetch-depth: 1 + + - name: Run Claude Code Review + id: claude-review + uses: anthropics/claude-code-action@v1 + with: + anthropic_api_key: ${{ secrets.ANTHROPIC_API_KEY }} + prompt: | + Review PR #${{ github.event.pull_request.number }}. + + Read CLAUDE.md for project conventions and architecture patterns. Focus on substantive issues: + - Code quality, potential bugs, architecture alignment + - Testing coverage + + BE CONCISE. Only report actual issues worth addressing - skip generic praise and obvious observations. + If the PR looks good, just say so briefly. + + **Posting your review:** + 1. First, check if you've already posted a review comment on this PR (find the comment ID): + ```bash + COMMENT_ID=$(gh api repos/${{ github.repository }}/issues/${{ github.event.pull_request.number }}/comments --jq '.[] | select(.user.login == "claude[bot]" or .user.login == "github-actions[bot]") | select(.body | startswith("## Claude Code Review")) | .id' | head -1) + ``` + + 2. If COMMENT_ID exists (non-empty), UPDATE that comment: + ```bash + gh api repos/${{ github.repository }}/issues/comments/$COMMENT_ID -X PATCH -f body="YOUR_REVIEW_CONTENT_HERE" + ``` + + 3. If COMMENT_ID is empty, create a new comment: + ```bash + gh pr comment ${{ github.event.pull_request.number }} --body "YOUR_REVIEW_CONTENT_HERE" --repo ${{ github.repository }} + ``` + + Start your review with "## Claude Code Review" heading so it can be identified and updated on subsequent runs. + + # See https://github.com/anthropics/claude-code-action/blob/main/docs/usage.md + # or https://docs.claude.com/en/docs/claude-code/cli-reference for available options + claude_args: '--allowed-tools "Bash(gh issue view:*),Bash(gh search:*),Bash(gh issue list:*),Bash(gh pr comment:*),Bash(gh pr diff:*),Bash(gh pr view:*),Bash(gh pr list:*),Bash(gh api:*)"' diff --git a/tests/e2e/keepalive-workflow.test.ts b/tests/e2e/keepalive-workflow.test.ts index 195e7f5..55b117d 100644 --- a/tests/e2e/keepalive-workflow.test.ts +++ b/tests/e2e/keepalive-workflow.test.ts @@ -155,10 +155,9 @@ describe('KeepAlive Workflow', () => { ); // Step 2: Explicitly destroy the container - const destroyResponse = await fetch(`${workerUrl}/api/destroy`, { + const destroyResponse = await fetch(`${workerUrl}/cleanup`, { method: 'POST', headers: keepAliveHeaders, - body: JSON.stringify({}), }); expect(destroyResponse.status).toBe(200); diff --git a/tests/e2e/streaming-operations-workflow.test.ts b/tests/e2e/streaming-operations-workflow.test.ts index 52740b8..9b47a3e 100644 --- a/tests/e2e/streaming-operations-workflow.test.ts +++ b/tests/e2e/streaming-operations-workflow.test.ts @@ -574,20 +574,26 @@ describe('Streaming Operations Workflow', () => { }, 90000); /** - * Test for callback-based streaming - * This validates that long-running commands work via callback pattern + * Test for streaming execution + * This validates that long-running commands work via streaming */ - test('should handle very long-running commands (60+ seconds) via callback-based streaming', async () => { + test('should handle very long-running commands (60+ seconds) via streaming', async () => { currentSandboxId = createSandboxId(); const headers = createTestHeaders(currentSandboxId); - console.log('[Test] Starting 60+ second command via callback-based streaming...'); + // Add keepAlive header to keep container alive during long execution + const keepAliveHeaders = { + ...headers, + 'X-Sandbox-KeepAlive': 'true', + }; - // With callback-based streaming, it should complete successfully + console.log('[Test] Starting 60+ second command via streaming...'); + + // With streaming, it should complete successfully const streamResponse = await vi.waitFor( - async () => fetchWithStartup(`${workerUrl}/api/execStreamCallback`, { + async () => fetchWithStartup(`${workerUrl}/api/execStream`, { method: 'POST', - headers, + headers: keepAliveHeaders, body: JSON.stringify({ // Command that runs for 60+ seconds with periodic output command: "bash -c 'for i in {1..12}; do echo \"Minute mark $i\"; sleep 5; done; echo \"COMPLETED\"'", @@ -629,13 +635,19 @@ describe('Streaming Operations Workflow', () => { currentSandboxId = createSandboxId(); const headers = createTestHeaders(currentSandboxId); + // Add keepAlive header to keep container alive during long sleep + const keepAliveHeaders = { + ...headers, + 'X-Sandbox-KeepAlive': 'true', + }; + console.log('[Test] Testing sleep 45 && echo "done" pattern...'); // This is the exact pattern that was failing before const streamResponse = await vi.waitFor( - async () => fetchWithStartup(`${workerUrl}/api/execStreamCallback`, { + async () => fetchWithStartup(`${workerUrl}/api/execStream`, { method: 'POST', - headers, + headers: keepAliveHeaders, body: JSON.stringify({ command: 'sleep 45 && echo "done"', }), diff --git a/tests/e2e/test-worker/index.ts b/tests/e2e/test-worker/index.ts index baffd3b..3e13e3e 100644 --- a/tests/e2e/test-worker/index.ts +++ b/tests/e2e/test-worker/index.ts @@ -93,51 +93,6 @@ export default { }); } - // Command execution with callback-based streaming (for long-running commands) - if (url.pathname === '/api/execStreamCallback' && request.method === 'POST') { - console.log('[TestWorker] execStreamCallback called for command:', body.command); - const startTime = Date.now(); - - // Create a TransformStream to convert callbacks to SSE - const { readable, writable } = new TransformStream(); - const writer = writable.getWriter(); - const encoder = new TextEncoder(); - - // Start streaming in the background using callback-based method - (async () => { - try { - await executor.execStreamWithCallback( - body.command, - async (event) => { - // Convert event to SSE format - const sseData = `data: ${JSON.stringify(event)}\n\n`; - await writer.write(encoder.encode(sseData)); - } - ); - } catch (error: any) { - // Send error event - const errorEvent = { - type: 'error', - timestamp: new Date().toISOString(), - error: error.message, - }; - await writer.write(encoder.encode(`data: ${JSON.stringify(errorEvent)}\n\n`)); - } finally { - await writer.close(); - console.log('[TestWorker] Stream completed in', Date.now() - startTime, 'ms'); - } - })(); - - // Return SSE stream - return new Response(readable, { - headers: { - 'Content-Type': 'text/event-stream', - 'Cache-Control': 'no-cache', - 'Connection': 'keep-alive', - }, - }); - } - // Git clone if (url.pathname === '/api/git/clone' && request.method === 'POST') { await executor.gitCheckout(body.repoUrl, { From 9eb0892ed790596cf8e2ca5ea31a4757daea072f Mon Sep 17 00:00:00 2001 From: katereznykova Date: Tue, 28 Oct 2025 15:02:15 +0000 Subject: [PATCH 21/23] more nits and bits --- packages/sandbox/src/request-handler.ts | 2 +- packages/sandbox/tests/sandbox.test.ts | 53 +------------------------ 2 files changed, 2 insertions(+), 53 deletions(-) diff --git a/packages/sandbox/src/request-handler.ts b/packages/sandbox/src/request-handler.ts index 9b7a176..bc0a9aa 100644 --- a/packages/sandbox/src/request-handler.ts +++ b/packages/sandbox/src/request-handler.ts @@ -1,5 +1,5 @@ -import { createLogger, type LogContext, TraceContext } from "@repo/shared"; import { switchPort } from "@cloudflare/containers"; +import { createLogger, type LogContext, TraceContext } from "@repo/shared"; import { getSandbox, type Sandbox } from "./sandbox"; import { sanitizeSandboxId, diff --git a/packages/sandbox/tests/sandbox.test.ts b/packages/sandbox/tests/sandbox.test.ts index e603b98..4995856 100644 --- a/packages/sandbox/tests/sandbox.test.ts +++ b/packages/sandbox/tests/sandbox.test.ts @@ -1,7 +1,7 @@ +import { Container } from '@cloudflare/containers'; import type { DurableObjectState } from '@cloudflare/workers-types'; import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; import { Sandbox } from '../src/sandbox'; -import { Container } from '@cloudflare/containers'; // Mock dependencies before imports vi.mock('./interpreter', () => ({ @@ -476,57 +476,6 @@ describe('Sandbox - Automatic Session Management', () => { }); }); - describe('Health checks', () => { - beforeEach(() => { - // Mock Container base class getState method for health checks - (sandbox as any).getState = vi.fn().mockResolvedValue({ - status: 'running', - timestamp: new Date().toISOString(), - }); - }); - - - it('should handle health check failures gracefully', async () => { - const encoder = new TextEncoder(); - const mockStream = new ReadableStream({ - start(controller) { - controller.enqueue(encoder.encode('test\n')); - controller.close(); - } - }); - - vi.spyOn(sandbox.client.commands, 'executeStream').mockResolvedValue(mockStream); - - // Make getState throw an error (container is dead) - (sandbox as any).getState = vi.fn().mockRejectedValue(new Error('Container not found')); - - const stream = await sandbox.execStream('echo test'); - const reader = stream.getReader(); - - // Read should still work since we're testing the health check error handling - // The health check runs in background and shouldn't block reads initially - const result = await reader.read(); - expect(result.done).toBe(false); - }); - - it('should work with all streaming APIs', async () => { - const encoder = new TextEncoder(); - const mockStream = new ReadableStream({ - start(controller) { - controller.enqueue(encoder.encode('test\n')); - controller.close(); - } - }); - - // Test execStream - vi.spyOn(sandbox.client.commands, 'executeStream').mockResolvedValue(mockStream); - const execStream = await sandbox.execStream('echo test'); - expect(execStream).toBeInstanceOf(ReadableStream); - - // Test streamProcessLogs - vi.spyOn(sandbox.client.processes, 'streamProcessLogs').mockResolvedValue(mockStream); - const logsStream = await sandbox.streamProcessLogs('proc-1'); - expect(logsStream).toBeInstanceOf(ReadableStream); describe('fetch() override - WebSocket detection', () => { let superFetchSpy: any; From 9f1d5eccc357aaa93007694a233edabc8e57ad5a Mon Sep 17 00:00:00 2001 From: katereznykova Date: Tue, 28 Oct 2025 15:05:42 +0000 Subject: [PATCH 22/23] fix pkg workflow --- .github/workflows/pkg-pr-new.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/pkg-pr-new.yml b/.github/workflows/pkg-pr-new.yml index 63f05c5..f057bdd 100644 --- a/.github/workflows/pkg-pr-new.yml +++ b/.github/workflows/pkg-pr-new.yml @@ -9,6 +9,7 @@ on: types: [opened, synchronize, reopened] paths: - '**' + - '!**/*.md' - '!.changeset/**' From f0424197921b81c5dfe1d9ca5db1069385104736 Mon Sep 17 00:00:00 2001 From: whoiskatrin Date: Tue, 28 Oct 2025 15:08:00 +0000 Subject: [PATCH 23/23] Add keepAlive flag to prevent container shutdown --- .changeset/wet-falcons-hang.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.changeset/wet-falcons-hang.md b/.changeset/wet-falcons-hang.md index 7e8afa8..2c520a5 100644 --- a/.changeset/wet-falcons-hang.md +++ b/.changeset/wet-falcons-hang.md @@ -2,4 +2,4 @@ "@cloudflare/sandbox": patch --- -add auto timeout renewal and some health checks +add keepAlive flag to prevent containers from shutting down