Skip to content

fix: WebTransport stream now extends abstract stream #2514

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
May 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions packages/transport-webtransport/.aegir.js
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
/* eslint-disable no-console */
import { spawn, exec } from 'child_process'
import { existsSync } from 'fs'
import { existsSync } from 'node:fs'
import os from 'node:os'
import defer from 'p-defer'

/** @type {import('aegir/types').PartialOptions} */
export default {
test: {
async before() {
async before () {
const main = os.platform() === 'win32' ? 'main.exe' : 'main'

if (!existsSync('./go-libp2p-webtransport-server/main')) {
await new Promise((resolve, reject) => {
exec('go build -o main main.go',
exec(`go build -o ${main} main.go`,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think these build files are being cleaned by npm run clean, which they probably should be

Copy link
Member Author

@achingbrain achingbrain Apr 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TBH I want to remove this transport server entirely and just rely on precomplied binaries in the interop tests because they exercise more functionality which gives us more certainty and we don't need to be recompiling this unchanging go script over and over again.

{ cwd: './go-libp2p-webtransport-server' },
(error, stdout, stderr) => {
if (error) {
Expand All @@ -21,7 +25,7 @@ export default {
})
}

const server = spawn('./main', [], { cwd: './go-libp2p-webtransport-server', killSignal: 'SIGINT' })
const server = spawn(`./${main}`, [], { cwd: './go-libp2p-webtransport-server', killSignal: 'SIGINT' })
server.stderr.on('data', (data) => {
console.log('stderr:', data.toString())
})
Expand Down Expand Up @@ -53,7 +57,7 @@ export default {
}
}
},
async after(_, { server }) {
async after (_, { server }) {
server.kill('SIGINT')
}
},
Expand Down
15 changes: 12 additions & 3 deletions packages/transport-webtransport/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -53,25 +53,34 @@
"@chainsafe/libp2p-noise": "^15.0.0",
"@libp2p/interface": "^1.3.0",
"@libp2p/peer-id": "^4.1.0",
"@libp2p/utils": "^5.3.2",
"@multiformats/multiaddr": "^12.2.1",
"@multiformats/multiaddr-matcher": "^1.2.0",
"it-stream-types": "^2.0.1",
"multiformats": "^13.1.0",
"race-signal": "^1.0.2",
"uint8arraylist": "^2.4.8",
"uint8arrays": "^5.0.3"
},
"devDependencies": {
"@libp2p/logger": "^4.0.11",
"@libp2p/peer-id-factory": "^4.1.0",
"@noble/hashes": "^1.4.0",
"aegir": "^42.2.5",
"it-map": "^3.1.0",
"it-to-buffer": "^4.0.7",
"libp2p": "^1.4.3",
"p-defer": "^4.0.1"
"p-defer": "^4.0.1",
"p-wait-for": "^5.0.2"
},
"browser": {
"./dist/src/listener.js": "./dist/src/listener.browser.js"
"./dist/src/listener.js": "./dist/src/listener.browser.js",
"./dist/src/webtransport.js": "./dist/src/webtransport.browser.js"
},
"react-native": {
"./dist/src/listener.js": "./dist/src/listener.browser.js"
"./dist/src/listener.js": "./dist/src/listener.browser.js",
"./dist/src/webtransport.js": "./dist/src/webtransport.browser.js",
"./dist/src/utils/generate-certificates.js": "./dist/src/utils/generate-certificates.browser.js"
},
"sideEffects": false
}
177 changes: 57 additions & 120 deletions packages/transport-webtransport/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,23 +30,38 @@
*/

import { noise } from '@chainsafe/libp2p-noise'
import { type Transport, transportSymbol, type CreateListenerOptions, type DialOptions, type Listener, type ComponentLogger, type Logger, type Connection, type MultiaddrConnection, type Stream, type CounterGroup, type Metrics, type PeerId, type StreamMuxerFactory, type StreamMuxerInit, type StreamMuxer } from '@libp2p/interface'
import { type Multiaddr, type AbortOptions } from '@multiformats/multiaddr'
import { AbortError, CodeError, transportSymbol } from '@libp2p/interface'
import { WebTransport as WebTransportMatcher } from '@multiformats/multiaddr-matcher'
import { webtransportBiDiStreamToStream } from './stream.js'
import { raceSignal } from 'race-signal'
import createListener from './listener.js'
import { webtransportMuxer } from './muxer.js'
import { inertDuplex } from './utils/inert-duplex.js'
import { isSubset } from './utils/is-subset.js'
import { parseMultiaddr } from './utils/parse-multiaddr.js'
import WebTransport from './webtransport.js'
import type { Transport, CreateListenerOptions, DialOptions, Listener, ComponentLogger, Logger, Connection, MultiaddrConnection, CounterGroup, Metrics, PeerId } from '@libp2p/interface'
import type { Multiaddr } from '@multiformats/multiaddr'
import type { Source } from 'it-stream-types'
import type { MultihashDigest } from 'multiformats/hashes/interface'
import type { Uint8ArrayList } from 'uint8arraylist'

/**
* PEM format server certificate and private key
*/
export interface WebTransportCertificate {
privateKey: string
pem: string
hash: MultihashDigest<number>
secret: string
}

interface WebTransportSessionCleanup {
(metric: string): void
}

export interface WebTransportInit {
maxInboundStreams?: number
certificates?: WebTransportCertificate[]
}

export interface WebTransportComponents {
Expand All @@ -69,7 +84,9 @@ class WebTransportTransport implements Transport {
this.log = components.logger.forComponent('libp2p:webtransport')
this.components = components
this.config = {
maxInboundStreams: init.maxInboundStreams ?? 1000
...init,
maxInboundStreams: init.maxInboundStreams ?? 1000,
certificates: init.certificates ?? []
}

if (components.metrics != null) {
Expand All @@ -87,24 +104,26 @@ class WebTransportTransport implements Transport {
readonly [transportSymbol] = true

async dial (ma: Multiaddr, options: DialOptions): Promise<Connection> {
options?.signal?.throwIfAborted()
if (options?.signal?.aborted === true) {
throw new AbortError()
}

this.log('dialing %s', ma)
const localPeer = this.components.peerId
if (localPeer === undefined) {
throw new Error('Need a local peerid')
throw new CodeError('Need a local peerid', 'ERR_INVALID_PARAMETERS')
}

options = options ?? {}

const { url, certhashes, remotePeer } = parseMultiaddr(ma)

if (remotePeer == null) {
throw new Error('Need a target peerid')
throw new CodeError('Need a target peerid', 'ERR_INVALID_PARAMETERS')
}

if (certhashes.length === 0) {
throw new Error('Expected multiaddr to contain certhashes')
throw new CodeError('Expected multiaddr to contain certhashes', 'ERR_INVALID_PARAMETERS')
}

let abortListener: (() => void) | undefined
Expand Down Expand Up @@ -159,10 +178,12 @@ class WebTransportTransport implements Transport {
once: true
})

this.log('wait for session to be ready')
await Promise.race([
wt.closed,
wt.ready
])
this.log('session became ready')

ready = true
this.metrics?.dialerEvents.increment({ ready: true })
Expand All @@ -175,15 +196,17 @@ class WebTransportTransport implements Transport {
cleanUpWTSession('remote_close')
})

if (!await this.authenticateWebTransport(wt, localPeer, remotePeer, certhashes)) {
throw new Error('Failed to authenticate webtransport')
authenticated = await raceSignal(this.authenticateWebTransport(wt, localPeer, remotePeer, certhashes), options.signal)

if (!authenticated) {
throw new CodeError('Failed to authenticate webtransport', 'ERR_AUTHENTICATION_FAILED')
}

this.metrics?.dialerEvents.increment({ open: true })

maConn = {
close: async () => {
this.log('Closing webtransport')
this.log('closing webtransport')
cleanUpWTSession('close')
},
abort: (err: Error) => {
Expand All @@ -199,9 +222,11 @@ class WebTransportTransport implements Transport {
...inertDuplex()
}

authenticated = true

return await options.upgrader.upgradeOutbound(maConn, { skipEncryption: true, muxerFactory: this.webtransportMuxer(wt), skipProtection: true })
return await options.upgrader.upgradeOutbound(maConn, {
skipEncryption: true,
muxerFactory: webtransportMuxer(wt, wt.incomingBidirectionalStreams.getReader(), this.components.logger, this.config),
skipProtection: true
})
} catch (err: any) {
this.log.error('caught wt session err', err)

Expand All @@ -221,11 +246,14 @@ class WebTransportTransport implements Transport {
}
}

async authenticateWebTransport (wt: InstanceType<typeof WebTransport>, localPeer: PeerId, remotePeer: PeerId, certhashes: Array<MultihashDigest<number>>): Promise<boolean> {
async authenticateWebTransport (wt: WebTransport, localPeer: PeerId, remotePeer: PeerId, certhashes: Array<MultihashDigest<number>>, signal?: AbortSignal): Promise<boolean> {
if (signal?.aborted === true) {
throw new AbortError()
}

const stream = await wt.createBidirectionalStream()
const writer = stream.writable.getWriter()
const reader = stream.readable.getReader()
await writer.ready

const duplex = {
source: (async function * () {
Expand All @@ -241,13 +269,15 @@ class WebTransportTransport implements Transport {
}
}
})(),
sink: async function (source: Source<Uint8Array | Uint8ArrayList>) {
sink: async (source: Source<Uint8Array | Uint8ArrayList>) => {
for await (const chunk of source) {
if (chunk instanceof Uint8Array) {
await writer.write(chunk)
} else {
await writer.write(chunk.subarray())
}
await raceSignal(writer.ready, signal)

const buf = chunk instanceof Uint8Array ? chunk : chunk.subarray()

writer.write(buf).catch(err => {
this.log.error('could not write chunk during authentication of WebTransport stream', err)
})
}
}
}
Expand All @@ -273,105 +303,12 @@ class WebTransportTransport implements Transport {
return true
}

webtransportMuxer (wt: WebTransport): StreamMuxerFactory {
let streamIDCounter = 0
const config = this.config
const self = this
return {
protocol: 'webtransport',
createStreamMuxer: (init?: StreamMuxerInit): StreamMuxer => {
// !TODO handle abort signal when WebTransport supports this.

if (typeof init === 'function') {
// The api docs say that init may be a function
init = { onIncomingStream: init }
}

const activeStreams: Stream[] = [];

(async function () {
//! TODO unclear how to add backpressure here?

const reader = wt.incomingBidirectionalStreams.getReader()
while (true) {
const { done, value: wtStream } = await reader.read()

if (done) {
break
}

if (activeStreams.length >= config.maxInboundStreams) {
// We've reached our limit, close this stream.
wtStream.writable.close().catch((err: Error) => {
self.log.error(`Failed to close inbound stream that crossed our maxInboundStream limit: ${err.message}`)
})
wtStream.readable.cancel().catch((err: Error) => {
self.log.error(`Failed to close inbound stream that crossed our maxInboundStream limit: ${err.message}`)
})
} else {
const stream = await webtransportBiDiStreamToStream(
wtStream,
String(streamIDCounter++),
'inbound',
activeStreams,
init?.onStreamEnd,
self.components.logger
)
activeStreams.push(stream)
init?.onIncomingStream?.(stream)
}
}
})().catch(() => {
this.log.error('WebTransport failed to receive incoming stream')
})

const muxer: StreamMuxer = {
protocol: 'webtransport',
streams: activeStreams,
newStream: async (name?: string): Promise<Stream> => {
const wtStream = await wt.createBidirectionalStream()

const stream = await webtransportBiDiStreamToStream(
wtStream,
String(streamIDCounter++),
init?.direction ?? 'outbound',
activeStreams,
init?.onStreamEnd,
self.components.logger
)
activeStreams.push(stream)

return stream
},

/**
* Close or abort all tracked streams and stop the muxer
*/
close: async (options?: AbortOptions) => {
this.log('Closing webtransport muxer')

await Promise.all(
activeStreams.map(async s => s.close(options))
)
},
abort: (err: Error) => {
this.log('Aborting webtransport muxer with err:', err)

for (const stream of activeStreams) {
stream.abort(err)
}
},
// This stream muxer is webtransport native. Therefore it doesn't plug in with any other duplex.
...inertDuplex()
}

return muxer
}
}
}

createListener (options: CreateListenerOptions): Listener {
throw new Error('Webtransport servers are not supported in Node or the browser')
return createListener(this.components, {
...options,
certificates: this.config.certificates,
maxInboundStreams: this.config.maxInboundStreams
})
}

/**
Expand Down
5 changes: 5 additions & 0 deletions packages/transport-webtransport/src/listener.browser.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import type { CreateListenerOptions, Listener } from '@libp2p/interface'

export default function createListener (options: CreateListenerOptions): Listener {
throw new Error('Not implemented')
}
19 changes: 19 additions & 0 deletions packages/transport-webtransport/src/listener.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import type { WebTransportCertificate } from './index.js'
import type { Connection, Upgrader, Listener, CreateListenerOptions, PeerId, ComponentLogger, Metrics } from '@libp2p/interface'

export interface WebTransportListenerComponents {
peerId: PeerId
logger: ComponentLogger
metrics?: Metrics
}

export interface WebTransportListenerInit extends CreateListenerOptions {
handler?(conn: Connection): void
upgrader: Upgrader
certificates?: WebTransportCertificate[]
maxInboundStreams?: number
}

export default function createListener (components: WebTransportListenerComponents, options: WebTransportListenerInit): Listener {
throw new Error('Only supported in browsers')
}
Loading