diff --git a/cli/tsc/dts/node/fs/promises.d.cts b/cli/tsc/dts/node/fs/promises.d.cts index b5ac815d6bbf40..2afb66d606f082 100644 --- a/cli/tsc/dts/node/fs/promises.d.cts +++ b/cli/tsc/dts/node/fs/promises.d.cts @@ -9,7 +9,7 @@ * @since v10.0.0 */ declare module "fs/promises" { - import { Abortable } from "node:events"; + import { Abortable, EventEmitter } from "node:events"; import { Stream } from "node:stream"; import { ReadableStream } from "node:stream/web"; import { @@ -91,8 +91,7 @@ declare module "fs/promises" { interface ReadableWebStreamOptions { autoClose?: boolean | undefined; } - // TODO: Add `EventEmitter` close - interface FileHandle { + interface FileHandle extends EventEmitter { /** * The numeric file descriptor managed by the {FileHandle} object. * @since v10.0.0 diff --git a/ext/node/polyfills/internal/fs/handle.ts b/ext/node/polyfills/internal/fs/handle.ts index 844475ea430b0a..8dec3d514e489d 100644 --- a/ext/node/polyfills/internal/fs/handle.ts +++ b/ext/node/polyfills/internal/fs/handle.ts @@ -1,5 +1,8 @@ // Copyright 2018-2026 the Deno authors. MIT license. +// TODO(petamoriken): enable prefer-primordials for node polyfills +// deno-lint-ignore-file prefer-primordials + import { EventEmitter } from "node:events"; import { Buffer } from "node:buffer"; import { @@ -34,7 +37,17 @@ import { CreateWriteStreamOptions, } from "node:fs/promises"; import assert from "node:assert"; -import { denoErrorToNodeError } from "ext:deno_node/internal/errors.ts"; +import { + denoErrorToNodeError, + ERR_INVALID_STATE, +} from "ext:deno_node/internal/errors.ts"; +import { readableStreamCancel } from "ext:deno_web/06_streams.js"; +import { + validateBoolean, + validateObject, +} from "ext:deno_node/internal/validators.mjs"; +import { kEmptyObject } from "ext:deno_node/internal/util.mjs"; +import process from "node:process"; const { Error, @@ -56,6 +69,7 @@ const kCloseResolve = Symbol("kCloseResolve"); const kCloseReject = Symbol("kCloseReject"); const kRef = Symbol("kRef"); const kUnref = Symbol("kUnref"); +const kLocked = Symbol("kLocked"); interface WriteResult { bytesWritten: number; @@ -73,6 +87,7 @@ export class FileHandle extends EventEmitter { [kClosePromise]?: Promise | null; [kCloseResolve]?: () => void; [kCloseReject]?: (err: Error) => void; + [kLocked]: boolean; constructor(rid: number) { super(); @@ -80,6 +95,7 @@ export class FileHandle extends EventEmitter { this[kRefs] = 1; this[kClosePromise] = null; + this[kLocked] = false; } get fd() { @@ -265,6 +281,73 @@ export class FileHandle extends EventEmitter { }); } + readableWebStream( + options: { autoClose?: boolean; type?: string } = kEmptyObject, + ): ReadableStream { + if (this.fd === -1) { + throw new ERR_INVALID_STATE("The FileHandle is closed"); + } + if (this[kClosePromise]) { + throw new ERR_INVALID_STATE("The FileHandle is closing"); + } + if (this[kLocked]) { + throw new ERR_INVALID_STATE("The FileHandle is locked"); + } + this[kLocked] = true; + + validateObject(options, "options"); + const autoClose = options?.autoClose ?? false; + const type = options?.type ?? "bytes"; + validateBoolean(autoClose, "options.autoClose"); + + if (type !== "bytes") { + process.emitWarning( + 'A non-"bytes" options.type has no effect. A byte-oriented steam is ' + + "always created.", + "ExperimentalWarning", + ); + } + + const ondone = async () => { + this[kUnref](); + if (autoClose) { + await this.close().catch(() => {}); + } + }; + + const readable = new ReadableStream({ + type: "bytes", + autoAllocateChunkSize: 16384, + + pull: async (controller) => { + const view = controller.byobRequest!.view! as Uint8Array; + const { bytesRead } = await this.read( + view, + view.byteOffset, + view.byteLength, + ); + + if (bytesRead === 0) { + controller.close(); + await ondone(); + } + + controller.byobRequest!.respond(bytesRead); + }, + + cancel: async () => { + await ondone(); + }, + }); + + this[kRef](); + this.once("close", () => { + readableStreamCancel(readable); + }); + + return readable; + } + [SymbolAsyncDispose]() { return this.close(); } diff --git a/ext/web/06_streams.js b/ext/web/06_streams.js index 95026069e40d68..dc329c6be0a7ab 100644 --- a/ext/web/06_streams.js +++ b/ext/web/06_streams.js @@ -7261,6 +7261,7 @@ export { ReadableStream, ReadableStreamBYOBReader, ReadableStreamBYOBRequest, + readableStreamCancel, readableStreamClose, readableStreamCollectIntoUint8Array, ReadableStreamDefaultController, diff --git a/ext/web/internal.d.ts b/ext/web/internal.d.ts index f1024459598532..e4e5bb2689cb93 100644 --- a/ext/web/internal.d.ts +++ b/ext/web/internal.d.ts @@ -95,6 +95,10 @@ declare module "ext:deno_web/09_file.js" { declare module "ext:deno_web/06_streams.js" { const ReadableStream: typeof ReadableStream; + function readableStreamCancel( + stream: ReadableStream, + reason?: string, + ): Promise; function isReadableStreamDisturbed(stream: ReadableStream): boolean; function createProxy(stream: ReadableStream): ReadableStream; } diff --git a/tests/node_compat/config.toml b/tests/node_compat/config.toml index 47bdb87c52d0be..9b7901b391e01b 100644 --- a/tests/node_compat/config.toml +++ b/tests/node_compat/config.toml @@ -432,6 +432,7 @@ "parallel/test-file-validate-mode-flag.js" = {} "parallel/test-file-write-stream5.js" = {} "parallel/test-filehandle-close.js" = {} +"parallel/test-filehandle-readablestream.js" = {} "parallel/test-finalization-registry-shutdown.js" = {} "parallel/test-fs-buffertype-writesync.js" = {} "parallel/test-fs-chmod-mask.js" = {} diff --git a/tests/unit_node/_fs/_fs_handle_test.ts b/tests/unit_node/_fs/_fs_handle_test.ts index 379fefa84d8e9e..01cf1845d62bd4 100644 --- a/tests/unit_node/_fs/_fs_handle_test.ts +++ b/tests/unit_node/_fs/_fs_handle_test.ts @@ -369,3 +369,167 @@ Deno.test( } }, ); + +Deno.test( + "[node/fs filehandle.readableWebStream] Create a readable web stream", + async function () { + const fileHandle = await fs.open(testData); + const webStream = fileHandle.readableWebStream(); + + assert(webStream instanceof ReadableStream); + + const reader = webStream.getReader(); + const chunks: Uint8Array[] = []; + + while (true) { + const { done, value } = await reader.read(); + if (done) break; + chunks.push(value); + } + + const result = new Uint8Array( + chunks.reduce((acc, chunk) => acc + chunk.length, 0), + ); + let offset = 0; + for (const chunk of chunks) { + result.set(chunk, offset); + offset += chunk.length; + } + + assertEquals(decoder.decode(result), "hello world"); + + await fileHandle.close(); + }, +); + +Deno.test( + "[node/fs filehandle.readableWebStream] With autoClose option", + async function () { + const fileHandle = await fs.open(testData); + const { promise: closePromise, resolve: closeResolve } = Promise + .withResolvers(); + fileHandle.once("close", closeResolve); + + const webStream = fileHandle.readableWebStream({ autoClose: true }); + + assert(webStream instanceof ReadableStream); + + const reader = webStream.getReader(); + const chunks: Uint8Array[] = []; + + while (true) { + const { done, value } = await reader.read(); + if (done) break; + chunks.push(value); + } + + const result = new Uint8Array( + chunks.reduce((acc, chunk) => acc + chunk.length, 0), + ); + let offset = 0; + for (const chunk of chunks) { + result.set(chunk, offset); + offset += chunk.length; + } + + assertEquals(decoder.decode(result), "hello world"); + + // Wait for autoClose to complete by waiting for the close event + await closePromise; + + // File should be closed + assertEquals(fileHandle.fd, -1); + }, +); + +Deno.test( + "[node/fs filehandle.readableWebStream] Throws when called twice", + async function () { + const fileHandle = await fs.open(testData); + + // First call should succeed + const stream = fileHandle.readableWebStream(); + + // Second call should throw + let threw = false; + try { + fileHandle.readableWebStream(); + } catch (e) { + threw = true; + assert(e instanceof Error); + assertEquals((e as NodeJS.ErrnoException).code, "ERR_INVALID_STATE"); + } + + assertEquals(threw, true); + + // Cancel the stream to avoid resource leak + await stream.cancel(); + await fileHandle.close(); + }, +); + +Deno.test( + "[node/fs filehandle.readableWebStream] cancel without autoClose keeps fd open", + async function () { + const fileHandle = await fs.open(testData); + const stream = fileHandle.readableWebStream({ autoClose: false }); + + // Cancel the stream + await stream.cancel(); + + // fd should still be valid (not -1) + assert(fileHandle.fd !== -1); + + // FileHandle should still be usable + const stat = await fileHandle.stat(); + assertEquals(stat.isFile(), true); + + await fileHandle.close(); + }, +); + +Deno.test( + "[node/fs filehandle.readableWebStream] cancel with autoClose closes fd", + async function () { + const fileHandle = await fs.open(testData); + const { promise: closePromise, resolve: closeResolve } = Promise + .withResolvers(); + fileHandle.once("close", closeResolve); + + const stream = fileHandle.readableWebStream({ autoClose: true }); + + // Cancel the stream - this should trigger autoClose + await stream.cancel(); + + // Wait for close event + await closePromise; + + // fd should be -1 + assertEquals(fileHandle.fd, -1); + }, +); + +Deno.test( + "[node/fs filehandle.readableWebStream] close FileHandle while reader is active", + async function () { + const fileHandle = await fs.open(testData); + const stream = fileHandle.readableWebStream(); + const reader = stream.getReader({ mode: "byob" }); + + // Close the FileHandle while reader is active + await fileHandle.close(); + + // fd should be -1 + assertEquals(fileHandle.fd, -1); + + // Reader should eventually get done or error + const buffer = new ArrayBuffer(1024); + try { + const result = await reader.read(new Uint8Array(buffer)); + // If no error, should be done + assertEquals(result.done, true); + } catch { + // Error is also acceptable (FileHandle was closed) + } + }, +);