Skip to content

Commit 1cdc1e5

Browse files
authored
Merge pull request #5296 from cloudflare/jasnell/streams-minor-tweaks
2 parents fab2143 + 97f2ddb commit 1cdc1e5

File tree

2 files changed

+76
-59
lines changed

2 files changed

+76
-59
lines changed

src/workerd/api/streams/standard.c++

Lines changed: 56 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -171,14 +171,19 @@ class WritableLockImpl {
171171
struct PipeLocked {
172172
ReadableStreamController::PipeController& source;
173173
jsg::Ref<ReadableStream> readableStreamRef;
174-
bool preventAbort;
175-
bool preventCancel;
176-
bool preventClose;
177-
bool pipeThrough;
174+
178175
kj::Maybe<jsg::Ref<AbortSignal>> maybeSignal;
179176

180177
kj::Maybe<jsg::Promise<void>> checkSignal(jsg::Lock& js, Controller& self);
181178

179+
struct Flags {
180+
uint8_t preventAbort : 1 = 0;
181+
uint8_t preventCancel : 1 = 0;
182+
uint8_t preventClose : 1 = 0;
183+
uint8_t pipeThrough : 1 = 0;
184+
};
185+
Flags flags{};
186+
182187
JSG_MEMORY_INFO(PipeLocked) {
183188
tracker.trackField("readableStreamRef", readableStreamRef);
184189
tracker.trackField("signal", maybeSignal);
@@ -424,11 +429,14 @@ bool WritableLockImpl<Controller>::pipeLock(
424429
state.template init<PipeLocked>(PipeLocked{
425430
.source = sourceLock,
426431
.readableStreamRef = kj::mv(source),
427-
.preventAbort = options.preventAbort.orDefault(false),
428-
.preventCancel = options.preventCancel.orDefault(false),
429-
.preventClose = options.preventClose.orDefault(false),
430-
.pipeThrough = options.pipeThrough,
431432
.maybeSignal = kj::mv(options.signal),
433+
.flags =
434+
{
435+
.preventAbort = options.preventAbort.orDefault(false),
436+
.preventCancel = options.preventCancel.orDefault(false),
437+
.preventClose = options.preventClose.orDefault(false),
438+
.pipeThrough = options.pipeThrough,
439+
},
432440
});
433441
return true;
434442
}
@@ -463,17 +471,17 @@ kj::Maybe<jsg::Promise<void>> WritableLockImpl<Controller>::PipeLocked::checkSig
463471
KJ_IF_SOME(signal, maybeSignal) {
464472
if (signal->getAborted(js)) {
465473
auto reason = signal->getReason(js);
466-
if (!preventCancel) {
474+
if (!flags.preventCancel) {
467475
source.release(js, v8::Local<v8::Value>(reason));
468476
} else {
469477
source.release(js);
470478
}
471-
if (!preventAbort) {
479+
if (!flags.preventAbort) {
472480
return self.abort(js, reason).then(js, JSG_VISITABLE_LAMBDA((this, reason = reason.addRef(js), ref = self.addRef()), (reason, ref), (jsg::Lock& js) {
473-
return rejectedMaybeHandledPromise<void>(js, reason.getHandle(js), pipeThrough);
481+
return rejectedMaybeHandledPromise<void>(js, reason.getHandle(js), flags.pipeThrough);
474482
}));
475483
}
476-
return rejectedMaybeHandledPromise<void>(js, reason, pipeThrough);
484+
return rejectedMaybeHandledPromise<void>(js, reason, flags.pipeThrough);
477485
}
478486
}
479487
return kj::none;
@@ -717,9 +725,9 @@ class ReadableStreamJsController final: public ReadableStreamController {
717725
// The lock state is separate because a closed or errored stream can still be locked.
718726
ReadableLockImpl lock;
719727

720-
bool disturbed = false;
721728
size_t pendingReadCount = 0;
722729
kj::Maybe<kj::OneOf<StreamStates::Closed, StreamStates::Errored>> maybePendingState = kj::none;
730+
bool disturbed = false;
723731

724732
template <typename T>
725733
jsg::Promise<T> readAll(jsg::Lock& js, uint64_t limit);
@@ -861,19 +869,19 @@ ReadableImpl<Self>::ReadableImpl(
861869

862870
template <typename Self>
863871
void ReadableImpl<Self>::start(jsg::Lock& js, jsg::Ref<Self> self) {
864-
KJ_ASSERT(!started && !starting);
865-
starting = true;
872+
KJ_ASSERT(!flags.started && !flags.starting);
873+
flags.starting = true;
866874

867875
auto onSuccess = JSG_VISITABLE_LAMBDA((this, self = self.addRef()), (self), (jsg::Lock& js) {
868-
started = true;
869-
starting = false;
876+
flags.started = true;
877+
flags.starting = false;
870878
pullIfNeeded(js, kj::mv(self));
871879
});
872880

873881
auto onFailure = JSG_VISITABLE_LAMBDA(
874882
(this, self = self.addRef()), (self), (jsg::Lock& js, jsg::Value reason) {
875-
started = true;
876-
starting = false;
883+
flags.started = true;
884+
flags.starting = false;
877885
doError(js, kj::mv(reason));
878886
});
879887

@@ -1069,24 +1077,24 @@ void ReadableImpl<Self>::pullIfNeeded(jsg::Lock& js, jsg::Ref<Self> self) {
10691077
return;
10701078
}
10711079

1072-
if (pulling) {
1073-
pullAgain = true;
1080+
if (flags.pulling) {
1081+
flags.pullAgain = true;
10741082
return;
10751083
}
1076-
KJ_ASSERT(!pullAgain);
1077-
pulling = true;
1084+
KJ_ASSERT(!flags.pullAgain);
1085+
flags.pulling = true;
10781086

10791087
auto onSuccess = JSG_VISITABLE_LAMBDA((this, self = self.addRef()), (self), (jsg::Lock& js) {
1080-
pulling = false;
1081-
if (pullAgain) {
1082-
pullAgain = false;
1088+
flags.pulling = false;
1089+
if (flags.pullAgain) {
1090+
flags.pullAgain = false;
10831091
pullIfNeeded(js, kj::mv(self));
10841092
}
10851093
});
10861094

10871095
auto onFailure = JSG_VISITABLE_LAMBDA(
10881096
(this, self = self.addRef()), (self), (jsg::Lock& js, jsg::Value reason) {
1089-
pulling = false;
1097+
flags.pulling = false;
10901098
doError(js, kj::mv(reason));
10911099
});
10921100

@@ -1170,7 +1178,7 @@ ssize_t WritableImpl<Self>::getDesiredSize() {
11701178

11711179
template <typename Self>
11721180
void WritableImpl<Self>::advanceQueueIfNeeded(jsg::Lock& js, jsg::Ref<Self> self) {
1173-
if (!started || inFlightWrite != kj::none) {
1181+
if (!flags.started || inFlightWrite != kj::none) {
11741182
return;
11751183
}
11761184
KJ_ASSERT(isWritable() || state.template is<StreamStates::Erroring>());
@@ -1247,7 +1255,7 @@ jsg::Promise<void> WritableImpl<Self>::close(jsg::Lock& js, jsg::Ref<Self> self)
12471255
auto prp = js.newPromiseAndResolver<void>();
12481256
closeRequest = kj::mv(prp.resolver);
12491257

1250-
if (backpressure && isWritable()) {
1258+
if (flags.backpressure && isWritable()) {
12511259
KJ_IF_SOME(owner, tryGetOwner()) {
12521260
owner.maybeResolveReadyPromise(js);
12531261
}
@@ -1418,8 +1426,8 @@ void WritableImpl<Self>::setup(jsg::Lock& js,
14181426
jsg::Ref<Self> self,
14191427
UnderlyingSink underlyingSink,
14201428
StreamQueuingStrategy queuingStrategy) {
1421-
KJ_ASSERT(!started && !starting);
1422-
starting = true;
1429+
KJ_ASSERT(!flags.started && !flags.starting);
1430+
flags.starting = true;
14231431

14241432
highWaterMark = queuingStrategy.highWaterMark.orDefault(1);
14251433
auto startAlgorithm = kj::mv(underlyingSink.start);
@@ -1441,8 +1449,8 @@ void WritableImpl<Self>::setup(jsg::Lock& js,
14411449
}
14421450
}
14431451

1444-
started = true;
1445-
starting = false;
1452+
flags.started = true;
1453+
flags.starting = false;
14461454
advanceQueueIfNeeded(js, kj::mv(self));
14471455
});
14481456

@@ -1455,12 +1463,12 @@ void WritableImpl<Self>::setup(jsg::Lock& js,
14551463
} else {
14561464
// Else block to avert dangling else compiler warning.
14571465
}
1458-
started = true;
1459-
starting = false;
1466+
flags.started = true;
1467+
flags.starting = false;
14601468
dealWithRejection(js, kj::mv(self), handle);
14611469
});
14621470

1463-
backpressure = getDesiredSize() < 0;
1471+
flags.backpressure = getDesiredSize() < 0;
14641472

14651473
maybeRunAlgorithm(js, startAlgorithm, kj::mv(onSuccess), kj::mv(onFailure), self.addRef());
14661474
}
@@ -1473,7 +1481,7 @@ void WritableImpl<Self>::startErroring(
14731481
owner.maybeRejectReadyPromise(js, reason);
14741482
}
14751483
state.template init<StreamStates::Erroring>(js.v8Ref(reason));
1476-
if (inFlightWrite == kj::none && inFlightClose == kj::none && started) {
1484+
if (inFlightWrite == kj::none && inFlightClose == kj::none && flags.started) {
14771485
finishErroring(js, kj::mv(self));
14781486
}
14791487
}
@@ -1492,7 +1500,8 @@ void WritableImpl<Self>::updateBackpressure(jsg::Lock& js) {
14921500
// This is fairly arbitrary and may need to be tuned further.
14931501
int warningMultiplier = highWaterMark <= 10 ? 10 : 2;
14941502

1495-
if (warnAboutExcessiveBackpressure && (amountBuffered >= warningMultiplier * highWaterMark)) {
1503+
if (flags.warnAboutExcessiveBackpressure &&
1504+
(amountBuffered >= warningMultiplier * highWaterMark)) {
14961505
excessiveBackpressureWarningCount++;
14971506
auto warning = kj::str("A WritableStream is experiencing excessive backpressure. "
14981507
"The current write buffer size is ",
@@ -1502,15 +1511,15 @@ void WritableImpl<Self>::updateBackpressure(jsg::Lock& js) {
15021511
"mark may cause excessive memory usage. ", "(Count ", excessiveBackpressureWarningCount,
15031512
")");
15041513
js.logWarning(warning);
1505-
warnAboutExcessiveBackpressure = false;
1514+
flags.warnAboutExcessiveBackpressure = false;
15061515
}
15071516

1508-
if (!bp) warnAboutExcessiveBackpressure = true;
1517+
if (!bp) flags.warnAboutExcessiveBackpressure = true;
15091518

1510-
if (bp != backpressure) {
1511-
backpressure = bp;
1519+
if (bp != flags.backpressure) {
1520+
flags.backpressure = bp;
15121521
KJ_IF_SOME(owner, tryGetOwner()) {
1513-
owner.updateBackpressure(js, backpressure);
1522+
owner.updateBackpressure(js, flags.backpressure);
15141523
}
15151524
}
15161525
}
@@ -3485,10 +3494,10 @@ jsg::Promise<void> WritableStreamJsController::pipeLoop(jsg::Lock& js) {
34853494
if (maybePipeLock == kj::none) return js.resolvedPromise();
34863495
auto& pipeLock = KJ_REQUIRE_NONNULL(maybePipeLock);
34873496

3488-
auto preventAbort = pipeLock.preventAbort;
3489-
auto preventCancel = pipeLock.preventCancel;
3490-
auto preventClose = pipeLock.preventClose;
3491-
auto pipeThrough = pipeLock.pipeThrough;
3497+
auto preventAbort = pipeLock.flags.preventAbort;
3498+
auto preventCancel = pipeLock.flags.preventCancel;
3499+
auto preventClose = pipeLock.flags.preventClose;
3500+
auto pipeThrough = pipeLock.flags.pipeThrough;
34923501
auto& source = pipeLock.source;
34933502
// At the start of each pipe step, we check to see if either the source or
34943503
// the destination has closed or errored and propagate that on to the other.

src/workerd/api/streams/standard.h

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -222,11 +222,6 @@ class ReadableImpl {
222222
kj::OneOf<StreamStates::Closed, StreamStates::Errored, Queue> state;
223223
Algorithms algorithms;
224224

225-
bool disturbed = false;
226-
bool pullAgain = false;
227-
bool pulling = false;
228-
bool started = false;
229-
bool starting = false;
230225
size_t highWaterMark = 1;
231226

232227
struct PendingCancel {
@@ -239,6 +234,14 @@ class ReadableImpl {
239234
};
240235
kj::Maybe<PendingCancel> maybePendingCancel;
241236

237+
struct Flags {
238+
uint8_t pullAgain : 1 = 0;
239+
uint8_t pulling : 1 = 0;
240+
uint8_t started : 1 = 0;
241+
uint8_t starting : 1 = 0;
242+
};
243+
Flags flags{};
244+
242245
friend Self;
243246
};
244247

@@ -368,23 +371,28 @@ class WritableImpl {
368371
kj::OneOf<StreamStates::Closed, StreamStates::Errored, StreamStates::Erroring, Writable> state =
369372
Writable();
370373
Algorithms algorithms;
371-
bool started = false;
372-
bool starting = false;
373-
bool backpressure = false;
374+
374375
size_t highWaterMark = 1;
376+
size_t amountBuffered = 0;
377+
size_t excessiveBackpressureWarningCount = 0;
375378

376379
// `writeRequests` is often going to be empty in common usage patterns, in which case std::list
377380
// is more memory efficient than a std::deque, for example.
378381
std::list<WriteRequest> writeRequests;
379-
size_t amountBuffered = 0;
380-
bool warnAboutExcessiveBackpressure = true;
381-
size_t excessiveBackpressureWarningCount = 0;
382382

383383
kj::Maybe<WriteRequest> inFlightWrite;
384384
kj::Maybe<jsg::Promise<void>::Resolver> inFlightClose;
385385
kj::Maybe<jsg::Promise<void>::Resolver> closeRequest;
386386
kj::Maybe<kj::Own<PendingAbort>> maybePendingAbort;
387387

388+
struct Flags {
389+
uint8_t started : 1 = 0;
390+
uint8_t starting : 1 = 0;
391+
uint8_t backpressure : 1 = 0;
392+
uint8_t warnAboutExcessiveBackpressure : 1 = 1;
393+
};
394+
Flags flags{};
395+
388396
friend Self;
389397
};
390398

@@ -601,7 +609,7 @@ class WritableStreamDefaultController: public jsg::Object {
601609
kj::Maybe<v8::Local<v8::Value>> isErroring(jsg::Lock& js);
602610

603611
bool isStarted() {
604-
return impl.started;
612+
return impl.flags.started;
605613
}
606614

607615
void setup(jsg::Lock& js, UnderlyingSink underlyingSink, StreamQueuingStrategy queuingStrategy);

0 commit comments

Comments
 (0)