Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions cli/tsc/dts/node/fs/promises.d.cts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
* @since v10.0.0
*/
declare module "fs/promises" {
import { Abortable } from "node:events";
import { Abortable, EventEmitter } from "node:events";
import { Stream } from "node:stream";
import { ReadableStream } from "node:stream/web";
import {
Expand Down Expand Up @@ -91,8 +91,7 @@ declare module "fs/promises" {
interface ReadableWebStreamOptions {
autoClose?: boolean | undefined;
}
// TODO: Add `EventEmitter` close
interface FileHandle {
interface FileHandle extends EventEmitter {
/**
* The numeric file descriptor managed by the {FileHandle} object.
* @since v10.0.0
Expand Down
85 changes: 84 additions & 1 deletion ext/node/polyfills/internal/fs/handle.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
// Copyright 2018-2026 the Deno authors. MIT license.

// TODO(petamoriken): enable prefer-primordials for node polyfills
// deno-lint-ignore-file prefer-primordials

import { EventEmitter } from "node:events";
import { Buffer } from "node:buffer";
import {
Expand Down Expand Up @@ -34,7 +37,17 @@ import {
CreateWriteStreamOptions,
} from "node:fs/promises";
import assert from "node:assert";
import { denoErrorToNodeError } from "ext:deno_node/internal/errors.ts";
import {
denoErrorToNodeError,
ERR_INVALID_STATE,
} from "ext:deno_node/internal/errors.ts";
import { readableStreamCancel } from "ext:deno_web/06_streams.js";
import {
validateBoolean,
validateObject,
} from "ext:deno_node/internal/validators.mjs";
import { kEmptyObject } from "ext:deno_node/internal/util.mjs";
import process from "node:process";

const {
Error,
Expand All @@ -56,6 +69,7 @@ const kCloseResolve = Symbol("kCloseResolve");
const kCloseReject = Symbol("kCloseReject");
const kRef = Symbol("kRef");
const kUnref = Symbol("kUnref");
const kLocked = Symbol("kLocked");

interface WriteResult {
bytesWritten: number;
Expand All @@ -73,13 +87,15 @@ export class FileHandle extends EventEmitter {
[kClosePromise]?: Promise<void> | null;
[kCloseResolve]?: () => void;
[kCloseReject]?: (err: Error) => void;
[kLocked]: boolean;

constructor(rid: number) {
super();
this.#rid = rid;

this[kRefs] = 1;
this[kClosePromise] = null;
this[kLocked] = false;
}

get fd() {
Expand Down Expand Up @@ -265,6 +281,73 @@ export class FileHandle extends EventEmitter {
});
}

readableWebStream(
options: { autoClose?: boolean; type?: string } = kEmptyObject,
): ReadableStream<Uint8Array> {
if (this.fd === -1) {
throw new ERR_INVALID_STATE("The FileHandle is closed");
}
if (this[kClosePromise]) {
throw new ERR_INVALID_STATE("The FileHandle is closing");
}
if (this[kLocked]) {
throw new ERR_INVALID_STATE("The FileHandle is locked");
}
this[kLocked] = true;

validateObject(options, "options");
const autoClose = options?.autoClose ?? false;
const type = options?.type ?? "bytes";
validateBoolean(autoClose, "options.autoClose");

if (type !== "bytes") {
process.emitWarning(
'A non-"bytes" options.type has no effect. A byte-oriented steam is ' +
"always created.",
"ExperimentalWarning",
);
}

const ondone = async () => {
this[kUnref]();
if (autoClose) {
await this.close().catch(() => {});
}
};

const readable = new ReadableStream({
type: "bytes",
autoAllocateChunkSize: 16384,

pull: async (controller) => {
const view = controller.byobRequest!.view! as Uint8Array;
const { bytesRead } = await this.read(
view,
view.byteOffset,
view.byteLength,
);

if (bytesRead === 0) {
controller.close();
await ondone();
}

controller.byobRequest!.respond(bytesRead);
},

cancel: async () => {
await ondone();
},
});

this[kRef]();
this.once("close", () => {
readableStreamCancel(readable);
});

return readable;
}

[SymbolAsyncDispose]() {
return this.close();
}
Expand Down
1 change: 1 addition & 0 deletions ext/web/06_streams.js
Original file line number Diff line number Diff line change
Expand Up @@ -7261,6 +7261,7 @@ export {
ReadableStream,
ReadableStreamBYOBReader,
ReadableStreamBYOBRequest,
readableStreamCancel,
readableStreamClose,
readableStreamCollectIntoUint8Array,
ReadableStreamDefaultController,
Expand Down
4 changes: 4 additions & 0 deletions ext/web/internal.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ declare module "ext:deno_web/09_file.js" {

declare module "ext:deno_web/06_streams.js" {
const ReadableStream: typeof ReadableStream;
function readableStreamCancel<T>(
stream: ReadableStream<T>,
reason?: string,
): Promise<void>;
function isReadableStreamDisturbed(stream: ReadableStream): boolean;
function createProxy<T>(stream: ReadableStream<T>): ReadableStream<T>;
}
Expand Down
1 change: 1 addition & 0 deletions tests/node_compat/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,7 @@
"parallel/test-file-validate-mode-flag.js" = {}
"parallel/test-file-write-stream5.js" = {}
"parallel/test-filehandle-close.js" = {}
"parallel/test-filehandle-readablestream.js" = {}
"parallel/test-finalization-registry-shutdown.js" = {}
"parallel/test-fs-buffertype-writesync.js" = {}
"parallel/test-fs-chmod-mask.js" = {}
Expand Down
164 changes: 164 additions & 0 deletions tests/unit_node/_fs/_fs_handle_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -369,3 +369,167 @@ Deno.test(
}
},
);

Deno.test(
"[node/fs filehandle.readableWebStream] Create a readable web stream",
async function () {
const fileHandle = await fs.open(testData);
const webStream = fileHandle.readableWebStream();

assert(webStream instanceof ReadableStream);

const reader = webStream.getReader();
const chunks: Uint8Array[] = [];

while (true) {
const { done, value } = await reader.read();
if (done) break;
chunks.push(value);
}

const result = new Uint8Array(
chunks.reduce((acc, chunk) => acc + chunk.length, 0),
);
let offset = 0;
for (const chunk of chunks) {
result.set(chunk, offset);
offset += chunk.length;
}

assertEquals(decoder.decode(result), "hello world");

await fileHandle.close();
},
);

Deno.test(
"[node/fs filehandle.readableWebStream] With autoClose option",
async function () {
const fileHandle = await fs.open(testData);
const { promise: closePromise, resolve: closeResolve } = Promise
.withResolvers<void>();
fileHandle.once("close", closeResolve);

const webStream = fileHandle.readableWebStream({ autoClose: true });

assert(webStream instanceof ReadableStream);

const reader = webStream.getReader();
const chunks: Uint8Array[] = [];

while (true) {
const { done, value } = await reader.read();
if (done) break;
chunks.push(value);
}

const result = new Uint8Array(
chunks.reduce((acc, chunk) => acc + chunk.length, 0),
);
let offset = 0;
for (const chunk of chunks) {
result.set(chunk, offset);
offset += chunk.length;
}

assertEquals(decoder.decode(result), "hello world");

// Wait for autoClose to complete by waiting for the close event
await closePromise;

// File should be closed
assertEquals(fileHandle.fd, -1);
},
);

Deno.test(
"[node/fs filehandle.readableWebStream] Throws when called twice",
async function () {
const fileHandle = await fs.open(testData);

// First call should succeed
const stream = fileHandle.readableWebStream();

// Second call should throw
let threw = false;
try {
fileHandle.readableWebStream();
} catch (e) {
threw = true;
assert(e instanceof Error);
assertEquals((e as NodeJS.ErrnoException).code, "ERR_INVALID_STATE");
}

assertEquals(threw, true);

// Cancel the stream to avoid resource leak
await stream.cancel();
await fileHandle.close();
},
);

Deno.test(
"[node/fs filehandle.readableWebStream] cancel without autoClose keeps fd open",
async function () {
const fileHandle = await fs.open(testData);
const stream = fileHandle.readableWebStream({ autoClose: false });

// Cancel the stream
await stream.cancel();

// fd should still be valid (not -1)
assert(fileHandle.fd !== -1);

// FileHandle should still be usable
const stat = await fileHandle.stat();
assertEquals(stat.isFile(), true);

await fileHandle.close();
},
);

Deno.test(
"[node/fs filehandle.readableWebStream] cancel with autoClose closes fd",
async function () {
const fileHandle = await fs.open(testData);
const { promise: closePromise, resolve: closeResolve } = Promise
.withResolvers<void>();
fileHandle.once("close", closeResolve);

const stream = fileHandle.readableWebStream({ autoClose: true });

// Cancel the stream - this should trigger autoClose
await stream.cancel();

// Wait for close event
await closePromise;

// fd should be -1
assertEquals(fileHandle.fd, -1);
},
);

Deno.test(
"[node/fs filehandle.readableWebStream] close FileHandle while reader is active",
async function () {
const fileHandle = await fs.open(testData);
const stream = fileHandle.readableWebStream();
const reader = stream.getReader({ mode: "byob" });

// Close the FileHandle while reader is active
await fileHandle.close();

// fd should be -1
assertEquals(fileHandle.fd, -1);

// Reader should eventually get done or error
const buffer = new ArrayBuffer(1024);
try {
const result = await reader.read(new Uint8Array(buffer));
// If no error, should be done
assertEquals(result.done, true);
} catch {
// Error is also acceptable (FileHandle was closed)
}
},
);
Loading