Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions lib/scope-internal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ export function createScopeInternal(
parent?: Scope,
): [ScopeInternal, () => Operation<void>] {
let destructors = new Set<() => Operation<void>>();
let finalizer: (() => Operation<void>) | undefined = undefined;

let contexts: Record<string, unknown> = Object.create(
parent ? (parent as ScopeInternal).contexts : null,
Expand Down Expand Up @@ -55,6 +56,12 @@ export function createScopeInternal(
destructors.add(op);
return () => destructors.delete(op);
},

// Set a finalizer that runs BEFORE regular destructors.
// Used by tasks to close their delimiter before child tasks are destroyed.
setFinalizer(op: () => Operation<void>): void {
finalizer = op;
},
});

scope.set(Priority, scope.expect(Priority) + 1);
Expand All @@ -74,6 +81,17 @@ export function createScopeInternal(
unbind();
let outcome = Ok();
try {
// Run the finalizer first (e.g., close the task's delimiter).
// This ensures the task's finally block runs while children are still alive.
if (finalizer) {
try {
yield* finalizer();
} catch (error) {
outcome = Err(error as Error);
}
finalizer = undefined;
}
// Then run regular destructors in reverse order (children first)
for (let destructor of [...destructors].reverse()) {
try {
destructors.delete(destructor);
Expand All @@ -99,4 +117,5 @@ export function createScopeInternal(
export interface ScopeInternal extends Scope {
contexts: Record<string, unknown>;
ensure(op: () => Operation<void>): () => void;
setFinalizer(op: () => Operation<void>): void;
}
4 changes: 3 additions & 1 deletion lib/task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,9 @@ export function createTask<T>(options: TaskOptions<T>): NewTask<T> {
let boundary = owner.expect(ErrorContext);
scope.set(ErrorContext, top);

scope.ensure(function* () {
// Use setFinalizer to ensure the delimiter closes BEFORE child destructors run.
// This allows the task's finally block to communicate with spawned children.
scope.setFinalizer(function* () {
try {
yield* top.close();
} finally {
Expand Down
107 changes: 107 additions & 0 deletions test/cleanup-regression.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
import { describe, expect, it } from "./suite.ts";
import { resource, run, spawn, suspend, withResolvers } from "../mod.ts";

// Helper to add timeout to async operations
function withTimeout<T>(promise: Promise<T>, ms: number): Promise<T> {
let timeoutId: number;
return Promise.race([
promise.finally(() => clearTimeout(timeoutId)),
new Promise<T>((_, reject) => {
timeoutId = setTimeout(
() => reject(new Error(`Timeout after ${ms}ms - cleanup deadlocked`)),
ms,
);
}),
]);
}

describe("cleanup regression (4.0.1)", () => {
// This pattern mirrors @effectionx/websocket's useWebSocket:
// 1. Resource with a spawned consumer task
// 2. Finally block signals the consumer to close (like socket.close())
// 3. Finally block waits for consumer to confirm it processed the close
//
// This worked in 4.0.0 but deadlocks in 4.0.1 due to changes in
// task finalization order (PRs #1081 and #1085)

it("finally block can signal spawned task and wait for response (sleep)", async () => {
let cleanupCompleted = false;
let consumerSawClose = false;

let task = run(function* () {
yield* resource(function* (provide) {
let { resolve: closeStream, operation: streamClosed } = withResolvers<
void
>();
let { resolve: confirmClosed, operation: closed } = withResolvers<
void
>();

// Spawned consumer task - like the message consumer in useWebSocket
yield* spawn(function* () {
// Simulate draining a stream until it closes
yield* streamClosed;
consumerSawClose = true;
confirmClosed();
});

try {
yield* provide(undefined);
} finally {
// Close the stream (like socket.close() in useWebSocket)
closeStream();
// Wait for consumer to confirm it saw the close
yield* closed;
cleanupCompleted = true;
}
});

yield* suspend();
});

// Timing gap before halt - this is important to reproduce the bug
await new Promise((r) => setTimeout(r, 50));
await withTimeout(task.halt(), 1000);

expect(consumerSawClose).toBe(true);
expect(cleanupCompleted).toBe(true);
});

it("finally block can signal spawned task and wait for response (no sleep)", async () => {
let cleanupCompleted = false;
let consumerSawClose = false;

let task = run(function* () {
yield* resource(function* (provide) {
let { resolve: closeStream, operation: streamClosed } = withResolvers<
void
>();
let { resolve: confirmClosed, operation: closed } = withResolvers<
void
>();

yield* spawn(function* () {
yield* streamClosed;
consumerSawClose = true;
confirmClosed();
});

try {
yield* provide(undefined);
} finally {
closeStream();
yield* closed;
cleanupCompleted = true;
}
});

yield* suspend();
});

// No timing gap - halt immediately
await withTimeout(task.halt(), 1000);

expect(consumerSawClose).toBe(true);
expect(cleanupCompleted).toBe(true);
});
});
Loading