Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 106 additions & 0 deletions benchmark/webstreams/internal-pipe.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
'use strict';
const common = require('../common.js');
const fsp = require('fs/promises');
const path = require('path');
const os = require('os');
const { pipeline } = require('stream/promises');
const {
ReadableStream,
WritableStream,
} = require('node:stream/web');

const bench = common.createBenchmark(main, {
type: [
'node-streams',
'webstream-js',
'webstream-file-read',
],
size: [1024, 16384, 65536],
n: [1e4, 1e5],
});

async function main({ type, size, n }) {
const chunk = Buffer.alloc(size, 'x');
const totalBytes = size * n;

switch (type) {
case 'node-streams': {
// Baseline: Node.js streams
let received = 0;
const readable = new (require('stream').Readable)({
read() {
for (let i = 0; i < 100 && received < n; i++) {
this.push(chunk);
received++;
}
if (received >= n) this.push(null);
},
});

const writable = new (require('stream').Writable)({
write(data, enc, cb) { cb(); },
});

bench.start();
await pipeline(readable, writable);
bench.end(totalBytes);
break;
}

case 'webstream-js': {
// Web streams with pure JS source/sink
let sent = 0;
const rs = new ReadableStream({
pull(controller) {
if (sent++ < n) {
controller.enqueue(chunk);
} else {
controller.close();
}
},
});

const ws = new WritableStream({
write() {},
close() { bench.end(totalBytes); },
});

bench.start();
await rs.pipeTo(ws);
break;
}

case 'webstream-file-read': {
// Create a temporary file with test data
const tmpDir = os.tmpdir();
const tmpFile = path.join(tmpDir, `bench-webstream-${process.pid}.tmp`);

// Write test data to file
const fd = await fsp.open(tmpFile, 'w');
for (let i = 0; i < n; i++) {
await fd.write(chunk);
}
await fd.close();

// Read using readableWebStream
const readFd = await fsp.open(tmpFile, 'r');
const rs = readFd.readableWebStream({ type: 'bytes' });

const ws = new WritableStream({
write() {},
close() {
bench.end(totalBytes);
// Cleanup
readFd.close().then(() => fsp.unlink(tmpFile));
},
});

bench.start();
await rs.pipeTo(ws);
break;
}

default:
throw new Error(`Unknown type: ${type}`);
}
}
16 changes: 14 additions & 2 deletions lib/internal/webstreams/adapters.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ const {
ByteLengthQueuingStrategy,
} = require('internal/webstreams/queuingstrategies');

const {
kStreamBase,
} = require('internal/webstreams/util');

const {
Writable,
Readable,
Expand Down Expand Up @@ -946,7 +950,7 @@ function newWritableStreamFromStreamBase(streamBase, strategy) {
return promise.promise;
}

return new WritableStream({
const stream = new WritableStream({
write(chunk, controller) {
current = current !== undefined ?
PromisePrototypeThen(
Expand All @@ -967,6 +971,10 @@ function newWritableStreamFromStreamBase(streamBase, strategy) {
return promise.promise;
},
}, strategy);

stream[kStreamBase] = streamBase;

return stream;
}

/**
Expand Down Expand Up @@ -1017,7 +1025,7 @@ function newReadableStreamFromStreamBase(streamBase, strategy, options = kEmptyO
}
};

return new ReadableStream({
const stream = new ReadableStream({
start(c) { controller = c; },

pull() {
Expand All @@ -1040,6 +1048,10 @@ function newReadableStreamFromStreamBase(streamBase, strategy, options = kEmptyO
return promise.promise;
},
}, strategy);

stream[kStreamBase] = streamBase;

return stream;
}

module.exports = {
Expand Down
33 changes: 33 additions & 0 deletions lib/internal/webstreams/readablestream.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ const {
DOMException,
} = internalBinding('messaging');

const {
StreamPipe,
} = internalBinding('stream_pipe');

const {
isArrayBufferView,
isDataView,
Expand Down Expand Up @@ -114,6 +118,7 @@ const {
iteratorNext,
kType,
kState,
kStreamBase,
} = require('internal/webstreams/util');

const {
Expand Down Expand Up @@ -1369,6 +1374,34 @@ function readableStreamPipeTo(
preventCancel,
signal) {

const sourceStreamBase = source[kStreamBase];
const destStreamBase = dest[kStreamBase];

if (sourceStreamBase !== undefined &&
destStreamBase !== undefined &&
signal === undefined &&
!preventClose &&
!preventAbort &&
!preventCancel) {
// Use native piping
const promise = PromiseWithResolvers();

source[kState].disturbed = true;

try {
const pipe = new StreamPipe(sourceStreamBase, destStreamBase);
pipe.onunpipe = () => {
promise.resolve();
};
pipe.start();
} catch (error) {
return PromiseReject(error);
}

return promise.promise;
}

// Use JS-based piping
let reader;
let writer;
let disposable;
Expand Down
2 changes: 2 additions & 0 deletions lib/internal/webstreams/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ const {

const kState = Symbol('kState');
const kType = Symbol('kType');
const kStreamBase = Symbol('kStreamBase');

const AsyncIterator = {
__proto__: AsyncIteratorPrototype,
Expand Down Expand Up @@ -296,4 +297,5 @@ module.exports = {
iteratorNext,
kType,
kState,
kStreamBase,
};
115 changes: 115 additions & 0 deletions test/parallel/test-whatwg-webstreams-internal-pipe.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
// Flags: --expose-internals --no-warnings
'use strict';

// Tests for the internal StreamBase pipe optimization infrastructure
// described in nodejs/performance#134
//
// Note(mertcanaltin): Full fast-path testing requires real StreamBase implementations
// (like HTTP/2 streams or TCP sockets), not JSStream mocks.
// These tests verify the marker attachment and fallback behavior.

const common = require('../common');

const assert = require('assert');

const {
internalBinding,
} = require('internal/test/binding');

const {
newWritableStreamFromStreamBase,
newReadableStreamFromStreamBase,
} = require('internal/webstreams/adapters');

const {
kStreamBase,
} = require('internal/webstreams/util');

const {
JSStream,
} = internalBinding('js_stream');

// kStreamBase marker is attached to ReadableStream
{
const stream = new JSStream();
const readable = newReadableStreamFromStreamBase(stream);

assert.strictEqual(readable[kStreamBase], stream);

// Cleanup
stream.emitEOF();
}

// kStreamBase marker is attached to WritableStream
{
const stream = new JSStream();
stream.onwrite = common.mustNotCall();
stream.onshutdown = (req) => req.oncomplete();

const writable = newWritableStreamFromStreamBase(stream);

assert.strictEqual(writable[kStreamBase], stream);

// Cleanup
writable.close();
}

// Regular JS streams don't have kStreamBase
{
const { ReadableStream, WritableStream } = require('stream/web');

const rs = new ReadableStream({
pull(controller) {
controller.enqueue('chunk');
controller.close();
},
});

const ws = new WritableStream({
write() {},
});

assert.strictEqual(rs[kStreamBase], undefined);
assert.strictEqual(ws[kStreamBase], undefined);

// Pipe should still work (standard path)
rs.pipeTo(ws).then(common.mustCall());
}

// Mixed streams (one internal, one JS) use standard path
{
const stream = new JSStream();
stream.onshutdown = (req) => req.oncomplete();
const readable = newReadableStreamFromStreamBase(stream);

const { WritableStream } = require('stream/web');
const chunks = [];
const ws = new WritableStream({
write(chunk) {
chunks.push(chunk);
},
});

// Readable has kStreamBase, ws does not - should use standard path
assert.ok(readable[kStreamBase]);
assert.strictEqual(ws[kStreamBase], undefined);

const pipePromise = readable.pipeTo(ws);

stream.readBuffer(Buffer.from('hello'));
stream.emitEOF();

pipePromise.then(common.mustCall(() => {
assert.strictEqual(chunks.length, 1);
}));
}

// Verify kStreamBase is the correct symbol from util
{
const {
kStreamBase: kStreamBase2,
} = require('internal/webstreams/util');

// Should be the same symbol
assert.strictEqual(kStreamBase, kStreamBase2);
}
Loading