Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
210 changes: 210 additions & 0 deletions src/workerd/api/streams/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -630,3 +630,213 @@ In each iteration of the loop we first perform a read followed by a write if the
not end the stream. The promise loop ends either when all of the data has been consumed or
the stream errors, whichever comes first.

# Memory Safety Patterns

The streams implementation uses several sophisticated patterns to ensure memory safety,
particularly around async operations and re-entrant JavaScript callbacks. Understanding
these patterns is critical for anyone maintaining this code.

## State Machine Architecture

Both ***Internal*** and ***Standard*** stream controllers use the `StateMachine<>` template
from `util/state-machine.h` to manage stream states. Key properties:

* **Terminal States:** `Closed` and `Errored` are terminal states - once entered, no
further transitions are allowed. This prevents double-close and double-error bugs.
* **Pending States (Standard only):** The `PendingStates<>` template parameter allows
deferring state transitions while an operation is in progress.
* **Active State:** The `ActiveState<>` marker identifies which state contains the
operational data (queues, consumers, etc.).

## Safety Patterns for Standard Streams

***Standard*** streams are more complex because user-provided JavaScript callbacks can
re-enter the stream implementation at arbitrary points.

### Deferred State Transitions

When a JS callback (like a pull algorithm) triggers a close or error during a read
operation, the state transition must be deferred until the read completes:

```cpp
// deferControllerStateChange() wraps read operations
controller.state.beginOperation(); // Increment counter
auto result = readCallback(); // May trigger JS that calls close()
controller.state.endOperation(); // Apply pending state if counter == 0
```

If `controller.close()` is called while `beginOperation()` is active, the transition is
stored as pending and applied when `endOperation()` sees the counter reach zero.

### Consumer Snapshot Pattern

When iterating over consumers (e.g., during a push to a teed stream), the consumer set
must be copied first because JS callbacks during iteration could modify the set:

```cpp
auto consumers = ready.consumers.snapshot(); // Copy the set
for (auto consumer: consumers) {
consumer->push(js, entry->clone(js)); // May trigger JS that modifies consumers
}
```

### WeakRef Pattern

For handles that user code may hold longer than the underlying object (like `ByobRequest`
or `PumpToReader`), the code uses a `WeakRef` pattern:

```cpp
KJ_IF_SOME(reader, pumpToReader->tryGet()) {
// Safe to use reader - it's still alive
reader.pumpLoop(js, ...);
} else {
// PumpToReader was destroyed, handle gracefully
}
```

### Reference Counting for Shared Entries

Queue entries shared across multiple consumers (e.g., teed streams) use `kj::Rc<Entry>`
reference counting to prevent use-after-free:

```cpp
class Entry: public kj::Refcounted {
kj::Rc<Entry> clone(jsg::Lock& js);
};
```

### Lambda Capture Safety in Pipe Loops

When capturing references in lambdas attached to promise continuations, the code must
re-check that the referenced object still exists:

```cpp
auto onSuccess = JSG_VISITABLE_LAMBDA((this, ref = addRef(), ...), ..., (...) {
auto maybePipeLock = lock.tryGetPipe();
if (maybePipeLock == kj::none) return js.resolvedPromise(); // Lock was released
auto& pipeLock = KJ_REQUIRE_NONNULL(maybePipeLock);
// Now safe to use pipeLock
});
```

**Important:** Never capture raw references (like `&source`) that may become invalid
before the lambda executes. Either re-acquire the reference inside the lambda or use
a refcounted/weak reference pattern.

### StateListener Callbacks

Consumer state listeners must not access `this` after calling methods that may destroy
the listener:

```cpp
void onConsumerClose(jsg::Lock& js) override {
KJ_IF_SOME(s, state) {
s.owner.doClose(js); // May destroy *this!
}
// DO NOT ACCESS *this AFTER THIS POINT
}
```

## Safety Patterns for Internal Streams

***Internal*** streams are simpler but still require careful handling across async
boundaries.

### Refcounted Pipe State

The `Pipe::State` structure uses `kj::Refcounted` to survive async operations:

```cpp
struct Pipe {
struct State: public kj::Refcounted {
bool aborted = false; // Set when Pipe is destroyed
// ...
};
kj::Own<State> state;

~Pipe() noexcept(false) {
state->aborted = true; // Signal continuations to stop
}
};
```

Lambda continuations capture `state = kj::addRef(*this)` and check `state->aborted`
before accessing any state that might have been destroyed.

### Generation Counter Pattern

The `writeLoop()` uses a generation counter to detect if the queue was modified during
an async operation:

```cpp
auto check = [expectedGeneration = queue.currentGeneration()]() {
KJ_ASSERT(queue.currentGeneration() == expectedGeneration);
return queue.front();
};
```

### Promise Resolution Ordering

State transitions occur BEFORE promise resolution to ensure continuations see consistent
state:

```cpp
void doClose(jsg::Lock& js) {
state.transitionTo<StreamStates::Closed>(); // State changes NOW
maybeResolvePromise(js, locked.getClosedFulfiller()); // Schedules microtask
}
// Continuations will see state == Closed
```

### V8Ref for Buffer Ownership

Write operations store a `jsg::V8Ref<v8::ArrayBuffer>` to prevent GC of the data buffer
while the async write is in progress:

```cpp
struct Write {
jsg::V8Ref<v8::ArrayBuffer> ownBytes; // Prevents GC
kj::ArrayPtr<kj::byte> bytes; // Raw pointer into ownBytes
};
```

## Cross-Request Considerations

In the Workers runtime, multiple requests can be processed concurrently by the same
isolate using green threads. When one request yields for I/O, another may run. The
`SetPromiseCrossContextResolveCallback` mechanism ensures promise reactions are deferred
to the correct request context.

Stream implementations interact with this model through:

* **`ioContext.addFunctor()`**: Binds continuations to the correct `IoContext`
* **`IoOwn<>`**: Ensures objects are accessed from the correct context
* **Promise context tagging**: All promises created during a request are tagged with
that request's `IoContext`

When a promise is resolved but its tag doesn't match the current context, the reactions
are deferred until the correct context is active.

## Guidelines for Maintenance

1. **Always use `deferControllerStateChange()`** when calling code that may invoke JS
callbacks during a read operation.

2. **Always use `snapshot()`** when iterating over consumers if the loop body may
trigger JavaScript.

3. **Never access `this`** after calling a StateListener callback that may destroy
the object.

4. **Always re-check lock state** in lambda continuations that might execute after
the lock is released.

5. **Use WeakRef** for any handle that user code may hold longer than the underlying
object.

6. **Order operations correctly**: Transition state before resolving promises, and
pop queue entries only after resolving/rejecting their associated promises.

7. **Be careful with reference captures**: Prefer capturing `this` and re-acquiring
references inside lambdas over capturing raw references that may become dangling.

Loading
Loading