Skip to content

Commit 35cda9e

Browse files
yusukebeusualoma
andauthored
fix: handle client disconnection without canceling stream (#258)
* fix: handle disconnection gracefully without canceling stream * Simplify Co-authored-by: Taku Amano <[email protected]> --------- Co-authored-by: Taku Amano <[email protected]>
1 parent 678c740 commit 35cda9e

File tree

2 files changed

+58
-12
lines changed

2 files changed

+58
-12
lines changed

src/utils.ts

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,38 +5,45 @@ export function writeFromReadableStream(stream: ReadableStream<Uint8Array>, writ
55
if (stream.locked) {
66
throw new TypeError('ReadableStream is locked.')
77
} else if (writable.destroyed) {
8-
stream.cancel()
98
return
109
}
10+
1111
const reader = stream.getReader()
12-
writable.on('close', cancel)
13-
writable.on('error', cancel)
14-
reader.read().then(flow, cancel)
12+
13+
const handleError = () => {
14+
// ignore the error
15+
}
16+
17+
writable.on('error', handleError)
18+
19+
reader.read().then(flow, handleStreamError)
20+
1521
return reader.closed.finally(() => {
16-
writable.off('close', cancel)
17-
writable.off('error', cancel)
22+
writable.off('error', handleError)
1823
})
24+
1925
// eslint-disable-next-line @typescript-eslint/no-explicit-any
20-
function cancel(error?: any) {
21-
reader.cancel(error).catch(() => {})
26+
function handleStreamError(error: any) {
2227
if (error) {
2328
writable.destroy(error)
2429
}
2530
}
31+
2632
function onDrain() {
27-
reader.read().then(flow, cancel)
33+
reader.read().then(flow, handleStreamError)
2834
}
35+
2936
function flow({ done, value }: ReadableStreamReadResult<Uint8Array>): void | Promise<void> {
3037
try {
3138
if (done) {
3239
writable.end()
3340
} else if (!writable.write(value)) {
3441
writable.once('drain', onDrain)
3542
} else {
36-
return reader.read().then(flow, cancel)
43+
return reader.read().then(flow, handleStreamError)
3744
}
3845
} catch (e) {
39-
cancel(e)
46+
handleStreamError(e)
4047
}
4148
}
4249
}

test/utils.test.ts

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
import { buildOutgoingHttpHeaders } from '../src/utils'
1+
import { Writable } from 'node:stream'
2+
import { buildOutgoingHttpHeaders, writeFromReadableStream } from '../src/utils'
23

34
describe('buildOutgoingHttpHeaders', () => {
45
it('original content-type is preserved', () => {
@@ -71,3 +72,41 @@ describe('buildOutgoingHttpHeaders', () => {
7172
})
7273
})
7374
})
75+
76+
describe('writeFromReadableStream', () => {
77+
it('should handle client disconnection gracefully without canceling stream', async () => {
78+
let enqueueCalled = false
79+
let cancelCalled = false
80+
81+
// Create test ReadableStream
82+
const stream = new ReadableStream({
83+
start(controller) {
84+
setTimeout(() => {
85+
try {
86+
controller.enqueue(new TextEncoder().encode('test'))
87+
enqueueCalled = true
88+
} catch {
89+
// Test should fail if error occurs
90+
}
91+
controller.close()
92+
}, 100)
93+
},
94+
cancel() {
95+
cancelCalled = true
96+
},
97+
})
98+
99+
// Test Writable stream
100+
const writable = new Writable()
101+
102+
// Simulate client disconnection after 50ms
103+
setTimeout(() => {
104+
writable.destroy()
105+
}, 50)
106+
107+
await writeFromReadableStream(stream, writable)
108+
109+
expect(enqueueCalled).toBe(true) // enqueue should succeed
110+
expect(cancelCalled).toBe(false) // cancel should not be called
111+
})
112+
})

0 commit comments

Comments
 (0)