diff --git a/doc/package.json b/doc/package.json index 5bbaf8c8d5..0251b86486 100644 --- a/doc/package.json +++ b/doc/package.json @@ -17,7 +17,7 @@ "doc-check": "aegir doc-check" }, "devDependencies": { - "aegir": "^47.0.21" + "aegir": "^47.0.22" }, "private": true } diff --git a/interop/Dockerfile b/interop/Dockerfile index e75df3fcec..cd67344482 100644 --- a/interop/Dockerfile +++ b/interop/Dockerfile @@ -1,5 +1,5 @@ # install node and browsers -FROM mcr.microsoft.com/playwright:v1.50.1 +FROM mcr.microsoft.com/playwright:v1.55.0 WORKDIR /app diff --git a/interop/package.json b/interop/package.json index 355eb7bb62..658e9c7b39 100644 --- a/interop/package.json +++ b/interop/package.json @@ -24,7 +24,7 @@ "@libp2p/websockets": "^9.2.19", "@libp2p/webtransport": "^5.0.51", "@multiformats/multiaddr": "^13.0.1", - "aegir": "^47.0.21", + "aegir": "^47.0.22", "libp2p": "^2.10.0", "p-event": "^6.0.1", "redis": "^4.7.1" diff --git a/interop/test/fixtures/get-libp2p.ts b/interop/test/fixtures/get-libp2p.ts index db0c47fb31..56c92b5e6e 100644 --- a/interop/test/fixtures/get-libp2p.ts +++ b/interop/test/fixtures/get-libp2p.ts @@ -15,7 +15,7 @@ import { yamux } from '@libp2p/yamux' import { createLibp2p } from 'libp2p' import type { Identify } from '@libp2p/identify' import type { Libp2p } from '@libp2p/interface' -import type { PingService } from '@libp2p/ping' +import type { Ping } from '@libp2p/ping' import type { Libp2pOptions } from 'libp2p' const isDialer: boolean = process.env.is_dialer === 'true' @@ -26,8 +26,8 @@ const SECURE_CHANNEL = process.env.security const MUXER = process.env.muxer const IP = process.env.ip ?? '0.0.0.0' -export async function getLibp2p (): Promise> { - const options: Libp2pOptions<{ ping: PingService, identify: Identify }> = { +export async function getLibp2p (): Promise> { + const options: Libp2pOptions<{ ping: Ping, identify: Identify }> = { start: true, connectionGater: { denyDialMultiaddr: async () => false diff --git a/package.json b/package.json index 63f09d20e9..cdb3e92279 100644 --- a/package.json +++ b/package.json @@ -39,12 +39,15 @@ "docs:no-publish": "aegir docs --publish false -- --exclude interop --exclude doc" }, "devDependencies": { - "aegir": "^47.0.6", + "aegir": "^47.0.22", "npm-run-all": "^4.1.5" }, "workspaces": [ "doc", "interop", "packages/*" - ] + ], + "overrides": { + "playwright-core": "next" + } } diff --git a/packages/config/package.json b/packages/config/package.json index 902c27217a..b0af093e62 100644 --- a/packages/config/package.json +++ b/packages/config/package.json @@ -46,7 +46,7 @@ "interface-datastore": "^8.3.2" }, "devDependencies": { - "aegir": "^47.0.21", + "aegir": "^47.0.22", "datastore-core": "^10.0.4" }, "sideEffects": false diff --git a/packages/connection-encrypter-noise/package.json b/packages/connection-encrypter-noise/package.json index 0365ae699e..e4262bf813 100644 --- a/packages/connection-encrypter-noise/package.json +++ b/packages/connection-encrypter-noise/package.json @@ -76,7 +76,7 @@ "@libp2p/yamux": "^7.0.1", "@multiformats/multiaddr": "^13.0.1", "@types/sinon": "^17.0.4", - "aegir": "^47.0.21", + "aegir": "^47.0.22", "execa": "^9.6.0", "go-libp2p": "^1.6.0", "iso-random-stream": "^2.0.2", diff --git a/packages/connection-encrypter-plaintext/package.json b/packages/connection-encrypter-plaintext/package.json index adf19bfcdd..05a12b962b 100644 --- a/packages/connection-encrypter-plaintext/package.json +++ b/packages/connection-encrypter-plaintext/package.json @@ -56,7 +56,7 @@ "devDependencies": { "@libp2p/crypto": "^5.1.8", "@libp2p/logger": "^5.2.0", - "aegir": "^47.0.21", + "aegir": "^47.0.22", "protons": "^7.7.0", "sinon": "^21.0.0" }, diff --git a/packages/connection-encrypter-tls/package.json b/packages/connection-encrypter-tls/package.json index 146d650dfd..a59ecc4609 100644 --- a/packages/connection-encrypter-tls/package.json +++ b/packages/connection-encrypter-tls/package.json @@ -57,7 +57,7 @@ }, "devDependencies": { "@libp2p/logger": "^5.2.0", - "aegir": "^47.0.21", + "aegir": "^47.0.22", "protons": "^7.7.0", "sinon": "^21.0.0", "sinon-ts": "^2.0.0" diff --git a/packages/crypto/package.json b/packages/crypto/package.json index 9deecef718..dc15264a67 100644 --- a/packages/crypto/package.json +++ b/packages/crypto/package.json @@ -95,7 +95,7 @@ }, "devDependencies": { "@types/mocha": "^10.0.10", - "aegir": "^47.0.21", + "aegir": "^47.0.22", "asn1js": "^3.0.6", "benchmark": "^2.1.4", "protons": "^7.7.0" diff --git a/packages/integration-tests/.aegir.js b/packages/integration-tests/.aegir.js index 669591b04c..4e33586886 100644 --- a/packages/integration-tests/.aegir.js +++ b/packages/integration-tests/.aegir.js @@ -155,6 +155,9 @@ export default { await before.libp2p?.stop() await before.goLibp2pRelay?.proc.kill() await before.libp2pLimitedRelay?.stop() + + // node-datachannel sometimes causes the process to hang + process.exit(0) } } } diff --git a/packages/integration-tests/package.json b/packages/integration-tests/package.json index e0c9098534..b35da776d5 100644 --- a/packages/integration-tests/package.json +++ b/packages/integration-tests/package.json @@ -9,8 +9,10 @@ "test": "aegir test -- --exit", "clean": "aegir clean", "lint": "aegir lint", - "test:chrome": "aegir test -t browser --cov", + "test:chrome": "aegir test -t browser --cov -- --exit", "test:chrome-webworker": "aegir test -t webworker", + "test:firefox": "aegir test -t browser -- --browser firefox", + "test:firefox-webworker": "aegir test -t webworker -- --browser firefox", "test:node": "aegir test -t node -f dist/test/node.js -f dist/test/**/*.spec.js --cov -- --exit", "test:interop": "aegir test -t node -f dist/test/interop.js", "dep-check": "aegir dep-check" @@ -51,7 +53,7 @@ "@multiformats/dns": "^1.0.6", "@multiformats/multiaddr": "^13.0.1", "@multiformats/multiaddr-matcher": "^3.0.1", - "aegir": "^47.0.21", + "aegir": "^47.0.22", "delay": "^6.0.0", "detect-browser": "^5.3.0", "execa": "^9.6.0", diff --git a/packages/integration-tests/test/bootstrap.spec.ts b/packages/integration-tests/test/bootstrap.spec.ts index 50a36a08be..01669427fb 100644 --- a/packages/integration-tests/test/bootstrap.spec.ts +++ b/packages/integration-tests/test/bootstrap.spec.ts @@ -31,11 +31,8 @@ describe('bootstrap', () => { let libp2p: Libp2p beforeEach(async () => { - [remotePeerId1, remotePeerId2] = await Promise.all([ - peerIdFromPrivateKey(await generateKeyPair('Ed25519')), - peerIdFromPrivateKey(await generateKeyPair('Ed25519')), - peerIdFromPrivateKey(await generateKeyPair('Ed25519')) - ]) + remotePeerId1 = peerIdFromPrivateKey(await generateKeyPair('Ed25519')) + remotePeerId2 = peerIdFromPrivateKey(await generateKeyPair('Ed25519')) }) afterEach(async () => { diff --git a/packages/integration-tests/test/dht.node.ts b/packages/integration-tests/test/dht.node.ts index 52a5cd666b..54a7d78495 100644 --- a/packages/integration-tests/test/dht.node.ts +++ b/packages/integration-tests/test/dht.node.ts @@ -1,6 +1,7 @@ /* eslint-env mocha */ import { identify } from '@libp2p/identify' +import { start, stop } from '@libp2p/interface' import { kadDHT, passthroughMapper } from '@libp2p/kad-dht' import { mplex } from '@libp2p/mplex' import { ping } from '@libp2p/ping' @@ -97,10 +98,10 @@ describe('DHT subsystem operates correctly', () => { } }) - await Promise.all([ - libp2p.start(), - remoteLibp2p.start() - ]) + await start( + libp2p, + remoteLibp2p + ) await libp2p.peerStore.patch(remoteLibp2p.peerId, { multiaddrs: [remoteListenAddr] @@ -109,13 +110,10 @@ describe('DHT subsystem operates correctly', () => { }) afterEach(async () => { - if (libp2p != null) { - await libp2p.stop() - } - - if (remoteLibp2p != null) { - await remoteLibp2p.stop() - } + await stop( + libp2p, + remoteLibp2p + ) }) it('should get notified of connected peers on dial', async () => { @@ -186,11 +184,11 @@ describe('DHT subsystem operates correctly', () => { } }) - await Promise.all([ - libp2p.start(), - remoteLibp2p1.start(), - remoteLibp2p2.start() - ]) + await start( + libp2p, + remoteLibp2p1, + remoteLibp2p2 + ) await libp2p.peerStore.patch(remoteLibp2p1.peerId, { multiaddrs: remoteLibp2p1.getMultiaddrs() @@ -208,9 +206,9 @@ describe('DHT subsystem operates correctly', () => { ]) await deferred.promise - return Promise.all([ - remoteLibp2p1.stop(), - remoteLibp2p2.stop() - ]) + return stop( + remoteLibp2p1, + remoteLibp2p2 + ) }) }) diff --git a/packages/integration-tests/test/events.spec.ts b/packages/integration-tests/test/events.spec.ts index b0f2b82ea4..aa835fcd16 100644 --- a/packages/integration-tests/test/events.spec.ts +++ b/packages/integration-tests/test/events.spec.ts @@ -69,10 +69,10 @@ describe('events', () => { await dialer.dial(listener.getMultiaddrs()) // Verify onConnection is called with the connection - const connections = await Promise.all([ + const connections = [ ...dialer.getConnections(listener.peerId), ...listener.getConnections(dialer.peerId) - ]) + ] expect(connections).to.have.lengthOf(2) await Promise.all([ diff --git a/packages/integration-tests/test/mdns.node.ts b/packages/integration-tests/test/mdns.node.ts index d4adfe07a9..50f2dda2ae 100644 --- a/packages/integration-tests/test/mdns.node.ts +++ b/packages/integration-tests/test/mdns.node.ts @@ -1,6 +1,7 @@ /* eslint-env mocha */ import { randomBytes } from '@libp2p/crypto' +import { start, stop } from '@libp2p/interface' import { mdns } from '@libp2p/mdns' import { tcp } from '@libp2p/tcp' import { multiaddr } from '@multiformats/multiaddr' @@ -70,15 +71,17 @@ describe('mdns', () => { } }) - await Promise.all([ - remoteLibp2p1.start(), - remoteLibp2p2.start(), - libp2p.start() - ]) + await start( + remoteLibp2p1, + remoteLibp2p2, + libp2p + ) await deferred.promise - await remoteLibp2p1.stop() - await remoteLibp2p2.stop() + await stop( + remoteLibp2p1, + remoteLibp2p2 + ) }) }) diff --git a/packages/interface-compliance-tests/package.json b/packages/interface-compliance-tests/package.json index e5fab116b8..68e557b5aa 100644 --- a/packages/interface-compliance-tests/package.json +++ b/packages/interface-compliance-tests/package.json @@ -92,7 +92,7 @@ "@libp2p/utils": "^6.7.2", "@multiformats/multiaddr": "^13.0.1", "@multiformats/multiaddr-matcher": "^3.0.1", - "aegir": "^47.0.21", + "aegir": "^47.0.22", "delay": "^6.0.0", "it-all": "^3.0.9", "it-drain": "^3.0.10", diff --git a/packages/interface-compliance-tests/src/stream-muxer/close-test.ts b/packages/interface-compliance-tests/src/stream-muxer/close-test.ts index d4a434b890..52bab95bd5 100644 --- a/packages/interface-compliance-tests/src/stream-muxer/close-test.ts +++ b/packages/interface-compliance-tests/src/stream-muxer/close-test.ts @@ -368,6 +368,7 @@ export default (common: TestSetup): void => { await Promise.all([ pEvent(listenerStream, 'close'), + // eslint-disable-next-line @typescript-eslint/await-thenable dialerStream.abort(new Error('Urk!')) ]) diff --git a/packages/interface-compliance-tests/src/stream-muxer/stream-test.ts b/packages/interface-compliance-tests/src/stream-muxer/stream-test.ts index f53e98691c..96735cfedb 100644 --- a/packages/interface-compliance-tests/src/stream-muxer/stream-test.ts +++ b/packages/interface-compliance-tests/src/stream-muxer/stream-test.ts @@ -259,6 +259,7 @@ export default (common: TestSetup): void => { await Promise.all([ pEvent(outboundStream, 'close'), + // eslint-disable-next-line @typescript-eslint/await-thenable inboundStream.abort(new Error('Urk!')) ]) diff --git a/packages/interface-compliance-tests/src/transport/index.ts b/packages/interface-compliance-tests/src/transport/index.ts index d687706ce3..00c3b260e4 100644 --- a/packages/interface-compliance-tests/src/transport/index.ts +++ b/packages/interface-compliance-tests/src/transport/index.ts @@ -250,8 +250,9 @@ export default (common: TestSetup): void => { expect(connection).to.have.property('streams').that.has.lengthOf(5) if (remoteConn != null) { - await pWaitFor(() => remoteConn.streams.length === 5, { - timeout: 5000 + await pWaitFor(() => remoteConn.streams.filter(s => s.protocol === '/echo/1.0.0').length === 5, { + timeout: 5_000, + interval: 1_000 }) } @@ -360,7 +361,7 @@ export default (common: TestSetup): void => { const connection = await dialer.dial(dialAddrs[0]) const echoProtocol = dialer.services.echo.protocol - for (let i = 0; i < 2000; i++) { + for (let i = 0; i < 2_000; i++) { const input = new Uint8Array(1024).fill(5) const output = await dialer.services.echo.echo(connection.remotePeer, input, { signal: AbortSignal.timeout(timeout) diff --git a/packages/interface-compliance-tests/src/transport/utils.ts b/packages/interface-compliance-tests/src/transport/utils.ts index 4f517346b7..dfa2fad969 100644 --- a/packages/interface-compliance-tests/src/transport/utils.ts +++ b/packages/interface-compliance-tests/src/transport/utils.ts @@ -28,7 +28,8 @@ export async function createPeer (config: Partial = {}): Promise< services: { ...config.services, echo: echo({ - maxInboundStreams: 5_000 + maxInboundStreams: 5_000, + timeout: 180_000 }) } }) diff --git a/packages/interface-internal/package.json b/packages/interface-internal/package.json index 902d215550..4e3232df96 100644 --- a/packages/interface-internal/package.json +++ b/packages/interface-internal/package.json @@ -47,7 +47,7 @@ "progress-events": "^1.0.1" }, "devDependencies": { - "aegir": "^47.0.21" + "aegir": "^47.0.22" }, "sideEffects": false } diff --git a/packages/interface/package.json b/packages/interface/package.json index 3a3baa4cc2..cd2f3f669f 100644 --- a/packages/interface/package.json +++ b/packages/interface/package.json @@ -50,7 +50,7 @@ "uint8arraylist": "^2.4.8" }, "devDependencies": { - "aegir": "^47.0.21" + "aegir": "^47.0.22" }, "sideEffects": false } diff --git a/packages/interface/src/stream-muxer.ts b/packages/interface/src/stream-muxer.ts index 280b4b407c..7bd3f96b67 100644 --- a/packages/interface/src/stream-muxer.ts +++ b/packages/interface/src/stream-muxer.ts @@ -142,7 +142,7 @@ export interface StreamMuxer extends TypedE /** * Create a new stream */ - createStream(options?: CreateStreamOptions): MuxedStream | Promise + createStream(options?: CreateStreamOptions): Promise /** * Immediately close the muxer, abort every open stream and discard any diff --git a/packages/interop/package.json b/packages/interop/package.json index f5ee2139ac..8f116e1acd 100644 --- a/packages/interop/package.json +++ b/packages/interop/package.json @@ -57,10 +57,10 @@ "uint8arrays": "^5.1.0" }, "devDependencies": { - "aegir": "^47.0.21", + "aegir": "^47.0.22", "protons": "^7.7.0" }, "peerDependencies": { - "aegir": "^47.0.21" + "aegir": "^47.0.22" } } diff --git a/packages/kad-dht/package.json b/packages/kad-dht/package.json index 27d26f1c07..ed477b7028 100644 --- a/packages/kad-dht/package.json +++ b/packages/kad-dht/package.json @@ -86,7 +86,7 @@ "@types/lodash.range": "^3.2.9", "@types/sinon": "^17.0.4", "@types/which": "^3.0.4", - "aegir": "^47.0.21", + "aegir": "^47.0.22", "benchmark": "^2.1.4", "datastore-core": "^10.0.4", "delay": "^6.0.0", diff --git a/packages/keychain/package.json b/packages/keychain/package.json index a839b7a8e9..6a4d092cec 100644 --- a/packages/keychain/package.json +++ b/packages/keychain/package.json @@ -64,7 +64,7 @@ }, "devDependencies": { "@libp2p/logger": "^5.2.0", - "aegir": "^47.0.21", + "aegir": "^47.0.22", "datastore-core": "^10.0.4" }, "sideEffects": false diff --git a/packages/libp2p-daemon-client/package.json b/packages/libp2p-daemon-client/package.json index b15663103b..76806812c6 100644 --- a/packages/libp2p-daemon-client/package.json +++ b/packages/libp2p-daemon-client/package.json @@ -55,7 +55,7 @@ "@chainsafe/libp2p-gossipsub": "^14.1.1", "@libp2p/daemon-server": "^8.0.6", "@libp2p/kad-dht": "^15.1.11", - "aegir": "^47.0.21", + "aegir": "^47.0.22", "it-all": "^3.0.9", "p-event": "^6.0.1", "sinon": "^21.0.0", diff --git a/packages/libp2p-daemon-protocol/package.json b/packages/libp2p-daemon-protocol/package.json index 6e5ce44370..de3373b7dc 100644 --- a/packages/libp2p-daemon-protocol/package.json +++ b/packages/libp2p-daemon-protocol/package.json @@ -68,7 +68,7 @@ "uint8arraylist": "^2.4.8" }, "devDependencies": { - "aegir": "^47.0.21", + "aegir": "^47.0.22", "protons": "^7.7.0" } } diff --git a/packages/libp2p-daemon-server/package.json b/packages/libp2p-daemon-server/package.json index 805e006b01..143fb35e6a 100644 --- a/packages/libp2p-daemon-server/package.json +++ b/packages/libp2p-daemon-server/package.json @@ -60,7 +60,7 @@ "uint8arrays": "^5.1.0" }, "devDependencies": { - "aegir": "^47.0.21", + "aegir": "^47.0.22", "sinon-ts": "^2.0.0" } } diff --git a/packages/libp2p-daemon/package.json b/packages/libp2p-daemon/package.json index 78bd3eb0f1..bbc29dfac4 100644 --- a/packages/libp2p-daemon/package.json +++ b/packages/libp2p-daemon/package.json @@ -52,7 +52,7 @@ }, "devDependencies": { "@types/yargs": "^17.0.33", - "aegir": "^47.0.21", + "aegir": "^47.0.22", "sinon": "^21.0.0" } } diff --git a/packages/libp2p/package.json b/packages/libp2p/package.json index d1c6ab88ba..18cd5fd39e 100644 --- a/packages/libp2p/package.json +++ b/packages/libp2p/package.json @@ -107,7 +107,7 @@ "uint8arrays": "^5.1.0" }, "devDependencies": { - "aegir": "^47.0.21", + "aegir": "^47.0.22", "delay": "^6.0.0", "it-all": "^3.0.9", "it-drain": "^3.0.10", diff --git a/packages/libp2p/src/libp2p.ts b/packages/libp2p/src/libp2p.ts index b9dc53833b..b7158e06b9 100644 --- a/packages/libp2p/src/libp2p.ts +++ b/packages/libp2p/src/libp2p.ts @@ -237,7 +237,7 @@ export class Libp2p extends TypedEventEmitter this.status = 'started' this.safeDispatchEvent('start', { detail: this }) - this.log('libp2p has started') + this.log('libp2p has started with peer id %p', this.peerId) } catch (err: any) { this.log.error('An error occurred starting libp2p', err) // set status to 'started' so this.stop() will stop any running components diff --git a/packages/libp2p/test/upgrading/upgrader.spec.ts b/packages/libp2p/test/upgrading/upgrader.spec.ts index bbcfdfed4d..ef39d17927 100644 --- a/packages/libp2p/test/upgrading/upgrader.spec.ts +++ b/packages/libp2p/test/upgrading/upgrader.spec.ts @@ -384,7 +384,7 @@ describe('upgrader', () => { protocol: muxerProtocol, createStreamMuxer: () => stubInterface({ status: 'open', - createStream: () => { + createStream: async () => { return outboundStream } }) @@ -422,7 +422,7 @@ describe('upgrader', () => { protocol: muxerProtocol, createStreamMuxer: () => stubInterface({ status: 'open', - createStream: () => { + createStream: async () => { return outboundStream } }) diff --git a/packages/logger/package.json b/packages/logger/package.json index 33fe11f5c7..82d2eb9fc5 100644 --- a/packages/logger/package.json +++ b/packages/logger/package.json @@ -55,7 +55,7 @@ }, "devDependencies": { "@libp2p/peer-id": "^5.1.9", - "aegir": "^47.0.21", + "aegir": "^47.0.22", "sinon": "^21.0.0", "uint8arrays": "^5.1.0" }, diff --git a/packages/logger/src/index.ts b/packages/logger/src/index.ts index 356b88fca6..ff20b41673 100644 --- a/packages/logger/src/index.ts +++ b/packages/logger/src/index.ts @@ -79,7 +79,33 @@ debug.formatters.a = (v?: Multiaddr): string => { // Add a formatter for stringifying Errors debug.formatters.e = (v?: Error): string => { - return v == null ? 'undefined' : notEmpty(v.stack) ?? notEmpty(v.message) ?? v.toString() + if (v == null) { + return 'undefined' + } + + const message = notEmpty(v.message) + const stack = notEmpty(v.stack) + + // some browser errors (mostly from Firefox) have no message or no stack, + // sometimes both, sometimes neither. Sometimes the message is in the stack, + // sometimes is isn't so try to do *something* useful + if (message != null && stack != null) { + if (stack.includes(message)) { + return stack + } + + return `${message}\n${stack}` + } + + if (stack != null) { + return stack + } + + if (message != null) { + return message + } + + return v.toString() } export type { Logger, ComponentLogger } diff --git a/packages/metrics-opentelemetry/package.json b/packages/metrics-opentelemetry/package.json index 10bb2326de..0358179598 100644 --- a/packages/metrics-opentelemetry/package.json +++ b/packages/metrics-opentelemetry/package.json @@ -46,7 +46,7 @@ }, "devDependencies": { "@libp2p/logger": "^5.2.0", - "aegir": "^47.0.21" + "aegir": "^47.0.22" }, "browser": { "./dist/src/system-metrics.js": "./dist/src/system-metrics.browser.js" diff --git a/packages/metrics-prometheus/package.json b/packages/metrics-prometheus/package.json index ab785d3048..fb4caf0708 100644 --- a/packages/metrics-prometheus/package.json +++ b/packages/metrics-prometheus/package.json @@ -47,7 +47,7 @@ "devDependencies": { "@libp2p/logger": "^5.2.0", "@libp2p/utils": "^6.7.2", - "aegir": "^47.0.21", + "aegir": "^47.0.22", "p-event": "^6.0.1" }, "sideEffects": false diff --git a/packages/metrics-simple/package.json b/packages/metrics-simple/package.json index 4b924817c2..12248baad9 100644 --- a/packages/metrics-simple/package.json +++ b/packages/metrics-simple/package.json @@ -48,7 +48,7 @@ }, "devDependencies": { "@types/tdigest": "^0.1.5", - "aegir": "^47.0.21", + "aegir": "^47.0.22", "p-defer": "^4.0.1" }, "sideEffects": false diff --git a/packages/multistream-select/package.json b/packages/multistream-select/package.json index 5b9ffb230b..46eae4f9cf 100644 --- a/packages/multistream-select/package.json +++ b/packages/multistream-select/package.json @@ -58,7 +58,7 @@ "uint8arrays": "^5.1.0" }, "devDependencies": { - "aegir": "^47.0.21", + "aegir": "^47.0.22", "iso-random-stream": "^2.0.2", "it-all": "^3.0.9", "it-drain": "^3.0.10", diff --git a/packages/multistream-select/src/handle.ts b/packages/multistream-select/src/handle.ts index a08c576d58..9c8559de01 100644 --- a/packages/multistream-select/src/handle.ts +++ b/packages/multistream-select/src/handle.ts @@ -61,8 +61,7 @@ export async function handle (stream: Stream, pro const log = stream.log.newScope('mss:select') const lp = lpStream(stream, { ...options, - maxDataLength: MAX_PROTOCOL_LENGTH, - stopPropagation: true + maxDataLength: MAX_PROTOCOL_LENGTH }) for (let i = 0; i < protocols.length; i++) { diff --git a/packages/peer-collections/package.json b/packages/peer-collections/package.json index 0961c4a058..fdbcef9b71 100644 --- a/packages/peer-collections/package.json +++ b/packages/peer-collections/package.json @@ -55,7 +55,7 @@ "devDependencies": { "@libp2p/crypto": "^5.1.8", "@types/sinon": "^17.0.4", - "aegir": "^47.0.21", + "aegir": "^47.0.22", "sinon": "^21.0.0", "sinon-ts": "^2.0.0" }, diff --git a/packages/peer-discovery-bootstrap/package.json b/packages/peer-discovery-bootstrap/package.json index 235f4c8aa7..ca68692c51 100644 --- a/packages/peer-discovery-bootstrap/package.json +++ b/packages/peer-discovery-bootstrap/package.json @@ -57,7 +57,7 @@ "devDependencies": { "@libp2p/interface-compliance-tests": "^6.5.0", "@libp2p/logger": "^5.2.0", - "aegir": "^47.0.21", + "aegir": "^47.0.22", "sinon-ts": "^2.0.0" }, "sideEffects": false diff --git a/packages/peer-discovery-mdns/package.json b/packages/peer-discovery-mdns/package.json index d4dd0a31fc..362c43c096 100644 --- a/packages/peer-discovery-mdns/package.json +++ b/packages/peer-discovery-mdns/package.json @@ -57,7 +57,7 @@ "@libp2p/crypto": "^5.1.8", "@libp2p/interface-compliance-tests": "^6.5.0", "@libp2p/logger": "^5.2.0", - "aegir": "^47.0.21", + "aegir": "^47.0.22", "p-event": "^6.0.1", "p-wait-for": "^5.0.2", "sinon-ts": "^2.0.0" diff --git a/packages/peer-discovery-mdns/test/multicast-dns.spec.ts b/packages/peer-discovery-mdns/test/multicast-dns.spec.ts index dc89ea767c..6a70c64c28 100644 --- a/packages/peer-discovery-mdns/test/multicast-dns.spec.ts +++ b/packages/peer-discovery-mdns/test/multicast-dns.spec.ts @@ -37,11 +37,9 @@ describe('MulticastDNS', () => { before(async function () { this.timeout(80 * 1000) - ;[pA, pB, pD] = await Promise.all([ - peerIdFromPrivateKey(await generateKeyPair('Ed25519')), - peerIdFromPrivateKey(await generateKeyPair('Ed25519')), - peerIdFromPrivateKey(await generateKeyPair('Ed25519')) - ]) + pA = peerIdFromPrivateKey(await generateKeyPair('Ed25519')) + pB = peerIdFromPrivateKey(await generateKeyPair('Ed25519')) + pD = peerIdFromPrivateKey(await generateKeyPair('Ed25519')) aMultiaddrs = [ multiaddr('/ip4/192.168.1.142/tcp/20001'), diff --git a/packages/peer-id/package.json b/packages/peer-id/package.json index dc2c458f79..daa2ae1baf 100644 --- a/packages/peer-id/package.json +++ b/packages/peer-id/package.json @@ -53,7 +53,7 @@ "uint8arrays": "^5.1.0" }, "devDependencies": { - "aegir": "^47.0.21", + "aegir": "^47.0.22", "sinon": "^21.0.0" }, "sideEffects": false diff --git a/packages/peer-record/package.json b/packages/peer-record/package.json index da1402c91a..30295e3adf 100644 --- a/packages/peer-record/package.json +++ b/packages/peer-record/package.json @@ -59,7 +59,7 @@ "uint8arrays": "^5.1.0" }, "devDependencies": { - "aegir": "^47.0.21", + "aegir": "^47.0.22", "protons": "^7.7.0" }, "sideEffects": false diff --git a/packages/peer-store/package.json b/packages/peer-store/package.json index 9434d21e8a..380a2b085b 100644 --- a/packages/peer-store/package.json +++ b/packages/peer-store/package.json @@ -66,7 +66,7 @@ "devDependencies": { "@libp2p/logger": "^5.2.0", "@types/sinon": "^17.0.4", - "aegir": "^47.0.21", + "aegir": "^47.0.22", "benchmark": "^2.1.4", "datastore-core": "^10.0.4", "delay": "^6.0.0", diff --git a/packages/pnet/package.json b/packages/pnet/package.json index a224f9e055..3dd8e424e9 100644 --- a/packages/pnet/package.json +++ b/packages/pnet/package.json @@ -52,7 +52,7 @@ }, "devDependencies": { "@types/xsalsa20": "^1.1.3", - "aegir": "^47.0.21", + "aegir": "^47.0.22", "p-event": "^6.0.1" }, "sideEffects": false diff --git a/packages/protocol-autonat-v2/package.json b/packages/protocol-autonat-v2/package.json index d04cb0db88..ef5247fc5f 100644 --- a/packages/protocol-autonat-v2/package.json +++ b/packages/protocol-autonat-v2/package.json @@ -60,7 +60,7 @@ "@libp2p/crypto": "^5.1.8", "@libp2p/logger": "^5.2.0", "@libp2p/peer-id": "^5.1.9", - "aegir": "^47.0.21", + "aegir": "^47.0.22", "delay": "^6.0.0", "it-all": "^3.0.9", "it-length-prefixed": "^10.0.1", diff --git a/packages/protocol-autonat/package.json b/packages/protocol-autonat/package.json index fb5a819eb7..be7226a9f9 100644 --- a/packages/protocol-autonat/package.json +++ b/packages/protocol-autonat/package.json @@ -60,7 +60,7 @@ "devDependencies": { "@libp2p/crypto": "^5.1.8", "@libp2p/logger": "^5.2.0", - "aegir": "^47.0.21", + "aegir": "^47.0.22", "delay": "^6.0.0", "it-all": "^3.0.9", "it-length-prefixed": "^10.0.1", diff --git a/packages/protocol-dcutr/package.json b/packages/protocol-dcutr/package.json index 39e9ebab18..0b1b2f43fe 100644 --- a/packages/protocol-dcutr/package.json +++ b/packages/protocol-dcutr/package.json @@ -55,7 +55,7 @@ "uint8arraylist": "^2.4.8" }, "devDependencies": { - "aegir": "^47.0.21", + "aegir": "^47.0.22", "protons": "^7.7.0", "sinon": "^21.0.0", "sinon-ts": "^2.0.0" diff --git a/packages/protocol-echo/package.json b/packages/protocol-echo/package.json index f1092d6850..0282d9cd7e 100644 --- a/packages/protocol-echo/package.json +++ b/packages/protocol-echo/package.json @@ -52,7 +52,7 @@ "uint8arraylist": "^2.4.8" }, "devDependencies": { - "aegir": "^47.0.21", + "aegir": "^47.0.22", "it-all": "^3.0.9", "sinon": "^21.0.0", "sinon-ts": "^2.0.0" diff --git a/packages/protocol-fetch/package.json b/packages/protocol-fetch/package.json index 8f664159e2..8f6d160175 100644 --- a/packages/protocol-fetch/package.json +++ b/packages/protocol-fetch/package.json @@ -56,7 +56,7 @@ "devDependencies": { "@libp2p/crypto": "^5.1.8", "@libp2p/peer-id": "^5.1.9", - "aegir": "^47.0.21", + "aegir": "^47.0.22", "protons": "^7.7.0", "sinon": "^21.0.0", "sinon-ts": "^2.0.0" diff --git a/packages/protocol-identify/package.json b/packages/protocol-identify/package.json index 391a9ddaa9..07ce2ab73d 100644 --- a/packages/protocol-identify/package.json +++ b/packages/protocol-identify/package.json @@ -62,7 +62,7 @@ }, "devDependencies": { "@libp2p/logger": "^5.2.0", - "aegir": "^47.0.21", + "aegir": "^47.0.22", "delay": "^6.0.0", "it-length-prefixed": "^10.0.1", "protons": "^7.7.0", diff --git a/packages/protocol-perf/package.json b/packages/protocol-perf/package.json index 625b9f06c5..adae72951e 100644 --- a/packages/protocol-perf/package.json +++ b/packages/protocol-perf/package.json @@ -55,7 +55,7 @@ "devDependencies": { "@libp2p/logger": "^5.2.0", "@libp2p/utils": "^6.7.2", - "aegir": "^47.0.21", + "aegir": "^47.0.22", "it-last": "^3.0.9", "sinon-ts": "^2.0.0" }, diff --git a/packages/protocol-ping/package.json b/packages/protocol-ping/package.json index 24e67f28e5..495737a050 100644 --- a/packages/protocol-ping/package.json +++ b/packages/protocol-ping/package.json @@ -55,7 +55,7 @@ "devDependencies": { "@libp2p/peer-id": "^5.1.9", "@libp2p/utils": "^6.7.2", - "aegir": "^47.0.21", + "aegir": "^47.0.22", "delay": "^6.0.0", "sinon": "^21.0.0", "sinon-ts": "^2.0.0" diff --git a/packages/protocol-ping/src/ping.ts b/packages/protocol-ping/src/ping.ts index d6bd008950..c7a5e5c90b 100644 --- a/packages/protocol-ping/src/ping.ts +++ b/packages/protocol-ping/src/ping.ts @@ -6,7 +6,7 @@ import { Uint8ArrayList } from 'uint8arraylist' import { equals as uint8ArrayEquals } from 'uint8arrays/equals' import { PROTOCOL_PREFIX, PROTOCOL_NAME, PING_LENGTH, PROTOCOL_VERSION, TIMEOUT, MAX_INBOUND_STREAMS, MAX_OUTBOUND_STREAMS } from './constants.js' import type { PingComponents, PingInit, Ping as PingInterface } from './index.js' -import type { AbortOptions, Stream, PeerId, Startable, Connection } from '@libp2p/interface' +import type { AbortOptions, Stream, PeerId, Startable, Connection, StreamMessageEvent } from '@libp2p/interface' import type { Multiaddr } from '@multiformats/multiaddr' export class Ping implements Startable, PingInterface { @@ -108,20 +108,32 @@ export class Ping implements Startable, PingInterface { const finished = Promise.withResolvers() const received = new Uint8ArrayList() - stream.addEventListener('message', (evt) => { + const onPong = (evt: StreamMessageEvent): void => { received.append(evt.data) if (received.byteLength === PING_LENGTH) { + stream.removeEventListener('message', onPong) + const rtt = Date.now() - start - if (!uint8ArrayEquals(data, received.subarray())) { - finished.reject(new ProtocolError(`Received wrong ping ack after ${rtt}ms`)) - } else { - finished.resolve(rtt) - } + Promise.all([ + stream.closeRead(options) + ]) + .then(() => { + if (!uint8ArrayEquals(data, received.subarray())) { + throw new ProtocolError(`Received wrong ping ack after ${rtt}ms`) + } else { + finished.resolve(rtt) + } + }) + .catch(err => { + stream.abort(err) + finished.reject(err) + }) } - }) + } + stream.addEventListener('message', onPong) stream.send(data) await stream.close(options) diff --git a/packages/pubsub-floodsub/package.json b/packages/pubsub-floodsub/package.json index ab353fad49..12851edaab 100644 --- a/packages/pubsub-floodsub/package.json +++ b/packages/pubsub-floodsub/package.json @@ -67,7 +67,7 @@ "@libp2p/peer-id": "^5.1.9", "@multiformats/multiaddr": "^13.0.1", "@types/sinon": "^17.0.4", - "aegir": "^47.0.21", + "aegir": "^47.0.22", "multiformats": "^13.4.0", "p-wait-for": "^5.0.2", "protons": "^7.7.0", diff --git a/packages/pubsub/package.json b/packages/pubsub/package.json index c72c7a366b..c83da020aa 100644 --- a/packages/pubsub/package.json +++ b/packages/pubsub/package.json @@ -92,7 +92,7 @@ "devDependencies": { "@libp2p/logger": "^5.2.0", "@types/sinon": "^17.0.4", - "aegir": "^47.0.21", + "aegir": "^47.0.22", "delay": "^6.0.0", "it-all": "^3.0.9", "p-defer": "^4.0.1", diff --git a/packages/record/package.json b/packages/record/package.json index e64e8a94fb..47efd121d4 100644 --- a/packages/record/package.json +++ b/packages/record/package.json @@ -55,7 +55,7 @@ "@types/lodash.random": "^3.2.9", "@types/lodash.range": "^3.2.9", "@types/which": "^3.0.4", - "aegir": "^47.0.21", + "aegir": "^47.0.22", "multiformats": "^13.4.0", "protons": "^7.7.0" }, diff --git a/packages/stream-multiplexer-mplex/package.json b/packages/stream-multiplexer-mplex/package.json index 7277f24a14..d7d5c6eef5 100644 --- a/packages/stream-multiplexer-mplex/package.json +++ b/packages/stream-multiplexer-mplex/package.json @@ -65,7 +65,7 @@ "devDependencies": { "@libp2p/interface-compliance-tests": "^6.5.0", "@libp2p/logger": "^5.2.0", - "aegir": "^47.0.21", + "aegir": "^47.0.22", "benchmark": "^2.1.4", "iso-random-stream": "^2.0.2", "it-all": "^3.0.9", diff --git a/packages/stream-multiplexer-mplex/test/stream.spec.ts b/packages/stream-multiplexer-mplex/test/stream.spec.ts index c39c298f8e..f5f9243439 100644 --- a/packages/stream-multiplexer-mplex/test/stream.spec.ts +++ b/packages/stream-multiplexer-mplex/test/stream.spec.ts @@ -106,6 +106,7 @@ describe('stream', () => { const [evt] = await Promise.all([ pEvent<'close', StreamCloseEvent>(pair.initiatorStream, 'close'), + // eslint-disable-next-line @typescript-eslint/await-thenable pair.initiatorStream.abort(error) ]) @@ -270,6 +271,7 @@ describe('stream', () => { await Promise.all([ pEvent(pair.receiverStream, 'close'), + // eslint-disable-next-line @typescript-eslint/await-thenable pair.initiatorStream.abort(error) ]) @@ -301,6 +303,7 @@ describe('stream', () => { await Promise.all([ pEvent(pair.initiatorStream, 'close'), + // eslint-disable-next-line @typescript-eslint/await-thenable pair.receiverStream.abort(error) ]) @@ -442,6 +445,7 @@ describe('stream', () => { await Promise.all([ pEvent(pair.receiverStream, 'close'), + // eslint-disable-next-line @typescript-eslint/await-thenable pair.initiatorStream.abort(new Error('wat')) ]) @@ -493,6 +497,7 @@ describe('stream', () => { await Promise.all([ pEvent(pair.initiatorStream, 'close'), + // eslint-disable-next-line @typescript-eslint/await-thenable pair.receiverStream.abort(new Error('wat')) ]) diff --git a/packages/stream-multiplexer-yamux/package.json b/packages/stream-multiplexer-yamux/package.json index cb66f5d51a..3b15b8f70e 100644 --- a/packages/stream-multiplexer-yamux/package.json +++ b/packages/stream-multiplexer-yamux/package.json @@ -86,7 +86,7 @@ "@dapplion/benchmark": "^1.0.0", "@libp2p/interface-compliance-tests": "^6.5.0", "@libp2p/mplex": "^11.0.47", - "aegir": "^47.0.21", + "aegir": "^47.0.22", "it-all": "^3.0.9", "it-drain": "^3.0.10", "it-pushable": "^3.2.3", diff --git a/packages/transport-circuit-relay-v2/package.json b/packages/transport-circuit-relay-v2/package.json index d1e81ca539..a0f5fdc2ea 100644 --- a/packages/transport-circuit-relay-v2/package.json +++ b/packages/transport-circuit-relay-v2/package.json @@ -66,7 +66,7 @@ }, "devDependencies": { "@libp2p/logger": "^5.2.0", - "aegir": "^47.0.21", + "aegir": "^47.0.22", "delay": "^6.0.0", "it-all": "^3.0.9", "it-protobuf-stream": "^2.0.3", diff --git a/packages/transport-circuit-relay-v2/src/constants.ts b/packages/transport-circuit-relay-v2/src/constants.ts index cbd7e7faea..98873df550 100644 --- a/packages/transport-circuit-relay-v2/src/constants.ts +++ b/packages/transport-circuit-relay-v2/src/constants.ts @@ -21,7 +21,7 @@ export const DEFAULT_RESERVATION_CONCURRENCY = 1 /** * How long to wait for a reservation attempt to finish */ -export const DEFAULT_RESERVATION_COMPLETION_TIMEOUT = 2000 +export const DEFAULT_RESERVATION_COMPLETION_TIMEOUT = 5_000 /** * How long to let the reservation attempt queue to grow @@ -31,7 +31,6 @@ export const DEFAULT_MAX_RESERVATION_QUEUE_LENGTH = 100 export const RELAY_SOURCE_TAG = 'circuit-relay-source' export const KEEP_ALIVE_TAG = `${KEEP_ALIVE}-circuit-relay` -export const KEEP_ALIVE_SOURCE_TAG = `${KEEP_ALIVE}-circuit-relay-source` // circuit v2 connection limits // https://github.com/libp2p/go-libp2p/blob/master/p2p/protocol/circuitv2/relay/resources.go#L61-L66 diff --git a/packages/transport-circuit-relay-v2/src/server/index.ts b/packages/transport-circuit-relay-v2/src/server/index.ts index 5052f5b86b..313fc3f338 100644 --- a/packages/transport-circuit-relay-v2/src/server/index.ts +++ b/packages/transport-circuit-relay-v2/src/server/index.ts @@ -8,7 +8,6 @@ import { TypedEventEmitter, setMaxListeners } from 'main-event' import * as Digest from 'multiformats/hashes/digest' import { DEFAULT_HOP_TIMEOUT, - KEEP_ALIVE_SOURCE_TAG, MAX_CONNECTIONS, RELAY_SOURCE_TAG, RELAY_V2_HOP_CODEC, @@ -188,8 +187,7 @@ export class CircuitRelayServer extends TypedEventEmitter imp const ttl = (result.expire * 1000) - Date.now() await this.components.peerStore.merge(connection.remotePeer, { tags: { - [RELAY_SOURCE_TAG]: { value: 1, ttl }, - [KEEP_ALIVE_SOURCE_TAG]: { value: 1, ttl } + [RELAY_SOURCE_TAG]: { value: 1, ttl } } }, options) } @@ -211,8 +209,7 @@ export class CircuitRelayServer extends TypedEventEmitter imp try { await this.components.peerStore.merge(connection.remotePeer, { tags: { - [RELAY_SOURCE_TAG]: undefined, - [KEEP_ALIVE_SOURCE_TAG]: undefined + [RELAY_SOURCE_TAG]: undefined } }, options) } catch (err) { diff --git a/packages/transport-circuit-relay-v2/test/hop.spec.ts b/packages/transport-circuit-relay-v2/test/hop.spec.ts index 1fbdeb77a2..416dd7a4a1 100644 --- a/packages/transport-circuit-relay-v2/test/hop.spec.ts +++ b/packages/transport-circuit-relay-v2/test/hop.spec.ts @@ -8,7 +8,7 @@ import { expect } from 'aegir/chai' import { TypedEventEmitter } from 'main-event' import Sinon from 'sinon' import { stubInterface } from 'sinon-ts' -import { DEFAULT_MAX_RESERVATION_STORE_SIZE, KEEP_ALIVE_SOURCE_TAG, RELAY_SOURCE_TAG, RELAY_V2_HOP_CODEC, RELAY_V2_STOP_CODEC } from '../src/constants.js' +import { DEFAULT_MAX_RESERVATION_STORE_SIZE, RELAY_SOURCE_TAG, RELAY_V2_HOP_CODEC, RELAY_V2_STOP_CODEC } from '../src/constants.js' import { HopMessage, Status } from '../src/pb/index.js' import { CircuitRelayServer } from '../src/server/index.js' import { CircuitRelayTransport } from '../src/transport/index.ts' @@ -283,10 +283,6 @@ describe('circuit-relay hop protocol', function () { [RELAY_SOURCE_TAG]: { value: 1, ttl: Sinon.match.number as unknown as number - }, - [KEEP_ALIVE_SOURCE_TAG]: { - value: 1, - ttl: Sinon.match.number as unknown as number } } })).to.be.true() diff --git a/packages/transport-memory/package.json b/packages/transport-memory/package.json index d69429c366..1c99297497 100644 --- a/packages/transport-memory/package.json +++ b/packages/transport-memory/package.json @@ -60,7 +60,7 @@ "devDependencies": { "@libp2p/logger": "^5.2.0", "@libp2p/peer-id": "^5.1.9", - "aegir": "^47.0.21", + "aegir": "^47.0.22", "sinon": "^21.0.0", "sinon-ts": "^2.0.0" }, diff --git a/packages/transport-tcp/package.json b/packages/transport-tcp/package.json index 71d8e13a52..a12926c586 100644 --- a/packages/transport-tcp/package.json +++ b/packages/transport-tcp/package.json @@ -65,7 +65,7 @@ }, "devDependencies": { "@libp2p/logger": "^5.2.0", - "aegir": "^47.0.21", + "aegir": "^47.0.22", "delay": "^6.0.0", "p-defer": "^4.0.1", "p-wait-for": "^5.0.2", diff --git a/packages/transport-tcp/test/connection.spec.ts b/packages/transport-tcp/test/connection.spec.ts index a2ab9f2d64..956755bd9c 100644 --- a/packages/transport-tcp/test/connection.spec.ts +++ b/packages/transport-tcp/test/connection.spec.ts @@ -81,7 +81,7 @@ describe('valid localAddr and remoteAddr', () => { // Close the dialer with two simultaneous calls to `close` await Promise.race([ new Promise((resolve, reject) => setTimeout(() => { reject(new Error('Timed out waiting for connection close')) }, 500)), - await Promise.all([ + Promise.all([ dialerConn.close(), dialerConn.close() ]) diff --git a/packages/transport-webrtc/package.json b/packages/transport-webrtc/package.json index 6d4349a07a..8b9d85234a 100644 --- a/packages/transport-webrtc/package.json +++ b/packages/transport-webrtc/package.json @@ -81,7 +81,7 @@ "devDependencies": { "@libp2p/logger": "^5.2.0", "@types/sinon": "^17.0.4", - "aegir": "^47.0.21", + "aegir": "^47.0.22", "any-signal": "^4.1.1", "datastore-core": "^10.0.4", "delay": "^6.0.0", diff --git a/packages/transport-webrtc/src/constants.ts b/packages/transport-webrtc/src/constants.ts index b940795e9b..28c9c58b3e 100644 --- a/packages/transport-webrtc/src/constants.ts +++ b/packages/transport-webrtc/src/constants.ts @@ -73,6 +73,11 @@ export const PROTOBUF_OVERHEAD = calculateProtobufOverhead() */ export const DATA_CHANNEL_DRAIN_TIMEOUT = 30_000 +/** + * Wait for the remote to acknowledge our FIN for this long + */ +export const DEFAULT_FIN_ACK_TIMEOUT = 10_000 + /** * Set as the 'negotiated' muxer protocol name */ diff --git a/packages/transport-webrtc/src/index.ts b/packages/transport-webrtc/src/index.ts index 956f8a8371..ffdbd6faf4 100644 --- a/packages/transport-webrtc/src/index.ts +++ b/packages/transport-webrtc/src/index.ts @@ -308,6 +308,15 @@ export interface DataChannelOptions { * @default 5_000 */ openTimeout?: number + + /** + * Due to bugs in WebRTC implementations it's necessary for the remote end of + * the connection to acknowledge the FIN message we send during stream + * closing. A stream will wait for this many ms. + * + * @default 10_000 + */ + finAckTimeout?: number } /** diff --git a/packages/transport-webrtc/src/muxer.ts b/packages/transport-webrtc/src/muxer.ts index a775bd1e43..1f61101d8d 100644 --- a/packages/transport-webrtc/src/muxer.ts +++ b/packages/transport-webrtc/src/muxer.ts @@ -1,5 +1,4 @@ import { AbstractStreamMuxer } from '@libp2p/utils' -import { pEvent } from 'p-event' import { MUXER_PROTOCOL } from './constants.js' import { createStream, WebRTCStream } from './stream.js' import type { DataChannelOptions } from './index.js' @@ -90,11 +89,12 @@ export class DataChannelMuxer extends AbstractStreamMuxer implemen * {@link https://developer.mozilla.org/en-US/docs/Web/API/RTCPeerConnection/datachannel_event} */ this.peerConnection.ondatachannel = ({ channel }) => { - this.log.trace('incoming %s datachannel with channel id %d and status', channel.protocol, channel.id, channel.readyState) + this.log.trace('incoming %s datachannel with channel id %d, protocol %s and status %s', channel.protocol, channel.id, channel.protocol, channel.readyState) - // 'init' channel is only used during connection establishment + // 'init' channel is only used during connection establishment, it is + // closed by the initiator if (channel.label === 'init') { - this.log.trace('closing init channel') + this.log.trace('closing init channel %d', channel.id) channel.close() return @@ -114,20 +114,13 @@ export class DataChannelMuxer extends AbstractStreamMuxer implemen async onCreateStream (options?: CreateStreamOptions): Promise { // The spec says the label MUST be an empty string: https://github.com/libp2p/specs/blob/master/webrtc/README.md#rtcdatachannel-label - const channel = this.peerConnection.createDataChannel('wtf', { + const channel = this.peerConnection.createDataChannel('', { // TODO: pre-negotiate stream protocol - protocol: options?.protocol + // protocol: options?.protocol }) this.log('open channel %d for protocol %s', channel.id, options?.protocol) - if (channel.readyState !== 'open') { - this.log('channel %d state is "%s" and not "open", waiting for "open" event before sending data', channel.id, channel.readyState) - await pEvent(channel, 'open', options) - - this.log('channel %d state is now "%s", sending data', channel.id, channel.readyState) - } - const stream = createStream({ ...options, ...this.dataChannelOptions, diff --git a/packages/transport-webrtc/src/private-to-private/initiate-connection.ts b/packages/transport-webrtc/src/private-to-private/initiate-connection.ts index 475a73e0d4..bcb3ca2f70 100644 --- a/packages/transport-webrtc/src/private-to-private/initiate-connection.ts +++ b/packages/transport-webrtc/src/private-to-private/initiate-connection.ts @@ -94,10 +94,20 @@ export async function initiateConnection ({ rtcConfiguration, dataChannel, signa // setup callback to write ICE candidates to the remote peer peerConnection.onicecandidate = ({ candidate }) => { + if (peerConnection.connectionState === 'connected') { + log.trace('ignore new ice candidate as peer connection is already connected') + return + } + // a null candidate means end-of-candidates, an empty string candidate // means end-of-candidates for this generation, otherwise this should // be a valid candidate object // see - https://www.w3.org/TR/webrtc/#rtcpeerconnectioniceevent + if (candidate == null || candidate?.candidate === '') { + log.trace('initiator detected end of ICE candidates') + return + } + const data = JSON.stringify(candidate?.toJSON() ?? null) log.trace('initiator sending ICE candidate %o', candidate) @@ -178,10 +188,16 @@ export async function initiateConnection ({ rtcConfiguration, dataChannel, signa }) } - log.trace('closing init channel, starting status') - + log.trace('closing init channel') channel.close() + // wait for init channel to close before proceeding, otherwise the channel + // id can be reused before both sides have seen the channel close + log.trace('waiting for init channel to close') + await pEvent(channel, 'close', { + signal + }) + onProgress?.(new CustomProgressEvent('webrtc:close-signaling-stream')) log.trace('closing signaling channel') diff --git a/packages/transport-webrtc/src/private-to-private/signaling-stream-handler.ts b/packages/transport-webrtc/src/private-to-private/signaling-stream-handler.ts index a19e64853d..d844496516 100644 --- a/packages/transport-webrtc/src/private-to-private/signaling-stream-handler.ts +++ b/packages/transport-webrtc/src/private-to-private/signaling-stream-handler.ts @@ -3,7 +3,7 @@ import { multiaddr } from '@multiformats/multiaddr' import { SDPHandshakeFailedError } from '../error.js' import { RTCSessionDescription } from '../webrtc/index.js' import { Message } from './pb/message.js' -import { getConnectionState, getRemotePeer, readCandidatesUntilConnected } from './util.js' +import { getRemotePeer, readCandidatesUntilConnected } from './util.js' import type { RTCPeerConnection } from '../webrtc/index.js' import type { AbortOptions, Connection, Logger, PeerId, Stream } from '@libp2p/interface' import type { Multiaddr } from '@multiformats/multiaddr' @@ -21,10 +21,20 @@ export async function handleIncomingStream (stream: Stream, connection: Connecti try { // candidate callbacks peerConnection.onicecandidate = ({ candidate }) => { + if (peerConnection.connectionState === 'connected') { + log.trace('ignore new ice candidate as peer connection is already connected') + return + } + // a null candidate means end-of-candidates, an empty string candidate // means end-of-candidates for this generation, otherwise this should // be a valid candidate object // see - https://www.w3.org/TR/webrtc/#rtcpeerconnectioniceevent + if (candidate == null || candidate?.candidate === '') { + log.trace('recipient detected end of ICE candidates') + return + } + const data = JSON.stringify(candidate?.toJSON() ?? null) log.trace('recipient sending ICE candidate %s', data) @@ -90,7 +100,7 @@ export async function handleIncomingStream (stream: Stream, connection: Connecti log }) } catch (err: any) { - if (getConnectionState(peerConnection) !== 'connected') { + if (peerConnection.connectionState !== 'connected') { log.error('error while handling signaling stream from peer %a', connection.remoteAddr, err) peerConnection.close() diff --git a/packages/transport-webrtc/src/private-to-private/util.ts b/packages/transport-webrtc/src/private-to-private/util.ts index c432865f0f..5049ceb6fc 100644 --- a/packages/transport-webrtc/src/private-to-private/util.ts +++ b/packages/transport-webrtc/src/private-to-private/util.ts @@ -1,7 +1,6 @@ import { ConnectionFailedError, InvalidMessageError, InvalidMultiaddrError } from '@libp2p/interface' import { peerIdFromString } from '@libp2p/peer-id' import { CustomProgressEvent } from 'progress-events' -import { isFirefox } from '../util.js' import { RTCIceCandidate } from '../webrtc/index.js' import { Message } from './pb/message.js' import type { WebRTCDialEvents } from './transport.js' @@ -28,7 +27,7 @@ export const readCandidatesUntilConnected = async (pc: RTCPeerConnection, stream connectedPromise.promise, stream.read({ signal: options.signal - }).catch(() => {}) + }) ]) // stream ended or we became connected @@ -63,32 +62,33 @@ export const readCandidatesUntilConnected = async (pc: RTCPeerConnection, stream options.onProgress?.(new CustomProgressEvent('webrtc:add-ice-candidate', candidate.candidate)) await pc.addIceCandidate(candidate) } catch (err) { - options.log.error('%s bad candidate received', options.direction, candidateInit, err) + options.log.error('%s bad candidate received %o - %e', options.direction, candidateInit, err) } } } catch (err) { - options.log.error('%s error parsing ICE candidate', options.direction, err) + options.log.error('%s error parsing ICE candidate - %e', options.direction, err) - if (options.signal?.aborted === true && getConnectionState(pc) !== 'connected') { + if (options.signal?.aborted === true && pc.connectionState !== 'connected') { throw err } } } -export function getConnectionState (pc: RTCPeerConnection): string { - return isFirefox ? pc.iceConnectionState : pc.connectionState -} - function resolveOnConnected (pc: RTCPeerConnection, promise: DeferredPromise): void { - pc[isFirefox ? 'oniceconnectionstatechange' : 'onconnectionstatechange'] = (_) => { - switch (getConnectionState(pc)) { + if (pc.connectionState === 'connected') { + promise.resolve() + return + } + + pc.onconnectionstatechange = (_) => { + switch (pc.connectionState) { case 'connected': promise.resolve() break case 'failed': case 'disconnected': case 'closed': - promise.reject(new ConnectionFailedError('RTCPeerConnection was closed')) + promise.reject(new ConnectionFailedError(`RTCPeerConnection connection state became "${pc.connectionState}"`)) break default: break diff --git a/packages/transport-webrtc/src/rtcpeerconnection-to-conn.ts b/packages/transport-webrtc/src/rtcpeerconnection-to-conn.ts index 02bd2f9b78..43170ec1f6 100644 --- a/packages/transport-webrtc/src/rtcpeerconnection-to-conn.ts +++ b/packages/transport-webrtc/src/rtcpeerconnection-to-conn.ts @@ -24,6 +24,10 @@ class RTCPeerConnectionMultiaddrConnection extends AbstractMultiaddrConnection { if (this.peerConnection.connectionState === 'disconnected' || this.peerConnection.connectionState === 'failed' || this.peerConnection.connectionState === 'closed') { // nothing else to do but close the connection this.onTransportClosed() + + // only necessary with node-datachannel + // https://github.com/murat-dogan/node-datachannel/issues/366#issuecomment-3228453155 + this.peerConnection.close() } } } diff --git a/packages/transport-webrtc/src/stream.ts b/packages/transport-webrtc/src/stream.ts index 83a63714c7..f55881a6eb 100644 --- a/packages/transport-webrtc/src/stream.ts +++ b/packages/transport-webrtc/src/stream.ts @@ -1,11 +1,13 @@ -import { StreamStateError } from '@libp2p/interface' +import { StreamResetError, StreamStateError } from '@libp2p/interface' import { AbstractStream } from '@libp2p/utils' import * as lengthPrefixed from 'it-length-prefixed' import { pushable } from 'it-pushable' +import { pEvent } from 'p-event' import { raceSignal } from 'race-signal' import { Uint8ArrayList } from 'uint8arraylist' -import { MAX_BUFFERED_AMOUNT, MAX_MESSAGE_SIZE, PROTOBUF_OVERHEAD } from './constants.js' +import { DEFAULT_FIN_ACK_TIMEOUT, MAX_BUFFERED_AMOUNT, MAX_MESSAGE_SIZE, PROTOBUF_OVERHEAD } from './constants.js' import { Message } from './private-to-public/pb/message.js' +import { isFirefox } from './util.js' import type { DataChannelOptions } from './index.js' import type { AbortOptions, MessageStreamDirection, Logger } from '@libp2p/interface' import type { AbstractStreamInit, SendResult } from '@libp2p/utils' @@ -35,7 +37,8 @@ export class WebRTCStream extends AbstractStream { */ private readonly incomingData: Pushable private readonly maxBufferedAmount: number - private readonly receivedFinAck: PromiseWithResolvers + private receivedFinAck?: PromiseWithResolvers + private finAckTimeout: number constructor (init: WebRTCStreamInit) { super({ @@ -47,44 +50,26 @@ export class WebRTCStream extends AbstractStream { this.channel.binaryType = 'arraybuffer' this.incomingData = pushable() this.maxBufferedAmount = init.maxBufferedAmount ?? MAX_BUFFERED_AMOUNT - this.receivedFinAck = Promise.withResolvers() - - // set up initial state - switch (this.channel.readyState) { - case 'open': - break - - case 'closed': - case 'closing': - if (this.timeline.close === undefined || this.timeline.close === 0) { - this.timeline.close = Date.now() - } - break - case 'connecting': - // noop - break - - default: - this.log.error('unknown datachannel state %s', this.channel.readyState) - throw new StreamStateError('Unknown datachannel state') - } + this.finAckTimeout = init.finAckTimeout ?? DEFAULT_FIN_ACK_TIMEOUT // handle RTCDataChannel events - this.channel.onclose = (_evt) => { - this.log.trace('received onclose event') + this.channel.onclose = () => { + this.log.trace('received datachannel close event') this.onRemoteCloseWrite() this.onTransportClosed() } this.channel.onerror = (evt) => { - this.log.trace('received onerror event') - const err = (evt as RTCErrorEvent).error + + this.log.trace('received datachannel error event - %e', err) + this.abort(err) } this.channel.onmessage = async (event: MessageEvent) => { + this.log('incoming message %d bytes', event.data.byteLength) const { data } = event if (data === null || data.byteLength === 0) { @@ -96,32 +81,49 @@ export class WebRTCStream extends AbstractStream { // dispatch drain event when the buffered amount drops to zero this.channel.bufferedAmountLowThreshold = 0 + this.channel.onbufferedamountlow = () => { - this.safeDispatchEvent('drain') + if (this.writableNeedsDrain) { + this.safeDispatchEvent('drain') + } } - const self = this + if (this.channel.readyState !== 'open') { + this.log('channel ready state is "%s" and not "open", waiting for "open" event before sending data', this.channel.readyState) + pEvent(this.channel, 'open', { + rejectionEvents: [ + 'close', + 'error' + ] + }) + .then(() => { + this.log('channel ready state is now "%s", dispatching drain', this.channel.readyState) + this.safeDispatchEvent('drain') + }) + .catch(err => { + this.abort(err.error ?? err) + }) + } // pipe framed protobuf messages through a length prefixed decoder, and // surface data from the `Message.message` field through a source. Promise.resolve().then(async () => { for await (const buf of lengthPrefixed.decode(this.incomingData)) { - const message = self.processIncomingProtobuf(buf) - - if (message != null) { - self.onData(new Uint8ArrayList(message)) - } + this.processIncomingProtobuf(buf) } }) .catch(err => { this.log.error('error processing incoming data channel messages', err) }) - // clean up the datachannel when both ends have sent a FIN - const webRTCStreamOnClose = (): void => { - this.channel.close() + // close when both writable ends are closed or an error occurs + const cleanUpDatachannelOnClose = (): void => { + if (this.channel.readyState === 'open') { + this.log.trace('stream closed, closing underlying datachannel') + this.channel.close() + } } - this.addEventListener('close', webRTCStreamOnClose) + this.addEventListener('close', cleanUpDatachannelOnClose) } sendNewStream (): void { @@ -129,27 +131,57 @@ export class WebRTCStream extends AbstractStream { } _sendMessage (data: Uint8ArrayList): void { - if (this.channel.readyState === 'closed' || this.channel.readyState === 'closing') { + if (this.channel.readyState !== 'open') { throw new StreamStateError(`Invalid datachannel state - ${this.channel.readyState}`) } - try { - this.log.trace('sending message, channel state "%s"', this.channel.readyState) - // send message without copying data - for (const buf of data) { - this.channel.send(buf) - } - } catch (err: any) { - this.log.error('error while sending message', err) + this.log.trace('sending message, channel state "%s"', this.channel.readyState) + + if (isFirefox) { + // TODO: firefox can deliver small messages out of order - remove once a + // browser with https://bugzilla.mozilla.org/show_bug.cgi?id=1983831 is + // available in playwright-test + this.channel.send(data.subarray()) + return + } + + // send message without copying data + for (const buf of data) { + this.channel.send(buf) } } sendData (data: Uint8ArrayList): SendResult { - const messageBuf = Message.encode({ - message: data.subarray() - }) - const prefixedBuf = lengthPrefixed.encode.single(messageBuf) - this._sendMessage(prefixedBuf) + if (this.channel.readyState !== 'open') { + return { + sentBytes: 0, + canSendMore: false + } + } + + // TODO: firefox can deliver small messages out of order - remove once a + // browser with https://bugzilla.mozilla.org/show_bug.cgi?id=1983831 is + // available in playwright-test + // ---- + // this is also necessary to work with rust-libp2p 0.54 though 0.53 seems ok + this._sendMessage( + lengthPrefixed.encode.single(Message.encode({ + message: data.subarray() + })) + ) + + /* + // TODO: enable this when FF and rust-libp2p are not broken + // send message without copying data + for (const message of data) { + this._sendMessage( + lengthPrefixed.encode.single(Message.encode({ + message + })) + ) + } + } + */ return { sentBytes: data.byteLength, @@ -157,69 +189,73 @@ export class WebRTCStream extends AbstractStream { } } - sendReset (): void { - this.receivedFinAck.resolve() - + sendReset (err: Error): void { try { + this.log.error('sending reset - %e', err) this._sendFlag(Message.Flag.RESET) + this.receivedFinAck?.reject(err) } catch (err) { this.log.error('failed to send reset - %e', err) - } finally { - this.channel.close() } } async sendCloseWrite (options?: AbortOptions): Promise { - if (this.channel.readyState === 'open') { - this._sendFlag(Message.Flag.FIN) - } - - await raceSignal(this.receivedFinAck.promise, options?.signal) + this._sendFlag(Message.Flag.FIN) + options?.signal?.throwIfAborted() + this.receivedFinAck = Promise.withResolvers() + + await Promise.any([ + raceSignal(this.receivedFinAck.promise, options?.signal), + new Promise(resolve => { + AbortSignal.timeout(this.finAckTimeout) + .addEventListener('abort', () => { + resolve() + }) + }) + ]) } async sendCloseRead (options?: AbortOptions): Promise { - if (this.channel.readyState === 'open') { - this._sendFlag(Message.Flag.STOP_SENDING) - } - + this._sendFlag(Message.Flag.STOP_SENDING) options?.signal?.throwIfAborted() } /** * Handle incoming */ - private processIncomingProtobuf (buffer: Uint8ArrayList): Uint8Array | undefined { + private processIncomingProtobuf (buffer: Uint8ArrayList): void { const message = Message.decode(buffer) + // ignore data messages if we've closed the readable end already + if (message.message != null && (this.readStatus === 'readable' || this.readStatus === 'paused')) { + this.onData(new Uint8ArrayList(message.message)) + } + if (message.flag !== undefined) { this.log.trace('incoming flag %s, write status "%s", read status "%s"', message.flag, this.writeStatus, this.readStatus) if (message.flag === Message.Flag.FIN) { - // We should expect no more data from the remote, stop reading - this.onRemoteCloseWrite() + // we should expect no more data from the remote, stop reading this._sendFlag(Message.Flag.FIN_ACK) + this.onRemoteCloseWrite() } if (message.flag === Message.Flag.RESET) { - this.receivedFinAck.resolve() - // Stop reading and writing to the stream immediately + // stop reading and writing to the stream immediately + this.receivedFinAck?.reject(new StreamResetError('The stream was reset')) this.onRemoteReset() } if (message.flag === Message.Flag.STOP_SENDING) { - // The remote has stopped reading + // the remote has stopped reading this.onRemoteCloseRead() } if (message.flag === Message.Flag.FIN_ACK) { - this.receivedFinAck.resolve() + // remote received our FIN + this.receivedFinAck?.resolve() } } - - // ignore data messages if we've closed the readable end already - if (this.readStatus === 'readable' || this.readStatus === 'paused') { - return message.message - } } private _sendFlag (flag: Message.Flag): boolean { diff --git a/packages/transport-webrtc/test/stream.spec.ts b/packages/transport-webrtc/test/stream.spec.ts index 18d3408f82..fc4388c83f 100644 --- a/packages/transport-webrtc/test/stream.spec.ts +++ b/packages/transport-webrtc/test/stream.spec.ts @@ -8,6 +8,7 @@ import { stubInterface } from 'sinon-ts' import { MAX_MESSAGE_SIZE, PROTOBUF_OVERHEAD } from '../src/constants.js' import { Message } from '../src/private-to-public/pb/message.js' import { createStream } from '../src/stream.js' +import { isFirefox } from '../src/util.ts' import { RTCPeerConnection } from '../src/webrtc/index.js' import { receiveFinAck, receiveRemoteCloseWrite } from './util.js' import type { WebRTCStream } from '../src/stream.js' @@ -38,7 +39,14 @@ describe('Max message size', () => { const sendMore = webrtcStream.send(data) expect(sendMore).to.be.true() - expect(channel.send).to.have.property('callCount', 2) + if (isFirefox) { + // TODO: firefox can deliver small messages out of order - remove once a + // browser with https://bugzilla.mozilla.org/show_bug.cgi?id=1983831 is + // available in playwright-test + expect(channel.send).to.have.property('callCount', 1) + } else { + expect(channel.send).to.have.property('callCount', 2) + } const bytes = channel.send.getCalls().reduce((acc, curr) => { return acc + curr.args[0].byteLength @@ -46,8 +54,13 @@ describe('Max message size', () => { expect(bytes).to.be.lessThan(MAX_MESSAGE_SIZE) - // minus 2x bytes because there is no flag field in the protobuf message - expect(channel.send.getCall(1).args[0]).to.have.lengthOf(MAX_MESSAGE_SIZE - 4) + if (isFirefox) { + // minus 2x bytes because there is no flag field in the protobuf message + expect(channel.send.getCall(0).args[0]).to.have.lengthOf(MAX_MESSAGE_SIZE - 2) + } else { + // minus 2x bytes because there is no flag field in the protobuf message + expect(channel.send.getCall(1).args[0]).to.have.lengthOf(MAX_MESSAGE_SIZE - 4) + } }) it(`sends messages greater than ${MAX_MESSAGE_SIZE} bytes in parts`, async () => { @@ -74,9 +87,17 @@ describe('Max message size', () => { const TEST_MESSAGE = 'test_message' -function setup (): { peerConnection: RTCPeerConnection, dataChannel: RTCDataChannel, stream: WebRTCStream } { +async function setup (): Promise<{ peerConnection: RTCPeerConnection, dataChannel: RTCDataChannel, stream: WebRTCStream }> { const peerConnection = new RTCPeerConnection() const dataChannel = peerConnection.createDataChannel('whatever', { negotiated: true, id: 91 }) + + await pEvent(dataChannel, 'open', { + rejectionEvents: [ + 'close', + 'error' + ] + }) + const stream = createStream({ channel: dataChannel, direction: 'outbound', @@ -96,13 +117,14 @@ function generatePbByFlag (flag?: Message.Flag): Uint8Array { return lengthPrefixed.encode.single(buf).subarray() } -describe('Stream Stats', () => { +// TODO: move to transport interface compliance suite +describe.skip('Stream Stats', () => { let stream: WebRTCStream let peerConnection: RTCPeerConnection let dataChannel: RTCDataChannel beforeEach(async () => { - ({ stream, peerConnection, dataChannel } = setup()) + ({ stream, peerConnection, dataChannel } = await setup()) }) afterEach(() => { @@ -163,13 +185,14 @@ describe('Stream Stats', () => { }) }) -describe('Stream Read Stats Transition By Incoming Flag', () => { +// TODO: move to transport interface compliance suite +describe.skip('Stream Read Stats Transition By Incoming Flag', () => { let dataChannel: RTCDataChannel let stream: Stream let peerConnection: RTCPeerConnection beforeEach(async () => { - ({ dataChannel, stream, peerConnection } = setup()) + ({ dataChannel, stream, peerConnection } = await setup()) }) afterEach(() => { @@ -205,13 +228,14 @@ describe('Stream Read Stats Transition By Incoming Flag', () => { }) }) -describe('Stream Write Stats Transition By Incoming Flag', () => { +// TODO: move to transport interface compliance suite +describe.skip('Stream Write Stats Transition By Incoming Flag', () => { let dataChannel: RTCDataChannel let stream: Stream let peerConnection: RTCPeerConnection beforeEach(async () => { - ({ dataChannel, stream, peerConnection } = setup()) + ({ dataChannel, stream, peerConnection } = await setup()) }) afterEach(() => { diff --git a/packages/transport-websockets/package.json b/packages/transport-websockets/package.json index 129490b184..6e24a2df4a 100644 --- a/packages/transport-websockets/package.json +++ b/packages/transport-websockets/package.json @@ -83,7 +83,7 @@ }, "devDependencies": { "@libp2p/logger": "^5.2.0", - "aegir": "^47.0.21", + "aegir": "^47.0.22", "is-loopback-addr": "^2.0.2", "p-wait-for": "^5.0.2", "sinon": "^21.0.0", diff --git a/packages/transport-websockets/src/listener.ts b/packages/transport-websockets/src/listener.ts index ed66148780..e7e367c761 100644 --- a/packages/transport-websockets/src/listener.ts +++ b/packages/transport-websockets/src/listener.ts @@ -331,12 +331,20 @@ export class WebSocketListener extends TypedEventEmitter impleme // abort and in-flight connection upgrades this.shutdownController.abort() - await Promise.all([ + const events = [ pEvent(this.server, 'close'), - this.http == null ? null : pEvent(this.http, 'close'), - this.https == null ? null : pEvent(this.https, 'close'), pEvent(this.wsServer, 'close') - ]) + ] + + if (this.http != null) { + events.push(pEvent(this.http, 'close')) + } + + if (this.https != null) { + events.push(pEvent(this.https, 'close')) + } + + await Promise.all(events) this.safeDispatchEvent('close') } diff --git a/packages/transport-webtransport/package.json b/packages/transport-webtransport/package.json index cbf717f09a..5083df2b83 100644 --- a/packages/transport-webtransport/package.json +++ b/packages/transport-webtransport/package.json @@ -62,7 +62,7 @@ "@libp2p/logger": "^5.2.0", "@libp2p/ping": "^2.0.37", "@noble/hashes": "^1.8.0", - "aegir": "^47.0.21", + "aegir": "^47.0.22", "execa": "^9.6.0", "go-libp2p": "^1.6.0", "it-all": "^3.0.9", diff --git a/packages/transport-webtransport/test/browser.ts b/packages/transport-webtransport/test/browser.ts index bf57c6e80d..1886b63a82 100644 --- a/packages/transport-webtransport/test/browser.ts +++ b/packages/transport-webtransport/test/browser.ts @@ -1,5 +1,6 @@ /* eslint-env mocha */ +import { stop } from '@libp2p/interface' import { noise } from '@libp2p/noise' import { ping } from '@libp2p/ping' import { multiaddr } from '@multiformats/multiaddr' @@ -32,12 +33,7 @@ describe('libp2p-webtransport', () => { }) afterEach(async () => { - if (node != null) { - await node.stop() - - const conns = node.getConnections() - expect(conns.length).to.equal(0) - } + await stop(node) }) it('webtransport connects to go-libp2p', async () => { diff --git a/packages/upnp-nat/package.json b/packages/upnp-nat/package.json index fc3bdf8b7e..0675c8d20c 100644 --- a/packages/upnp-nat/package.json +++ b/packages/upnp-nat/package.json @@ -62,7 +62,7 @@ "@libp2p/crypto": "^5.1.8", "@libp2p/logger": "^5.2.0", "@libp2p/peer-id": "^5.1.9", - "aegir": "^47.0.21", + "aegir": "^47.0.22", "sinon-ts": "^2.0.0", "wherearewe": "^2.0.1" }, diff --git a/packages/utils/package.json b/packages/utils/package.json index 0dd1373c44..1c167588a0 100644 --- a/packages/utils/package.json +++ b/packages/utils/package.json @@ -72,7 +72,7 @@ "devDependencies": { "@libp2p/peer-id": "^5.1.9", "@types/netmask": "^2.0.5", - "aegir": "^47.0.21", + "aegir": "^47.0.22", "benchmark": "^2.1.4", "it-all": "^3.0.9", "it-drain": "^3.0.10", diff --git a/packages/utils/src/abstract-message-stream.ts b/packages/utils/src/abstract-message-stream.ts index aee3e722c4..2ec51d1469 100644 --- a/packages/utils/src/abstract-message-stream.ts +++ b/packages/utils/src/abstract-message-stream.ts @@ -91,9 +91,11 @@ export abstract class AbstractMessageStream { - this.log.trace('drain event received, continue sending data') - this.writableNeedsDrain = false - this.processSendQueue() + if (this.writableNeedsDrain) { + this.log.trace('drain event received, continue sending data') + this.writableNeedsDrain = false + this.processSendQueue() + } } this.addEventListener('drain', continueSendingOnDrain) } @@ -304,6 +306,7 @@ export abstract class AbstractMessageStream { } } - onCreateStream (options: CreateStreamOptions): MockMuxedStream { + async onCreateStream (options: CreateStreamOptions): Promise { this.nextStreamId += 2 return this._createStream(`${this.nextStreamId}`, 'outbound', options) diff --git a/packages/utils/src/stream-utils.ts b/packages/utils/src/stream-utils.ts index eee1b6d7da..e5c6463363 100644 --- a/packages/utils/src/stream-utils.ts +++ b/packages/utils/src/stream-utils.ts @@ -50,14 +50,6 @@ export interface ByteStreamOpts { * @default 4_194_304 */ maxBufferSize?: number - - /** - * If true, prevent message events propagating after they have been received, - * - * This is useful for when there are be other observers of messages and the - * caller does not wish to them to receive anything - */ - stopPropagation?: boolean } export interface ReadBytesOptions extends AbortOptions { @@ -132,10 +124,6 @@ export function byteStream (stream: T, opts?: ByteStre } const byteStreamOnMessageListener = (evt: StreamMessageEvent): void => { - if (opts?.stopPropagation === true) { - // evt.stopImmediatePropagation() - } - readBuffer.append(evt.data) if (readBuffer.byteLength > maxBufferSize) { @@ -177,6 +165,7 @@ export function byteStream (stream: T, opts?: ByteStre } if (readBuffer.byteLength < options.bytes) { + stream.log.error('closed after reading %d/%d bytes', readBuffer.byteLength, options.bytes) throw new UnexpectedEOFError(`Unexpected EOF - stream closed after reading ${readBuffer.byteLength}/${options.bytes} bytes`) } } @@ -210,6 +199,7 @@ export function byteStream (stream: T, opts?: ByteStre if (readBuffer.byteLength < toRead) { if (isEOF(stream)) { + stream.log.error('closed while reading %d/%d bytes', readBuffer.byteLength, toRead) throw new UnexpectedEOFError(`Unexpected EOF - stream closed while reading ${readBuffer.byteLength}/${toRead} bytes`) } @@ -352,10 +342,12 @@ export function lpStream (stream: T, opts: Partial { const [sinkError, sourceError] = await Promise.all([ it.sink(source()).catch(err => err), drain(it.source).catch(err => err), + // eslint-disable-next-line @typescript-eslint/await-thenable outgoing.abort(err) ])