Skip to content

Commit 4c7a602

Browse files
committed
feat(kkrpc): add AsyncIterable streaming support
Server methods returning AsyncIterable are automatically detected and streamed chunk-by-chunk to the client, where they're consumed via for await...of. Supports consumer cancellation (break sends stream-cancel), error propagation, concurrent streams, per-chunk output validation, and interceptor integration. Adds streaming-demo example, docs page, 9 integration tests. Removes PLANNING.md — all roadmap items now complete.
1 parent 4cbc381 commit 4c7a602

File tree

13 files changed

+1269
-592
lines changed

13 files changed

+1269
-592
lines changed

.journal/2026-02-07.md

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,3 +81,50 @@ Also fixed a pre-existing inconsistency: `ioredis`, `kafkajs`, `socket.io`, and
8181
- **`pnpm-lock.yaml`** — will need `pnpm install` to reconcile the lockfile after this change
8282
- **CI** — adapter tests that import these packages will need them installed explicitly in the test environment (likely already the case via devDependencies or workspace config)
8383
- **Docs** — README adapter sections should note the required peer install (e.g., `npm install kkrpc kafkajs`)
84+
85+
## 01:05 — AsyncIterable Streaming Support (PLANNING.md item 2)
86+
87+
### Core Decision/Topic
88+
89+
Implemented **streaming / subscription support** — the final major feature from the roadmap. Server methods can now return `AsyncIterable` (async generators), and kkrpc automatically streams chunks to the client where they're consumed via `for await...of`.
90+
91+
### Options Considered
92+
93+
- **Dedicated subscription API** (e.g. `channel.subscribe("topic")`) vs **transparent AsyncIterable detection** — chose transparent detection. If a handler returns something with `Symbol.asyncIterator`, kkrpc streams it automatically. No new API surface for users to learn; async generators "just work".
94+
- **Backpressure protocol** (consumer ACKs each chunk before producer sends next) vs **fire-and-forget chunks with buffering** — chose buffering. Simpler protocol, and most RPC streaming use cases (progress updates, log tailing, countdowns) don't need backpressure. Can be added later if needed.
95+
- **Per-chunk interceptor calls** vs **interceptors wrap initial handler only** — chose handler-only. Interceptors see the method call that starts the stream but don't fire per-chunk. This matches how middleware works in gRPC and tRPC.
96+
97+
### Final Decision & Rationale
98+
99+
**Protocol:** 4 new message types added to the wire format:
100+
- `stream-chunk` — producer → consumer, carries one value
101+
- `stream-end` — producer → consumer, signals completion
102+
- `stream-error` — producer → consumer, carries serialized error
103+
- `stream-cancel` — consumer → producer, signals `break` in `for await`
104+
105+
The initial response carries `{ __stream: true }` to signal the consumer to create an AsyncIterable instead of resolving the Promise directly.
106+
107+
**Consumer-side queue pattern:** Chunks arrive asynchronously; consumer reads synchronously via `next()`. Bridged with a buffer + waiters queue — chunks that arrive before `next()` are buffered; `next()` calls that arrive before a chunk park as pending resolvers.
108+
109+
**Cancellation:** `break` in `for await` triggers the iterator's `return()` method, which sends `stream-cancel`. Producer uses `AbortController` to stop its iteration loop.
110+
111+
**Validation integration:** Per-chunk output validation supported — if a schema is configured for the method's output, each chunk is validated before sending.
112+
113+
### Key Changes Made
114+
115+
| File | Change |
116+
|---|---|
117+
| `packages/kkrpc/src/serialization.ts` | Extended `Message.type` union with 4 streaming message types |
118+
| `packages/kkrpc/src/channel.ts` | Added ~200 lines: `StreamConsumerState`/`StreamProducerState` interfaces, `isAsyncIterable()` helper, stream routing in `processDecodedMessage`, producer methods (`streamResult`, `sendStreamError`, `handleStreamCancel`), consumer methods (`createStreamIterable`, `handleStreamChunk`, `handleStreamEnd`, `handleStreamError`), cleanup in `destroy()` |
119+
| `packages/kkrpc/__tests__/streaming.test.ts` | 9 integration tests over real WebSocket: basic countdown, coexistence with regular methods, error propagation, consumer cancellation, concurrent streams, interceptor interaction, delayed chunks, empty stream, nested method streaming |
120+
| `packages/kkrpc/README.md` | Added streaming section with code examples and comparison table row |
121+
| `docs/src/content/docs/guides/streaming.md` | Standalone docs page covering usage, cancellation, errors, concurrency, protocol details |
122+
| `examples/streaming-demo/` | Full working example: `api.ts` (3 streaming methods + 1 regular), `server.ts` (WebSocket + logging interceptor), `client.ts` (5 demo patterns), `README.md` |
123+
| `PLANNING.md` | Removed — all planned features now implemented |
124+
125+
### Future Considerations
126+
127+
- **Backpressure** — for high-throughput streaming (e.g. file transfer), a flow-control protocol (consumer ACKs) would prevent memory buildup
128+
- **Bidirectional streaming** — current design is server→client only. Client→server streaming would need a `sendStream()` API on the proxy
129+
- **Stream timeout** — currently streams can run indefinitely. A per-stream idle timeout could auto-cancel stalled streams
130+
- **PLANNING.md removed** — all 4 major roadmap items (middleware, streaming, timeout, type safety) are complete

0 commit comments

Comments
 (0)