Skip to content

Commit 49feab0

Browse files
anonrigjasnell
andauthored
make sure the close algorithm always run async (#5779)
Co-authored-by: James M Snell <[email protected]>
1 parent b4f8424 commit 49feab0

File tree

2 files changed

+57
-7
lines changed

2 files changed

+57
-7
lines changed

src/workerd/api/streams/standard.c++

Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -537,6 +537,51 @@ jsg::Promise<void> maybeRunAlgorithm(
537537
return js.resolvedPromise();
538538
}
539539

540+
jsg::Promise<void> maybeRunAlgorithmAsync(
541+
jsg::Lock& js, auto& maybeAlgorithm, auto&& onSuccess, auto&& onFailure, auto&&... args) {
542+
// The algorithm is a JavaScript function mapped through jsg::Function.
543+
// It is expected to return a Promise mapped via jsg::Promise. If the
544+
// function returns synchronously, the jsg::Promise wrapper ensures
545+
// that it is properly mapped to a jsg::Promise, but if the Promise
546+
// throws synchronously, we have to convert that synchronous throw
547+
// into a proper rejected jsg::Promise.
548+
KJ_IF_SOME(algorithm, maybeAlgorithm) {
549+
// We need two layers of tryCatch here, unfortunately. The inner layer
550+
// covers the algorithm implementation itself and is our typical error
551+
// handling path. It ensures that if the algorithm throws an exception,
552+
// that is properly converted in to a rejected promise that is *then*
553+
// handled by the onFailure handler that is passed in. The outer tryCatch
554+
// handles the rare and generally unexpected failure of the calls to
555+
// .then() itself, which can throw JS exceptions synchronously in certain
556+
// rare cases. For those we return a rejected promise but do not call the
557+
// onFailure case since such errors are generally indicative of a fatal
558+
// condition in the isolate (e.g. out of memory, other fatal exception, etc).
559+
return js.tryCatch([&] {
560+
KJ_IF_SOME(ioContext, IoContext::tryCurrent()) {
561+
return js
562+
.tryCatch([&] { return algorithm(js, kj::fwd<decltype(args)>(args)...); },
563+
[&](jsg::Value&& exception) { return js.rejectedPromise<void>(kj::mv(exception)); })
564+
.then(js, ioContext.addFunctor(kj::mv(onSuccess)),
565+
ioContext.addFunctor(kj::mv(onFailure)));
566+
} else {
567+
return js
568+
.tryCatch([&] { return algorithm(js, kj::fwd<decltype(args)>(args)...); },
569+
[&](jsg::Value&& exception) {
570+
return js.rejectedPromise<void>(kj::mv(exception));
571+
}).then(js, kj::mv(onSuccess), kj::mv(onFailure));
572+
}
573+
}, [&](jsg::Value&& exception) { return js.rejectedPromise<void>(kj::mv(exception)); });
574+
}
575+
576+
// If the algorithm does not exist, we handle it as a success but ensure
577+
// it runs asynchronously by scheduling via a resolved promise.
578+
KJ_IF_SOME(ioContext, IoContext::tryCurrent()) {
579+
return js.resolvedPromise().then(js, ioContext.addFunctor(kj::mv(onSuccess)));
580+
} else {
581+
return js.resolvedPromise().then(js, kj::mv(onSuccess));
582+
}
583+
}
584+
540585
int getHighWaterMark(
541586
const UnderlyingSource& underlyingSource, const StreamQueuingStrategy& queuingStrategy) {
542587
bool isBytes = underlyingSource.type.map([](auto& s) { return s == "bytes"; }).orDefault(false);
@@ -1218,7 +1263,17 @@ void WritableImpl<Self>::advanceQueueIfNeeded(jsg::Lock& js, jsg::Ref<Self> self
12181263
finishInFlightClose(js, kj::mv(self), reason.getHandle(js));
12191264
});
12201265

1221-
maybeRunAlgorithm(js, algorithms.close, kj::mv(onSuccess), kj::mv(onFailure));
1266+
// Per the spec, the close algorithm should always run asynchronously, even if
1267+
// there's no user-provided close handler. This ensures that releaseLock() can
1268+
// reject the closed promise before the close completes.
1269+
// The original maybeRunAlgorithm would call the onSuccess continuation
1270+
// synchronously if algorithms.close is not specified. maybeRunAlgorithmAsync
1271+
// always defers to a microtask.
1272+
if (FeatureFlags::get(js).getPedanticWpt()) {
1273+
maybeRunAlgorithmAsync(js, algorithms.close, kj::mv(onSuccess), kj::mv(onFailure));
1274+
} else {
1275+
maybeRunAlgorithm(js, algorithms.close, kj::mv(onSuccess), kj::mv(onFailure));
1276+
}
12221277
}
12231278
return;
12241279
}

src/wpt/streams-test.ts

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -728,12 +728,7 @@ export default {
728728
],
729729
},
730730
'writable-streams/byte-length-queuing-strategy.any.js': {},
731-
'writable-streams/close.any.js': {
732-
comment: 'To be investigated',
733-
expectedFailures: [
734-
'releaseLock() should not change the result of sync close()',
735-
],
736-
},
731+
'writable-streams/close.any.js': {},
737732
'writable-streams/constructor.any.js': {
738733
comment: 'These are mostly about validation of params',
739734
expectedFailures:

0 commit comments

Comments
 (0)