Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 106 additions & 0 deletions benchmark/webstreams/internal-pipe.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
'use strict';
const common = require('../common.js');
const fsp = require('fs/promises');
const path = require('path');
const os = require('os');
const { pipeline } = require('stream/promises');
const {
ReadableStream,
WritableStream,
} = require('node:stream/web');

const bench = common.createBenchmark(main, {
type: [
'node-streams',
'webstream-js',
'webstream-file-read',
],
size: [1024, 16384, 65536],
n: [1e4, 1e5],
});

async function main({ type, size, n }) {
const chunk = Buffer.alloc(size, 'x');
const totalBytes = size * n;

switch (type) {
case 'node-streams': {
// Baseline: Node.js streams
let received = 0;
const readable = new (require('stream').Readable)({
read() {
for (let i = 0; i < 100 && received < n; i++) {
this.push(chunk);
received++;
}
if (received >= n) this.push(null);
},
});

const writable = new (require('stream').Writable)({
write(data, enc, cb) { cb(); },
});

bench.start();
await pipeline(readable, writable);
bench.end(totalBytes);
break;
}

case 'webstream-js': {
// Web streams with pure JS source/sink
let sent = 0;
const rs = new ReadableStream({
pull(controller) {
if (sent++ < n) {
controller.enqueue(chunk);
} else {
controller.close();
}
},
});

const ws = new WritableStream({
write() {},
close() { bench.end(totalBytes); },
});

bench.start();
await rs.pipeTo(ws);
break;
}

case 'webstream-file-read': {
// Create a temporary file with test data
const tmpDir = os.tmpdir();
const tmpFile = path.join(tmpDir, `bench-webstream-${process.pid}.tmp`);

// Write test data to file
const fd = await fsp.open(tmpFile, 'w');
for (let i = 0; i < n; i++) {
await fd.write(chunk);
}
await fd.close();

// Read using readableWebStream
const readFd = await fsp.open(tmpFile, 'r');
const rs = readFd.readableWebStream({ type: 'bytes' });

const ws = new WritableStream({
write() {},
close() {
bench.end(totalBytes);
// Cleanup
readFd.close().then(() => fsp.unlink(tmpFile));
},
});

bench.start();
await rs.pipeTo(ws);
break;
}

default:
throw new Error(`Unknown type: ${type}`);
}
}
16 changes: 14 additions & 2 deletions lib/internal/webstreams/adapters.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ const {
ByteLengthQueuingStrategy,
} = require('internal/webstreams/queuingstrategies');

const {
kStreamBase,
} = require('internal/webstreams/util');

const {
Writable,
Readable,
Expand Down Expand Up @@ -946,7 +950,7 @@ function newWritableStreamFromStreamBase(streamBase, strategy) {
return promise.promise;
}

return new WritableStream({
const stream = new WritableStream({
write(chunk, controller) {
current = current !== undefined ?
PromisePrototypeThen(
Expand All @@ -967,6 +971,10 @@ function newWritableStreamFromStreamBase(streamBase, strategy) {
return promise.promise;
},
}, strategy);

stream[kStreamBase] = streamBase;

return stream;
}

/**
Expand Down Expand Up @@ -1017,7 +1025,7 @@ function newReadableStreamFromStreamBase(streamBase, strategy, options = kEmptyO
}
};

return new ReadableStream({
const stream = new ReadableStream({
start(c) { controller = c; },

pull() {
Expand All @@ -1040,6 +1048,10 @@ function newReadableStreamFromStreamBase(streamBase, strategy, options = kEmptyO
return promise.promise;
},
}, strategy);

stream[kStreamBase] = streamBase;

return stream;
}

module.exports = {
Expand Down
91 changes: 91 additions & 0 deletions lib/internal/webstreams/readablestream.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ const {

const {
AbortError,
ErrnoException,
codes: {
ERR_ILLEGAL_CONSTRUCTOR,
ERR_INVALID_ARG_TYPE,
Expand All @@ -45,6 +46,17 @@ const {
DOMException,
} = internalBinding('messaging');

const {
StreamPipe,
} = internalBinding('stream_pipe');

const {
kReadBytesOrError,
streamBaseState,
} = internalBinding('stream_wrap');

const { UV_EOF } = internalBinding('uv');

const {
isArrayBufferView,
isDataView,
Expand Down Expand Up @@ -114,6 +126,7 @@ const {
iteratorNext,
kType,
kState,
kStreamBase,
} = require('internal/webstreams/util');

const {
Expand All @@ -125,6 +138,7 @@ const {
isWritableStreamDefaultWriter,

writableStreamAbort,
writableStreamClose,
writableStreamCloseQueuedOrInFlight,
writableStreamDefaultWriterCloseWithErrorPropagation,
writableStreamDefaultWriterRelease,
Expand Down Expand Up @@ -1369,6 +1383,83 @@ function readableStreamPipeTo(
preventCancel,
signal) {

const sourceStreamBase = source?.[kStreamBase];
const destStreamBase = dest?.[kStreamBase];

if (sourceStreamBase !== undefined &&
destStreamBase !== undefined &&
signal === undefined &&
!preventClose &&
!preventAbort &&
!preventCancel) {
// Native C++ StreamPipe path for internal-to-internal piping.
// Ref: https://github.com/nodejs/performance/issues/134
const promise = PromiseWithResolvers();

source[kState].disturbed = true;

let pipeError = null;
let isComplete = false;
const originalSourceOnread = sourceStreamBase.onread;

sourceStreamBase.onread = (arrayBuffer) => {
const nread = streamBaseState[kReadBytesOrError];
if (nread < 0 && nread !== UV_EOF) {
pipeError = new ErrnoException(nread, 'read');
}
if (originalSourceOnread) {
return originalSourceOnread(arrayBuffer);
}
};

function finalize(error) {
if (isComplete) return;
isComplete = true;
sourceStreamBase.onread = originalSourceOnread;

if (error) {
if (source[kState].state === 'readable') {
readableStreamError(source, error);
}
if (dest[kState].state === 'writable') {
writableStreamAbort(dest, error);
}
promise.reject(error);
} else {
if (source[kState].state === 'readable') {
readableStreamClose(source);
}
if (dest[kState].state === 'writable' &&
!writableStreamCloseQueuedOrInFlight(dest)) {
PromisePrototypeThen(
writableStreamClose(dest),
() => promise.resolve(),
(err) => promise.reject(err),
);
} else {
promise.resolve();
}
}
}

try {
const pipe = new StreamPipe(sourceStreamBase, destStreamBase);
pipe.onunpipe = () => {
if (pipeError) {
finalize(pipeError);
}
};
pipe.oncomplete = () => finalize(pipeError);
pipe.start();
} catch (error) {
sourceStreamBase.onread = originalSourceOnread;
promise.reject(error);
}

return promise.promise;
}

// Use JS-based piping
let reader;
let writer;
let disposable;
Expand Down
2 changes: 2 additions & 0 deletions lib/internal/webstreams/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ const {

const kState = Symbol('kState');
const kType = Symbol('kType');
const kStreamBase = Symbol('kStreamBase');

const AsyncIterator = {
__proto__: AsyncIteratorPrototype,
Expand Down Expand Up @@ -296,4 +297,5 @@ module.exports = {
iteratorNext,
kType,
kState,
kStreamBase,
};
Loading
Loading