Skip to content

Commit f763d9f

Browse files
committed
Updating queue to use state machine impl
1 parent 423fe54 commit f763d9f

File tree

3 files changed

+232
-175
lines changed

3 files changed

+232
-175
lines changed

src/workerd/api/streams/queue-test.c++

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1199,5 +1199,95 @@ KJ_TEST("ByteQueue with multiple default consumers with atLeast (different rate)
11991199

12001200
#pragma endregion ByteQueue Tests
12011201

1202+
#pragma region Re-entrancy Safety Tests
1203+
1204+
// Test that pushing to a closed consumer doesn't crash.
1205+
// This can happen during QueueImpl::push() iteration when resolving a read
1206+
// on one consumer triggers JavaScript that closes another consumer.
1207+
KJ_TEST("ValueQueue push to closed consumer is safe") {
1208+
preamble([](jsg::Lock& js) {
1209+
ValueQueue queue(2);
1210+
ValueQueue::Consumer consumer1(queue);
1211+
ValueQueue::Consumer consumer2(queue);
1212+
1213+
// Close consumer2
1214+
consumer2.close(js);
1215+
1216+
// Now push to the queue - this pushes to all consumers
1217+
// Before the fix, this would crash when trying to push to closed consumer2
1218+
queue.push(js, getEntry(js, 4));
1219+
1220+
// consumer1 should have received the data
1221+
KJ_ASSERT(consumer1.size() == 4);
1222+
1223+
js.runMicrotasks();
1224+
});
1225+
}
1226+
1227+
// Test that pushing to a cancelled consumer doesn't crash.
1228+
KJ_TEST("ValueQueue push to cancelled consumer is safe") {
1229+
preamble([](jsg::Lock& js) {
1230+
ValueQueue queue(2);
1231+
ValueQueue::Consumer consumer1(queue);
1232+
ValueQueue::Consumer consumer2(queue);
1233+
1234+
// Cancel consumer2
1235+
consumer2.cancel(js, kj::none);
1236+
1237+
// Now push to the queue
1238+
queue.push(js, getEntry(js, 4));
1239+
1240+
// consumer1 should have received the data
1241+
KJ_ASSERT(consumer1.size() == 4);
1242+
1243+
js.runMicrotasks();
1244+
});
1245+
}
1246+
1247+
// Test that pushing to an errored consumer doesn't crash.
1248+
KJ_TEST("ValueQueue push to errored consumer is safe") {
1249+
preamble([](jsg::Lock& js) {
1250+
ValueQueue queue(2);
1251+
ValueQueue::Consumer consumer1(queue);
1252+
ValueQueue::Consumer consumer2(queue);
1253+
1254+
// Error consumer2
1255+
consumer2.error(js, js.v8Ref(js.v8Error("error reason"_kj)));
1256+
1257+
// Now push to the queue
1258+
queue.push(js, getEntry(js, 4));
1259+
1260+
// consumer1 should have received the data
1261+
KJ_ASSERT(consumer1.size() == 4);
1262+
1263+
js.runMicrotasks();
1264+
});
1265+
}
1266+
1267+
// Test ByteQueue version of the safety checks
1268+
KJ_TEST("ByteQueue push to closed consumer is safe") {
1269+
preamble([](jsg::Lock& js) {
1270+
ByteQueue queue(10);
1271+
ByteQueue::Consumer consumer1(queue);
1272+
ByteQueue::Consumer consumer2(queue);
1273+
1274+
// Close consumer2
1275+
consumer2.close(js);
1276+
1277+
// Now push to the queue
1278+
auto store = jsg::BackingStore::alloc(js, 4);
1279+
memset(store.asArrayPtr().begin(), 'A', 4);
1280+
auto entry = kj::rc<ByteQueue::Entry>(jsg::BufferSource(js, kj::mv(store)));
1281+
queue.push(js, kj::mv(entry));
1282+
1283+
// consumer1 should have received the data
1284+
KJ_ASSERT(consumer1.size() == 4);
1285+
1286+
js.runMicrotasks();
1287+
});
1288+
}
1289+
1290+
#pragma endregion Re - entrancy Safety Tests
1291+
12021292
} // namespace
12031293
} // namespace workerd::api

src/workerd/api/streams/queue.c++

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -538,7 +538,7 @@ v8::Local<v8::Uint8Array> ByteQueue::ByobRequest::getView(jsg::Lock& js) {
538538
ByteQueue::ByteQueue(size_t highWaterMark): impl(highWaterMark) {}
539539

540540
void ByteQueue::close(jsg::Lock& js) {
541-
KJ_IF_SOME(ready, impl.state.tryGet<ByteQueue::QueueImpl::Ready>()) {
541+
KJ_IF_SOME(ready, impl.state.tryGetUnsafe<ByteQueue::QueueImpl::Ready>()) {
542542
while (!ready.pendingByobReadRequests.empty()) {
543543
ready.pendingByobReadRequests.front()->invalidate();
544544
ready.pendingByobReadRequests.pop_front();

0 commit comments

Comments
 (0)