diff --git a/src/utils.ts b/src/utils.ts index c704c93..1f4240d 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -5,27 +5,34 @@ export function writeFromReadableStream(stream: ReadableStream, writ if (stream.locked) { throw new TypeError('ReadableStream is locked.') } else if (writable.destroyed) { - stream.cancel() return } + const reader = stream.getReader() - writable.on('close', cancel) - writable.on('error', cancel) - reader.read().then(flow, cancel) + + const handleError = () => { + // ignore the error + } + + writable.on('error', handleError) + + reader.read().then(flow, handleStreamError) + return reader.closed.finally(() => { - writable.off('close', cancel) - writable.off('error', cancel) + writable.off('error', handleError) }) + // eslint-disable-next-line @typescript-eslint/no-explicit-any - function cancel(error?: any) { - reader.cancel(error).catch(() => {}) + function handleStreamError(error: any) { if (error) { writable.destroy(error) } } + function onDrain() { - reader.read().then(flow, cancel) + reader.read().then(flow, handleStreamError) } + function flow({ done, value }: ReadableStreamReadResult): void | Promise { try { if (done) { @@ -33,10 +40,10 @@ export function writeFromReadableStream(stream: ReadableStream, writ } else if (!writable.write(value)) { writable.once('drain', onDrain) } else { - return reader.read().then(flow, cancel) + return reader.read().then(flow, handleStreamError) } } catch (e) { - cancel(e) + handleStreamError(e) } } } diff --git a/test/utils.test.ts b/test/utils.test.ts index 3673c5f..53f6972 100644 --- a/test/utils.test.ts +++ b/test/utils.test.ts @@ -1,4 +1,5 @@ -import { buildOutgoingHttpHeaders } from '../src/utils' +import { Writable } from 'node:stream' +import { buildOutgoingHttpHeaders, writeFromReadableStream } from '../src/utils' describe('buildOutgoingHttpHeaders', () => { it('original content-type is preserved', () => { @@ -71,3 +72,41 @@ describe('buildOutgoingHttpHeaders', () => { }) }) }) + +describe('writeFromReadableStream', () => { + it('should handle client disconnection gracefully without canceling stream', async () => { + let enqueueCalled = false + let cancelCalled = false + + // Create test ReadableStream + const stream = new ReadableStream({ + start(controller) { + setTimeout(() => { + try { + controller.enqueue(new TextEncoder().encode('test')) + enqueueCalled = true + } catch { + // Test should fail if error occurs + } + controller.close() + }, 100) + }, + cancel() { + cancelCalled = true + }, + }) + + // Test Writable stream + const writable = new Writable() + + // Simulate client disconnection after 50ms + setTimeout(() => { + writable.destroy() + }, 50) + + await writeFromReadableStream(stream, writable) + + expect(enqueueCalled).toBe(true) // enqueue should succeed + expect(cancelCalled).toBe(false) // cancel should not be called + }) +})