Skip to content

Commit 423fe54

Browse files
committed
Fixup pipe state abort handling
1 parent dcd0151 commit 423fe54

File tree

3 files changed

+280
-70
lines changed

3 files changed

+280
-70
lines changed

src/workerd/api/streams/internal-test.c++

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
#include "internal.h"
66
#include "readable.h"
7+
#include "standard.h"
78
#include "writable.h"
89

910
#include <workerd/jsg/jsg-test.h>
@@ -352,5 +353,105 @@ KJ_TEST("WritableStreamInternalController observability") {
352353
KJ_ASSERT(observer.queueSizeBytes == 0);
353354
}
354355

356+
// Test for use-after-free fix in pipeLoop when abort is called during pending read.
357+
// This tests the scenario where:
358+
// 1. A JavaScript-backed ReadableStream is piped to an internal WritableStream
359+
// 2. The pipeLoop is waiting for a read from the JS stream
360+
// 3. abort() is called on the writable stream, which triggers drain()
361+
// 4. drain() destroys the Pipe object
362+
// 5. The pending read callback must not access the freed Pipe
363+
//
364+
// The fix ensures the Pipe::State is ref-counted and survives until all callbacks complete.
365+
KJ_TEST("WritableStreamInternalController pipeLoop abort during pending read") {
366+
capnp::MallocMessageBuilder message;
367+
auto flags = message.initRoot<CompatibilityFlags>();
368+
flags.setNodeJsCompat(true);
369+
flags.setWorkerdExperimental(true);
370+
flags.setStreamsJavaScriptControllers(true);
371+
// Enable the flag that causes abort to call drain() immediately
372+
flags.setInternalWritableStreamAbortClearsQueue(true);
373+
374+
TestFixture fixture({.featureFlags = flags.asReader()});
375+
376+
class MySink final: public WritableStreamSink {
377+
public:
378+
kj::Promise<void> write(kj::ArrayPtr<const byte> buffer) override {
379+
return kj::READY_NOW;
380+
}
381+
kj::Promise<void> write(kj::ArrayPtr<const kj::ArrayPtr<const byte>> pieces) override {
382+
return kj::READY_NOW;
383+
}
384+
kj::Promise<void> end() override {
385+
return kj::READY_NOW;
386+
}
387+
void abort(kj::Exception reason) override {}
388+
};
389+
390+
fixture.runInIoContext([&](const TestFixture::Environment& env) {
391+
// Create a JavaScript-backed ReadableStream.
392+
// The pull function will be called when the pipe tries to read.
393+
// We use a JS-backed stream so that pipeLoop is used (not the kj pipe path).
394+
//
395+
// We need to simulate:
396+
// 1. First read succeeds with some data
397+
// 2. Second read is pending (the promise from pull is not resolved)
398+
// 3. While pending, we abort the writable stream
399+
//
400+
// Using an UnderlyingSource with a pull callback that enqueues data once,
401+
// then on the second call returns without enqueuing (leaving the read pending).
402+
403+
int pullCount = 0;
404+
jsg::Ref<ReadableStream> source = ReadableStream::constructor(env.js,
405+
UnderlyingSource{.pull =
406+
[&pullCount](jsg::Lock& js, UnderlyingSource::Controller controller) {
407+
pullCount++;
408+
auto& c = KJ_ASSERT_NONNULL(controller.tryGet<jsg::Ref<ReadableStreamDefaultController>>());
409+
if (pullCount == 1) {
410+
// First pull: enqueue some data so the pipe loop can make progress
411+
auto data = js.bytes(kj::heapArray<kj::byte>({1, 2, 3, 4}));
412+
c->enqueue(js, data.getHandle(js));
413+
}
414+
// Second pull onwards: don't enqueue anything, leaving the read pending.
415+
// This simulates an async data source that hasn't received data yet.
416+
// The promise returned by read() will be pending.
417+
return js.resolvedPromise();
418+
}},
419+
kj::none);
420+
421+
jsg::Ref<WritableStream> sink =
422+
env.js.alloc<WritableStream>(env.context, kj::heap<MySink>(), kj::none);
423+
424+
// Start the pipe. This will:
425+
// 1. Call pull() which enqueues data
426+
// 2. pipeLoop reads the data and writes it to the sink
427+
// 3. pipeLoop calls read() again, which calls pull()
428+
// 4. pull() returns without enqueuing, so read() returns a pending promise
429+
// 5. pipeLoop's callback is now waiting for that promise
430+
auto pipeTo = source->pipeTo(env.js, sink.addRef(), PipeToOptions{});
431+
pipeTo.markAsHandled(env.js);
432+
433+
// Run microtasks to let the pipe make progress (first read/write cycle)
434+
env.js.runMicrotasks();
435+
436+
// At this point, pipeLoop should be waiting for the second read.
437+
// Now abort the writable stream. This should:
438+
// 1. Call doAbort() which calls drain()
439+
// 2. drain() destroys the Pipe (setting state->aborted = true)
440+
// 3. The pending read callback should check aborted and bail out safely
441+
442+
// Before the fix, this would cause a use-after-free when the pending callback
443+
// tried to access the freed Pipe.
444+
auto abortPromise = sink->getController().abort(env.js, env.js.v8TypeError("Test abort"_kj));
445+
abortPromise.markAsHandled(env.js);
446+
447+
// Run microtasks to process the abort and any pending callbacks
448+
env.js.runMicrotasks();
449+
450+
// If we get here without crashing, the test passes.
451+
// The fix ensures that the Pipe::State survives until all callbacks complete.
452+
KJ_ASSERT(pullCount >= 1); // Verify pull was called at least once
453+
});
454+
}
455+
355456
} // namespace
356457
} // namespace workerd::api

0 commit comments

Comments
 (0)