Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/workerd/api/tests/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,12 @@ wd_test(
data = ["streams-respond-test.js"],
)

wd_test(
src = "streams-js-test.wd-test",
args = ["--experimental"],
data = ["streams-js-test.js"],
)

wd_test(
src = "unsafe-test.wd-test",
args = ["--experimental"],
Expand Down
257 changes: 257 additions & 0 deletions src/workerd/api/tests/streams-js-test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,257 @@
// Copyright (c) 2025 Cloudflare, Inc.
// Licensed under the Apache 2.0 license found in the LICENSE file or at:
// https://opensource.org/licenses/Apache-2.0

// Tests for JavaScript-backed streams (ReadableStream and WritableStream constructors)
// Ported from edgeworker streams-js.ew-test

import { strictEqual, ok, throws, rejects } from 'node:assert';

// Test that JS streams globals exist
export const userStreamsGlobalsExist = {
test() {
ok(ReadableStreamDefaultController !== undefined);
ok(ReadableByteStreamController !== undefined);
ok(ReadableStreamBYOBRequest !== undefined);
ok(WritableStreamDefaultController !== undefined);
},
};

// Test that JS streams objects are not directly constructable
export const jsStreamsObjectsNotConstructable = {
test() {
throws(() => new ReadableStreamDefaultController(), TypeError);
throws(() => new ReadableByteStreamController(), TypeError);
throws(() => new ReadableStreamBYOBRequest(), TypeError);
throws(() => new WritableStreamDefaultController(), TypeError);
},
};

// Test new ReadableStream() works
export const newReadableStream = {
test() {
new ReadableStream();
new ReadableStream({ type: 'bytes' });
},
};

// Test that underlying source algorithms are called
export const newReadableStreamAlgorithms = {
async test() {
// Sync algorithms
{
let started = false;
let pulled = false;
let canceled = false;
const rs = new ReadableStream({
start() {
started = true;
},
pull() {
pulled = true;
},
cancel() {
canceled = true;
},
});
ok(started);

await scheduler.wait(1);

rs.cancel();

ok(pulled);
ok(canceled);
}

// Byte stream sync algorithms
{
let started = false;
let pulled = false;
let canceled = false;
const rs = new ReadableStream(
{
type: 'bytes',
start() {
started = true;
},
pull() {
pulled = true;
},
cancel() {
canceled = true;
},
},
{ highWaterMark: 1 }
);

ok(started);
await scheduler.wait(1);

rs.cancel();

ok(pulled);
ok(canceled);
}

// Async algorithms for value stream
{
let onStarted, onPulled, onCanceled;
let started = new Promise((resolve) => (onStarted = resolve));
let pulled = new Promise((resolve) => (onPulled = resolve));
let canceled = new Promise((resolve) => (onCanceled = resolve));

const rs = new ReadableStream({
async start() {
await scheduler.wait(1);
onStarted();
},
async pull() {
await scheduler.wait(1);
onPulled();
},
async cancel() {
onCanceled();
},
});

await Promise.allSettled([started, pulled]);
await scheduler.wait(1);
await Promise.allSettled([rs.cancel(), canceled]);
}

// Async algorithms for byte stream
{
let onStarted, onPulled, onCanceled;
let started = new Promise((resolve) => (onStarted = resolve));
let pulled = new Promise((resolve) => (onPulled = resolve));
let canceled = new Promise((resolve) => (onCanceled = resolve));

const rs = new ReadableStream(
{
type: 'bytes',
async start() {
await scheduler.wait(1);
onStarted();
},
async pull() {
await scheduler.wait(1);
onPulled();
},
async cancel() {
onCanceled();
},
},
{ highWaterMark: 1 }
);

await Promise.allSettled([started, pulled]);
await scheduler.wait(1);
await Promise.allSettled([rs.cancel(), canceled]);
}
},
};

// Test that new ReadableStream creates the right kind of controller
export const newReadableStreamControllerType = {
test() {
new ReadableStream({
start(c) {
ok(c instanceof ReadableStreamDefaultController);
},
pull(c) {
ok(c instanceof ReadableStreamDefaultController);
},
});

new ReadableStream({
type: 'bytes',
start(c) {
ok(c instanceof ReadableByteStreamController);
},
pull(c) {
ok(c instanceof ReadableByteStreamController);
const byobRequest = c.byobRequest;
ok(byobRequest != null);
ok(byobRequest === c.byobRequest);
ok(byobRequest instanceof ReadableStreamBYOBRequest);
ok(byobRequest.view instanceof Uint8Array);
},
});
},
};

// Test sync algorithm errors are handled properly
export const newReadableStreamSyncAlgorithmErrorsHandled = {
async test() {
// Start error
{
const rs = new ReadableStream({
start() {
throw new Error('boom');
},
});

await rejects(rs.getReader().read(), { message: 'boom' });
}

// Pull error
{
const rs = new ReadableStream({
pull() {
throw new Error('boom');
},
});

await rejects(rs.getReader().read(), { message: 'boom' });
}

// Cancel error
{
const rs = new ReadableStream({
cancel() {
throw new Error('boom');
},
});
await rejects(rs.cancel(), { message: 'boom' });
}
},
};

// Test async algorithm errors are handled properly
export const newReadableStreamAsyncAlgorithmErrorsHandled = {
async test() {
// Async start error
{
const rs = new ReadableStream({
async start() {
throw new Error('boom');
},
});

await rejects(rs.getReader().read(), { message: 'boom' });
}

// Async pull error
{
const rs = new ReadableStream({
async pull() {
throw new Error('boom');
},
});

await rejects(rs.getReader().read(), { message: 'boom' });
}

// Async cancel error
{
const rs = new ReadableStream({
async cancel() {
throw new Error('boom');
},
});

await rejects(rs.cancel(), { message: 'boom' });
}
},
};
17 changes: 17 additions & 0 deletions src/workerd/api/tests/streams-js-test.wd-test
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
using Workerd = import "/workerd/workerd.capnp";

const unitTests :Workerd.Config = (
services = [
( name = "streams-js-test",
worker = (
modules = [
(name = "worker", esModule = embed "streams-js-test.js")
],
compatibilityFlags = [
"nodejs_compat",
"streams_enable_constructors",
]
)
),
],
);
88 changes: 88 additions & 0 deletions src/workerd/api/tests/streams-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -763,8 +763,96 @@ export const fixedLengthStreamPreconditions = {
},
};

// Test non-standard readAtLeast() extension with default reader (should throw)
export const readAtLeastDefaultReaderThrows = {
async test() {
const rs = new ReadableStream({
type: 'bytes',
pull(c) {
c.enqueue(enc.encode('hello'));
c.close();
},
});

const reader = rs.getReader();
throws(() => reader.readAtLeast(1), TypeError);
reader.releaseLock();

// Consume the stream to clean up
for await (const _ of rs) {
}
},
};

// Test non-standard readAtLeast() extension with BYOB reader
// Note: The original ew-test expected value=undefined on done, which was the legacy
// behavior of internal streams. With `internal_stream_byob_return_view` compat flag
// (enabled since 2024-05-13), the spec-compliant behavior returns an empty view.
export const readAtLeastByobReader = {
async test(ctrl, env) {
// Use service binding to get chunked response
const response = await env.subrequest.fetch('http://test/chunked');
const reader = response.body.getReader({ mode: 'byob' });

// First readAtLeast: request min 4 bytes
// Server sends: 'foo' (3) + 'bar' (3) = 6 bytes, first chunk 'foo' only 3 bytes
// so readAtLeast(4) should wait for more data
let result = await reader.readAtLeast(4, new Uint8Array(20));
let value = new TextDecoder().decode(result.value);
strictEqual(result.done, false);
strictEqual(value.length, 6);
strictEqual(value, 'foobar');

// Regular read
result = await reader.read(new Uint8Array(20));
value = new TextDecoder().decode(result.value);
strictEqual(value.length, 1);
strictEqual(value, 'b');
strictEqual(result.done, false);

// Second readAtLeast: request min 4 bytes, only 'az' (2 bytes) remain
// Server sends: 'a' (1) + 'z' (1) = 2 bytes, then closes
result = await reader.readAtLeast(4, new Uint8Array(20));
value = new TextDecoder().decode(result.value);
strictEqual(value.length, 2);
strictEqual(value, 'az');
strictEqual(result.done, false);

// Final read should be done - spec requires empty view, not undefined
result = await reader.readAtLeast(4, new Uint8Array(20));
strictEqual(result.done, true);
ok(result.value instanceof Uint8Array);
strictEqual(result.value.byteLength, 0);
},
};

export default {
async fetch(request, env) {
const url = new URL(request.url);

// Endpoint for chunked data for readAtLeast tests
if (url.pathname === '/chunked') {
const rs = new ReadableStream({
type: 'bytes',
async pull(controller) {
// Simulate chunked input: foo, bar, b, a, z
const chunks = [
enc.encode('foo'),
enc.encode('bar'),
enc.encode('b'),
enc.encode('a'),
enc.encode('z'),
];
for (const chunk of chunks) {
controller.enqueue(chunk);
await scheduler.wait(1);
}
controller.close();
},
});
return new Response(rs);
}

strictEqual(request.headers.get('content-length'), '10');
return new Response(request.body);
},
Expand Down
Loading