Skip to content

Commit f7d7d13

Browse files
committed
streams: forward errors correctly for duplexPair endpoints
fix the duplexPair implementation so that when one side is destroyed with an error, the other side also receives the error or a close event as appropriate. previous behavior caused sideA to never emit an 'error' or 'close' when sideB errored, which prevented users from observing or handling the paired stream failure. Fixes: #61015
1 parent 05f8772 commit f7d7d13

File tree

2 files changed

+56
-1
lines changed

2 files changed

+56
-1
lines changed

lib/internal/streams/duplexpair.js

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,13 +50,29 @@ class DuplexSide extends Duplex {
5050
this.#otherSide.on('end', callback);
5151
this.#otherSide.push(null);
5252
}
53+
54+
55+
_destroy(err, callback) {
56+
57+
if (err) {
58+
// Error case: tell the other side to also destroy with that error.
59+
this.#otherSide.destroy(err);
60+
} else {
61+
// Graceful close case (destroy() without error):
62+
// send an EOF to the other side's readable end if it hasn't already closed.
63+
if (this.#otherSide && !this.#otherSide.destroyed) {
64+
this.#otherSide.push(null);
65+
}
66+
}
67+
callback(err);
68+
}
5369
}
5470

5571
function duplexPair(options) {
5672
const side0 = new DuplexSide(options);
5773
const side1 = new DuplexSide(options);
5874
side0[kInitOtherSide](side1);
5975
side1[kInitOtherSide](side0);
60-
return [ side0, side1 ];
76+
return [side0, side1];
6177
}
6278
module.exports = duplexPair;

test/parallel/test-duplex-error.js

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
'use strict';
2+
3+
const assert = require('assert');
4+
const { duplexPair } = require('stream');
5+
6+
const [sideA, sideB] = duplexPair();
7+
8+
let sideAErrorReceived = false;
9+
let sideBErrorReceived = false;
10+
11+
// Add error handlers
12+
sideA.on('error', (err) => {
13+
sideAErrorReceived = true;
14+
});
15+
sideB.on('error', (err) => {
16+
sideBErrorReceived = true;
17+
});
18+
19+
// Ensure the streams are flowing
20+
sideA.resume();
21+
sideB.resume();
22+
23+
// Destroy sideB with an error
24+
sideB.destroy(new Error('Simulated error'));
25+
26+
// Wait for event loop to process error events
27+
setImmediate(() => {
28+
assert.strictEqual(
29+
sideAErrorReceived,
30+
true,
31+
'sideA should receive the error when sideB is destroyed with an error'
32+
);
33+
assert.strictEqual(
34+
sideBErrorReceived,
35+
true,
36+
'sideB should emit its own error when destroyed'
37+
);
38+
});
39+

0 commit comments

Comments
 (0)