Skip to content

Commit 955e22f

Browse files
authored
Merge pull request #5337 from cloudflare/jasnell/optimize-readablestream-tee-handling
2 parents 71a0056 + 8529515 commit 955e22f

File tree

5 files changed

+584
-6
lines changed

5 files changed

+584
-6
lines changed

src/workerd/api/streams/queue.h

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,9 @@
88

99
#include <workerd/jsg/jsg.h>
1010
#include <workerd/util/ring-buffer.h>
11+
#include <workerd/util/small-set.h>
1112

1213
#include <list>
13-
#include <set>
1414

1515
namespace workerd::api {
1616

@@ -164,7 +164,7 @@ class QueueImpl final {
164164
KJ_IF_SOME(ready, state.template tryGet<Ready>()) {
165165
// We copy the list of consumers in case the consumers remove themselves
166166
// from the queue during the close callback, invalidating the iterator.
167-
auto consumers = ready.consumers;
167+
auto consumers = ready.consumers.snapshot();
168168
for (auto consumer: consumers) {
169169
consumer->close(js);
170170
}
@@ -188,7 +188,7 @@ class QueueImpl final {
188188
KJ_IF_SOME(ready, state.template tryGet<Ready>()) {
189189
// We copy the list of consumers in case the consumers remove themselves
190190
// from the queue during the error callback, invalidating the iterator.
191-
auto consumers = ready.consumers;
191+
auto consumers = ready.consumers.snapshot();
192192
for (auto consumer: consumers) {
193193
consumer->error(js, reason.addRef(js));
194194
}
@@ -284,7 +284,10 @@ class QueueImpl final {
284284
using Errored = jsg::Value;
285285

286286
struct Ready final: public State {
287-
std::set<ConsumerImpl*> consumers;
287+
// The set of consumers attached to this queue. In the typical case this
288+
// will be a very small number (often just one or two), so we use SmallSet to
289+
// optimize for that.
290+
SmallSet<ConsumerImpl*> consumers;
288291
};
289292

290293
size_t highWaterMark;
@@ -293,13 +296,13 @@ class QueueImpl final {
293296

294297
void addConsumer(ConsumerImpl* consumer) {
295298
KJ_IF_SOME(ready, state.template tryGet<Ready>()) {
296-
ready.consumers.insert(consumer);
299+
ready.consumers.add(consumer);
297300
}
298301
}
299302

300303
void removeConsumer(ConsumerImpl* consumer) {
301304
KJ_IF_SOME(ready, state.template tryGet<Ready>()) {
302-
ready.consumers.erase(consumer);
305+
ready.consumers.remove(consumer);
303306
maybeUpdateBackpressure();
304307
}
305308
}

src/workerd/io/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@ wd_cc_library(
122122
"//src/workerd/util:completion-membrane",
123123
"//src/workerd/util:exception",
124124
"//src/workerd/util:ring-buffer",
125+
"//src/workerd/util:small-set",
125126
"//src/workerd/util:sqlite",
126127
"//src/workerd/util:strong-bool",
127128
"//src/workerd/util:thread-scopes",

src/workerd/util/BUILD.bazel

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,15 @@ wd_cc_library(
8282
deps = ["@capnp-cpp//src/kj"],
8383
)
8484

85+
wd_cc_library(
86+
name = "small-set",
87+
hdrs = ["small-set.h"],
88+
visibility = ["//visibility:public"],
89+
deps = [
90+
"@capnp-cpp//src/kj",
91+
],
92+
)
93+
8594
wd_cc_library(
8695
name = "mimetype",
8796
srcs = ["mimetype.c++"],
@@ -345,3 +354,8 @@ kj_test(
345354
src = "ring-buffer-test.c++",
346355
deps = [":ring-buffer"],
347356
)
357+
358+
kj_test(
359+
src = "small-set-test.c++",
360+
deps = [":small-set"],
361+
)
Lines changed: 258 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,258 @@
1+
// Copyright (c) 2017-2022 Cloudflare, Inc.
2+
// Licensed under the Apache 2.0 license found in the LICENSE file or at:
3+
// https://opensource.org/licenses/Apache-2.0
4+
5+
#include "small-set.h"
6+
7+
#include <kj/test.h>
8+
9+
namespace workerd {
10+
namespace {
11+
12+
KJ_TEST("SmallSet: empty set") {
13+
SmallSet<int*> set;
14+
KJ_EXPECT(set.empty());
15+
KJ_EXPECT(set.size() == 0);
16+
17+
int dummy;
18+
KJ_EXPECT(!set.contains(&dummy));
19+
}
20+
21+
KJ_TEST("SmallSet: add and remove single item") {
22+
SmallSet<int*> set;
23+
int a = 1;
24+
25+
KJ_EXPECT(set.add(&a));
26+
KJ_EXPECT(!set.empty());
27+
KJ_EXPECT(set.size() == 1);
28+
KJ_EXPECT(set.contains(&a));
29+
30+
// Adding the same item should return false
31+
KJ_EXPECT(!set.add(&a));
32+
KJ_EXPECT(set.size() == 1);
33+
34+
KJ_EXPECT(set.remove(&a));
35+
KJ_EXPECT(set.empty());
36+
KJ_EXPECT(set.size() == 0);
37+
KJ_EXPECT(!set.contains(&a));
38+
39+
// Removing again should return false
40+
KJ_EXPECT(!set.remove(&a));
41+
}
42+
43+
KJ_TEST("SmallSet: add and remove two items") {
44+
SmallSet<int*> set;
45+
int a = 1, b = 2;
46+
47+
KJ_EXPECT(set.add(&a));
48+
KJ_EXPECT(set.add(&b));
49+
KJ_EXPECT(set.size() == 2);
50+
KJ_EXPECT(set.contains(&a));
51+
KJ_EXPECT(set.contains(&b));
52+
53+
// Adding duplicates should return false
54+
KJ_EXPECT(!set.add(&a));
55+
KJ_EXPECT(!set.add(&b));
56+
KJ_EXPECT(set.size() == 2);
57+
58+
KJ_EXPECT(set.remove(&a));
59+
KJ_EXPECT(set.size() == 1);
60+
KJ_EXPECT(!set.contains(&a));
61+
KJ_EXPECT(set.contains(&b));
62+
63+
KJ_EXPECT(set.remove(&b));
64+
KJ_EXPECT(set.empty());
65+
}
66+
67+
KJ_TEST("SmallSet: add and remove multiple items") {
68+
SmallSet<int*> set;
69+
int a = 1, b = 2, c = 3, d = 4;
70+
71+
KJ_EXPECT(set.add(&a));
72+
KJ_EXPECT(set.add(&b));
73+
KJ_EXPECT(set.add(&c));
74+
KJ_EXPECT(set.add(&d));
75+
KJ_EXPECT(set.size() == 4);
76+
77+
KJ_EXPECT(set.contains(&a));
78+
KJ_EXPECT(set.contains(&b));
79+
KJ_EXPECT(set.contains(&c));
80+
KJ_EXPECT(set.contains(&d));
81+
82+
KJ_EXPECT(set.remove(&b));
83+
KJ_EXPECT(set.size() == 3);
84+
KJ_EXPECT(!set.contains(&b));
85+
86+
KJ_EXPECT(set.remove(&c));
87+
KJ_EXPECT(set.size() == 2);
88+
KJ_EXPECT(set.contains(&a));
89+
KJ_EXPECT(set.contains(&d));
90+
91+
KJ_EXPECT(set.remove(&a));
92+
KJ_EXPECT(set.size() == 1);
93+
KJ_EXPECT(set.contains(&d));
94+
95+
KJ_EXPECT(set.remove(&d));
96+
KJ_EXPECT(set.empty());
97+
}
98+
99+
KJ_TEST("SmallSet: state transitions") {
100+
SmallSet<int*> set;
101+
int a = 1, b = 2, c = 3, d = 4;
102+
103+
// None -> Single
104+
KJ_EXPECT(set.add(&a));
105+
KJ_EXPECT(set.size() == 1);
106+
107+
// Single -> Double
108+
KJ_EXPECT(set.add(&b));
109+
KJ_EXPECT(set.size() == 2);
110+
111+
// Double -> Multiple
112+
KJ_EXPECT(set.add(&c));
113+
KJ_EXPECT(set.size() == 3);
114+
115+
// Multiple stays Multiple
116+
KJ_EXPECT(set.add(&d));
117+
KJ_EXPECT(set.size() == 4);
118+
119+
// Multiple -> Multiple (one less)
120+
KJ_EXPECT(set.remove(&d));
121+
KJ_EXPECT(set.size() == 3);
122+
123+
// Multiple -> Double
124+
KJ_EXPECT(set.remove(&c));
125+
KJ_EXPECT(set.size() == 2);
126+
127+
// Double -> Single
128+
KJ_EXPECT(set.remove(&b));
129+
KJ_EXPECT(set.size() == 1);
130+
131+
// Single -> None
132+
KJ_EXPECT(set.remove(&a));
133+
KJ_EXPECT(set.size() == 0);
134+
}
135+
136+
KJ_TEST("SmallSet: iteration") {
137+
SmallSet<int*> set;
138+
int a = 1, b = 2, c = 3;
139+
140+
// Empty iteration
141+
int count = 0;
142+
for (auto item: set) {
143+
(void)item;
144+
count++;
145+
}
146+
KJ_EXPECT(count == 0);
147+
148+
// Single item
149+
set.add(&a);
150+
count = 0;
151+
for (auto item: set) {
152+
KJ_EXPECT(item == &a);
153+
count++;
154+
}
155+
KJ_EXPECT(count == 1);
156+
157+
// Two items
158+
set.add(&b);
159+
count = 0;
160+
kj::Vector<int*> found;
161+
for (auto item: set) {
162+
found.add(item);
163+
count++;
164+
}
165+
KJ_EXPECT(count == 2);
166+
KJ_EXPECT(found.size() == 2);
167+
168+
// Multiple items
169+
set.add(&c);
170+
count = 0;
171+
found.clear();
172+
for (auto item: set) {
173+
found.add(item);
174+
count++;
175+
}
176+
KJ_EXPECT(count == 3);
177+
KJ_EXPECT(found.size() == 3);
178+
}
179+
180+
KJ_TEST("SmallSet: clear") {
181+
SmallSet<int*> set;
182+
int a = 1, b = 2, c = 3;
183+
184+
set.add(&a);
185+
set.add(&b);
186+
set.add(&c);
187+
KJ_EXPECT(set.size() == 3);
188+
189+
set.clear();
190+
KJ_EXPECT(set.empty());
191+
KJ_EXPECT(set.size() == 0);
192+
KJ_EXPECT(!set.contains(&a));
193+
KJ_EXPECT(!set.contains(&b));
194+
KJ_EXPECT(!set.contains(&c));
195+
}
196+
197+
KJ_TEST("SmallSet: snapshot for safe iteration during self-removal") {
198+
// This simulates the queue.h use case where consumers remove themselves
199+
// during close/error callbacks
200+
201+
struct RemovableThing {
202+
SmallSet<RemovableThing*>* owner;
203+
int value;
204+
205+
void removeSelf() {
206+
owner->remove(this);
207+
}
208+
};
209+
210+
SmallSet<RemovableThing*> set;
211+
RemovableThing a{&set, 1}, b{&set, 2}, c{&set, 3};
212+
213+
set.add(&a);
214+
set.add(&b);
215+
set.add(&c);
216+
KJ_EXPECT(set.size() == 3);
217+
218+
// Without snapshot, this would cause iterator invalidation
219+
// With snapshot, it's safe
220+
auto snapshot = set.snapshot();
221+
for (auto* item: snapshot) {
222+
item->removeSelf();
223+
}
224+
225+
KJ_EXPECT(set.empty());
226+
KJ_EXPECT(set.size() == 0);
227+
}
228+
229+
KJ_TEST("SmallSet: snapshot from single state") {
230+
SmallSet<int*> set;
231+
int a = 1;
232+
233+
set.add(&a);
234+
auto snapshot = set.snapshot();
235+
KJ_EXPECT(snapshot.size() == 1);
236+
KJ_EXPECT(snapshot[0] == &a);
237+
}
238+
239+
KJ_TEST("SmallSet: snapshot from double state") {
240+
SmallSet<int*> set;
241+
int a = 1, b = 2;
242+
243+
set.add(&a);
244+
set.add(&b);
245+
auto snapshot = set.snapshot();
246+
KJ_EXPECT(snapshot.size() == 2);
247+
// Order doesn't matter for set semantics
248+
KJ_EXPECT((snapshot[0] == &a && snapshot[1] == &b) || (snapshot[0] == &b && snapshot[1] == &a));
249+
}
250+
251+
KJ_TEST("SmallSet: snapshot from empty state") {
252+
SmallSet<int*> set;
253+
auto snapshot = set.snapshot();
254+
KJ_EXPECT(snapshot.size() == 0);
255+
}
256+
257+
} // namespace
258+
} // namespace workerd

0 commit comments

Comments
 (0)