diff --git a/src/workerd/api/tests/BUILD.bazel b/src/workerd/api/tests/BUILD.bazel index 3a791bd018d..f7782395ee5 100644 --- a/src/workerd/api/tests/BUILD.bazel +++ b/src/workerd/api/tests/BUILD.bazel @@ -372,6 +372,12 @@ wd_test( data = ["streams-respond-test.js"], ) +wd_test( + src = "streams-js-test.wd-test", + args = ["--experimental"], + data = ["streams-js-test.js"], +) + wd_test( src = "unsafe-test.wd-test", args = ["--experimental"], diff --git a/src/workerd/api/tests/streams-js-test.js b/src/workerd/api/tests/streams-js-test.js new file mode 100644 index 00000000000..a02ac7c1d89 --- /dev/null +++ b/src/workerd/api/tests/streams-js-test.js @@ -0,0 +1,257 @@ +// Copyright (c) 2025 Cloudflare, Inc. +// Licensed under the Apache 2.0 license found in the LICENSE file or at: +// https://opensource.org/licenses/Apache-2.0 + +// Tests for JavaScript-backed streams (ReadableStream and WritableStream constructors) +// Ported from edgeworker streams-js.ew-test + +import { strictEqual, ok, throws, rejects } from 'node:assert'; + +// Test that JS streams globals exist +export const userStreamsGlobalsExist = { + test() { + ok(ReadableStreamDefaultController !== undefined); + ok(ReadableByteStreamController !== undefined); + ok(ReadableStreamBYOBRequest !== undefined); + ok(WritableStreamDefaultController !== undefined); + }, +}; + +// Test that JS streams objects are not directly constructable +export const jsStreamsObjectsNotConstructable = { + test() { + throws(() => new ReadableStreamDefaultController(), TypeError); + throws(() => new ReadableByteStreamController(), TypeError); + throws(() => new ReadableStreamBYOBRequest(), TypeError); + throws(() => new WritableStreamDefaultController(), TypeError); + }, +}; + +// Test new ReadableStream() works +export const newReadableStream = { + test() { + new ReadableStream(); + new ReadableStream({ type: 'bytes' }); + }, +}; + +// Test that underlying source algorithms are called +export const newReadableStreamAlgorithms = { + async test() { + // Sync algorithms + { + let started = false; + let pulled = false; + let canceled = false; + const rs = new ReadableStream({ + start() { + started = true; + }, + pull() { + pulled = true; + }, + cancel() { + canceled = true; + }, + }); + ok(started); + + await scheduler.wait(1); + + rs.cancel(); + + ok(pulled); + ok(canceled); + } + + // Byte stream sync algorithms + { + let started = false; + let pulled = false; + let canceled = false; + const rs = new ReadableStream( + { + type: 'bytes', + start() { + started = true; + }, + pull() { + pulled = true; + }, + cancel() { + canceled = true; + }, + }, + { highWaterMark: 1 } + ); + + ok(started); + await scheduler.wait(1); + + rs.cancel(); + + ok(pulled); + ok(canceled); + } + + // Async algorithms for value stream + { + let onStarted, onPulled, onCanceled; + let started = new Promise((resolve) => (onStarted = resolve)); + let pulled = new Promise((resolve) => (onPulled = resolve)); + let canceled = new Promise((resolve) => (onCanceled = resolve)); + + const rs = new ReadableStream({ + async start() { + await scheduler.wait(1); + onStarted(); + }, + async pull() { + await scheduler.wait(1); + onPulled(); + }, + async cancel() { + onCanceled(); + }, + }); + + await Promise.allSettled([started, pulled]); + await scheduler.wait(1); + await Promise.allSettled([rs.cancel(), canceled]); + } + + // Async algorithms for byte stream + { + let onStarted, onPulled, onCanceled; + let started = new Promise((resolve) => (onStarted = resolve)); + let pulled = new Promise((resolve) => (onPulled = resolve)); + let canceled = new Promise((resolve) => (onCanceled = resolve)); + + const rs = new ReadableStream( + { + type: 'bytes', + async start() { + await scheduler.wait(1); + onStarted(); + }, + async pull() { + await scheduler.wait(1); + onPulled(); + }, + async cancel() { + onCanceled(); + }, + }, + { highWaterMark: 1 } + ); + + await Promise.allSettled([started, pulled]); + await scheduler.wait(1); + await Promise.allSettled([rs.cancel(), canceled]); + } + }, +}; + +// Test that new ReadableStream creates the right kind of controller +export const newReadableStreamControllerType = { + test() { + new ReadableStream({ + start(c) { + ok(c instanceof ReadableStreamDefaultController); + }, + pull(c) { + ok(c instanceof ReadableStreamDefaultController); + }, + }); + + new ReadableStream({ + type: 'bytes', + start(c) { + ok(c instanceof ReadableByteStreamController); + }, + pull(c) { + ok(c instanceof ReadableByteStreamController); + const byobRequest = c.byobRequest; + ok(byobRequest != null); + ok(byobRequest === c.byobRequest); + ok(byobRequest instanceof ReadableStreamBYOBRequest); + ok(byobRequest.view instanceof Uint8Array); + }, + }); + }, +}; + +// Test sync algorithm errors are handled properly +export const newReadableStreamSyncAlgorithmErrorsHandled = { + async test() { + // Start error + { + const rs = new ReadableStream({ + start() { + throw new Error('boom'); + }, + }); + + await rejects(rs.getReader().read(), { message: 'boom' }); + } + + // Pull error + { + const rs = new ReadableStream({ + pull() { + throw new Error('boom'); + }, + }); + + await rejects(rs.getReader().read(), { message: 'boom' }); + } + + // Cancel error + { + const rs = new ReadableStream({ + cancel() { + throw new Error('boom'); + }, + }); + await rejects(rs.cancel(), { message: 'boom' }); + } + }, +}; + +// Test async algorithm errors are handled properly +export const newReadableStreamAsyncAlgorithmErrorsHandled = { + async test() { + // Async start error + { + const rs = new ReadableStream({ + async start() { + throw new Error('boom'); + }, + }); + + await rejects(rs.getReader().read(), { message: 'boom' }); + } + + // Async pull error + { + const rs = new ReadableStream({ + async pull() { + throw new Error('boom'); + }, + }); + + await rejects(rs.getReader().read(), { message: 'boom' }); + } + + // Async cancel error + { + const rs = new ReadableStream({ + async cancel() { + throw new Error('boom'); + }, + }); + + await rejects(rs.cancel(), { message: 'boom' }); + } + }, +}; diff --git a/src/workerd/api/tests/streams-js-test.wd-test b/src/workerd/api/tests/streams-js-test.wd-test new file mode 100644 index 00000000000..acd6591bb50 --- /dev/null +++ b/src/workerd/api/tests/streams-js-test.wd-test @@ -0,0 +1,17 @@ +using Workerd = import "/workerd/workerd.capnp"; + +const unitTests :Workerd.Config = ( + services = [ + ( name = "streams-js-test", + worker = ( + modules = [ + (name = "worker", esModule = embed "streams-js-test.js") + ], + compatibilityFlags = [ + "nodejs_compat", + "streams_enable_constructors", + ] + ) + ), + ], +); diff --git a/src/workerd/api/tests/streams-test.js b/src/workerd/api/tests/streams-test.js index aa55071cd8e..206cd9d9839 100644 --- a/src/workerd/api/tests/streams-test.js +++ b/src/workerd/api/tests/streams-test.js @@ -763,8 +763,96 @@ export const fixedLengthStreamPreconditions = { }, }; +// Test non-standard readAtLeast() extension with default reader (should throw) +export const readAtLeastDefaultReaderThrows = { + async test() { + const rs = new ReadableStream({ + type: 'bytes', + pull(c) { + c.enqueue(enc.encode('hello')); + c.close(); + }, + }); + + const reader = rs.getReader(); + throws(() => reader.readAtLeast(1), TypeError); + reader.releaseLock(); + + // Consume the stream to clean up + for await (const _ of rs) { + } + }, +}; + +// Test non-standard readAtLeast() extension with BYOB reader +// Note: The original ew-test expected value=undefined on done, which was the legacy +// behavior of internal streams. With `internal_stream_byob_return_view` compat flag +// (enabled since 2024-05-13), the spec-compliant behavior returns an empty view. +export const readAtLeastByobReader = { + async test(ctrl, env) { + // Use service binding to get chunked response + const response = await env.subrequest.fetch('http://test/chunked'); + const reader = response.body.getReader({ mode: 'byob' }); + + // First readAtLeast: request min 4 bytes + // Server sends: 'foo' (3) + 'bar' (3) = 6 bytes, first chunk 'foo' only 3 bytes + // so readAtLeast(4) should wait for more data + let result = await reader.readAtLeast(4, new Uint8Array(20)); + let value = new TextDecoder().decode(result.value); + strictEqual(result.done, false); + strictEqual(value.length, 6); + strictEqual(value, 'foobar'); + + // Regular read + result = await reader.read(new Uint8Array(20)); + value = new TextDecoder().decode(result.value); + strictEqual(value.length, 1); + strictEqual(value, 'b'); + strictEqual(result.done, false); + + // Second readAtLeast: request min 4 bytes, only 'az' (2 bytes) remain + // Server sends: 'a' (1) + 'z' (1) = 2 bytes, then closes + result = await reader.readAtLeast(4, new Uint8Array(20)); + value = new TextDecoder().decode(result.value); + strictEqual(value.length, 2); + strictEqual(value, 'az'); + strictEqual(result.done, false); + + // Final read should be done - spec requires empty view, not undefined + result = await reader.readAtLeast(4, new Uint8Array(20)); + strictEqual(result.done, true); + ok(result.value instanceof Uint8Array); + strictEqual(result.value.byteLength, 0); + }, +}; + export default { async fetch(request, env) { + const url = new URL(request.url); + + // Endpoint for chunked data for readAtLeast tests + if (url.pathname === '/chunked') { + const rs = new ReadableStream({ + type: 'bytes', + async pull(controller) { + // Simulate chunked input: foo, bar, b, a, z + const chunks = [ + enc.encode('foo'), + enc.encode('bar'), + enc.encode('b'), + enc.encode('a'), + enc.encode('z'), + ]; + for (const chunk of chunks) { + controller.enqueue(chunk); + await scheduler.wait(1); + } + controller.close(); + }, + }); + return new Response(rs); + } + strictEqual(request.headers.get('content-length'), '10'); return new Response(request.body); }, diff --git a/src/workerd/api/tests/transform-streams-test.js b/src/workerd/api/tests/transform-streams-test.js index e54b89b0b4a..58983705655 100644 --- a/src/workerd/api/tests/transform-streams-test.js +++ b/src/workerd/api/tests/transform-streams-test.js @@ -432,3 +432,155 @@ export const transformRoundtrip = { strictEqual(output, testData); }, }; + +// Regression test: iterating over globalThis properties should not crash. +// This tests that constructing and inspecting various global objects works correctly. +// +// Note: We skip `process` in the iteration because when nodejs_compat is enabled, +// the iteration logic may call process.exit() which terminates the test. +export const transformCrashRegression = { + test() { + function iterate(obj, depth = 0, results = {}, originalKey) { + for (const key in obj) { + if (depth > 100) return results; + if ( + key === 'parent' || + key === 'globalThis' || + key === 'self' || + key === 'ServiceWorkerGlobalScope' || + key === 'global' || + key === 'process' + ) + continue; + if (typeof obj[key] === 'object') { + results[key] = iterate(obj[key], ++depth, results[key] ?? {}, key); + } else { + const properties = new Set([ + ...Object.getOwnPropertyNames(obj[key]), + ...Object.getOwnPropertySymbols(obj[key]), + ...Object.keys(obj[key]), + ]); + for (const prop in obj[key]) { + properties.add(prop); + } + if (properties.size > 0) { + results[key] = { + __proto: typeof obj[key].__proto__, + }; + for (const property of properties) { + if ( + property === 'caller' || + property === 'callee' || + property === 'arguments' || + property === 'constructor' + ) + continue; + let writeProp = + property === 'prototype' ? '_prototype' : property; + try { + if (typeof obj[key][property] === 'object') { + results[key][writeProp] = iterate( + obj[key][property], + ++depth, + results[key][property] ?? {}, + key + ); + } else { + results[key][writeProp] = typeof obj[key][property]; + } + } catch (err) {} + let instance; + try { + if (property === 'prototype') { + try { + instance = new obj[key](); + } catch (internalConstructError) { + if ( + internalConstructError.message.includes( + 'Failed to construct' + ) || + internalConstructError.message.includes( + 'Illegal constructor' + ) + ) { + instance = new obj[key](''); + } else { + throw internalConstructError; + } + } + } else { + instance = new obj[key][property](); + } + const instanceProperties = new Set([ + ...Object.getOwnPropertyNames(instance), + ...Object.getOwnPropertySymbols(instance), + ...Object.keys(instance), + ...(instance.constructor + ? Object.getOwnPropertyNames(instance.constructor) + : []), + ...(instance.constructor + ? Object.getOwnPropertySymbols(instance.constructor) + : []), + ...(instance.constructor + ? Object.keys(instance.constructor) + : []), + ]); + for (const prop in instance) { + instanceProperties.add(prop); + } + if (instanceProperties.size > 0) { + if (results[key][writeProp] === undefined) { + results[key][writeProp] = {}; + } + for (const instanceProperty of instanceProperties) { + try { + if ( + results[key][writeProp][instanceProperty] === undefined + ) { + results[key][writeProp][instanceProperty] = + typeof instance[instanceProperty]; + } + } catch {} + } + } else { + if (results[key][writeProp] === undefined) { + results[key][writeProp] = typeof instance; + } + } + } catch (e) { + if (results[key][writeProp] === undefined) { + results[key][writeProp] = typeof obj[key][property]; + } + continue; + } + } + } else { + results[key] = typeof obj[key]; + } + } + } + return results; + } + + function order(obj) { + if (typeof obj === 'object') { + const ordered = {}; + Object.keys(obj) + .sort() + .forEach(function (key) { + ordered[key] = order(obj[key]); + }); + return ordered; + } else { + return obj; + } + } + + const iterated = iterate(globalThis); + const ordered = order(iterated); + const toWrite = JSON.stringify(ordered); + + // Test passes if we get here without crashing + ok(toWrite.length > 0); + }, +};