From 525c8c860511aa131555ee71ddbda17885a0edfb Mon Sep 17 00:00:00 2001 From: achingbrain Date: Tue, 30 Apr 2024 16:01:52 +0100 Subject: [PATCH 1/8] fix: update WebTransport implementation The PR pulls all of the non-`@fails/webtransport` parts out of There's a lot of work that's been done to re-use existing libp2p code such as the abstract stream class which handles a lot more closing scenarios than the existing implementation so it would be good to get that in. --- packages/transport-webtransport/.aegir.js | 14 +- packages/transport-webtransport/src/index.ts | 175 ++++-------- .../src/listener.browser.ts | 5 + .../transport-webtransport/src/listener.ts | 19 ++ packages/transport-webtransport/src/muxer.ts | 95 +++++++ packages/transport-webtransport/src/stream.ts | 254 +++++++----------- .../utils/generate-certificates.browser.ts | 3 + .../src/utils/generate-certificates.ts | 11 + .../src/webtransport.browser.ts | 1 + .../src/webtransport.ts | 17 ++ .../test/fixtures/random-bytes.ts | 12 + .../test/{browser.ts => webtransport.spec.ts} | 62 +++-- 12 files changed, 363 insertions(+), 305 deletions(-) create mode 100644 packages/transport-webtransport/src/listener.browser.ts create mode 100644 packages/transport-webtransport/src/listener.ts create mode 100644 packages/transport-webtransport/src/muxer.ts create mode 100644 packages/transport-webtransport/src/utils/generate-certificates.browser.ts create mode 100644 packages/transport-webtransport/src/utils/generate-certificates.ts create mode 100644 packages/transport-webtransport/src/webtransport.browser.ts create mode 100644 packages/transport-webtransport/src/webtransport.ts create mode 100644 packages/transport-webtransport/test/fixtures/random-bytes.ts rename packages/transport-webtransport/test/{browser.ts => webtransport.spec.ts} (74%) diff --git a/packages/transport-webtransport/.aegir.js b/packages/transport-webtransport/.aegir.js index b211f70e0a..5f7d52f3db 100644 --- a/packages/transport-webtransport/.aegir.js +++ b/packages/transport-webtransport/.aegir.js @@ -1,14 +1,18 @@ +/* eslint-disable no-console */ import { spawn, exec } from 'child_process' -import { existsSync } from 'fs' +import { existsSync } from 'node:fs' +import os from 'node:os' import defer from 'p-defer' /** @type {import('aegir/types').PartialOptions} */ export default { test: { - async before() { + async before () { + const main = os.platform() === 'win32' ? 'main.exe' : 'main' + if (!existsSync('./go-libp2p-webtransport-server/main')) { await new Promise((resolve, reject) => { - exec('go build -o main main.go', + exec(`go build -o ${main} main.go`, { cwd: './go-libp2p-webtransport-server' }, (error, stdout, stderr) => { if (error) { @@ -21,7 +25,7 @@ export default { }) } - const server = spawn('./main', [], { cwd: './go-libp2p-webtransport-server', killSignal: 'SIGINT' }) + const server = spawn(`./${main}`, [], { cwd: './go-libp2p-webtransport-server', killSignal: 'SIGINT' }) server.stderr.on('data', (data) => { console.log('stderr:', data.toString()) }) @@ -53,7 +57,7 @@ export default { } } }, - async after(_, { server }) { + async after (_, { server }) { server.kill('SIGINT') } }, diff --git a/packages/transport-webtransport/src/index.ts b/packages/transport-webtransport/src/index.ts index 65d414885a..591d217193 100644 --- a/packages/transport-webtransport/src/index.ts +++ b/packages/transport-webtransport/src/index.ts @@ -30,23 +30,38 @@ */ import { noise } from '@chainsafe/libp2p-noise' -import { type Transport, transportSymbol, type CreateListenerOptions, type DialOptions, type Listener, type ComponentLogger, type Logger, type Connection, type MultiaddrConnection, type Stream, type CounterGroup, type Metrics, type PeerId, type StreamMuxerFactory, type StreamMuxerInit, type StreamMuxer } from '@libp2p/interface' -import { type Multiaddr, type AbortOptions } from '@multiformats/multiaddr' +import { AbortError, CodeError, transportSymbol } from '@libp2p/interface' import { WebTransport as WebTransportMatcher } from '@multiformats/multiaddr-matcher' -import { webtransportBiDiStreamToStream } from './stream.js' +import { raceSignal } from 'race-signal' +import createListener from './listener.js' +import { webtransportMuxer } from './muxer.js' import { inertDuplex } from './utils/inert-duplex.js' import { isSubset } from './utils/is-subset.js' import { parseMultiaddr } from './utils/parse-multiaddr.js' +import WebTransport from './webtransport.js' +import type { Transport, CreateListenerOptions, DialOptions, Listener, ComponentLogger, Logger, Connection, MultiaddrConnection, CounterGroup, Metrics, PeerId } from '@libp2p/interface' +import type { Multiaddr } from '@multiformats/multiaddr' import type { Source } from 'it-stream-types' import type { MultihashDigest } from 'multiformats/hashes/interface' import type { Uint8ArrayList } from 'uint8arraylist' +/** + * PEM format server certificate and private key + */ +export interface WebTransportCertificate { + privateKey: string + pem: string + hash: MultihashDigest + secret: string +} + interface WebTransportSessionCleanup { (metric: string): void } export interface WebTransportInit { maxInboundStreams?: number + certificates?: WebTransportCertificate[] } export interface WebTransportComponents { @@ -69,7 +84,9 @@ class WebTransportTransport implements Transport { this.log = components.logger.forComponent('libp2p:webtransport') this.components = components this.config = { - maxInboundStreams: init.maxInboundStreams ?? 1000 + ...init, + maxInboundStreams: init.maxInboundStreams ?? 1000, + certificates: init.certificates ?? [] } if (components.metrics != null) { @@ -87,12 +104,14 @@ class WebTransportTransport implements Transport { readonly [transportSymbol] = true async dial (ma: Multiaddr, options: DialOptions): Promise { - options?.signal?.throwIfAborted() + if (options?.signal?.aborted === true) { + throw new AbortError() + } this.log('dialing %s', ma) const localPeer = this.components.peerId if (localPeer === undefined) { - throw new Error('Need a local peerid') + throw new CodeError('Need a local peerid', 'ERR_INVALID_PARAMETERS') } options = options ?? {} @@ -100,11 +119,11 @@ class WebTransportTransport implements Transport { const { url, certhashes, remotePeer } = parseMultiaddr(ma) if (remotePeer == null) { - throw new Error('Need a target peerid') + throw new CodeError('Need a target peerid', 'ERR_INVALID_PARAMETERS') } if (certhashes.length === 0) { - throw new Error('Expected multiaddr to contain certhashes') + throw new CodeError('Expected multiaddr to contain certhashes', 'ERR_INVALID_PARAMETERS') } let abortListener: (() => void) | undefined @@ -159,10 +178,12 @@ class WebTransportTransport implements Transport { once: true }) + this.log('wait for session to be ready') await Promise.race([ wt.closed, wt.ready ]) + this.log('session became ready') ready = true this.metrics?.dialerEvents.increment({ ready: true }) @@ -175,15 +196,19 @@ class WebTransportTransport implements Transport { cleanUpWTSession('remote_close') }) - if (!await this.authenticateWebTransport(wt, localPeer, remotePeer, certhashes)) { + if (!await raceSignal(this.authenticateWebTransport(wt, localPeer, remotePeer, certhashes), options.signal)) { throw new Error('Failed to authenticate webtransport') } + if (options?.signal?.aborted === true) { + throw new AbortError() + } + this.metrics?.dialerEvents.increment({ open: true }) maConn = { close: async () => { - this.log('Closing webtransport') + this.log('closing webtransport') cleanUpWTSession('close') }, abort: (err: Error) => { @@ -201,7 +226,11 @@ class WebTransportTransport implements Transport { authenticated = true - return await options.upgrader.upgradeOutbound(maConn, { skipEncryption: true, muxerFactory: this.webtransportMuxer(wt), skipProtection: true }) + return await options.upgrader.upgradeOutbound(maConn, { + skipEncryption: true, + muxerFactory: webtransportMuxer(wt, wt.incomingBidirectionalStreams.getReader(), this.components.logger, this.config), + skipProtection: true + }) } catch (err: any) { this.log.error('caught wt session err', err) @@ -221,11 +250,14 @@ class WebTransportTransport implements Transport { } } - async authenticateWebTransport (wt: InstanceType, localPeer: PeerId, remotePeer: PeerId, certhashes: Array>): Promise { + async authenticateWebTransport (wt: WebTransport, localPeer: PeerId, remotePeer: PeerId, certhashes: Array>, signal?: AbortSignal): Promise { + if (signal?.aborted === true) { + throw new AbortError() + } + const stream = await wt.createBidirectionalStream() const writer = stream.writable.getWriter() const reader = stream.readable.getReader() - await writer.ready const duplex = { source: (async function * () { @@ -241,13 +273,15 @@ class WebTransportTransport implements Transport { } } })(), - sink: async function (source: Source) { + sink: async (source: Source) => { for await (const chunk of source) { - if (chunk instanceof Uint8Array) { - await writer.write(chunk) - } else { - await writer.write(chunk.subarray()) - } + await raceSignal(writer.ready, signal) + + const buf = chunk instanceof Uint8Array ? chunk : chunk.subarray() + + writer.write(buf).catch(err => { + this.log.error('could not write chunk during authentication of WebTransport stream', err) + }) } } } @@ -273,105 +307,12 @@ class WebTransportTransport implements Transport { return true } - webtransportMuxer (wt: WebTransport): StreamMuxerFactory { - let streamIDCounter = 0 - const config = this.config - const self = this - return { - protocol: 'webtransport', - createStreamMuxer: (init?: StreamMuxerInit): StreamMuxer => { - // !TODO handle abort signal when WebTransport supports this. - - if (typeof init === 'function') { - // The api docs say that init may be a function - init = { onIncomingStream: init } - } - - const activeStreams: Stream[] = []; - - (async function () { - //! TODO unclear how to add backpressure here? - - const reader = wt.incomingBidirectionalStreams.getReader() - while (true) { - const { done, value: wtStream } = await reader.read() - - if (done) { - break - } - - if (activeStreams.length >= config.maxInboundStreams) { - // We've reached our limit, close this stream. - wtStream.writable.close().catch((err: Error) => { - self.log.error(`Failed to close inbound stream that crossed our maxInboundStream limit: ${err.message}`) - }) - wtStream.readable.cancel().catch((err: Error) => { - self.log.error(`Failed to close inbound stream that crossed our maxInboundStream limit: ${err.message}`) - }) - } else { - const stream = await webtransportBiDiStreamToStream( - wtStream, - String(streamIDCounter++), - 'inbound', - activeStreams, - init?.onStreamEnd, - self.components.logger - ) - activeStreams.push(stream) - init?.onIncomingStream?.(stream) - } - } - })().catch(() => { - this.log.error('WebTransport failed to receive incoming stream') - }) - - const muxer: StreamMuxer = { - protocol: 'webtransport', - streams: activeStreams, - newStream: async (name?: string): Promise => { - const wtStream = await wt.createBidirectionalStream() - - const stream = await webtransportBiDiStreamToStream( - wtStream, - String(streamIDCounter++), - init?.direction ?? 'outbound', - activeStreams, - init?.onStreamEnd, - self.components.logger - ) - activeStreams.push(stream) - - return stream - }, - - /** - * Close or abort all tracked streams and stop the muxer - */ - close: async (options?: AbortOptions) => { - this.log('Closing webtransport muxer') - - await Promise.all( - activeStreams.map(async s => s.close(options)) - ) - }, - abort: (err: Error) => { - this.log('Aborting webtransport muxer with err:', err) - - for (const stream of activeStreams) { - stream.abort(err) - } - }, - // This stream muxer is webtransport native. Therefore it doesn't plug in with any other duplex. - ...inertDuplex() - } - - return muxer - } - } - } - createListener (options: CreateListenerOptions): Listener { - throw new Error('Webtransport servers are not supported in Node or the browser') + return createListener(this.components, { + ...options, + certificates: this.config.certificates, + maxInboundStreams: this.config.maxInboundStreams + }) } /** diff --git a/packages/transport-webtransport/src/listener.browser.ts b/packages/transport-webtransport/src/listener.browser.ts new file mode 100644 index 0000000000..fc3851379e --- /dev/null +++ b/packages/transport-webtransport/src/listener.browser.ts @@ -0,0 +1,5 @@ +import type { CreateListenerOptions, Listener } from '@libp2p/interface' + +export default function createListener (options: CreateListenerOptions): Listener { + throw new Error('Not implemented') +} diff --git a/packages/transport-webtransport/src/listener.ts b/packages/transport-webtransport/src/listener.ts new file mode 100644 index 0000000000..a0e15ff49d --- /dev/null +++ b/packages/transport-webtransport/src/listener.ts @@ -0,0 +1,19 @@ +import type { WebTransportCertificate } from './index.js' +import type { Connection, Upgrader, Listener, CreateListenerOptions, PeerId, ComponentLogger, Metrics } from '@libp2p/interface' + +export interface WebTransportListenerComponents { + peerId: PeerId + logger: ComponentLogger + metrics?: Metrics +} + +export interface WebTransportListenerInit extends CreateListenerOptions { + handler?(conn: Connection): void + upgrader: Upgrader + certificates?: WebTransportCertificate[] + maxInboundStreams?: number +} + +export default function createListener (components: WebTransportListenerComponents, options: WebTransportListenerInit): Listener { + throw new Error('Only supported in browsers') +} diff --git a/packages/transport-webtransport/src/muxer.ts b/packages/transport-webtransport/src/muxer.ts new file mode 100644 index 0000000000..7547076074 --- /dev/null +++ b/packages/transport-webtransport/src/muxer.ts @@ -0,0 +1,95 @@ +import { webtransportBiDiStreamToStream } from './stream.js' +import { inertDuplex } from './utils/inert-duplex.js' +import type WebTransport from './webtransport.js' +import type { ComponentLogger, Stream, StreamMuxer, StreamMuxerFactory, StreamMuxerInit } from '@libp2p/interface' + +export interface WebTransportMuxerInit { + maxInboundStreams: number +} + +export function webtransportMuxer (wt: Pick, reader: ReadableStreamDefaultReader, logger: ComponentLogger, config: WebTransportMuxerInit): StreamMuxerFactory { + let streamIDCounter = 0 + const log = logger.forComponent('libp2p:webtransport:muxer') + + return { + protocol: 'webtransport', + createStreamMuxer: (init?: StreamMuxerInit): StreamMuxer => { + // !TODO handle abort signal when WebTransport supports this. + + if (typeof init === 'function') { + // The api docs say that init may be a function + init = { onIncomingStream: init } + } + + const activeStreams: Stream[] = [] + + void Promise.resolve().then(async () => { + //! TODO unclear how to add backpressure here? + while (true) { + const { done, value: wtStream } = await reader.read() + + if (done) { + break + } + + if (activeStreams.length >= config.maxInboundStreams) { + log('too many inbound streams open, closing new incoming stream') + // We've reached our limit, close this stream. + wtStream.writable.close().catch((err: Error) => { + log.error(`failed to close inbound stream that crossed our maxInboundStream limit: ${err.message}`) + }) + wtStream.readable.cancel().catch((err: Error) => { + log.error(`failed to close inbound stream that crossed our maxInboundStream limit: ${err.message}`) + }) + } else { + const stream = await webtransportBiDiStreamToStream( + wtStream, + String(streamIDCounter++), + 'inbound', + activeStreams, + init?.onStreamEnd, + logger + ) + activeStreams.push(stream) + init?.onIncomingStream?.(stream) + } + } + }) + + const muxer: StreamMuxer = { + protocol: 'webtransport', + streams: activeStreams, + newStream: async (name?: string): Promise => { + log('new outgoing stream', name) + + const wtStream = await wt.createBidirectionalStream() + const stream = await webtransportBiDiStreamToStream(wtStream, String(streamIDCounter++), init?.direction ?? 'outbound', activeStreams, init?.onStreamEnd, logger) + activeStreams.push(stream) + + return stream + }, + + /** + * Close all tracked streams and stop the muxer + */ + close: async () => { + log('closing webtransport muxer gracefully') + wt.close() + }, + + /** + * Abort all tracked streams and stop the muxer + */ + abort: (err: Error) => { + log('closing webtransport muxer with err:', err) + wt.close() + }, + + // This stream muxer is webtransport native. Therefore it doesn't plug in with any other duplex. + ...inertDuplex() + } + + return muxer + } + } +} diff --git a/packages/transport-webtransport/src/stream.ts b/packages/transport-webtransport/src/stream.ts index 43672408c9..4730dd60bd 100644 --- a/packages/transport-webtransport/src/stream.ts +++ b/packages/transport-webtransport/src/stream.ts @@ -1,184 +1,110 @@ +import { AbstractStream, type AbstractStreamInit } from '@libp2p/utils/abstract-stream' +import { raceSignal } from 'race-signal' import { Uint8ArrayList } from 'uint8arraylist' import type { AbortOptions, ComponentLogger, Direction, Stream } from '@libp2p/interface' -import type { Source } from 'it-stream-types' -export async function webtransportBiDiStreamToStream (bidiStream: WebTransportBidirectionalStream, streamId: string, direction: Direction, activeStreams: Stream[], onStreamEnd: undefined | ((s: Stream) => void), logger: ComponentLogger): Promise { - const log = logger.forComponent(`libp2p:webtransport:stream:${direction}:${streamId}`) - const writer = bidiStream.writable.getWriter() - const reader = bidiStream.readable.getReader() - await writer.ready - - function cleanupStreamFromActiveStreams (): void { - const index = activeStreams.findIndex(s => s === stream) - if (index !== -1) { - activeStreams.splice(index, 1) - stream.timeline.close = Date.now() - onStreamEnd?.(stream) - } - } +interface WebTransportStreamInit extends AbstractStreamInit { + bidiStream: WebTransportBidirectionalStream +} - let writerClosed = false - let readerClosed = false; - (async function () { - const err: Error | undefined = await writer.closed.catch((err: Error) => err) - if (err != null) { - const msg = err.message - if (!(msg.includes('aborted by the remote server') || msg.includes('STOP_SENDING'))) { - log.error(`WebTransport writer closed unexpectedly: streamId=${streamId} err=${err.message}`) - } - } - writerClosed = true - if (writerClosed && readerClosed) { - cleanupStreamFromActiveStreams() - } - })().catch(() => { - log.error('WebTransport failed to cleanup closed stream') - }); - - (async function () { - const err: Error | undefined = await reader.closed.catch((err: Error) => err) - if (err != null) { - log.error(`WebTransport reader closed unexpectedly: streamId=${streamId} err=${err.message}`) - } - readerClosed = true - if (writerClosed && readerClosed) { - cleanupStreamFromActiveStreams() - } - })().catch(() => { - log.error('WebTransport failed to cleanup closed stream') - }) +class WebTransportStream extends AbstractStream { + private readonly writer: WritableStreamDefaultWriter + private readonly reader: ReadableStreamDefaultReader - let sinkSunk = false - const stream: Stream = { - id: streamId, - status: 'open', - writeStatus: 'ready', - readStatus: 'ready', - abort (err: Error) { - if (!writerClosed) { - writer.abort(err) - .catch(err => { - log.error('could not abort stream', err) - }) - writerClosed = true - } - readerClosed = true - - this.status = 'aborted' - this.writeStatus = 'closed' - this.readStatus = 'closed' - - this.timeline.reset = - this.timeline.close = - this.timeline.closeRead = - this.timeline.closeWrite = Date.now() - - cleanupStreamFromActiveStreams() - }, - async close (options?: AbortOptions) { - this.status = 'closing' - - await Promise.all([ - stream.closeRead(options), - stream.closeWrite(options) - ]) - - cleanupStreamFromActiveStreams() - - this.status = 'closed' - this.timeline.close = Date.now() - }, - - async closeRead (options?: AbortOptions) { - if (!readerClosed) { - this.readStatus = 'closing' - - try { - await reader.cancel() - } catch (err: any) { - if (err.toString().includes('RESET_STREAM') === true) { - writerClosed = true - } - } + constructor (init: WebTransportStreamInit) { + super(init) - this.timeline.closeRead = Date.now() - this.readStatus = 'closed' + this.writer = init.bidiStream.writable.getWriter() + this.reader = init.bidiStream.readable.getReader() - readerClosed = true - } + Promise.resolve().then(async () => { + while (true) { + const result = await this.reader.read() - if (writerClosed) { - cleanupStreamFromActiveStreams() - } - }, + if (result.done) { + init.log('remote closed write') + return + } - async closeWrite (options?: AbortOptions) { - if (!writerClosed) { - writerClosed = true + if (result.value != null) { + this.sourcePush(new Uint8ArrayList(result.value)) + } + } + }) + .catch(err => { + init.log.error('error reading from stream', err) + this.abort(err) + }) + .finally(() => { + this.remoteCloseWrite() + }) + + void this.writer.closed + .then(() => { + init.log('writer closed') + }) + .catch((err) => { + init.log('writer close promise rejected', err) + }) + .finally(() => { + this.remoteCloseRead() + }) + } - this.writeStatus = 'closing' + sendNewStream (options?: AbortOptions | undefined): void { + // this is a no-op + } - try { - await writer.close() - } catch (err: any) { - if (err.toString().includes('RESET_STREAM') === true) { - readerClosed = true - } - } + async sendData (buf: Uint8ArrayList, options?: AbortOptions): Promise { + for await (const chunk of buf) { + this.log('sendData waiting for writer to be ready') + await raceSignal(this.writer.ready, options?.signal) + + // the streams spec recommends not waiting for data to be sent + // https://streams.spec.whatwg.org/#example-manual-write-dont-await + this.writer.write(chunk) + .catch(err => { + this.log.error('error sending stream data', err) + }) + } + } - this.timeline.closeWrite = Date.now() - this.writeStatus = 'closed' - } + async sendReset (options?: AbortOptions): Promise { + this.log('sendReset aborting writer') + await raceSignal(this.writer.abort(), options?.signal) + this.log('sendReset aborted writer') + } - if (readerClosed) { - cleanupStreamFromActiveStreams() - } - }, - direction, - timeline: { open: Date.now() }, - metadata: {}, - source: (async function * () { - while (true) { - const val = await reader.read() - if (val.done) { - readerClosed = true - if (writerClosed) { - cleanupStreamFromActiveStreams() - } - return - } + async sendCloseWrite (options?: AbortOptions): Promise { + this.log('sendCloseWrite closing writer') + await raceSignal(this.writer.close(), options?.signal) + this.log('sendCloseWrite closed writer') + } - yield new Uint8ArrayList(val.value) - } - })(), - sink: async function (source: Source) { - if (sinkSunk) { - throw new Error('sink already called on stream') - } - sinkSunk = true - try { - this.writeStatus = 'writing' - - for await (const chunks of source) { - if (chunks instanceof Uint8Array) { - await writer.write(chunks) - } else { - for (const buf of chunks) { - await writer.write(buf) - } - } - } + async sendCloseRead (options?: AbortOptions): Promise { + this.log('sendCloseRead cancelling reader') + await raceSignal(this.reader.cancel(), options?.signal) + this.log('sendCloseRead cancelled reader') + } +} - this.writeStatus = 'done' - } finally { - this.timeline.closeWrite = Date.now() - this.writeStatus = 'closed' +export async function webtransportBiDiStreamToStream (bidiStream: WebTransportBidirectionalStream, streamId: string, direction: Direction, activeStreams: Stream[], onStreamEnd: undefined | ((s: Stream) => void), logger: ComponentLogger): Promise { + const log = logger.forComponent(`libp2p:webtransport:stream:${direction}:${streamId}`) - await stream.closeWrite() + const stream = new WebTransportStream({ + bidiStream, + id: streamId, + direction, + log, + onEnd: () => { + const index = activeStreams.findIndex(s => s === stream) + if (index !== -1) { + activeStreams.splice(index, 1) } - }, - log - } + + onStreamEnd?.(stream) + } + }) return stream } diff --git a/packages/transport-webtransport/src/utils/generate-certificates.browser.ts b/packages/transport-webtransport/src/utils/generate-certificates.browser.ts new file mode 100644 index 0000000000..ba2c23f0a5 --- /dev/null +++ b/packages/transport-webtransport/src/utils/generate-certificates.browser.ts @@ -0,0 +1,3 @@ +export async function generateWebTransportCertificates (): Promise { + throw new Error('Not implemented') +} diff --git a/packages/transport-webtransport/src/utils/generate-certificates.ts b/packages/transport-webtransport/src/utils/generate-certificates.ts new file mode 100644 index 0000000000..57d3d2692a --- /dev/null +++ b/packages/transport-webtransport/src/utils/generate-certificates.ts @@ -0,0 +1,11 @@ +import type { WebTransportCertificate } from '../../src/index.js' + +export interface GenerateWebTransportCertificateOptions { + days: number + start?: Date + extensions?: any[] +} + +export async function generateWebTransportCertificates (options: GenerateWebTransportCertificateOptions[] = []): Promise { + throw new Error('Not implemented') +} diff --git a/packages/transport-webtransport/src/webtransport.browser.ts b/packages/transport-webtransport/src/webtransport.browser.ts new file mode 100644 index 0000000000..349cb18505 --- /dev/null +++ b/packages/transport-webtransport/src/webtransport.browser.ts @@ -0,0 +1 @@ +export default WebTransport diff --git a/packages/transport-webtransport/src/webtransport.ts b/packages/transport-webtransport/src/webtransport.ts new file mode 100644 index 0000000000..af19fed0e0 --- /dev/null +++ b/packages/transport-webtransport/src/webtransport.ts @@ -0,0 +1,17 @@ +export default class WebTransport { + constructor (url: string | URL, options?: WebTransportOptions) { + throw new Error('Only supported in browsers') + } + + close (): void { + throw new Error('Only supported in browsers') + } + + async createBidirectionalStream (): Promise { + throw new Error('Only supported in browsers') + } + + public closed = Promise.reject(new Error('Only supported in browsers')) + public ready = Promise.reject(new Error('Only supported in browsers')) + public incomingBidirectionalStreams: ReadableStream +} diff --git a/packages/transport-webtransport/test/fixtures/random-bytes.ts b/packages/transport-webtransport/test/fixtures/random-bytes.ts new file mode 100644 index 0000000000..c34a09a5b4 --- /dev/null +++ b/packages/transport-webtransport/test/fixtures/random-bytes.ts @@ -0,0 +1,12 @@ +import { CodeError } from '@libp2p/interface' +import { randomBytes as randB } from '@noble/hashes/utils' + +/** + * Generates a Uint8Array with length `number` populated by random bytes + */ +export function randomBytes (length: number): Uint8Array { + if (isNaN(length) || length <= 0) { + throw new CodeError('random bytes length must be a Number bigger than 0', 'ERR_INVALID_LENGTH') + } + return randB(length) +} diff --git a/packages/transport-webtransport/test/browser.ts b/packages/transport-webtransport/test/webtransport.spec.ts similarity index 74% rename from packages/transport-webtransport/test/browser.ts rename to packages/transport-webtransport/test/webtransport.spec.ts index c4a54f6ad5..5f4ab6a758 100644 --- a/packages/transport-webtransport/test/browser.ts +++ b/packages/transport-webtransport/test/webtransport.spec.ts @@ -4,8 +4,12 @@ import { noise } from '@chainsafe/libp2p-noise' import { multiaddr } from '@multiformats/multiaddr' import { expect } from 'aegir/chai' +import map from 'it-map' +import toBuffer from 'it-to-buffer' import { createLibp2p, type Libp2p } from 'libp2p' +import pWaitFor from 'p-wait-for' import { webTransport } from '../src/index.js' +import { randomBytes } from './fixtures/random-bytes.js' describe('libp2p-webtransport', () => { let node: Libp2p @@ -16,12 +20,16 @@ describe('libp2p-webtransport', () => { connectionEncryption: [noise()], connectionGater: { denyDialMultiaddr: async () => false + }, + connectionManager: { + minConnections: 0 } }) }) afterEach(async () => { if (node != null) { + console.info('stop node') await node.stop() const conns = node.getConnections() @@ -47,8 +55,7 @@ describe('libp2p-webtransport', () => { // we can use the builtin ping system const stream = await node.dialProtocol(ma, '/ipfs/ping/1.0.0') - const data = new Uint8Array(32) - globalThis.crypto.getRandomValues(data) + const data = randomBytes(32) const pong = new Promise((resolve, reject) => { (async () => { @@ -109,7 +116,7 @@ describe('libp2p-webtransport', () => { expect(err.toString()).to.contain('aborted') }) - it('connects to ipv6 addresses', async function () { + it.skip('connects to ipv6 addresses', async function () { if (process.env.disableIp6 === 'true') { return this.skip() } @@ -133,29 +140,46 @@ describe('libp2p-webtransport', () => { const maStr: string = process.env.serverAddr const ma = multiaddr(maStr) - async function * gen (): AsyncGenerator { - yield new Uint8Array([0]) - yield new Uint8Array([1, 2, 3, 4]) - yield new Uint8Array([5, 6, 7]) - yield new Uint8Array([8, 9, 10, 11]) - yield new Uint8Array([12, 13, 14, 15]) + const data = [ + Uint8Array.from([0]), + Uint8Array.from([1, 2, 3, 4]), + Uint8Array.from([5, 6, 7]), + Uint8Array.from([8, 9, 10, 11]), + Uint8Array.from([12, 13, 14, 15]) + ] + + async function * gen (): AsyncGenerator { + yield * data } const stream = await node.dialProtocol(ma, 'echo') - await stream.sink(gen()) + expect(stream.timeline.closeWrite).to.be.undefined() + expect(stream.timeline.closeRead).to.be.undefined() + expect(stream.timeline.close).to.be.undefined() + + // send and receive data + const [, output] = await Promise.all([ + stream.sink(gen()), + toBuffer(map(stream.source, buf => buf.subarray())) + ]) + + // closing takes a little bit of time + await pWaitFor(() => { + return stream.writeStatus === 'closed' + }, { + interval: 100 + }) - let expectedNextNumber = 0 - for await (const chunk of stream.source) { - for (const byte of chunk.subarray()) { - expect(byte).to.equal(expectedNextNumber++) - } - } - expect(expectedNextNumber).to.equal(16) + expect(stream.writeStatus).to.equal('closed') + expect(stream.timeline.closeWrite).to.be.greaterThan(0) - // Close read, we've should have closed the write side during sink - await stream.closeRead() + // should have read all of the bytes + expect(output).to.equalBytes(toBuffer(data)) + // should have set timeline events + expect(stream.timeline.closeWrite).to.be.greaterThan(0) + expect(stream.timeline.closeRead).to.be.greaterThan(0) expect(stream.timeline.close).to.be.greaterThan(0) }) }) From cab7ec6f6da2be3a38613fd649f91589c1e20776 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Tue, 30 Apr 2024 16:05:51 +0100 Subject: [PATCH 2/8] chore: fix deps --- packages/transport-webtransport/package.json | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/packages/transport-webtransport/package.json b/packages/transport-webtransport/package.json index 7fa64c517c..8d6c25065a 100644 --- a/packages/transport-webtransport/package.json +++ b/packages/transport-webtransport/package.json @@ -53,19 +53,25 @@ "@chainsafe/libp2p-noise": "^15.0.0", "@libp2p/interface": "^1.3.0", "@libp2p/peer-id": "^4.1.0", + "@libp2p/utils": "^5.3.2", "@multiformats/multiaddr": "^12.2.1", "@multiformats/multiaddr-matcher": "^1.2.0", "it-stream-types": "^2.0.1", "multiformats": "^13.1.0", + "race-signal": "^1.0.2", "uint8arraylist": "^2.4.8", "uint8arrays": "^5.0.3" }, "devDependencies": { "@libp2p/logger": "^4.0.11", "@libp2p/peer-id-factory": "^4.1.0", + "@noble/hashes": "^1.4.0", "aegir": "^42.2.5", + "it-map": "^3.1.0", + "it-to-buffer": "^4.0.7", "libp2p": "^1.4.3", - "p-defer": "^4.0.1" + "p-defer": "^4.0.1", + "p-wait-for": "^5.0.2" }, "browser": { "./dist/src/listener.js": "./dist/src/listener.browser.js" From b3fff8c176c61aaa8d19896f32e9fb12229226b6 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Tue, 30 Apr 2024 16:19:46 +0100 Subject: [PATCH 3/8] chore: rename file back --- .../test/{webtransport.spec.ts => browser.ts} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename packages/transport-webtransport/test/{webtransport.spec.ts => browser.ts} (100%) diff --git a/packages/transport-webtransport/test/webtransport.spec.ts b/packages/transport-webtransport/test/browser.ts similarity index 100% rename from packages/transport-webtransport/test/webtransport.spec.ts rename to packages/transport-webtransport/test/browser.ts From 5c1f6753c2fa968faa3da01326aa3422b1b02743 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Tue, 30 Apr 2024 16:20:47 +0100 Subject: [PATCH 4/8] chore: unskip test --- packages/transport-webtransport/test/browser.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/transport-webtransport/test/browser.ts b/packages/transport-webtransport/test/browser.ts index 5f4ab6a758..8b5fa6bc69 100644 --- a/packages/transport-webtransport/test/browser.ts +++ b/packages/transport-webtransport/test/browser.ts @@ -116,7 +116,7 @@ describe('libp2p-webtransport', () => { expect(err.toString()).to.contain('aborted') }) - it.skip('connects to ipv6 addresses', async function () { + it('connects to ipv6 addresses', async function () { if (process.env.disableIp6 === 'true') { return this.skip() } From b513a5cef0b529f7c6d4f31f58d41b359c42a8a7 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Tue, 30 Apr 2024 17:10:29 +0100 Subject: [PATCH 5/8] chore: fix tests --- packages/transport-webtransport/package.json | 7 +++++-- packages/transport-webtransport/test/browser.ts | 2 +- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/packages/transport-webtransport/package.json b/packages/transport-webtransport/package.json index 8d6c25065a..d1a53a9545 100644 --- a/packages/transport-webtransport/package.json +++ b/packages/transport-webtransport/package.json @@ -74,10 +74,13 @@ "p-wait-for": "^5.0.2" }, "browser": { - "./dist/src/listener.js": "./dist/src/listener.browser.js" + "./dist/src/listener.js": "./dist/src/listener.browser.js", + "./dist/src/webtransport.js": "./dist/src/webtransport.browser.js" }, "react-native": { - "./dist/src/listener.js": "./dist/src/listener.browser.js" + "./dist/src/listener.js": "./dist/src/listener.browser.js", + "./dist/src/webtransport.js": "./dist/src/webtransport.browser.js", + "./dist/src/utils/generate-certificates.js": "./dist/src/utils/generate-certificates.browser.js" }, "sideEffects": false } diff --git a/packages/transport-webtransport/test/browser.ts b/packages/transport-webtransport/test/browser.ts index 8b5fa6bc69..0c714e437c 100644 --- a/packages/transport-webtransport/test/browser.ts +++ b/packages/transport-webtransport/test/browser.ts @@ -96,7 +96,7 @@ describe('libp2p-webtransport', () => { const ma = multiaddr(maStrNoCerthash + '/p2p/' + maStrP2p) await expect(node.dial(ma)).to.eventually.be.rejected() - .with.property('code', 'ERR_NO_VALID_ADDRESSES') + .with.property('code', 'ERR_INVALID_PARAMETERS') }) it('fails to connect due to an aborted signal', async () => { From 5d9b59b96648da28618b2dac3ab1fd3b48fe1ea1 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Tue, 30 Apr 2024 18:12:56 +0100 Subject: [PATCH 6/8] chore: pr comments --- packages/transport-webtransport/src/index.ts | 8 +++----- packages/transport-webtransport/src/muxer.ts | 2 +- packages/transport-webtransport/test/browser.ts | 2 +- 3 files changed, 5 insertions(+), 7 deletions(-) diff --git a/packages/transport-webtransport/src/index.ts b/packages/transport-webtransport/src/index.ts index 344cfa9973..e9ef02f984 100644 --- a/packages/transport-webtransport/src/index.ts +++ b/packages/transport-webtransport/src/index.ts @@ -196,12 +196,10 @@ class WebTransportTransport implements Transport { cleanUpWTSession('remote_close') }) - if (!await raceSignal(this.authenticateWebTransport(wt, localPeer, remotePeer, certhashes), options.signal)) { - throw new Error('Failed to authenticate webtransport') - } + const authenticated = await raceSignal(this.authenticateWebTransport(wt, localPeer, remotePeer, certhashes), options.signal) - if (options?.signal?.aborted === true) { - throw new AbortError() + if (!authenticated) { + throw new CodeError('Failed to authenticate webtransport', 'ERR_AUTHENTICATION_FAILED') } this.metrics?.dialerEvents.increment({ open: true }) diff --git a/packages/transport-webtransport/src/muxer.ts b/packages/transport-webtransport/src/muxer.ts index 7547076074..fa5c1e0492 100644 --- a/packages/transport-webtransport/src/muxer.ts +++ b/packages/transport-webtransport/src/muxer.ts @@ -33,7 +33,7 @@ export function webtransportMuxer (wt: Pick= config.maxInboundStreams) { - log('too many inbound streams open, closing new incoming stream') + log(`too many inbound streams open - ${activeStreams.length}/${config.maxInboundStreams}, closing new incoming stream`) // We've reached our limit, close this stream. wtStream.writable.close().catch((err: Error) => { log.error(`failed to close inbound stream that crossed our maxInboundStream limit: ${err.message}`) diff --git a/packages/transport-webtransport/test/browser.ts b/packages/transport-webtransport/test/browser.ts index 0c714e437c..8b5fa6bc69 100644 --- a/packages/transport-webtransport/test/browser.ts +++ b/packages/transport-webtransport/test/browser.ts @@ -96,7 +96,7 @@ describe('libp2p-webtransport', () => { const ma = multiaddr(maStrNoCerthash + '/p2p/' + maStrP2p) await expect(node.dial(ma)).to.eventually.be.rejected() - .with.property('code', 'ERR_INVALID_PARAMETERS') + .with.property('code', 'ERR_NO_VALID_ADDRESSES') }) it('fails to connect due to an aborted signal', async () => { From 139c837af00eb09e42962fd40bcbd1437ce47d7b Mon Sep 17 00:00:00 2001 From: achingbrain Date: Tue, 30 Apr 2024 18:18:53 +0100 Subject: [PATCH 7/8] chore: fix build --- packages/transport-webtransport/src/index.ts | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/packages/transport-webtransport/src/index.ts b/packages/transport-webtransport/src/index.ts index e9ef02f984..24f7c7a8d6 100644 --- a/packages/transport-webtransport/src/index.ts +++ b/packages/transport-webtransport/src/index.ts @@ -196,7 +196,7 @@ class WebTransportTransport implements Transport { cleanUpWTSession('remote_close') }) - const authenticated = await raceSignal(this.authenticateWebTransport(wt, localPeer, remotePeer, certhashes), options.signal) + authenticated = await raceSignal(this.authenticateWebTransport(wt, localPeer, remotePeer, certhashes), options.signal) if (!authenticated) { throw new CodeError('Failed to authenticate webtransport', 'ERR_AUTHENTICATION_FAILED') @@ -222,8 +222,6 @@ class WebTransportTransport implements Transport { ...inertDuplex() } - authenticated = true - return await options.upgrader.upgradeOutbound(maConn, { skipEncryption: true, muxerFactory: webtransportMuxer(wt, wt.incomingBidirectionalStreams.getReader(), this.components.logger, this.config), From 7098875bdbe24e76a89a3c51697a861524b47fe9 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Tue, 30 Apr 2024 18:19:24 +0100 Subject: [PATCH 8/8] chore: remove console --- packages/transport-webtransport/test/browser.ts | 2 -- 1 file changed, 2 deletions(-) diff --git a/packages/transport-webtransport/test/browser.ts b/packages/transport-webtransport/test/browser.ts index 8b5fa6bc69..ae8b3d3a42 100644 --- a/packages/transport-webtransport/test/browser.ts +++ b/packages/transport-webtransport/test/browser.ts @@ -1,4 +1,3 @@ -/* eslint-disable no-console */ /* eslint-env mocha */ import { noise } from '@chainsafe/libp2p-noise' @@ -29,7 +28,6 @@ describe('libp2p-webtransport', () => { afterEach(async () => { if (node != null) { - console.info('stop node') await node.stop() const conns = node.getConnections()