Skip to content

Commit cb1c14e

Browse files
authored
feat: allow async stream handlers (#3212)
Allow `await`ing promises inside stream handlers.
1 parent 4420fad commit cb1c14e

File tree

3 files changed

+6
-10
lines changed

3 files changed

+6
-10
lines changed

packages/interface-compliance-tests/src/mocks/connection.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ export function mockConnection (maConn: MultiaddrConnection, opts: MockConnectio
144144
mss.handle(muxedStream, registrar.getProtocols(), {
145145
log
146146
})
147-
.then(({ stream, protocol }) => {
147+
.then(async ({ stream, protocol }) => {
148148
log('%s: incoming stream opened on %s', direction, protocol)
149149
muxedStream.protocol = protocol
150150
muxedStream.sink = stream.sink
@@ -153,9 +153,10 @@ export function mockConnection (maConn: MultiaddrConnection, opts: MockConnectio
153153
connection.streams.push(muxedStream)
154154
const { handler } = registrar.getHandler(protocol)
155155

156-
handler({ connection, stream: muxedStream })
156+
await handler({ connection, stream: muxedStream })
157157
}).catch(err => {
158158
log.error(err)
159+
muxedStream.abort(err)
159160
})
160161
} catch (err: any) {
161162
log.error(err)

packages/interface/src/stream-handler.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ export interface StreamHandler {
1717
/**
1818
* A callback function that accepts the incoming stream data
1919
*/
20-
(data: IncomingStreamData): void
20+
(data: IncomingStreamData): void | Promise<void>
2121
}
2222

2323
export interface StreamHandlerOptions extends AbortOptions {

packages/libp2p/src/connection.ts

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -273,17 +273,12 @@ export class Connection implements ConnectionInterface {
273273
throw new LimitedConnectionError('Cannot open protocol stream on limited connection')
274274
}
275275

276-
handler({ connection: this, stream: muxedStream })
276+
await handler({ connection: this, stream: muxedStream })
277277
})
278278
.catch(async err => {
279279
this.log.error('error handling incoming stream id %s - %e', muxedStream.id, err)
280280

281-
if (muxedStream.timeline.close == null) {
282-
await muxedStream.close({
283-
signal
284-
})
285-
.catch(err => muxedStream.abort(err))
286-
}
281+
muxedStream.abort(err)
287282
})
288283
}
289284

0 commit comments

Comments
 (0)