Skip to content

Commit 2ec31fe

Browse files
committed
Update the WritableStreamSink and adapter to support output locks
1 parent 4182bb0 commit 2ec31fe

File tree

2 files changed

+26
-7
lines changed

2 files changed

+26
-7
lines changed

src/workerd/api/streams/writable-sink-adapter.c++

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,14 @@ struct WritableStreamSinkJsAdapter::Active final {
1919
struct Task {
2020
kj::Function<kj::Promise<void>()> task;
2121
kj::Own<kj::PromiseFulfiller<void>> fulfiller;
22-
Task(kj::Function<kj::Promise<void>()> task, kj::Own<kj::PromiseFulfiller<void>> fulfiller)
22+
kj::Maybe<kj::Promise<void>> maybeOutputLock;
23+
24+
Task(kj::Function<kj::Promise<void>()> task,
25+
kj::Own<kj::PromiseFulfiller<void>> fulfiller,
26+
kj::Maybe<kj::Promise<void>> maybeOutputLock = kj::none)
2327
: task(kj::mv(task)),
24-
fulfiller(kj::mv(fulfiller)) {}
28+
fulfiller(kj::mv(fulfiller)),
29+
maybeOutputLock(kj::mv(maybeOutputLock)) {}
2530
KJ_DISALLOW_COPY_AND_MOVE(Task);
2631
};
2732
using TaskQueue = workerd::util::Queue<kj::Own<Task>>;
@@ -80,9 +85,11 @@ struct WritableStreamSinkJsAdapter::Active final {
8085
kj::Promise<void> enqueue(kj::Function<kj::Promise<void>()> task) {
8186
KJ_DASSERT(!aborted, "cannot enqueue tasks on an aborted queue");
8287
auto paf = kj::newPromiseAndFulfiller<void>();
83-
queue.push(kj::heap<Task>(kj::mv(task), kj::mv(paf.fulfiller)));
88+
auto& ioContext = IoContext::current();
89+
queue.push(kj::heap<Task>(
90+
kj::mv(task), kj::mv(paf.fulfiller), ioContext.waitForOutputLocksIfNecessary()));
8491
if (!running) {
85-
IoContext::current().addTask(canceler.wrap(run()));
92+
ioContext.addTask(canceler.wrap(run()));
8693
}
8794
return kj::mv(paf.promise);
8895
}
@@ -103,6 +110,9 @@ struct WritableStreamSinkJsAdapter::Active final {
103110
});
104111
bool taskFailed = false;
105112
try {
113+
KJ_IF_SOME(lock, task->maybeOutputLock) {
114+
co_await lock;
115+
}
106116
co_await task->task();
107117
task->fulfiller->fulfill();
108118
} catch (...) {

src/workerd/api/streams/writable-sink.c++

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -228,17 +228,26 @@ class IoContextWritableStreamSinkWrapper: public WritableStreamSinkWrapper {
228228

229229
kj::Promise<void> write(kj::ArrayPtr<const byte> buffer) override {
230230
auto pending = ioContext.registerPendingEvent();
231-
return getInner().write(buffer);
231+
KJ_IF_SOME(p, ioContext.waitForOutputLocksIfNecessary()) {
232+
co_await kj::mv(p);
233+
}
234+
co_await getInner().write(buffer);
232235
}
233236

234237
kj::Promise<void> write(kj::ArrayPtr<const kj::ArrayPtr<const byte>> pieces) override {
235238
auto pending = ioContext.registerPendingEvent();
236-
return getInner().write(pieces);
239+
KJ_IF_SOME(p, ioContext.waitForOutputLocksIfNecessary()) {
240+
co_await kj::mv(p);
241+
}
242+
co_await getInner().write(pieces);
237243
}
238244

239245
kj::Promise<void> end() override {
240246
auto pending = ioContext.registerPendingEvent();
241-
return getInner().end();
247+
KJ_IF_SOME(p, ioContext.waitForOutputLocksIfNecessary()) {
248+
co_await kj::mv(p);
249+
}
250+
co_await getInner().end();
242251
}
243252

244253
private:

0 commit comments

Comments
 (0)