-
Notifications
You must be signed in to change notification settings - Fork 489
Description
js-libp2p started with Node.js streams, then migrated to pull-streams, then to streaming (async) iterables.
Streaming iterables have served us well, however the sink
/source
style is not used anywhere else in JavaScript which increases the barrier to entry for new developers.
They have also introduced a lot of complexity around closing streams, as streams can be closed by the source passed to the sink ending, throwing, or by calling .closeRead
, .close
or .abort
.
Once the passed source has ended it's not clear what the behaviour should be if the caller wants to sink multiple sources, nor how multiple-reader, multiple-writer patterns should be applied.
For example in the past some users have wanted to sink multiple sources but we use this as a method to detect the closing of the writable end of the stream, meaning they have to add a layer of indirection via a pushable, it-merge, or some other mechanism. It would perhaps be better to just let users close the stream themselves when they are done with them as with Node.js and Web streams.
They are also async-everywhere which adds latency where it is not required, for example if an incoming message was split into several smaller messages once length-decoded, the processing of each message would happen after resolving a promise for each message which would then happen over several event loop iterations. EventTargets would let us process multiple messages synchronously which would improve throughput.
Internally we use it-pushable
s which limit our ability to apply backpressure within libp2p since they can't tell us when data is consumed or the internal queue is full. In some places we use it-queueless-pushable
s to mitigate this but it complicates testing since once data is written to the pushable, no further data can be written until the existing data is read which necessitates hacky workarounds and occasionally surprising behaviour.
EventTarget?
At the time these decisions were made, the EventTarget API was not well supported, which would have meant the natural thing to do would be to depend on a polyfill for the Node.js EventEmitter API, something to be avoided if we want to embrace web standards and have small bundle sizes.
This is no longer the case - EventTargets are supported natively everywhere and we use them extensively in the codebase.
The EventTarget API is used by transports such as WebSockets and WebRTC and is a high performance and simple to understand event emitter.
It has the distinct advantage of being a synchronous API as event listeners are typically invoked in the same event loop iteration - unlike promise-based continuations which add artificial latency by executing in subsequent microtask queue(s).
So EventTargets are easier for new developers to understand due to their familiarity, and they are faster due to their synchronous nature.
Web streams?
An alternative to EventTarget are ReadableStream/WritableStream based duplexes in the style of WebTransport Bidirectional streams:
interface Duplex {
readable: ReadableStream
writable: WritableStream
}
These have a few problems:
- Individual reads/writes are not cancellable so the promises created (and any resources they are using) will exist until the underlying operation ends, wasting resources
- Each read/write is async - this adds unnecessary and easily avoidable latency to every operation
Performance
The benchmarks at streaming-benchmarks show EventTargets compare favourably to Node.js streams and outperform duplex iterables by about 25% and Web streams by almost 300%:
> [email protected] start
> node index.js
┌─────────┬─────────────────────────────────────────────┬──────┬───────┐
│ (index) │ Name │ Time │ Ops/s │
├─────────┼─────────────────────────────────────────────┼──────┼───────┤
│ 0 │ 'node streams' │ 360 │ 0.36 │
│ 1 │ 'event target' │ 371 │ 0.371 │
│ 2 │ 'duplex async iterators' │ 494 │ 0.494 │
│ 3 │ 'node streams as duplex async iterator' │ 684 │ 0.684 │
│ 4 │ 'web streams' │ 1092 │ 1.092 │
│ 5 │ 'web streams as duplex async iterator' │ 1162 │ 1.162 │
│ 6 │ 'web byte streams' │ 1246 │ 1.246 │
│ 7 │ 'node streams as web streams' │ 1278 │ 1.278 │
│ 8 │ 'web byte streams as duplex async iterator' │ 1521 │ 1.521 │
└─────────┴─────────────────────────────────────────────┴──────┴───────┘
EventTarget API
interface StreamEvents {
/**
* Data was received from the remote end of the stream
*/
'message': MessageEvent<Uint8Array | Uint8ArrayList>
/**
* The stream closed gracefully - all data written to the stream has been consumed by
* the underlying sink.
*
* No further events will be emitted and the stream cannot be used to send/receive
* any more data.
*/
'close': Event
/**
* A local or remote error occurred.
*
* No further events will be emitted and the stream cannot be used to send/receive
* any more data.
*/
'error': ErrorEvent
/**
* The local send buffer is empty and the stream can accept new data from the caller
*/
'drain': Event
}
interface ErrorEvent extends Event {
/**
* The error that occurred
*/
reason: Error
/**
* If true the error was local, otherwise it occurred on the remote end of the stream
*/
local: boolean
}
interface Stream extends TypedEventTarget<StreamEvents> {
/**
* When the stream is closed for writing via `close` or `closeWrite`, any buffered data
* must be consumed by the underling sink within this many ms, otherwise the stream
* will be reset and an 'error' event emitted.
*
* @default 1_000
*/
closeTimeout: number
/**
* If no data is sent or received in this number of ms the stream will be reset and an
* 'error' event emitted.
*
* @default 120_000
*/
inactivityTimeout: number
/**
* The maximum number of bytes to store when paused. If receipt of more bytes from the
* remote end of the stream causes the buffer size to exceed this value the stream will
* be reset and an 'error' event emitted.
*/
maxPauseBufferLength: number
/**
* Write data to the stream. If the method returns false it means the internal buffer
* is now full and the caller should wait for the 'drain' event before sending more
* data.
*
* This method may throw if:
* - The internal send buffer is full
* - The stream has previously been closed for writing locally or remotely
*/
send (data: Uint8Array | Uint8ArrayList): boolean
/**
* Returns a promise that resolves once the passed data has been written to the
* underlying sink.
*
* This method can result in lower throughput since subsequent writes will be delayed
* until after promise resolution. If throughput is a priority, call `send` without
* passing any options.
*
* This method may reject if:
* - The internal send buffer is full
* - The stream has previously been closed for writing locally or remotely
*/
send (data: Uint8Array | Uint8ArrayList, options: AbortOptions): Promise<void>
/**
* Immediately close the stream for reading and writing, discard any unsent/unread
* data, and emit an error event on the stream.
*/
abort (err: Error): void
/**
* Gracefully close the stream for reading and writing.
*
* A 'close' event will be emitted on the stream once all outstanding data has been
* sent to the remote.
*/
close (): void
/**
* Sends a message to the remote informing them we will not read any more data from
* the stream.
*
* If the writable end of the stream is already closed, a 'close' event will be emitted
* on the stream.
*/
closeRead (): void
/**
* Gracefully close the stream for writing - any outstanding data will be sent to the
* remote and any further calls to `.send` will throw.
*
* If the readable end of the stream is already closed, a 'close' event will be emitted
* on the stream once any buffered data has been sent.
*/
closeWrite (): void
/**
* Stop emitting further 'message' events. Any received data will be stored in an
* internal buffer. If this buffer reaches a preset limit the stream will be reset
* and an 'error' event emitted.
*
* If the underlying source supports it, the remote peer will be instructed to pause
* transmission of further data.
*/
pause (): void
/**
* Resume emitting 'message' events.
*
* If the underlying source supports it, the remote peer will be informed that it is
* ok to start sending data again.
*/
unpause (): void
}
Async Iterable mode
We can also make the Stream interface implement the async iterable protocol so the user can pass the stream to for await..of
and for compatibility with the existing it-*
ecosystem:
// read data
for await (const buf of stream) {
console.info(buf)
// write data
stream.send(Uint8Array.from([0, 1, 2, 3])
}
import all from 'it-all'
const bufs = await all(stream)
..etc