Skip to content

Commit 7939dbd

Browse files
achingbrainmaschad
andauthored
feat: add connection monitor (#2644)
Adds a connection monitor that periodically ensures remote peers are still online and contactable by trying to send a single byte via the ping protocol, and sets the `.rtt` property of the connection to how long it took. If the ping protocol is not supported by the remote, it tries to infer the round trip time by how long it took to fail. If the remote is unresponsive or opening the stream fails for any other reason, the connection is aborted with the throw error. It's possible to configure the ping interval, how long we wait before considering a peer to be inactive and whether or not to close the connection on failure. Closes #2643 --------- Co-authored-by: Chad Nehemiah <[email protected]>
1 parent c5dba70 commit 7939dbd

File tree

6 files changed

+270
-0
lines changed

6 files changed

+270
-0
lines changed

packages/interface/src/connection/index.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -287,6 +287,13 @@ export interface Connection {
287287
*/
288288
transient: boolean
289289

290+
/**
291+
* The time in milliseconds it takes to make a round trip to the remote peer.
292+
*
293+
* This is updated periodically by the connection monitor.
294+
*/
295+
rtt?: number
296+
290297
/**
291298
* Create a new stream on this connection and negotiate one of the passed protocols
292299
*/

packages/libp2p/package.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@
101101
"any-signal": "^4.1.1",
102102
"datastore-core": "^9.2.9",
103103
"interface-datastore": "^8.2.11",
104+
"it-byte-stream": "^1.0.12",
104105
"it-merge": "^3.0.5",
105106
"it-parallel": "^3.0.7",
106107
"merge-options": "^3.0.4",
@@ -126,6 +127,7 @@
126127
"it-all": "^3.0.6",
127128
"it-drain": "^3.0.7",
128129
"it-map": "^3.1.0",
130+
"it-pair": "^2.0.6",
129131
"it-pipe": "^3.0.1",
130132
"it-pushable": "^3.2.3",
131133
"it-stream-types": "^2.0.1",
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
import { serviceCapabilities } from '@libp2p/interface'
2+
import { AdaptiveTimeout } from '@libp2p/utils/adaptive-timeout'
3+
import { byteStream } from 'it-byte-stream'
4+
import type { ComponentLogger, Logger, Metrics, Startable } from '@libp2p/interface'
5+
import type { ConnectionManager } from '@libp2p/interface-internal'
6+
import type { AdaptiveTimeoutInit } from '@libp2p/utils/adaptive-timeout'
7+
8+
const DEFAULT_PING_INTERVAL_MS = 10000
9+
10+
export interface ConnectionMonitorInit {
11+
/**
12+
* Whether the connection monitor is enabled
13+
*
14+
* @default true
15+
*/
16+
enabled?: boolean
17+
18+
/**
19+
* How often to ping remote peers in ms
20+
*
21+
* @default 10000
22+
*/
23+
pingInterval?: number
24+
25+
/**
26+
* Timeout settings for how long the ping is allowed to take before the
27+
* connection will be judged inactive and aborted.
28+
*
29+
* The timeout is adaptive to cope with slower networks or nodes that
30+
* have changing network characteristics, such as mobile.
31+
*/
32+
pingTimeout?: Omit<AdaptiveTimeoutInit, 'metricsName' | 'metrics'>
33+
34+
/**
35+
* If true, any connection that fails the ping will be aborted
36+
*
37+
* @default true
38+
*/
39+
abortConnectionOnPingFailure?: boolean
40+
}
41+
42+
export interface ConnectionMonitorComponents {
43+
logger: ComponentLogger
44+
connectionManager: ConnectionManager
45+
metrics?: Metrics
46+
}
47+
48+
export class ConnectionMonitor implements Startable {
49+
private readonly components: ConnectionMonitorComponents
50+
private readonly log: Logger
51+
private heartbeatInterval?: ReturnType<typeof setInterval>
52+
private readonly pingIntervalMs: number
53+
private abortController?: AbortController
54+
private readonly timeout: AdaptiveTimeout
55+
56+
constructor (components: ConnectionMonitorComponents, init: ConnectionMonitorInit = {}) {
57+
this.components = components
58+
59+
this.log = components.logger.forComponent('libp2p:connection-monitor')
60+
this.pingIntervalMs = init.pingInterval ?? DEFAULT_PING_INTERVAL_MS
61+
62+
this.timeout = new AdaptiveTimeout({
63+
...(init.pingTimeout ?? {}),
64+
metrics: components.metrics,
65+
metricName: 'libp2p_connection_monitor_ping_time_milliseconds'
66+
})
67+
}
68+
69+
readonly [Symbol.toStringTag] = '@libp2p/connection-monitor'
70+
71+
readonly [serviceCapabilities]: string[] = [
72+
'@libp2p/connection-monitor'
73+
]
74+
75+
start (): void {
76+
this.abortController = new AbortController()
77+
78+
this.heartbeatInterval = setInterval(() => {
79+
this.components.connectionManager.getConnections().forEach(conn => {
80+
Promise.resolve().then(async () => {
81+
let start = Date.now()
82+
try {
83+
const signal = this.timeout.getTimeoutSignal({
84+
signal: this.abortController?.signal
85+
})
86+
const stream = await conn.newStream('/ipfs/ping/1.0.0', {
87+
signal,
88+
runOnTransientConnection: true
89+
})
90+
const bs = byteStream(stream)
91+
start = Date.now()
92+
93+
await Promise.all([
94+
bs.write(new Uint8Array(1), {
95+
signal
96+
}),
97+
bs.read(1, {
98+
signal
99+
})
100+
])
101+
102+
conn.rtt = Date.now() - start
103+
104+
await bs.unwrap().close({
105+
signal
106+
})
107+
} catch (err: any) {
108+
if (err.code !== 'ERR_UNSUPPORTED_PROTOCOL') {
109+
throw err
110+
}
111+
112+
// protocol was unsupported, but that's ok as it means the remote
113+
// peer was still alive. We ran multistream-select which means two
114+
// round trips (e.g. 1x for the mss header, then another for the
115+
// protocol) so divide the time it took by two
116+
conn.rtt = (Date.now() - start) / 2
117+
}
118+
})
119+
.catch(err => {
120+
this.log.error('error during heartbeat, aborting connection', err)
121+
conn.abort(err)
122+
})
123+
})
124+
}, this.pingIntervalMs)
125+
}
126+
127+
stop (): void {
128+
this.abortController?.abort()
129+
130+
if (this.heartbeatInterval != null) {
131+
clearInterval(this.heartbeatInterval)
132+
}
133+
}
134+
}

packages/libp2p/src/index.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import { createLibp2pNode } from './libp2p.js'
1818
import type { AddressManagerInit } from './address-manager/index.js'
1919
import type { Components } from './components.js'
2020
import type { ConnectionManagerInit } from './connection-manager/index.js'
21+
import type { ConnectionMonitorInit } from './connection-monitor.js'
2122
import type { TransportManagerInit } from './transport-manager.js'
2223
import type { Libp2p, ServiceMap, ComponentLogger, NodeInfo, ConnectionProtector, ConnectionEncrypter, ConnectionGater, ContentRouting, Metrics, PeerDiscovery, PeerId, PeerRouting, StreamMuxerFactory, Transport, PrivateKey } from '@libp2p/interface'
2324
import type { PersistentPeerStoreInit } from '@libp2p/peer-store'
@@ -57,6 +58,11 @@ export interface Libp2pInit<T extends ServiceMap = ServiceMap> {
5758
*/
5859
connectionManager?: ConnectionManagerInit
5960

61+
/**
62+
* libp2p Connection Monitor configuration
63+
*/
64+
connectionMonitor?: ConnectionMonitorInit
65+
6066
/**
6167
* A connection gater can deny new connections based on user criteria
6268
*/

packages/libp2p/src/libp2p.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import { checkServiceDependencies, defaultComponents } from './components.js'
1414
import { connectionGater } from './config/connection-gater.js'
1515
import { validateConfig } from './config.js'
1616
import { DefaultConnectionManager } from './connection-manager/index.js'
17+
import { ConnectionMonitor } from './connection-monitor.js'
1718
import { CompoundContentRouting } from './content-routing.js'
1819
import { codes } from './errors.js'
1920
import { DefaultPeerRouting } from './peer-routing.js'
@@ -121,6 +122,11 @@ export class Libp2pNode<T extends ServiceMap = ServiceMap> extends TypedEventEmi
121122
// Create the Connection Manager
122123
this.configureComponent('connectionManager', new DefaultConnectionManager(this.components, init.connectionManager))
123124

125+
if (init.connectionMonitor?.enabled !== false) {
126+
// Create the Connection Monitor if not disabled
127+
this.configureComponent('connectionMonitor', new ConnectionMonitor(this.components, init.connectionMonitor))
128+
}
129+
124130
// Create the Registrar
125131
this.configureComponent('registrar', new DefaultRegistrar(this.components))
126132

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
/* eslint-env mocha */
2+
3+
import { CodeError, start, stop } from '@libp2p/interface'
4+
import { defaultLogger } from '@libp2p/logger'
5+
import { expect } from 'aegir/chai'
6+
import delay from 'delay'
7+
import { pair } from 'it-pair'
8+
import { type StubbedInstance, stubInterface } from 'sinon-ts'
9+
import { ConnectionMonitor } from '../../src/connection-monitor.js'
10+
import type { ComponentLogger, Stream, Connection } from '@libp2p/interface'
11+
import type { ConnectionManager } from '@libp2p/interface-internal'
12+
13+
interface StubbedConnectionMonitorComponents {
14+
logger: ComponentLogger
15+
connectionManager: StubbedInstance<ConnectionManager>
16+
}
17+
18+
describe('connection monitor', () => {
19+
let monitor: ConnectionMonitor
20+
let components: StubbedConnectionMonitorComponents
21+
22+
beforeEach(() => {
23+
components = {
24+
logger: defaultLogger(),
25+
connectionManager: stubInterface<ConnectionManager>()
26+
}
27+
})
28+
29+
afterEach(async () => {
30+
await stop(monitor)
31+
})
32+
33+
it('should monitor the liveness of a connection', async () => {
34+
monitor = new ConnectionMonitor(components, {
35+
pingInterval: 10
36+
})
37+
38+
await start(monitor)
39+
40+
const connection = stubInterface<Connection>()
41+
const stream = stubInterface<Stream>({
42+
...pair<any>()
43+
})
44+
connection.newStream.withArgs('/ipfs/ping/1.0.0').resolves(stream)
45+
46+
components.connectionManager.getConnections.returns([connection])
47+
48+
await delay(100)
49+
50+
expect(connection.rtt).to.be.gte(0)
51+
})
52+
53+
it('should monitor the liveness of a connection that does not support ping', async () => {
54+
monitor = new ConnectionMonitor(components, {
55+
pingInterval: 10
56+
})
57+
58+
await start(monitor)
59+
60+
const connection = stubInterface<Connection>()
61+
connection.newStream.withArgs('/ipfs/ping/1.0.0').callsFake(async () => {
62+
await delay(10)
63+
throw new CodeError('Unsupported protocol', 'ERR_UNSUPPORTED_PROTOCOL')
64+
})
65+
66+
components.connectionManager.getConnections.returns([connection])
67+
68+
await delay(100)
69+
70+
expect(connection.rtt).to.be.gte(0)
71+
})
72+
73+
it('should abort a connection that times out', async () => {
74+
monitor = new ConnectionMonitor(components, {
75+
pingInterval: 50,
76+
pingTimeout: {
77+
initialValue: 10
78+
}
79+
})
80+
81+
await start(monitor)
82+
83+
const connection = stubInterface<Connection>()
84+
connection.newStream.withArgs('/ipfs/ping/1.0.0').callsFake(async (protocols, opts) => {
85+
await delay(200)
86+
opts?.signal?.throwIfAborted()
87+
return stubInterface<Stream>()
88+
})
89+
90+
components.connectionManager.getConnections.returns([connection])
91+
92+
await delay(500)
93+
94+
expect(connection.abort).to.have.property('called', true)
95+
})
96+
97+
it('should abort a connection that fails', async () => {
98+
monitor = new ConnectionMonitor(components, {
99+
pingInterval: 10
100+
})
101+
102+
await start(monitor)
103+
104+
const connection = stubInterface<Connection>()
105+
connection.newStream.withArgs('/ipfs/ping/1.0.0').callsFake(async (protocols, opts) => {
106+
throw new CodeError('Connection closed', 'ERR_CONNECTION_CLOSED')
107+
})
108+
109+
components.connectionManager.getConnections.returns([connection])
110+
111+
await delay(100)
112+
113+
expect(connection.abort).to.have.property('called', true)
114+
})
115+
})

0 commit comments

Comments
 (0)