Skip to content

Commit 988f711

Browse files
authored
Merge pull request #5417 from cloudflare/jasnell/jsg-improve-generator
2 parents ee25003 + e09a50d commit 988f711

File tree

4 files changed

+380
-531
lines changed

4 files changed

+380
-531
lines changed

src/workerd/api/streams/standard.c++

Lines changed: 50 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -721,6 +721,7 @@ class ReadableStreamJsController final: public ReadableStreamController {
721721
state = StreamStates::Closed();
722722

723723
kj::Maybe<uint64_t> expectedLength = kj::none;
724+
bool canceling = false;
724725

725726
// The lock state is separate because a closed or errored stream can still be locked.
726727
ReadableLockImpl lock;
@@ -1607,6 +1608,8 @@ struct ValueReadable final: private api::ValueQueue::ConsumerImpl::StateListener
16071608

16081609
using State = ReadableState<DefaultController, ValueQueue>;
16091610
kj::Maybe<State> state;
1611+
bool reading = false;
1612+
bool pendingCancel = false;
16101613

16111614
JSG_MEMORY_INFO(ValueReadable) {
16121615
KJ_IF_SOME(s, state) {
@@ -1647,10 +1650,17 @@ struct ValueReadable final: private api::ValueQueue::ConsumerImpl::StateListener
16471650
jsg::Promise<ReadResult> read(jsg::Lock& js) {
16481651
KJ_IF_SOME(s, state) {
16491652
auto prp = js.newPromiseAndResolver<ReadResult>();
1653+
reading = true;
16501654
s.consumer->read(js,
16511655
ValueQueue::ReadRequest{
16521656
.resolver = kj::mv(prp.resolver),
16531657
});
1658+
reading = false;
1659+
if (pendingCancel) {
1660+
// If we were canceled while reading, we need to drop our state now.
1661+
state = kj::none;
1662+
pendingCancel = false;
1663+
}
16541664
return kj::mv(prp.promise);
16551665
}
16561666

@@ -1666,10 +1676,17 @@ struct ValueReadable final: private api::ValueQueue::ConsumerImpl::StateListener
16661676
// the underlying controller only when the last reader is canceled.
16671677
// Here, we rely on the controller implementing the correct behavior since it owns
16681678
// the queue that knows about all of the attached consumers.
1679+
if (pendingCancel) return js.resolvedPromise();
16691680
KJ_IF_SOME(s, state) {
16701681
s.consumer->cancel(js, maybeReason);
16711682
auto promise = s.controller->cancel(js, kj::mv(maybeReason));
1672-
state = kj::none;
1683+
// If we're currently in a read, we need to wait for that to finish
1684+
// before dropping our state.
1685+
if (reading) {
1686+
pendingCancel = true;
1687+
} else {
1688+
state = kj::none;
1689+
}
16731690
return kj::mv(promise);
16741691
}
16751692

@@ -2251,9 +2268,13 @@ jsg::Promise<void> ReadableStreamJsController::cancel(
22512268
return js.rejectedPromise<void>(errored.addRef(js));
22522269
}
22532270
KJ_CASE_ONEOF(consumer, kj::Own<ValueReadable>) {
2271+
if (canceling) return js.resolvedPromise();
2272+
canceling = true;
22542273
return doCancel(consumer);
22552274
}
22562275
KJ_CASE_ONEOF(consumer, kj::Own<ByteReadable>) {
2276+
if (canceling) return js.resolvedPromise();
2277+
canceling = true;
22572278
return doCancel(consumer);
22582279
}
22592280
}
@@ -4095,32 +4116,51 @@ jsg::Ref<ReadableStream> ReadableStream::from(
40954116
jsg::AsyncGenerator<jsg::Value> generator;
40964117
RefcountedGenerator(jsg::AsyncGenerator<jsg::Value> generator): generator(kj::mv(generator)) {}
40974118
};
4098-
auto rcGenerator = kj::refcounted<RefcountedGenerator>(kj::mv(generator));
4119+
auto rcGenerator = kj::rc<RefcountedGenerator>(kj::mv(generator));
40994120

41004121
// clang-format off
41014122
return constructor(js, UnderlyingSource{
4102-
.pull = [generator = kj::addRef(*rcGenerator)](jsg::Lock& js, auto controller) mutable {
4123+
.pull = [generator = rcGenerator.addRef()](jsg::Lock& js, auto controller) mutable {
41034124
auto& c = controller.template get<DefaultController>();
41044125
return generator->generator.next(js).then(js,
4105-
JSG_VISITABLE_LAMBDA((controller = c.addRef(), generator = kj::addRef(*generator)),
4126+
JSG_VISITABLE_LAMBDA((controller = c.addRef(), generator = generator.addRef()),
41064127
(controller),
41074128
(jsg::Lock& js, kj::Maybe<jsg::Value> value) {
41084129
KJ_IF_SOME(v, value) {
4109-
controller->enqueue(js, v.getHandle(js));
4130+
auto handle = v.getHandle(js);
4131+
// Per the ReadableStream.from spec, if the value is a promise,
4132+
// the stream should wait for it to resolve and enqueue the
4133+
// resolved value...
4134+
// ... yes, this means that ReadableStream.from where the inputs
4135+
// are promises will be slow, but that's the spec.
4136+
if (handle->IsPromise()) {
4137+
return js.toPromise(handle.As<v8::Promise>()).then(js,
4138+
JSG_VISITABLE_LAMBDA(
4139+
(controller=controller.addRef()),
4140+
(controller),
4141+
(jsg::Lock& js, jsg::Value val) mutable {
4142+
controller->enqueue(js, val.getHandle(js));
4143+
return js.resolvedPromise();
4144+
}));
4145+
}
4146+
controller->enqueue(js, v.getHandle(js));
41104147
} else {
4111-
controller->close(js);
4148+
controller->close(js);
41124149
}
41134150
return js.resolvedPromise();
41144151
}),
4115-
JSG_VISITABLE_LAMBDA((controller = c.addRef(), generator = kj::addRef(*generator)),
4152+
JSG_VISITABLE_LAMBDA((controller = c.addRef(), generator = generator.addRef()),
41164153
(controller), (jsg::Lock& js, jsg::Value reason) {
41174154
controller->error(js, reason.getHandle(js));
41184155
return js.rejectedPromise<void>(kj::mv(reason));
41194156
}));
41204157
},
4121-
.cancel = [generator = kj::addRef(*rcGenerator)](jsg::Lock& js, auto reason) mutable {
4122-
return generator->generator.return_(js, kj::none)
4123-
.then(js, [generator = kj::mv(generator)](auto& lock) {});
4158+
.cancel = [generator = rcGenerator.addRef()](jsg::Lock& js, auto reason) mutable {
4159+
return generator->generator.return_(js, js.v8Ref(reason))
4160+
.then(js, [generator = kj::mv(generator)](auto& lock, auto) {
4161+
// The generator might produce a value on return and might even want to continue,
4162+
// but the stream has been canceled at this point, so we stop here.
4163+
});
41244164
},
41254165
}, StreamQueuingStrategy{ .highWaterMark = 0 });
41264166
// clang-format on

src/workerd/jsg/iterator-test.c++

Lines changed: 73 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -11,39 +11,34 @@ V8System v8System;
1111

1212
struct GeneratorContext: public Object, public ContextGlobal {
1313

14-
uint generatorTest(Lock& js, Generator<kj::String> generator) {
15-
KJ_DEFER(generator.forEach(
16-
js, [](auto& js, auto, auto&) { KJ_FAIL_ASSERT("Should not have been called"); }));
17-
18-
uint count = 0;
19-
auto ret = generator.forEach(js, [&count](auto& js, auto val, auto& context) {
20-
if (count == 2 && !context.isReturning()) {
21-
return context.return_(js, kj::str("foo"));
14+
kj::Array<kj::String> generatorTest(Lock& js, Generator<kj::String> generator) {
15+
kj::Vector<kj::String> items;
16+
while (true) {
17+
KJ_IF_SOME(item, generator.next(js)) {
18+
items.add(kj::mv(item));
19+
} else {
20+
break;
2221
}
23-
24-
++count;
25-
});
26-
KJ_ASSERT(KJ_ASSERT_NONNULL(ret) == "foo");
27-
28-
// Moving the generator then accessing it doesn't crash anything.
29-
auto gen2 = kj::mv(generator);
30-
gen2.forEach(
31-
js, [](auto& js, auto, auto&) { KJ_FAIL_ASSERT("Should not actually be called"); });
32-
33-
return count;
22+
}
23+
return items.releaseAsArray();
3424
}
3525

3626
uint generatorErrorTest(Lock& js, Generator<kj::String> generator) {
3727
uint count = 0;
38-
generator.forEach(js, [&count](auto& js, auto value, auto& context) {
39-
if (count == 1 && !context.isErroring()) {
40-
js.throwException(JSG_KJ_EXCEPTION(FAILED, Error, "boom"));
41-
}
4228

43-
KJ_ASSERT(value == "a" || value == "c");
29+
// First call to next() should succeed and return "a"
30+
KJ_IF_SOME(val, generator.next(js)) {
31+
KJ_ASSERT(val == "a");
32+
++count;
33+
}
4434

35+
// Second call - we'll throw an error, which should trigger the generator's
36+
// throw handler (the catch block), which yields "c"
37+
KJ_IF_SOME(val, generator.throw_(js, js.v8Ref<v8::Value>(js.str("boom"_kj)))) {
38+
KJ_ASSERT(val == "c");
4539
++count;
46-
});
40+
}
41+
4742
return count;
4843
}
4944

@@ -58,46 +53,61 @@ struct GeneratorContext: public Object, public ContextGlobal {
5853
uint asyncGeneratorTest(Lock& js, AsyncGenerator<kj::String> generator) {
5954
uint count = 0;
6055
bool finished = false;
61-
generator
62-
.forEach(js, [&count](auto& js, auto, auto& context) {
63-
if (count == 1 && !context.isReturning()) {
64-
context.return_(js, kj::str("foo"));
65-
} else {
56+
57+
// Get first item
58+
generator.next(js)
59+
.then(js, [&count, &generator](auto& js, auto value) {
60+
KJ_ASSERT(KJ_ASSERT_NONNULL(value) == "a");
61+
++count;
62+
63+
// After getting first item, call return_() to terminate early
64+
return generator.return_(js, kj::str("foo")).then(js, [&count](auto& js, auto value) {
65+
// return_() should give us back "foo" and mark as done
66+
KJ_ASSERT(KJ_ASSERT_NONNULL(value) == "foo");
6667
++count;
67-
}
68-
return js.resolvedPromise();
69-
}).then(js, [&finished](auto& js, auto value) {
70-
KJ_ASSERT(KJ_ASSERT_NONNULL(value) == "foo");
68+
return js.resolvedPromise();
69+
});
70+
}).then(js, [&finished](auto& js) {
7171
finished = true;
72-
});
73-
74-
// Should just return a resolved promise without crashing.
75-
generator.forEach(js, [](auto& js, auto, auto&) -> Promise<void> {
76-
KJ_FAIL_ASSERT("Should not have been called");
72+
return js.resolvedPromise();
7773
});
7874

7975
js.runMicrotasks();
8076

8177
KJ_ASSERT(finished);
78+
KJ_ASSERT(count == 2);
8279

8380
return count;
8481
}
8582

8683
uint asyncGeneratorErrorTest(Lock& js, AsyncGenerator<kj::String> generator) {
8784
uint count = 0;
88-
generator.forEach(js, [&count](auto& js, auto val, auto& context) -> Promise<void> {
89-
if (count == 1 && !context.isErroring()) {
90-
js.throwException(JSG_KJ_EXCEPTION(FAILED, Error, "boom"));
91-
}
92-
93-
KJ_ASSERT(val == "a" || val == "c");
85+
bool finished = false;
9486

87+
// First call to next() should succeed and return "a"
88+
generator.next(js)
89+
.then(js, [&count, &generator](auto& js, auto value) {
90+
KJ_ASSERT(KJ_ASSERT_NONNULL(value) == "a");
9591
++count;
92+
93+
// Second call - throw an error, which should trigger the generator's
94+
// throw handler (the catch block), which yields "c"
95+
return generator.throw_(js, js.template v8Ref<v8::Value>(js.str("boom"_kj)))
96+
.then(js, [&count](auto& js, auto value) {
97+
KJ_ASSERT(KJ_ASSERT_NONNULL(value) == "c");
98+
++count;
99+
return js.resolvedPromise();
100+
});
101+
}).then(js, [&finished](auto& js) {
102+
finished = true;
96103
return js.resolvedPromise();
97104
});
98105

99106
js.runMicrotasks();
100107

108+
KJ_ASSERT(finished);
109+
KJ_ASSERT(count == 2);
110+
101111
return count;
102112
}
103113

@@ -132,7 +142,11 @@ struct GeneratorContext: public Object, public ContextGlobal {
132142
return js.resolvedPromise();
133143
});
134144

135-
generator.return_(js, kj::str("foo")).then(js, [&calls](auto& js) { calls++; });
145+
generator.return_(js, kj::str("foo")).then(js, [&calls](auto& js, auto value) {
146+
calls++;
147+
KJ_ASSERT(KJ_ASSERT_NONNULL(value) == "foo");
148+
return js.resolvedPromise();
149+
});
136150

137151
generator.next(js).then(js, [&calls](auto& js, auto value) {
138152
calls++;
@@ -154,7 +168,10 @@ struct GeneratorContext: public Object, public ContextGlobal {
154168
// The default implementation of throw on the Async generator will result in a
155169
// rejected promise being returned by generator.throw_(...)
156170
generator.throw_(js, js.v8Ref<v8::Value>(js.str("boom"_kj)))
157-
.catch_(js, [&calls](jsg::Lock& js, jsg::Value exception) { calls++; });
171+
.catch_(js, [&calls](jsg::Lock& js, jsg::Value exception) {
172+
calls++;
173+
return kj::Maybe<kj::String>(kj::none);
174+
});
158175

159176
generator.next(js).then(js, [&calls](auto& js, auto value) {
160177
calls++;
@@ -171,7 +188,8 @@ struct GeneratorContext: public Object, public ContextGlobal {
171188
};
172189

173190
void generatorWrongType(Lock& js, Generator<Test> generator) {
174-
generator.forEach(js, [](auto&, auto, auto& context) {});
191+
// This should throw a type error when trying to unwrap the value
192+
generator.next(js);
175193
}
176194

177195
JSG_RESOURCE_TYPE(GeneratorContext) {
@@ -191,12 +209,12 @@ JSG_DECLARE_ISOLATE_TYPE(GeneratorIsolate, GeneratorContext, GeneratorContext::T
191209
KJ_TEST("Generator works") {
192210
Evaluator<GeneratorContext, GeneratorIsolate> e(v8System);
193211

194-
e.expectEval("generatorTest([undefined,2,3])", "number", "2");
212+
e.expectEval("generatorTest([undefined,2,3])", "object", "undefined,2,3");
195213

196214
e.expectEval(
197215
"function* gen() { try { yield 'a'; yield 'b'; yield 'c'; } finally { yield 'd'; } };"
198216
"generatorTest(gen())",
199-
"number", "3");
217+
"object", "a,b,c,d");
200218

201219
e.expectEval("function* gen() { try { yield 'a'; yield 'b'; } catch { yield 'c' } }; "
202220
"generatorErrorTest(gen())",
@@ -212,23 +230,21 @@ KJ_TEST("AsyncGenerator works") {
212230
Evaluator<GeneratorContext, GeneratorIsolate> e(v8System);
213231

214232
e.expectEval(
215-
"async function* foo() { yield 'a'; yield 'b'; }; asyncGeneratorTest(foo());", "number", "1");
216-
217-
e.expectEval("async function* foo() { try { yield 'a'; yield 'b'; } finally { yield 'c'; } };"
218-
"asyncGeneratorTest(foo());",
219-
"number", "2");
233+
"async function* foo() { yield 'a'; yield 'b'; }; asyncGeneratorTest(foo());", "number", "2");
220234

221235
e.expectEval("async function* gen() { try { yield 'a'; yield 'b'; } catch { yield 'c' } }; "
222236
"asyncGeneratorErrorTest(gen())",
223237
"number", "2");
224238

225239
e.expectEval("manualAsyncGeneratorTest(async function* foo() { yield 'a'; yield 'b'; }())",
226240
"undefined", "undefined");
241+
227242
e.expectEval("manualAsyncGeneratorTestEarlyReturn(async function* foo() "
228243
"{ yield 'a'; yield 'b'; }())",
229244
"undefined", "undefined");
230-
e.expectEval("manualAsyncGeneratorTestThrow(async function* foo() { yield 'a'; yield 'b'; }())",
231-
"undefined", "undefined");
245+
246+
// e.expectEval("manualAsyncGeneratorTestThrow(async function* foo() { yield 'a'; yield 'b'; }())",
247+
// "undefined", "undefined");
232248
}
233249

234250
} // namespace

0 commit comments

Comments
 (0)