Skip to content

Commit 4bd3b24

Browse files
committed
Pass Spans to InputGate wait()
1 parent 377d49e commit 4bd3b24

File tree

4 files changed

+52
-49
lines changed

4 files changed

+52
-49
lines changed

src/workerd/io/io-context.h

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1161,8 +1161,9 @@ template <typename Func>
11611161
kj::PromiseForResult<Func, Worker::Lock&> IoContext::run(
11621162
Func&& func, kj::Maybe<kj::Own<InputGate::CriticalSection>> criticalSection) {
11631163
KJ_IF_SOME(cs, criticalSection) {
1164-
return cs.get()->wait().then(
1165-
[this, func = kj::fwd<Func>(func)](InputGate::Lock&& inputLock) mutable {
1164+
return cs.get()
1165+
->wait(getCurrentTraceSpan())
1166+
.then([this, func = kj::fwd<Func>(func)](InputGate::Lock&& inputLock) mutable {
11661167
return run(kj::fwd<Func>(func), kj::mv(inputLock));
11671168
});
11681169
} else {
@@ -1182,8 +1183,9 @@ kj::PromiseForResult<Func, Worker::Lock&> IoContext::run(
11821183
kj::Promise<Worker::AsyncLock> asyncLockPromise = nullptr;
11831184
KJ_IF_SOME(a, actor) {
11841185
if (inputLock == kj::none) {
1185-
return a.getInputGate().wait().then(
1186-
[this, func = kj::fwd<Func>(func)](InputGate::Lock&& inputLock) mutable {
1186+
return a.getInputGate()
1187+
.wait(getCurrentTraceSpan())
1188+
.then([this, func = kj::fwd<Func>(func)](InputGate::Lock&& inputLock) mutable {
11871189
return run(kj::fwd<Func>(func), kj::mv(inputLock));
11881190
});
11891191
}
@@ -1597,7 +1599,7 @@ jsg::PromiseForResult<Func, void, true> IoContext::blockConcurrencyWhile(
15971599
auto [result, resolver] = js.newPromiseAndResolver<T>();
15981600

15991601
addTask(
1600-
cs->wait()
1602+
cs->wait(getCurrentTraceSpan())
16011603
.then([this, callback = kj::mv(callback),
16021604
maybeAsyncContext = jsg::AsyncContextFrame::currentRef(js)](
16031605
InputGate::Lock inputLock) mutable {

src/workerd/io/io-gate-test.c++

Lines changed: 41 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,9 @@ KJ_TEST("InputGate basics") {
1515

1616
InputGate gate;
1717

18-
kj::Promise<InputGate::Lock> promise1 = gate.wait();
19-
kj::Promise<InputGate::Lock> promise2 = gate.wait();
20-
kj::Promise<InputGate::Lock> promise3 = gate.wait();
18+
kj::Promise<InputGate::Lock> promise1 = gate.wait(nullptr);
19+
kj::Promise<InputGate::Lock> promise2 = gate.wait(nullptr);
20+
kj::Promise<InputGate::Lock> promise3 = gate.wait(nullptr);
2121

2222
KJ_ASSERT(promise1.poll(ws));
2323
KJ_EXPECT(!promise2.poll(ws));
@@ -51,17 +51,17 @@ KJ_TEST("InputGate critical section") {
5151
kj::Own<InputGate::CriticalSection> cs;
5252

5353
{
54-
auto lock = gate.wait().wait(ws);
54+
auto lock = gate.wait(nullptr).wait(ws);
5555
cs = lock.startCriticalSection();
5656
}
5757

5858
{
5959
// Take the first lock.
60-
auto firstLock = cs->wait().wait(ws);
60+
auto firstLock = cs->wait(nullptr).wait(ws);
6161

6262
// Other locks are blocked.
63-
auto wait1 = cs->wait();
64-
auto wait2 = cs->wait();
63+
auto wait1 = cs->wait(nullptr);
64+
auto wait2 = cs->wait(nullptr);
6565
KJ_EXPECT(!wait1.poll(ws));
6666
KJ_EXPECT(!wait2.poll(ws));
6767

@@ -77,11 +77,11 @@ KJ_TEST("InputGate critical section") {
7777
}
7878

7979
// Can't lock the top-level gate while CriticalSection still exists.
80-
auto outerWait = gate.wait();
80+
auto outerWait = gate.wait(nullptr);
8181
KJ_EXPECT(!outerWait.poll(ws));
8282

8383
{
84-
auto lock = cs->wait().wait(ws);
84+
auto lock = cs->wait(nullptr).wait(ws);
8585
cs->succeeded();
8686
KJ_EXPECT(!outerWait.poll(ws));
8787
}
@@ -99,16 +99,16 @@ KJ_TEST("InputGate multiple critical sections start together") {
9999
kj::Own<InputGate::CriticalSection> cs2;
100100

101101
{
102-
auto lock = gate.wait().wait(ws);
102+
auto lock = gate.wait(nullptr).wait(ws);
103103
cs1 = lock.startCriticalSection();
104104
cs2 = lock.startCriticalSection();
105105
}
106106

107107
// Start cs1.
108-
cs1->wait().wait(ws);
108+
cs1->wait(nullptr).wait(ws);
109109

110110
// Can't start cs2 yet.
111-
auto cs2Wait = cs2->wait();
111+
auto cs2Wait = cs2->wait(nullptr);
112112
KJ_EXPECT(!cs2Wait.poll(ws));
113113

114114
cs1->succeeded();
@@ -126,20 +126,20 @@ KJ_TEST("InputGate nested critical sections") {
126126
kj::Own<InputGate::CriticalSection> cs2;
127127

128128
{
129-
auto lock = gate.wait().wait(ws);
129+
auto lock = gate.wait(nullptr).wait(ws);
130130
cs1 = lock.startCriticalSection();
131131
}
132132

133133
{
134-
auto lock = cs1->wait().wait(ws);
134+
auto lock = cs1->wait(nullptr).wait(ws);
135135
cs2 = lock.startCriticalSection();
136136
}
137137

138138
// Start cs2.
139-
cs2->wait().wait(ws);
139+
cs2->wait(nullptr).wait(ws);
140140

141141
// Can't start new tasks in cs1 until cs2 finishes.
142-
auto cs1Wait = cs1->wait();
142+
auto cs1Wait = cs1->wait(nullptr);
143143
KJ_EXPECT(!cs1Wait.poll(ws));
144144

145145
cs2->succeeded();
@@ -157,25 +157,25 @@ KJ_TEST("InputGate nested critical section outlives parent") {
157157
kj::Own<InputGate::CriticalSection> cs2;
158158

159159
{
160-
auto lock = gate.wait().wait(ws);
160+
auto lock = gate.wait(nullptr).wait(ws);
161161
cs1 = lock.startCriticalSection();
162162
}
163163

164164
{
165-
auto lock = cs1->wait().wait(ws);
165+
auto lock = cs1->wait(nullptr).wait(ws);
166166
cs2 = lock.startCriticalSection();
167167
}
168168

169169
// Start cs2.
170-
cs2->wait().wait(ws);
170+
cs2->wait(nullptr).wait(ws);
171171

172172
// Mark cs1 done. (Note that, in a real program, this probably can't happen like this, because a
173173
// lock would be taken on cs1 before marking it done, and that lock would wait for cs2 to
174174
// finish. But I want to make sure it works anyway.)
175175
cs1->succeeded();
176176

177177
// Can't start new tasks in at root until cs2 finishes.
178-
auto rootWait = gate.wait();
178+
auto rootWait = gate.wait(nullptr);
179179
KJ_EXPECT(!rootWait.poll(ws));
180180

181181
cs2->succeeded();
@@ -195,32 +195,32 @@ KJ_TEST("InputGate deeply nested critical sections") {
195195
kj::Own<InputGate::CriticalSection> cs4;
196196

197197
{
198-
auto lock = gate.wait().wait(ws);
198+
auto lock = gate.wait(nullptr).wait(ws);
199199
cs1 = lock.startCriticalSection();
200200
}
201201

202202
{
203-
auto lock = cs1->wait().wait(ws);
203+
auto lock = cs1->wait(nullptr).wait(ws);
204204
cs2 = lock.startCriticalSection();
205205
}
206206

207207
{
208-
auto lock = cs2->wait().wait(ws);
208+
auto lock = cs2->wait(nullptr).wait(ws);
209209
cs3 = lock.startCriticalSection();
210210
cs4 = lock.startCriticalSection();
211211
}
212212

213213
// Start cs2
214-
cs2->wait().wait(ws);
214+
cs2->wait(nullptr).wait(ws);
215215

216216
// Add some waiters to cs2, some of which are waiting to start more nested critical sections
217-
auto lock = cs2->wait().wait(ws);
218-
auto waiter1 = cs2->wait();
219-
auto waiter2 = cs2->wait();
217+
auto lock = cs2->wait(nullptr).wait(ws);
218+
auto waiter1 = cs2->wait(nullptr);
219+
auto waiter2 = cs2->wait(nullptr);
220220

221221
// Both of these wait on cs2 indirectly, as they are nested under cs2
222-
auto waiter3 = cs3->wait();
223-
auto waiter4 = cs4->wait();
222+
auto waiter3 = cs3->wait(nullptr);
223+
auto waiter4 = cs4->wait(nullptr);
224224

225225
KJ_EXPECT(!waiter1.poll(ws));
226226
KJ_EXPECT(!waiter2.poll(ws));
@@ -250,12 +250,12 @@ KJ_TEST("InputGate deeply nested critical sections") {
250250
auto lock2 = waiter3.wait(ws);
251251

252252
// Add a waiter on cs3
253-
auto waiter5 = cs3->wait();
253+
auto waiter5 = cs3->wait(nullptr);
254254
KJ_ASSERT(!waiter5.poll(ws));
255255

256256
// Can't start new tasks on the root until both cs1 and cs3 have succeeded, and all outstanding
257257
// tasks have either been dropped or completed.
258-
auto waiter6 = gate.wait();
258+
auto waiter6 = gate.wait(nullptr);
259259
KJ_ASSERT(!waiter6.poll(ws));
260260

261261
cs1->succeeded();
@@ -281,12 +281,12 @@ KJ_TEST("InputGate critical section lock outlives critical section") {
281281
kj::Own<InputGate::CriticalSection> cs;
282282

283283
{
284-
auto lock = gate.wait().wait(ws);
284+
auto lock = gate.wait(nullptr).wait(ws);
285285
cs = lock.startCriticalSection();
286286
}
287287

288288
// Start critical section.
289-
auto lock = cs->wait().wait(ws);
289+
auto lock = cs->wait(nullptr).wait(ws);
290290
KJ_ASSERT(lock.isFor(gate));
291291

292292
// Mark it done, even though a lock is still outstanding.
@@ -302,7 +302,7 @@ KJ_TEST("InputGate critical section lock outlives critical section") {
302302
lock.addRef(nullptr);
303303

304304
// The gate should still be locked
305-
auto waiter = gate.wait();
305+
auto waiter = gate.wait(nullptr);
306306
KJ_EXPECT(!waiter.poll(ws));
307307

308308
// Drop the outstanding lock
@@ -326,34 +326,34 @@ KJ_TEST("InputGate broken") {
326326
kj::Own<InputGate::CriticalSection> cs3;
327327

328328
{
329-
auto lock = gate.wait().wait(ws);
329+
auto lock = gate.wait(nullptr).wait(ws);
330330
cs1 = lock.startCriticalSection();
331331
cs3 = lock.startCriticalSection();
332332
}
333333

334334
{
335-
auto lock = cs1->wait().wait(ws);
335+
auto lock = cs1->wait(nullptr).wait(ws);
336336
cs2 = lock.startCriticalSection();
337337
}
338338

339339
// start cs2
340-
cs2->wait().wait(ws);
340+
cs2->wait(nullptr).wait(ws);
341341

342-
auto cs1Wait = cs1->wait();
342+
auto cs1Wait = cs1->wait(nullptr);
343343
KJ_EXPECT(!cs1Wait.poll(ws));
344344

345-
auto cs3Wait = cs3->wait();
345+
auto cs3Wait = cs3->wait(nullptr);
346346
KJ_EXPECT(!cs3Wait.poll(ws));
347347

348-
auto rootWait = gate.wait();
348+
auto rootWait = gate.wait(nullptr);
349349
KJ_EXPECT(!rootWait.poll(ws));
350350

351351
cs2->failed(KJ_EXCEPTION(FAILED, "foobar"));
352352

353353
KJ_EXPECT_THROW_MESSAGE("foobar", cs1Wait.wait(ws));
354354
KJ_EXPECT_THROW_MESSAGE("foobar", cs3Wait.wait(ws));
355355
KJ_EXPECT_THROW_MESSAGE("foobar", rootWait.wait(ws));
356-
KJ_EXPECT_THROW_MESSAGE("foobar", cs2->wait().wait(ws));
356+
KJ_EXPECT_THROW_MESSAGE("foobar", cs2->wait(nullptr).wait(ws));
357357
KJ_EXPECT_THROW_MESSAGE("foobar", brokenPromise.wait(ws));
358358
KJ_EXPECT_THROW_MESSAGE("foobar", gate.onBroken().wait(ws));
359359
}

src/workerd/io/io-gate.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,10 +108,11 @@ class InputGate {
108108
};
109109

110110
// Wait until there are no `Lock`s, then create a new one and return it.
111+
//
111112
// If parentSpan is provided, child spans will be created to track:
112113
// - Time spent waiting for the lock (if waiting is required)
113114
// - Time spent holding the lock
114-
kj::Promise<Lock> wait(SpanParent parentSpan = nullptr);
115+
kj::Promise<Lock> wait(SpanParent parentSpan);
115116

116117
// Rejects if and when calls to `wait()` become broken due to a failed critical section. The
117118
// actor should be shut down in this case. This promise never resolves, only rejects.
@@ -188,7 +189,7 @@ class InputGate::CriticalSection: private InputGate, public kj::Refcounted {
188189
// The first call to wait() begins the CriticalSection. After that wait completes, until the
189190
// CriticalSection is done and dropped, no other locks will be allowed on this InputGate, except
190191
// locks requested by calling wait() on this CriticalSection -- or one of its children.
191-
kj::Promise<Lock> wait(SpanParent parentSpan = nullptr);
192+
kj::Promise<Lock> wait(SpanParent parentSpan);
192193

193194
// Call when the critical section has completed successfully. If this is not called before the
194195
// CriticalSection is dropped, then failed() is called implicitly.

src/workerd/io/worker.c++

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3749,7 +3749,7 @@ void Worker::Actor::ensureConstructed(IoContext& context) {
37493749
}
37503750

37513751
kj::Promise<void> Worker::Actor::ensureConstructedImpl(IoContext& context, ActorClassInfo& info) {
3752-
InputGate::Lock inputLock = co_await impl->inputGate.wait();
3752+
InputGate::Lock inputLock = co_await impl->inputGate.wait(context.getCurrentTraceSpan());
37533753

37543754
try {
37553755
bool containerRunning = false;

0 commit comments

Comments
 (0)