Skip to content

Commit 9d7a83f

Browse files
committed
Update the streams readme with notes about memory safety
1 parent 05b299c commit 9d7a83f

File tree

1 file changed

+210
-0
lines changed

1 file changed

+210
-0
lines changed

src/workerd/api/streams/README.md

Lines changed: 210 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -630,3 +630,213 @@ In each iteration of the loop we first perform a read followed by a write if the
630630
not end the stream. The promise loop ends either when all of the data has been consumed or
631631
the stream errors, whichever comes first.
632632

633+
# Memory Safety Patterns
634+
635+
The streams implementation uses several sophisticated patterns to ensure memory safety,
636+
particularly around async operations and re-entrant JavaScript callbacks. Understanding
637+
these patterns is critical for anyone maintaining this code.
638+
639+
## State Machine Architecture
640+
641+
Both ***Internal*** and ***Standard*** stream controllers use the `StateMachine<>` template
642+
from `util/state-machine.h` to manage stream states. Key properties:
643+
644+
* **Terminal States:** `Closed` and `Errored` are terminal states - once entered, no
645+
further transitions are allowed. This prevents double-close and double-error bugs.
646+
* **Pending States (Standard only):** The `PendingStates<>` template parameter allows
647+
deferring state transitions while an operation is in progress.
648+
* **Active State:** The `ActiveState<>` marker identifies which state contains the
649+
operational data (queues, consumers, etc.).
650+
651+
## Safety Patterns for Standard Streams
652+
653+
***Standard*** streams are more complex because user-provided JavaScript callbacks can
654+
re-enter the stream implementation at arbitrary points.
655+
656+
### Deferred State Transitions
657+
658+
When a JS callback (like a pull algorithm) triggers a close or error during a read
659+
operation, the state transition must be deferred until the read completes:
660+
661+
```cpp
662+
// deferControllerStateChange() wraps read operations
663+
controller.state.beginOperation(); // Increment counter
664+
auto result = readCallback(); // May trigger JS that calls close()
665+
controller.state.endOperation(); // Apply pending state if counter == 0
666+
```
667+
668+
If `controller.close()` is called while `beginOperation()` is active, the transition is
669+
stored as pending and applied when `endOperation()` sees the counter reach zero.
670+
671+
### Consumer Snapshot Pattern
672+
673+
When iterating over consumers (e.g., during a push to a teed stream), the consumer set
674+
must be copied first because JS callbacks during iteration could modify the set:
675+
676+
```cpp
677+
auto consumers = ready.consumers.snapshot(); // Copy the set
678+
for (auto consumer: consumers) {
679+
consumer->push(js, entry->clone(js)); // May trigger JS that modifies consumers
680+
}
681+
```
682+
683+
### WeakRef Pattern
684+
685+
For handles that user code may hold longer than the underlying object (like `ByobRequest`
686+
or `PumpToReader`), the code uses a `WeakRef` pattern:
687+
688+
```cpp
689+
KJ_IF_SOME(reader, pumpToReader->tryGet()) {
690+
// Safe to use reader - it's still alive
691+
reader.pumpLoop(js, ...);
692+
} else {
693+
// PumpToReader was destroyed, handle gracefully
694+
}
695+
```
696+
697+
### Reference Counting for Shared Entries
698+
699+
Queue entries shared across multiple consumers (e.g., teed streams) use `kj::Rc<Entry>`
700+
reference counting to prevent use-after-free:
701+
702+
```cpp
703+
class Entry: public kj::Refcounted {
704+
kj::Rc<Entry> clone(jsg::Lock& js);
705+
};
706+
```
707+
708+
### Lambda Capture Safety in Pipe Loops
709+
710+
When capturing references in lambdas attached to promise continuations, the code must
711+
re-check that the referenced object still exists:
712+
713+
```cpp
714+
auto onSuccess = JSG_VISITABLE_LAMBDA((this, ref = addRef(), ...), ..., (...) {
715+
auto maybePipeLock = lock.tryGetPipe();
716+
if (maybePipeLock == kj::none) return js.resolvedPromise(); // Lock was released
717+
auto& pipeLock = KJ_REQUIRE_NONNULL(maybePipeLock);
718+
// Now safe to use pipeLock
719+
});
720+
```
721+
722+
**Important:** Never capture raw references (like `&source`) that may become invalid
723+
before the lambda executes. Either re-acquire the reference inside the lambda or use
724+
a refcounted/weak reference pattern.
725+
726+
### StateListener Callbacks
727+
728+
Consumer state listeners must not access `this` after calling methods that may destroy
729+
the listener:
730+
731+
```cpp
732+
void onConsumerClose(jsg::Lock& js) override {
733+
KJ_IF_SOME(s, state) {
734+
s.owner.doClose(js); // May destroy *this!
735+
}
736+
// DO NOT ACCESS *this AFTER THIS POINT
737+
}
738+
```
739+
740+
## Safety Patterns for Internal Streams
741+
742+
***Internal*** streams are simpler but still require careful handling across async
743+
boundaries.
744+
745+
### Refcounted Pipe State
746+
747+
The `Pipe::State` structure uses `kj::Refcounted` to survive async operations:
748+
749+
```cpp
750+
struct Pipe {
751+
struct State: public kj::Refcounted {
752+
bool aborted = false; // Set when Pipe is destroyed
753+
// ...
754+
};
755+
kj::Own<State> state;
756+
757+
~Pipe() noexcept(false) {
758+
state->aborted = true; // Signal continuations to stop
759+
}
760+
};
761+
```
762+
763+
Lambda continuations capture `state = kj::addRef(*this)` and check `state->aborted`
764+
before accessing any state that might have been destroyed.
765+
766+
### Generation Counter Pattern
767+
768+
The `writeLoop()` uses a generation counter to detect if the queue was modified during
769+
an async operation:
770+
771+
```cpp
772+
auto check = [expectedGeneration = queue.currentGeneration()]() {
773+
KJ_ASSERT(queue.currentGeneration() == expectedGeneration);
774+
return queue.front();
775+
};
776+
```
777+
778+
### Promise Resolution Ordering
779+
780+
State transitions occur BEFORE promise resolution to ensure continuations see consistent
781+
state:
782+
783+
```cpp
784+
void doClose(jsg::Lock& js) {
785+
state.transitionTo<StreamStates::Closed>(); // State changes NOW
786+
maybeResolvePromise(js, locked.getClosedFulfiller()); // Schedules microtask
787+
}
788+
// Continuations will see state == Closed
789+
```
790+
791+
### V8Ref for Buffer Ownership
792+
793+
Write operations store a `jsg::V8Ref<v8::ArrayBuffer>` to prevent GC of the data buffer
794+
while the async write is in progress:
795+
796+
```cpp
797+
struct Write {
798+
jsg::V8Ref<v8::ArrayBuffer> ownBytes; // Prevents GC
799+
kj::ArrayPtr<kj::byte> bytes; // Raw pointer into ownBytes
800+
};
801+
```
802+
803+
## Cross-Request Considerations
804+
805+
In the Workers runtime, multiple requests can be processed concurrently by the same
806+
isolate using green threads. When one request yields for I/O, another may run. The
807+
`SetPromiseCrossContextResolveCallback` mechanism ensures promise reactions are deferred
808+
to the correct request context.
809+
810+
Stream implementations interact with this model through:
811+
812+
* **`ioContext.addFunctor()`**: Binds continuations to the correct `IoContext`
813+
* **`IoOwn<>`**: Ensures objects are accessed from the correct context
814+
* **Promise context tagging**: All promises created during a request are tagged with
815+
that request's `IoContext`
816+
817+
When a promise is resolved but its tag doesn't match the current context, the reactions
818+
are deferred until the correct context is active.
819+
820+
## Guidelines for Maintenance
821+
822+
1. **Always use `deferControllerStateChange()`** when calling code that may invoke JS
823+
callbacks during a read operation.
824+
825+
2. **Always use `snapshot()`** when iterating over consumers if the loop body may
826+
trigger JavaScript.
827+
828+
3. **Never access `this`** after calling a StateListener callback that may destroy
829+
the object.
830+
831+
4. **Always re-check lock state** in lambda continuations that might execute after
832+
the lock is released.
833+
834+
5. **Use WeakRef** for any handle that user code may hold longer than the underlying
835+
object.
836+
837+
6. **Order operations correctly**: Transition state before resolving promises, and
838+
pop queue entries only after resolving/rejecting their associated promises.
839+
840+
7. **Be careful with reference captures**: Prefer capturing `this` and re-acquiring
841+
references inside lambdas over capturing raw references that may become dangling.
842+

0 commit comments

Comments
 (0)