From b7361fbf8953f80cb59a8ab3c96c0657a53ce352 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Thu, 14 Nov 2024 12:12:25 +0000 Subject: [PATCH 1/4] chore: make tests more reliable in ci CI is underpowered, the "many small writes" test overloads process and means the connection monitor thinks the remote has gone away so disable it during interface tests. --- packages/integration-tests/.aegir.js | 4 +++- .../test/compliance/transport/circuit-relay.spec.ts | 3 +++ .../test/compliance/transport/memory.spec.ts | 5 ++++- .../test/compliance/transport/tcp.spec.ts | 5 ++++- .../test/compliance/transport/webrtc.spec.ts | 7 ++++++- .../test/compliance/transport/websockets.spec.ts | 3 +++ 6 files changed, 23 insertions(+), 4 deletions(-) diff --git a/packages/integration-tests/.aegir.js b/packages/integration-tests/.aegir.js index 6c2537bdf3..01517980e5 100644 --- a/packages/integration-tests/.aegir.js +++ b/packages/integration-tests/.aegir.js @@ -20,6 +20,7 @@ export default { const { identify } = await import('@libp2p/identify') const { echo } = await import('@libp2p/echo') const { mockMuxer } = await import('@libp2p/interface-compliance-tests/mocks') + const { ping } = await import('@libp2p/ping') const libp2p = await createLibp2p({ connectionManager: { @@ -54,7 +55,8 @@ export default { }), echo: echo({ maxInboundStreams: 5 - }) + }), + ping: ping() } }) diff --git a/packages/integration-tests/test/compliance/transport/circuit-relay.spec.ts b/packages/integration-tests/test/compliance/transport/circuit-relay.spec.ts index 2d73e076d2..48cedc68ed 100644 --- a/packages/integration-tests/test/compliance/transport/circuit-relay.spec.ts +++ b/packages/integration-tests/test/compliance/transport/circuit-relay.spec.ts @@ -35,6 +35,9 @@ describe('Circuit relay transport interface compliance', () => { }, connectionGater: { denyDialMultiaddr: () => false + }, + connectionMonitor: { + enabled: false } } diff --git a/packages/integration-tests/test/compliance/transport/memory.spec.ts b/packages/integration-tests/test/compliance/transport/memory.spec.ts index 9517a39df8..b31a615443 100644 --- a/packages/integration-tests/test/compliance/transport/memory.spec.ts +++ b/packages/integration-tests/test/compliance/transport/memory.spec.ts @@ -16,7 +16,10 @@ describe('memory transport interface compliance tests', () => { ], streamMuxers: [ yamux() - ] + ], + connectionMonitor: { + enabled: false + } } return { diff --git a/packages/integration-tests/test/compliance/transport/tcp.spec.ts b/packages/integration-tests/test/compliance/transport/tcp.spec.ts index 38f5495ca1..031b4e61ae 100644 --- a/packages/integration-tests/test/compliance/transport/tcp.spec.ts +++ b/packages/integration-tests/test/compliance/transport/tcp.spec.ts @@ -21,7 +21,10 @@ describe('tcp transport interface compliance IPv4', () => { ], streamMuxers: [ yamux() - ] + ], + connectionMonitor: { + enabled: false + } } return { diff --git a/packages/integration-tests/test/compliance/transport/webrtc.spec.ts b/packages/integration-tests/test/compliance/transport/webrtc.spec.ts index a9a155c8c7..7b4c8c1f41 100644 --- a/packages/integration-tests/test/compliance/transport/webrtc.spec.ts +++ b/packages/integration-tests/test/compliance/transport/webrtc.spec.ts @@ -5,6 +5,7 @@ import { yamux } from '@chainsafe/libp2p-yamux' import { circuitRelayTransport } from '@libp2p/circuit-relay-v2' import { identify } from '@libp2p/identify' import tests from '@libp2p/interface-compliance-tests/transport' +import { ping } from '@libp2p/ping' import { webRTC } from '@libp2p/webrtc' import { webSockets } from '@libp2p/websockets' import { all } from '@libp2p/websockets/filters' @@ -33,10 +34,14 @@ describe('WebRTC transport interface compliance', () => { yamux() ], services: { - identify: identify() + identify: identify(), + ping: ping() }, connectionGater: { denyDialMultiaddr: () => false + }, + connectionMonitor: { + enabled: false } } diff --git a/packages/integration-tests/test/compliance/transport/websockets.spec.ts b/packages/integration-tests/test/compliance/transport/websockets.spec.ts index 4e0914c910..ab6e8fe9a1 100644 --- a/packages/integration-tests/test/compliance/transport/websockets.spec.ts +++ b/packages/integration-tests/test/compliance/transport/websockets.spec.ts @@ -28,6 +28,9 @@ describe('websocket transport interface compliance', () => { ], connectionGater: { denyDialMultiaddr: () => false + }, + connectionMonitor: { + enabled: false } } From b898a820bf08afbbcd3cec0fead58d1dae119024 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Thu, 14 Nov 2024 12:47:11 +0000 Subject: [PATCH 2/4] chore: turn off on relay --- packages/integration-tests/.aegir.js | 3 +++ 1 file changed, 3 insertions(+) diff --git a/packages/integration-tests/.aegir.js b/packages/integration-tests/.aegir.js index 01517980e5..91a8a94030 100644 --- a/packages/integration-tests/.aegir.js +++ b/packages/integration-tests/.aegir.js @@ -57,6 +57,9 @@ export default { maxInboundStreams: 5 }), ping: ping() + }, + connectionMonitor: { + enabled: false } }) From e566d76fccbf9b1b4d461a747429941643c3de3b Mon Sep 17 00:00:00 2001 From: achingbrain Date: Thu, 14 Nov 2024 14:09:59 +0000 Subject: [PATCH 3/4] chore: check connection streams --- .../src/transport/index.ts | 10 +++++----- packages/transport-webrtc/src/muxer.ts | 13 ++++++++++--- packages/transport-webrtc/src/stream.ts | 8 ++++++-- 3 files changed, 21 insertions(+), 10 deletions(-) diff --git a/packages/interface-compliance-tests/src/transport/index.ts b/packages/interface-compliance-tests/src/transport/index.ts index 1e2f840b98..54ecdef632 100644 --- a/packages/interface-compliance-tests/src/transport/index.ts +++ b/packages/interface-compliance-tests/src/transport/index.ts @@ -140,7 +140,7 @@ export default (common: TestSetup): void => { .with.property('name', 'AbortError') }) - it('should close all streams when the connection closes', async () => { + it.only('should close all streams when the connection closes', async () => { ({ dialer, listener, dialAddrs } = await getSetup(common)) let incomingConnectionPromise: DeferredPromise | undefined @@ -164,14 +164,14 @@ export default (common: TestSetup): void => { remoteConn = await incomingConnectionPromise.promise } - const streams: Stream[] = [] - for (let i = 0; i < 5; i++) { - streams.push(await connection.newStream('/echo/1.0.0', { + await connection.newStream('/echo/1.0.0', { maxOutboundStreams: 5 - })) + }) } + const streams = connection.streams + // Close the connection and verify all streams have been closed await connection.close() diff --git a/packages/transport-webrtc/src/muxer.ts b/packages/transport-webrtc/src/muxer.ts index 63dccd16ff..cf6a5b7588 100644 --- a/packages/transport-webrtc/src/muxer.ts +++ b/packages/transport-webrtc/src/muxer.ts @@ -155,12 +155,16 @@ export class DataChannelMuxer implements StreamMuxer { return } + // lib-datachannel throws if `.getId` is called on a closed channel so + // memoize it + const id = channel.id + const stream = createStream({ channel, direction: 'inbound', onEnd: () => { - this.log('incoming channel %s ended with state %s', channel.id, channel.readyState) this.#onStreamEnd(stream, channel) + this.log('incoming channel %s ended', id) }, logger: this.logger, ...this.dataChannelOptions @@ -241,15 +245,18 @@ export class DataChannelMuxer implements StreamMuxer { newStream (): Stream { // The spec says the label SHOULD be an empty string: https://github.com/libp2p/specs/blob/master/webrtc/README.md#rtcdatachannel-label const channel = this.peerConnection.createDataChannel('') + // lib-datachannel throws if `.getId` is called on a closed channel so + // memoize it + const id = channel.id - this.log.trace('opened outgoing datachannel with channel id %s', channel.id) + this.log.trace('opened outgoing datachannel with channel id %s', id) const stream = createStream({ channel, direction: 'outbound', onEnd: () => { - this.log('outgoing channel %s ended with state %s', channel.id, channel.readyState) this.#onStreamEnd(stream, channel) + this.log('outgoing channel %s ended', id) }, logger: this.logger, ...this.dataChannelOptions diff --git a/packages/transport-webrtc/src/stream.ts b/packages/transport-webrtc/src/stream.ts index 694b9ba7ac..b345414c44 100644 --- a/packages/transport-webrtc/src/stream.ts +++ b/packages/transport-webrtc/src/stream.ts @@ -274,7 +274,11 @@ export class WebRTCStream extends AbstractStream { } async sendReset (): Promise { - await this._sendFlag(Message.Flag.RESET) + try { + await this._sendFlag(Message.Flag.RESET) + } catch (err) { + this.log.error('failed to send reset - %e', err) + } } async sendCloseWrite (options: AbortOptions): Promise { @@ -362,7 +366,7 @@ export class WebRTCStream extends AbstractStream { return true } catch (err: any) { - this.log.error('could not send flag %s', flag.toString(), err) + this.log.error('could not send flag %s - %e', flag.toString(), err) } return false From de52802d30a4575bf4740d40e7e5931097ac0752 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Thu, 14 Nov 2024 14:17:48 +0000 Subject: [PATCH 4/4] chore: fix linting --- packages/interface-compliance-tests/src/transport/index.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/interface-compliance-tests/src/transport/index.ts b/packages/interface-compliance-tests/src/transport/index.ts index 54ecdef632..eca91c1627 100644 --- a/packages/interface-compliance-tests/src/transport/index.ts +++ b/packages/interface-compliance-tests/src/transport/index.ts @@ -9,7 +9,7 @@ import { isValidTick } from '../is-valid-tick.js' import { createPeer, getTransportManager, getUpgrader, slowNetwork } from './utils.js' import type { TestSetup } from '../index.js' import type { Echo } from '@libp2p/echo' -import type { Connection, Libp2p, Stream } from '@libp2p/interface' +import type { Connection, Libp2p } from '@libp2p/interface' import type { Multiaddr } from '@multiformats/multiaddr' import type { MultiaddrMatcher } from '@multiformats/multiaddr-matcher' import type { Libp2pInit } from 'libp2p' @@ -140,7 +140,7 @@ export default (common: TestSetup): void => { .with.property('name', 'AbortError') }) - it.only('should close all streams when the connection closes', async () => { + it('should close all streams when the connection closes', async () => { ({ dialer, listener, dialAddrs } = await getSetup(common)) let incomingConnectionPromise: DeferredPromise | undefined