Skip to content

Commit b6ceaf5

Browse files
committed
update node.js streams implementation
1 parent 48cfcb1 commit b6ceaf5

21 files changed

+2121
-1267
lines changed

src/node/internal/events.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,10 @@ export class EventEmitterAsyncResource
146146
}
147147
}
148148

149-
export function addAbortListener(signal: AbortSignal, listener: any) {
149+
export function addAbortListener(
150+
signal: AbortSignal | undefined,
151+
listener: any
152+
) {
150153
if (signal === undefined) {
151154
throw new ERR_INVALID_ARG_TYPE('signal', 'AbortSignal', signal);
152155
}

src/node/internal/internal_http_client.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ import {
4343
import { OutgoingMessage } from 'node-internal:internal_http_outgoing';
4444
import { Agent, globalAgent } from 'node-internal:internal_http_agent';
4545
import type { IncomingMessageCallback } from 'node-internal:internal_http_util';
46-
import type { Socket } from 'net';
46+
import type { Socket } from 'node:net';
4747

4848
const INVALID_PATH_REGEX = /[^\u0021-\u00ff]/;
4949

@@ -168,7 +168,7 @@ export class ClientRequest extends OutgoingMessage implements _ClientRequest {
168168

169169
const signal = options.signal;
170170
if (signal) {
171-
addAbortSignal(signal, this);
171+
addAbortSignal(signal, this as unknown as Writable);
172172
}
173173
let method = options.method;
174174
const methodIsString = typeof method === 'string';

src/node/internal/process.d.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,9 @@ export const platform: string;
88

99
declare global {
1010
const Cloudflare: {
11-
readonly compatibilityFlags: Record<string, boolean>;
11+
readonly compatibilityFlags: Record<string, boolean> & {
12+
enable_streams_nodejs_v24_compat: boolean;
13+
};
1214
};
1315
}
1416

src/node/internal/public_process.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,13 +61,12 @@ function chunkToBuffer(
6161
// For stdout, we emulate `nohup node foo.js`
6262
class SyncWriteStream extends Writable {
6363
fd: number;
64-
override readable: boolean;
64+
override readable: boolean = false;
6565
_type = 'fs';
6666
_isStdio = true;
6767
constructor(fd: number) {
6868
super({ autoDestroy: true });
6969
this.fd = fd;
70-
this.readable = false;
7170
}
7271
override _write(
7372
chunk: string | Buffer | ArrayBufferView | DataView,

src/node/internal/streams_add_abort_signal.ts

Lines changed: 60 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -23,41 +23,85 @@
2323
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
2424
// USE OR OTHER DEALINGS IN THE SOFTWARE.
2525

26-
import { validateAbortSignal } from 'node-internal:validators';
27-
import { isNodeStream } from 'node-internal:streams_util';
28-
import { eos } from 'node-internal:streams_end_of_stream';
2926
import {
3027
AbortError,
3128
ERR_INVALID_ARG_TYPE,
3229
} from 'node-internal:internal_errors';
33-
import { addAbortListener } from 'node-internal:events';
3430

31+
import {
32+
isNodeStream,
33+
isWebStream,
34+
kControllerErrorFunction,
35+
} from 'node-internal:streams_util';
36+
37+
import { eos } from 'node-internal:streams_end_of_stream';
3538
import type { Readable } from 'node-internal:streams_readable';
3639
import type { Writable } from 'node-internal:streams_writable';
3740
import type { Transform } from 'node-internal:streams_transform';
41+
import { addAbortListener } from 'node-internal:events';
42+
43+
// This method is inlined here for readable-stream
44+
// It also does not allow for signal to not exist on the stream
45+
// https://github.com/nodejs/node/pull/36061#discussion_r533718029
46+
function validateAbortSignal(
47+
signal: unknown,
48+
name: string
49+
): asserts signal is AbortSignal {
50+
if (signal == null || typeof signal !== 'object' || !('aborted' in signal)) {
51+
throw new ERR_INVALID_ARG_TYPE(name, 'AbortSignal', signal);
52+
}
53+
}
3854

39-
type NodeStream = Readable | Writable | Transform;
55+
type StreamType =
56+
| Readable
57+
| Writable
58+
| Transform
59+
| ReadableStream
60+
| WritableStream;
4061

41-
export function addAbortSignal<T extends { destroy: (err: Error) => void }>(
62+
export function addAbortSignal<T extends StreamType>(
4263
signal: unknown,
4364
stream: T
4465
): T {
4566
validateAbortSignal(signal, 'signal');
46-
if (!isNodeStream(stream)) {
47-
throw new ERR_INVALID_ARG_TYPE('stream', 'stream.Stream', stream);
48-
}
49-
const onAbort = (): void => {
50-
stream.destroy(
51-
new AbortError(undefined, {
52-
cause: signal.reason,
53-
})
67+
if (!isNodeStream(stream) && !isWebStream(stream)) {
68+
throw new ERR_INVALID_ARG_TYPE(
69+
'stream',
70+
['ReadableStream', 'WritableStream', 'Stream'],
71+
stream
5472
);
55-
};
73+
}
74+
return addAbortSignalNoValidate(signal, stream);
75+
}
76+
77+
export function addAbortSignalNoValidate<T extends StreamType>(
78+
signal: AbortSignal | null | undefined,
79+
stream: T
80+
): T {
81+
if (signal == null || typeof signal !== 'object' || !('aborted' in signal)) {
82+
return stream;
83+
}
84+
const onAbort = isNodeStream(stream)
85+
? (): void => {
86+
stream.destroy(new AbortError(undefined, { cause: signal.reason }));
87+
}
88+
: (): void => {
89+
(
90+
stream as ReadableStream & {
91+
[kControllerErrorFunction]: (err: Error) => void;
92+
}
93+
)[kControllerErrorFunction](
94+
new AbortError(undefined, { cause: signal.reason })
95+
);
96+
};
5697
if (signal.aborted) {
5798
onAbort();
5899
} else {
59100
const disposable = addAbortListener(signal, onAbort);
60-
eos(stream as NodeStream, disposable[Symbol.dispose]);
101+
eos(
102+
stream as Readable | Writable | Transform,
103+
disposable[Symbol.dispose] as () => void
104+
);
61105
}
62106
return stream;
63107
}

0 commit comments

Comments
 (0)