Skip to content

Commit 2a09040

Browse files
authored
NodeJS Compat: Add process.stdin, stdout and stderr (#4480)
1 parent 01bbfae commit 2a09040

20 files changed

+712
-151
lines changed

src/node/internal/internal_fs_streams.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -368,7 +368,7 @@ export declare class ReadStream extends Readable {
368368
[kFs]: RealizedFsOperations;
369369
[kIsPerformingIO]: boolean;
370370
[kHandle]: FileHandle | undefined;
371-
constructor(path: FilePath, options?: ReadStreamOptions);
371+
constructor(path: FilePath | null, options?: ReadStreamOptions);
372372
push(chunk: NodeJS.ArrayBufferView | null): boolean;
373373
close(callback?: ErrorOnlyCallback): void;
374374
}

src/node/internal/public_process.ts

Lines changed: 59 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,11 @@ import {
1616
ERR_UNSUPPORTED_OPERATION,
1717
} from 'node-internal:internal_errors';
1818
import processImpl from 'node-internal:process';
19-
import type { Buffer } from 'node:buffer';
19+
import { Buffer } from 'node-internal:internal_buffer';
2020
import { parseEnv } from 'node-internal:internal_utils';
21-
import type * as NodeFS from 'node:fs';
22-
23-
const { compatibilityFlags } = Cloudflare;
24-
21+
import { Writable } from 'node-internal:streams_writable';
22+
import { writeSync } from 'node-internal:internal_fs_sync';
23+
import { ReadStream } from 'node-internal:internal_fs_streams';
2524
import {
2625
platform,
2726
nextTick,
@@ -32,12 +31,63 @@ import {
3231
} from 'node-internal:internal_process';
3332
import { validateString } from 'node-internal:validators';
3433

34+
import type { Readable } from 'node-internal:streams_readable';
35+
import type * as NodeFS from 'node:fs';
36+
37+
const { compatibilityFlags } = Cloudflare;
38+
3539
export { platform, nextTick, emitWarning, env, features };
3640

37-
// TODO(soon): Implement stdio along with TTY streams (and as a requirement for removing experimental).
38-
export const stdin = undefined;
39-
export const stdout = undefined;
40-
export const stderr = undefined;
41+
// For stdin, we emulate `node foo.js < /dev/null`
42+
export const stdin = new ReadStream(null, {
43+
fd: 0,
44+
autoClose: false,
45+
}) as Readable & {
46+
fd: number;
47+
};
48+
stdin.fd = 0;
49+
50+
function chunkToBuffer(
51+
chunk: Buffer | ArrayBufferView | DataView | string,
52+
encoding: BufferEncoding
53+
): Buffer {
54+
if (Buffer.isBuffer(chunk)) {
55+
return chunk;
56+
}
57+
if (typeof chunk === 'string') {
58+
return Buffer.from(chunk, encoding);
59+
}
60+
return Buffer.from(chunk.buffer, chunk.byteOffset, chunk.byteLength);
61+
}
62+
63+
// For stdout, we emulate `nohup node foo.js`
64+
class SyncWriteStream extends Writable {
65+
fd: number;
66+
readable: boolean;
67+
_type = 'fs';
68+
_isStdio = true;
69+
constructor(fd: number) {
70+
super({ autoDestroy: true });
71+
this.fd = fd;
72+
this.readable = false;
73+
}
74+
override _write(
75+
chunk: string | Buffer | ArrayBufferView | DataView,
76+
encoding: BufferEncoding,
77+
cb: (error?: Error | null) => void
78+
): void {
79+
try {
80+
writeSync(this.fd, chunkToBuffer(chunk, encoding));
81+
} catch (e: unknown) {
82+
cb(e as Error);
83+
return;
84+
}
85+
cb();
86+
}
87+
}
88+
89+
export const stdout = new SyncWriteStream(1);
90+
export const stderr = new SyncWriteStream(2);
4191

4292
export function chdir(path: string | Buffer | URL): void {
4393
validateString(path, 'directory');
@@ -200,8 +250,6 @@ export function uptime(): number {
200250
return 0;
201251
}
202252

203-
// TODO(soon): Support with proper process.cwd() resolution along with
204-
// test in process-nodejs-test.
205253
export function loadEnvFile(path: string | URL | Buffer = '.env'): void {
206254
if (!compatibilityFlags.experimental || !compatibilityFlags.nodejs_compat) {
207255
throw new ERR_UNSUPPORTED_OPERATION();

src/node/internal/streams_adapters.js

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ import {
4141
} from 'node-internal:streams_util';
4242

4343
import { Buffer } from 'node-internal:internal_buffer';
44+
import { nextTick } from 'node-internal:internal_process';
4445

4546
import {
4647
ERR_INVALID_ARG_TYPE,
@@ -53,8 +54,6 @@ import { normalizeEncoding } from 'node-internal:internal_utils';
5354

5455
import { validateBoolean, validateObject } from 'node-internal:validators';
5556

56-
import process from 'node:process';
57-
5857
const encoder = new TextEncoder();
5958

6059
/**
@@ -220,7 +219,7 @@ export function newStreamWritableFromWritableStream(
220219
// thrown we don't want those to cause an unhandled
221220
// rejection. Let's just escape the promise and
222221
// handle it separately.
223-
process.nextTick(() => destroy.call(writable, error));
222+
nextTick(() => destroy.call(writable, error));
224223
}
225224
}
226225

@@ -271,7 +270,7 @@ export function newStreamWritableFromWritableStream(
271270
// thrown we don't want those to cause an unhandled
272271
// rejection. Let's just escape the promise and
273272
// handle it separately.
274-
process.nextTick(() => {
273+
nextTick(() => {
275274
throw error;
276275
});
277276
}
@@ -299,7 +298,7 @@ export function newStreamWritableFromWritableStream(
299298
// thrown we don't want those to cause an unhandled
300299
// rejection. Let's just escape the promise and
301300
// handle it separately.
302-
process.nextTick(() => destroy.call(writable, error));
301+
nextTick(() => destroy.call(writable, error));
303302
}
304303
}
305304

@@ -491,7 +490,7 @@ export function newStreamReadableFromReadableStream(
491490
// thrown we don't want those to cause an unhandled
492491
// rejection. Let's just escape the promise and
493492
// handle it separately.
494-
process.nextTick(() => {
493+
nextTick(() => {
495494
throw error;
496495
});
497496
}
@@ -641,7 +640,7 @@ export function newStreamDuplexFromReadableWritablePair(
641640
// thrown we don't want those to cause an unhandled
642641
// rejection. Let's just escape the promise and
643642
// handle it separately.
644-
process.nextTick(() => destroy.call(duplex, error));
643+
nextTick(() => destroy.call(duplex, error));
645644
}
646645
}
647646

@@ -693,7 +692,7 @@ export function newStreamDuplexFromReadableWritablePair(
693692
// thrown we don't want those to cause an unhandled
694693
// rejection. Let's just escape the promise and
695694
// handle it separately.
696-
process.nextTick(() => destroy.call(duplex, error));
695+
nextTick(() => destroy.call(duplex, error));
697696
}
698697
}
699698

@@ -725,7 +724,7 @@ export function newStreamDuplexFromReadableWritablePair(
725724
// thrown we don't want those to cause an unhandled
726725
// rejection. Let's just escape the promise and
727726
// handle it separately.
728-
process.nextTick(() => {
727+
nextTick(() => {
729728
throw error;
730729
});
731730
}

src/node/internal/streams_duplex.js

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,7 @@ import {
3434
newStreamDuplexFromReadableWritablePair,
3535
newReadableWritablePairFromDuplex,
3636
} from 'node-internal:streams_adapters';
37-
38-
import process from 'node:process';
37+
import { nextTick } from 'node-internal:internal_process';
3938

4039
import {
4140
addAbortSignal,
@@ -322,9 +321,9 @@ function duplexify(body, name) {
322321
final(async () => {
323322
try {
324323
await promise;
325-
process.nextTick(cb, null);
324+
nextTick(cb, null);
326325
} catch (err) {
327-
process.nextTick(cb, err);
326+
nextTick(cb, err);
328327
}
329328
});
330329
},
@@ -434,7 +433,7 @@ function fromAsyncGen(fn) {
434433
const _promise = promise;
435434
promise = null;
436435
const { chunk, done, cb } = await _promise;
437-
process.nextTick(cb);
436+
nextTick(cb);
438437
if (done) return;
439438
if (signal.aborted)
440439
throw new AbortError(undefined, {
@@ -633,7 +632,7 @@ class DuplexSide extends Duplex {
633632
assert(this.#otherSide !== null);
634633
assert(this.#otherSide[kCallback] === null);
635634
if (chunk.length === 0) {
636-
process.nextTick(callback);
635+
nextTick(callback);
637636
} else {
638637
this.#otherSide.push(chunk);
639638
this.#otherSide[kCallback] = callback;

src/node/internal/streams_pipeline.js

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ import {
3636
destroyer as destroyerImpl,
3737
} from 'node-internal:streams_util';
3838

39-
import process from 'node:process';
39+
import { nextTick } from 'node-internal:internal_process';
4040

4141
import { PassThrough } from 'node-internal:streams_transform';
4242
import { Duplex } from 'node-internal:streams_duplex';
@@ -214,7 +214,7 @@ export function pipelineImpl(streams, callback, opts) {
214214
if (!error) {
215215
lastStreamCleanup.forEach((fn) => fn());
216216
}
217-
process.nextTick(callback, error, value);
217+
nextTick(callback, error, value);
218218
}
219219
}
220220
let ret;
@@ -309,11 +309,11 @@ export function pipelineImpl(streams, callback, opts) {
309309
if (end) {
310310
pt.end();
311311
}
312-
process.nextTick(finish);
312+
nextTick(finish);
313313
},
314314
(err) => {
315315
pt.destroy(err);
316-
process.nextTick(finish, err);
316+
nextTick(finish, err);
317317
}
318318
);
319319
} else if (isIterable(ret, true)) {
@@ -365,7 +365,7 @@ export function pipelineImpl(streams, callback, opts) {
365365
(signal !== null && signal !== undefined && signal.aborted) ||
366366
(outerSignal !== null && outerSignal !== undefined && outerSignal.aborted)
367367
) {
368-
process.nextTick(abort);
368+
nextTick(abort);
369369
}
370370
return ret;
371371
}

src/node/internal/streams_readable.js

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,7 @@ import {
4242
finished,
4343
kOnConstructed,
4444
} from 'node-internal:streams_util';
45-
46-
import process from 'node:process';
45+
import { nextTick } from 'node-internal:internal_process';
4746

4847
import { EventEmitter } from 'node-internal:events';
4948

@@ -547,7 +546,7 @@ function emitReadable(stream) {
547546
state.needReadable = false;
548547
if (!state.emittedReadable) {
549548
state.emittedReadable = true;
550-
process.nextTick(emitReadable_, stream);
549+
nextTick(emitReadable_, stream);
551550
}
552551
}
553552

@@ -578,7 +577,7 @@ function emitReadable_(stream) {
578577
function maybeReadMore(stream, state) {
579578
if (!state.readingMore && state.constructed) {
580579
state.readingMore = true;
581-
process.nextTick(maybeReadMore_, stream, state);
580+
nextTick(maybeReadMore_, stream, state);
582581
}
583582
}
584583

@@ -643,7 +642,7 @@ Readable.prototype.pipe = function (dest, pipeOpts) {
643642
state.pipes.push(dest);
644643
const doEnd = !pipeOpts || pipeOpts.end !== false;
645644
const endFn = doEnd ? onend : unpipe;
646-
if (state.endEmitted) process.nextTick(endFn);
645+
if (state.endEmitted) nextTick(endFn);
647646
else src.once('end', endFn);
648647
dest.on('unpipe', onunpipe);
649648
function onunpipe(readable, unpipeInfo) {
@@ -835,7 +834,7 @@ Readable.prototype.on = function (ev, fn) {
835834
if (state.length) {
836835
emitReadable(this);
837836
} else if (!state.reading) {
838-
process.nextTick(nReadingNextTick, this);
837+
nextTick(nReadingNextTick, this);
839838
}
840839
}
841840
}
@@ -851,7 +850,7 @@ Readable.prototype.removeListener = function (ev, fn) {
851850
// support once('readable', fn) cycles. This means that calling
852851
// resume within the same tick will have no
853852
// effect.
854-
process.nextTick(updateReadableListening, this);
853+
nextTick(updateReadableListening, this);
855854
}
856855
return res;
857856
};
@@ -865,7 +864,7 @@ Readable.prototype.removeAllListeners = function (ev) {
865864
// support once('readable', fn) cycles. This means that calling
866865
// resume within the same tick will have no
867866
// effect.
868-
process.nextTick(updateReadableListening, this);
867+
nextTick(updateReadableListening, this);
869868
}
870869
return res;
871870
};
@@ -908,7 +907,7 @@ Readable.prototype.resume = function () {
908907
function resume(stream, state) {
909908
if (!state.resumeScheduled) {
910909
state.resumeScheduled = true;
911-
process.nextTick(resume_, stream, state);
910+
nextTick(resume_, stream, state);
912911
}
913912
}
914913

@@ -1225,7 +1224,7 @@ function endReadable(stream) {
12251224
const state = stream._readableState;
12261225
if (!state.endEmitted) {
12271226
state.ended = true;
1228-
process.nextTick(endReadableNT, state, stream);
1227+
nextTick(endReadableNT, state, stream);
12291228
}
12301229
}
12311230

@@ -1240,7 +1239,7 @@ function endReadableNT(state, stream) {
12401239
state.endEmitted = true;
12411240
stream.emit('end');
12421241
if (stream.writable && stream.allowHalfOpen === false) {
1243-
process.nextTick(endWritableNT, stream);
1242+
nextTick(endWritableNT, stream);
12441243
} else if (state.autoDestroy) {
12451244
// In case of duplex streams we need a way to detect
12461245
// if the writable side is ready for autoDestroy as well.
@@ -1344,8 +1343,8 @@ export function from(Readable, iterable, opts) {
13441343
};
13451344
readable._destroy = function (error, cb) {
13461345
close(error).then(
1347-
() => process.nextTick(cb, error),
1348-
(err) => process.nextTick(cb, err || error)
1346+
() => nextTick(cb, error),
1347+
(err) => nextTick(cb, err || error)
13491348
);
13501349
};
13511350
async function close(error) {

0 commit comments

Comments
 (0)