Skip to content

Commit 72a7ea1

Browse files
committed
fix: allow aborting drain waiting
Add onDrain method that can be aborted
1 parent afa930e commit 72a7ea1

File tree

5 files changed

+148
-4
lines changed

5 files changed

+148
-4
lines changed

doc/migrations/v2.0.0-v3.0.0.md

Lines changed: 36 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -84,9 +84,9 @@ and `sink` properties, we now listen for `message` events and call `.send` with
8484
`Uint8Array`/`Uint8ArrayList` values.
8585

8686
We can detect when write backpressure needs to be applied by `.send` returning
87-
`false` (we should await a `drain` event when this happens). and read
88-
backpressure can be explicitly applied by calling the new `.pause` and `.resume`
89-
methods.
87+
`false` (we should await a `drain` event or for the promise returned from
88+
`onDrain()` to resolve when this happens). and read backpressure can be
89+
explicitly applied by calling the new `.pause` and `.resume` methods.
9090

9191
> [!CAUTION]
9292
> If no `message` event handler is added, streams will buffer incoming data
@@ -179,7 +179,8 @@ When streams close they emit a `close` event. This event has an `error: Error` p
179179

180180
### Write backpressure
181181

182-
You can use [p-event](https://www.npmjs.com/package/p-event) or [race-event](https://www.npmjs.com/package/race-event) to pause writing due to backpressure:
182+
When `.send()` returns false, the promise returned from `onDrain` will resolve
183+
when the stream can accept new data:
183184

184185
```ts
185186
import { createLibp2p } from 'libp2p'
@@ -198,6 +199,37 @@ const bufs = [
198199
// a lot of data
199200
]
200201

202+
for (const buf of bufs) {
203+
if (!stream.send(buf)) {
204+
await stream.onDrain({
205+
signal: AbortSignal.timeout(5_000)
206+
})
207+
}
208+
}
209+
```
210+
211+
Alternatively you can use [p-event](https://www.npmjs.com/package/p-event) or
212+
[race-event](https://www.npmjs.com/package/race-event) to wait for the stream to
213+
emit a `drain` event before continuing:
214+
215+
```ts
216+
import { createLibp2p } from 'libp2p'
217+
import { peerIdFromString } from '@libp2p/peer-id'
218+
import { pEvent } from 'p-event'
219+
220+
const node = createLibp2p({
221+
// libp2p config here
222+
})
223+
224+
const remotePeer = peerIdFromString('123Foo...')
225+
const stream = await node.dialProtocol(remotePeer, '/echo/1.0.0', {
226+
signal: AbortSignal.timeout(5_000)
227+
})
228+
229+
const bufs = [
230+
// a lot of data
231+
]
232+
201233
for (const buf of bufs) {
202234
if (!stream.send(buf)) {
203235
await pEvent(stream, 'drain', {

packages/interface/src/message-stream.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,4 +196,10 @@ export interface MessageStream<Timeline extends MessageStreamTimeline = MessageS
196196
* emitted before any other queued data.
197197
*/
198198
unshift (data: Uint8Array | Uint8ArrayList): void
199+
200+
/**
201+
* Returns a promise that resolves when the stream can accept new data or
202+
* rejects if the stream is closed or reset before this occurs.
203+
*/
204+
onDrain (options?: AbortOptions): Promise<void>
199205
}

packages/utils/src/abstract-message-stream.ts

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
import { StreamResetError, TypedEventEmitter, StreamMessageEvent, StreamBufferError, StreamResetEvent, StreamAbortEvent, StreamCloseEvent, StreamStateError } from '@libp2p/interface'
22
import { pushable } from 'it-pushable'
3+
import { raceSignal } from 'race-signal'
34
import { Uint8ArrayList } from 'uint8arraylist'
5+
import { StreamClosedError } from './errors.ts'
46
import type { MessageStreamEvents, MessageStreamStatus, MessageStream, AbortOptions, MessageStreamTimeline, MessageStreamDirection, EventHandler, StreamOptions, MessageStreamReadStatus, MessageStreamWriteStatus } from '@libp2p/interface'
57
import type { Logger } from '@libp2p/logger'
68

@@ -63,6 +65,8 @@ export abstract class AbstractMessageStream<Timeline extends MessageStreamTimeli
6365
protected readonly writeBuffer: Uint8ArrayList
6466
protected sendingData: boolean
6567

68+
private onDrainPromise?: PromiseWithResolvers<void>
69+
6670
constructor (init: MessageStreamInit) {
6771
super()
6872

@@ -96,8 +100,15 @@ export abstract class AbstractMessageStream<Timeline extends MessageStreamTimeli
96100
this.writableNeedsDrain = false
97101
this.processSendQueue()
98102
}
103+
104+
this.onDrainPromise?.resolve()
99105
}
100106
this.addEventListener('drain', continueSendingOnDrain)
107+
108+
const rejectOnDrainOnClose = (evt: StreamCloseEvent): void => {
109+
this.onDrainPromise?.reject(evt.error ?? new StreamClosedError())
110+
}
111+
this.addEventListener('close', rejectOnDrainOnClose)
101112
}
102113

103114
get readBufferLength (): number {
@@ -108,6 +119,18 @@ export abstract class AbstractMessageStream<Timeline extends MessageStreamTimeli
108119
return this.writeBuffer.byteLength
109120
}
110121

122+
async onDrain (options?: AbortOptions): Promise<void> {
123+
if (this.writableNeedsDrain !== true) {
124+
return Promise.resolve()
125+
}
126+
127+
if (this.onDrainPromise == null) {
128+
this.onDrainPromise = Promise.withResolvers()
129+
}
130+
131+
return raceSignal(this.onDrainPromise.promise, options?.signal)
132+
}
133+
111134
async * [Symbol.asyncIterator] (): AsyncGenerator<Uint8Array | Uint8ArrayList> {
112135
if (this.readStatus !== 'readable' && this.readStatus !== 'paused') {
113136
return

packages/utils/src/errors.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,3 +37,8 @@ export class MaxEarlyStreamsError extends Error {
3737
static name = 'MaxEarlyStreamsError'
3838
name = 'MaxEarlyStreamsError'
3939
}
40+
41+
export class StreamClosedError extends Error {
42+
static name = 'StreamClosedError'
43+
name = 'StreamClosedError'
44+
}

packages/utils/test/stream-utils-test.spec.ts

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -313,4 +313,82 @@ describe('stream-pair', () => {
313313
Uint8Array.from([4, 5, 6, 7, 0, 1, 2, 3])
314314
)
315315
})
316+
317+
it('should return a promise from onDrain', async () => {
318+
const [outgoing] = await streamPair({
319+
delay: 100
320+
})
321+
322+
// should resolve immediately when backpressure is not being applied
323+
await expect(outgoing.onDrain()).to.eventually.be.undefined()
324+
325+
while (true) {
326+
if (outgoing.send(Uint8Array.from([0, 1, 2, 3])) === false) {
327+
break
328+
}
329+
}
330+
331+
await expect(outgoing.onDrain()).to.eventually.be.undefined()
332+
})
333+
334+
it('should abort a promise from onDrain via signal', async () => {
335+
const [outgoing] = await streamPair({
336+
delay: 100
337+
})
338+
339+
// should resolve immediately when backpressure is not being applied
340+
await expect(outgoing.onDrain()).to.eventually.be.undefined()
341+
342+
while (true) {
343+
if (outgoing.send(Uint8Array.from([0, 1, 2, 3])) === false) {
344+
break
345+
}
346+
}
347+
348+
const controller = new AbortController()
349+
controller.abort()
350+
351+
await expect(outgoing.onDrain({
352+
signal: controller.signal
353+
})).to.eventually.be.rejected
354+
.with.property('name', 'AbortError')
355+
})
356+
357+
it('should abort a promise from onDrain via abort', async () => {
358+
const [outgoing] = await streamPair({
359+
delay: 100
360+
})
361+
362+
while (true) {
363+
if (outgoing.send(Uint8Array.from([0, 1, 2, 3])) === false) {
364+
break
365+
}
366+
}
367+
368+
await expect(
369+
Promise.all([
370+
outgoing.onDrain(),
371+
// eslint-disable-next-line @typescript-eslint/await-thenable
372+
outgoing.abort(new Error('Abort!'))
373+
])
374+
).to.eventually.be.rejected
375+
.with.property('message', 'Abort!')
376+
})
377+
378+
it('should abort a promise from onDrain via reset', async () => {
379+
const [outgoing, incoming] = await streamPair({
380+
delay: 100
381+
})
382+
383+
while (true) {
384+
if (outgoing.send(Uint8Array.from([0, 1, 2, 3])) === false) {
385+
break
386+
}
387+
}
388+
389+
incoming.abort(new Error('Reset!'))
390+
391+
await expect(outgoing.onDrain()).to.eventually.be.rejected
392+
.with.property('name', 'StreamResetError')
393+
})
316394
})

0 commit comments

Comments
 (0)