Skip to content

Commit 7e58596

Browse files
committed
feat: add stream middleware
Adds middleware handlers for protocol streams. They are invoked for incoming and outgoing streams and allow access to the stream and connection before the handler (incoming) or caller (outgoing) receive them. This way middleware can wrap streams in transforms, or deny access, or something else. ```ts libp2p.use('/my/protocol/1.0.0', (stream, connection, next) => { const originalSource = stream.source // increment all byte values in the stream by one stream.source = (async function * () { for await (const buf of originalSource) { buf = buf.map(val => val + 1) yield buf } })() // pass the stream on to the next middleware next(stream, connection) }) ```
1 parent 748f962 commit 7e58596

File tree

10 files changed

+297
-10
lines changed

10 files changed

+297
-10
lines changed

.github/dictionary.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,3 +14,4 @@ additionals
1414
SECG
1515
Certicom
1616
RSAES
17+
unuse

packages/interface-compliance-tests/src/mocks/registrar.ts

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
import { mergeOptions } from '@libp2p/utils/merge-options'
2-
import type { Connection, PeerId, Topology, IncomingStreamData, StreamHandler, StreamHandlerOptions, StreamHandlerRecord } from '@libp2p/interface'
2+
import type { Connection, PeerId, Topology, IncomingStreamData, StreamHandler, StreamHandlerOptions, StreamHandlerRecord, StreamMiddleware } from '@libp2p/interface'
33
import type { Registrar } from '@libp2p/interface-internal'
44

55
export class MockRegistrar implements Registrar {
66
private readonly topologies = new Map<string, Array<{ id: string, topology: Topology }>>()
77
private readonly handlers = new Map<string, StreamHandlerRecord>()
8+
private readonly middleware = new Map<string, StreamMiddleware[]>()
89

910
getProtocols (): string[] {
1011
return Array.from(this.handlers.keys()).sort()
@@ -69,6 +70,18 @@ export class MockRegistrar implements Registrar {
6970
getTopologies (protocol: string): Topology[] {
7071
return (this.topologies.get(protocol) ?? []).map(t => t.topology)
7172
}
73+
74+
use(protocol: string, middleware: StreamMiddleware[]): void {
75+
this.middleware.set(protocol, middleware)
76+
}
77+
78+
unuse(protocol: string): void {
79+
this.middleware.delete(protocol)
80+
}
81+
82+
getMiddleware(protocol: string): StreamMiddleware[] {
83+
return this.middleware.get(protocol) ?? []
84+
}
7285
}
7386

7487
export function mockRegistrar (): Registrar {

packages/interface-internal/src/registrar.ts

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import type { StreamHandler, StreamHandlerOptions, StreamHandlerRecord, Topology, IncomingStreamData } from '@libp2p/interface'
1+
import type { StreamHandler, StreamHandlerOptions, StreamHandlerRecord, Topology, IncomingStreamData, StreamMiddleware } from '@libp2p/interface'
22
import type { AbortOptions } from '@multiformats/multiaddr'
33

44
export type {
@@ -69,6 +69,30 @@ export interface Registrar {
6969
*/
7070
getHandler(protocol: string): StreamHandlerRecord
7171

72+
/**
73+
* Retrieve any registered middleware for a given protocol.
74+
*
75+
* @param protocol - The protocol to fetch middleware for
76+
* @returns A list of `StreamMiddleware` implementations
77+
*/
78+
use(protocol: string, middleware: StreamMiddleware[]): void
79+
80+
/**
81+
* Retrieve any registered middleware for a given protocol.
82+
*
83+
* @param protocol - The protocol to fetch middleware for
84+
* @returns A list of `StreamMiddleware` implementations
85+
*/
86+
unuse(protocol: string): void
87+
88+
/**
89+
* Retrieve any registered middleware for a given protocol.
90+
*
91+
* @param protocol - The protocol to fetch middleware for
92+
* @returns A list of `StreamMiddleware` implementations
93+
*/
94+
getMiddleware(protocol: string): StreamMiddleware[]
95+
7296
/**
7397
* Register a topology handler for a protocol - the topology will be
7498
* invoked when peers are discovered on the network that support the

packages/interface/src/index.ts

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import type { PeerInfo } from './peer-info.js'
2323
import type { PeerRouting } from './peer-routing.js'
2424
import type { Address, Peer, PeerStore } from './peer-store.js'
2525
import type { Startable } from './startable.js'
26-
import type { StreamHandler, StreamHandlerOptions } from './stream-handler.js'
26+
import type { StreamHandler, StreamHandlerOptions, StreamMiddleware } from './stream-handler.js'
2727
import type { Topology } from './topology.js'
2828
import type { Listener, OutboundConnectionUpgradeEvents } from './transport.js'
2929
import type { Multiaddr } from '@multiformats/multiaddr'
@@ -720,6 +720,33 @@ export interface Libp2p<T extends ServiceMap = ServiceMap> extends Startable, Ty
720720
*/
721721
unregister(id: string): void
722722

723+
/**
724+
* Registers one or more middleware implementations that will be invoked for
725+
* incoming and outgoing protocol streams that match the passed protocol.
726+
*
727+
* @example
728+
*
729+
* ```TypeScript
730+
* libp2p.use('/my/protocol/1.0.0', (stream, connection, next) => {
731+
* // do something with stream and/or connection
732+
* next(stream, connection)
733+
* })
734+
* ```
735+
*/
736+
use (protocol: string, middleware: StreamMiddleware | StreamMiddleware[]): void
737+
738+
/**
739+
* Deregisters all middleware for the passed protocol.
740+
*
741+
* @example
742+
*
743+
* ```TypeScript
744+
* libp2p.unuse('/my/protocol/1.0.0')
745+
* // any previously registered middleware will no longer be invoked
746+
* ```
747+
*/
748+
unuse (protocol: string): void
749+
723750
/**
724751
* Returns the public key for the passed PeerId. If the PeerId is of the 'RSA'
725752
* type this may mean searching the routing if the peer's key is not present

packages/interface/src/stream-handler.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,13 @@ export interface StreamHandler {
2020
(data: IncomingStreamData): void
2121
}
2222

23+
/**
24+
* Stream middleware allows accessing stream data outside of the stream handler
25+
*/
26+
export interface StreamMiddleware {
27+
(stream: Stream, connection: Connection, next: (stream: Stream, connection: Connection) => void): void
28+
}
29+
2330
export interface StreamHandlerOptions extends AbortOptions {
2431
/**
2532
* How many incoming streams can be open for this protocol at the same time on each connection
@@ -46,6 +53,11 @@ export interface StreamHandlerOptions extends AbortOptions {
4653
* protocol(s), the existing handler will be discarded.
4754
*/
4855
force?: true
56+
57+
/**
58+
* Middleware allows accessing stream data outside of the stream handler
59+
*/
60+
middleware?: StreamMiddleware[]
4961
}
5062

5163
export interface StreamHandlerRecord {

packages/libp2p/src/libp2p.ts

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import { userAgent } from './user-agent.js'
2424
import * as pkg from './version.js'
2525
import type { Components } from './components.js'
2626
import type { Libp2p as Libp2pInterface, Libp2pInit } from './index.js'
27-
import type { PeerRouting, ContentRouting, Libp2pEvents, PendingDial, ServiceMap, AbortOptions, ComponentLogger, Logger, Connection, NewStreamOptions, Stream, Metrics, PeerId, PeerInfo, PeerStore, Topology, Libp2pStatus, IsDialableOptions, DialOptions, PublicKey, Ed25519PeerId, Secp256k1PeerId, RSAPublicKey, RSAPeerId, URLPeerId, Ed25519PublicKey, Secp256k1PublicKey, StreamHandler, StreamHandlerOptions } from '@libp2p/interface'
27+
import type { PeerRouting, ContentRouting, Libp2pEvents, PendingDial, ServiceMap, AbortOptions, ComponentLogger, Logger, Connection, NewStreamOptions, Stream, Metrics, PeerId, PeerInfo, PeerStore, Topology, Libp2pStatus, IsDialableOptions, DialOptions, PublicKey, Ed25519PeerId, Secp256k1PeerId, RSAPublicKey, RSAPeerId, URLPeerId, Ed25519PublicKey, Secp256k1PublicKey, StreamHandler, StreamHandlerOptions, StreamMiddleware } from '@libp2p/interface'
2828
import type { Multiaddr } from '@multiformats/multiaddr'
2929

3030
export class Libp2p<T extends ServiceMap = ServiceMap> extends TypedEventEmitter<Libp2pEvents> implements Libp2pInterface<T> {
@@ -402,6 +402,14 @@ export class Libp2p<T extends ServiceMap = ServiceMap> extends TypedEventEmitter
402402
this.components.registrar.unregister(id)
403403
}
404404

405+
use (protocol: string, middleware: StreamMiddleware | StreamMiddleware[]): void {
406+
this.components.registrar.use(protocol, Array.isArray(middleware) ? middleware : [middleware])
407+
}
408+
409+
unuse (protocol: string): void {
410+
this.components.registrar.unuse(protocol)
411+
}
412+
405413
async isDialable (multiaddr: Multiaddr, options: IsDialableOptions = {}): Promise<boolean> {
406414
return this.components.connectionManager.isDialable(multiaddr, options)
407415
}

packages/libp2p/src/registrar.ts

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import { InvalidParametersError } from '@libp2p/interface'
22
import { mergeOptions } from '@libp2p/utils/merge-options'
33
import { trackedMap } from '@libp2p/utils/tracked-map'
44
import * as errorsJs from './errors.js'
5-
import type { IdentifyResult, Libp2pEvents, Logger, PeerUpdate, PeerId, PeerStore, Topology, StreamHandler, StreamHandlerRecord, StreamHandlerOptions, AbortOptions, Metrics } from '@libp2p/interface'
5+
import type { IdentifyResult, Libp2pEvents, Logger, PeerUpdate, PeerId, PeerStore, Topology, StreamHandler, StreamHandlerRecord, StreamHandlerOptions, AbortOptions, Metrics, StreamMiddleware } from '@libp2p/interface'
66
import type { Registrar as RegistrarInterface } from '@libp2p/interface-internal'
77
import type { ComponentLogger } from '@libp2p/logger'
88
import type { TypedEventTarget } from 'main-event'
@@ -26,10 +26,12 @@ export class Registrar implements RegistrarInterface {
2626
private readonly topologies: Map<string, Map<string, Topology>>
2727
private readonly handlers: Map<string, StreamHandlerRecord>
2828
private readonly components: RegistrarComponents
29+
private readonly middleware: Map<string, StreamMiddleware[]>
2930

3031
constructor (components: RegistrarComponents) {
3132
this.components = components
3233
this.log = components.logger.forComponent('libp2p:registrar')
34+
this.middleware = new Map()
3335
this.topologies = new Map()
3436
components.metrics?.registerMetricGroup('libp2p_registrar_topologies', {
3537
calculate: () => {
@@ -165,6 +167,18 @@ export class Registrar implements RegistrarInterface {
165167
}
166168
}
167169

170+
use (protocol: string, middleware: StreamMiddleware[]): void {
171+
this.middleware.set(protocol, middleware)
172+
}
173+
174+
unuse (protocol: string): void {
175+
this.middleware.delete(protocol)
176+
}
177+
178+
getMiddleware (protocol: string): StreamMiddleware[] {
179+
return this.middleware.get(protocol) ?? []
180+
}
181+
168182
/**
169183
* Remove a disconnected peer from the record
170184
*/

packages/libp2p/src/upgrader.ts

Lines changed: 37 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -395,7 +395,7 @@ export class Upgrader implements UpgraderInterface {
395395

396396
let muxer: StreamMuxer | undefined
397397
let newStream: ((multicodecs: string[], options?: AbortOptions) => Promise<Stream>) | undefined
398-
let connection: Connection // eslint-disable-line prefer-const
398+
let connection: Connection
399399

400400
if (muxerFactory != null) {
401401
// Create the muxer
@@ -488,7 +488,7 @@ export class Upgrader implements UpgraderInterface {
488488
}
489489

490490
connection.log.trace('starting new stream for protocols %s', protocols)
491-
const muxedStream = await muxer.newStream()
491+
let muxedStream = await muxer.newStream()
492492
connection.log.trace('started new stream %s for protocols %s', muxedStream.id, protocols)
493493

494494
try {
@@ -556,6 +556,23 @@ export class Upgrader implements UpgraderInterface {
556556

557557
this.components.metrics?.trackProtocolStream(muxedStream, connection)
558558

559+
const middleware = this.components.registrar.getMiddleware(protocol)
560+
561+
middleware.push((stream, connection, next) => {
562+
next(stream, connection)
563+
})
564+
565+
let i = 0
566+
567+
while (i < middleware.length) {
568+
// eslint-disable-next-line no-loop-func
569+
middleware[i](muxedStream, connection, (s, c) => {
570+
muxedStream = s
571+
connection = c
572+
i++
573+
})
574+
}
575+
559576
return muxedStream
560577
} catch (err: any) {
561578
connection.log.error('could not create new outbound stream on connection %s %a for protocols %s - %e', direction === 'inbound' ? 'from' : 'to', opts.maConn.remoteAddr, protocols, err)
@@ -652,14 +669,30 @@ export class Upgrader implements UpgraderInterface {
652669
* Routes incoming streams to the correct handler
653670
*/
654671
_onStream (opts: OnStreamOptions): void {
655-
const { connection, stream, protocol } = opts
672+
let { connection, stream, protocol } = opts
656673
const { handler, options } = this.components.registrar.getHandler(protocol)
657674

658675
if (connection.limits != null && options.runOnLimitedConnection !== true) {
659676
throw new LimitedConnectionError('Cannot open protocol stream on limited connection')
660677
}
661678

662-
handler({ connection, stream })
679+
const middleware = this.components.registrar.getMiddleware(protocol)
680+
681+
middleware.push((stream, connection, next) => {
682+
handler({ connection, stream })
683+
next(stream, connection)
684+
})
685+
686+
let i = 0
687+
688+
while (i < middleware.length) {
689+
// eslint-disable-next-line no-loop-func
690+
middleware[i](stream, connection, (s, c) => {
691+
stream = s
692+
connection = c
693+
i++
694+
})
695+
}
663696
}
664697

665698
/**

0 commit comments

Comments
 (0)